summaryrefslogtreecommitdiff
path: root/portato/backend
diff options
context:
space:
mode:
Diffstat (limited to 'portato/backend')
-rw-r--r--portato/backend/__init__.py33
-rw-r--r--portato/backend/exceptions.py23
-rw-r--r--portato/backend/flags.py617
-rw-r--r--portato/backend/package.py324
-rw-r--r--portato/backend/portage_helper.py374
5 files changed, 1371 insertions, 0 deletions
diff --git a/portato/backend/__init__.py b/portato/backend/__init__.py
new file mode 100644
index 0000000..55cb10b
--- /dev/null
+++ b/portato/backend/__init__.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/__init__.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+import sys
+
+# insert the gentoolkit-location into syspath
+sys.path.insert(0, "/usr/lib/gentoolkit/pym")
+
+# import gentoolkit and portage
+import gentoolkit
+import portage
+
+# this is set to "var/lib/portage/world" by default - so we add the leading /
+portage.WORLD_FILE = portage.settings["ROOT"]+portage.WORLD_FILE
+portage.settings = None # we use our own one ...
+
+# portage tree vars
+porttree = gentoolkit.porttree
+vartree = gentoolkit.vartree
+
+# import our packages
+from exceptions import *
+from package import *
+from portage_helper import *
diff --git a/portato/backend/exceptions.py b/portato/backend/exceptions.py
new file mode 100644
index 0000000..a5cb2fb
--- /dev/null
+++ b/portato/backend/exceptions.py
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/exceptions.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+class BlockedException (Exception):
+ """An exception marking, that some package is blocking another one."""
+ pass
+
+class PackageNotFoundException (Exception):
+ """An exception marking that a package could not be found."""
+ pass
+
+class DependencyCalcError (Exception):
+ """An error occured during dependency calculation."""
+ pass
diff --git a/portato/backend/flags.py b/portato/backend/flags.py
new file mode 100644
index 0000000..d1187c9
--- /dev/null
+++ b/portato/backend/flags.py
@@ -0,0 +1,617 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/flags.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+import os
+import os.path
+from subprocess import Popen, PIPE # needed for grep
+
+from portato.helper import *
+from portage_helper import split_package_name
+import package
+
+import portage
+from portage_util import unique_array
+
+CONFIG = {
+ "usefile" : "portato",
+ "maskfile" : "portato",
+ "testingfile" : "portato",
+ "usePerVersion" : True,
+ "maskPerVersion" : True,
+ "testingPerVersion" : True
+ }
+
+### GENERAL PART ###
+
+def grep (pkg, path):
+ """Grep runs "egrep" on a given path and looks for occurences of a given package.
+ @param pkg: the package
+ @type pkg: string (cpv) or L{backend.Package}-object
+ @param path: path to look in
+ @type path: string
+
+ @returns: occurences of pkg in the format: "file:line-no:complete_line_found"
+ @rtype: string"""
+
+ if not isinstance(pkg, package.Package):
+ pkg = package.Package(pkg) # assume it is a cpv or a gentoolkit.Package
+
+ command = "egrep -x -n -r -H '^[<>!=~]{0,2}%s(-[0-9].*)?[[:space:]]?.*$' %s" # %s is replaced in the next line ;)
+ return Popen((command % (pkg.get_cp(), path)), shell = True, stdout = PIPE).communicate()[0].splitlines()
+
+def get_data(pkg, path):
+ """This splits up the data of L{grep} and builds tuples in the format (file,line,criterion,list_of_flags).
+ @param pkg: package to find
+ @type pkg: string (cpv) or L{backend.Package}-object
+ @param path: path to look in
+ @type path: string
+
+ @returns: a list of tuples in the form (file,line,criterion,list_of_flags)
+ @rtype: (string,string,string,string[])[]"""
+
+ flags = []
+
+ # do grep
+ list = grep(pkg, path)
+
+ for i in range(len(list)):
+ file, line, fl = tuple(list[i].split(":")) # get file, line and flag-list
+ fl = fl.split()
+ crit = fl[0]
+ fl = fl[1:]
+ # stop after first comment
+ for j in range(len(fl)):
+ if fl[j][0] == "#": # comment - stop here
+ fl = fl[:j]
+ break
+ flags.append((file,line,crit,fl))
+
+ return flags
+
+def set_config (cfg):
+ """This function sets the CONFIG-variable for the whole module. Use this instead of modifying L{CONFIG} directly.
+ @param cfg: a dictionary with at least all the keys of the CONFIG-var
+ @type cfg: dict
+ @raises KeyError: if a keyword is missing in the new cfg"""
+
+ for i in CONFIG.keys():
+ if not i in cfg:
+ raise KeyError, "Missing keyword in config: "+i
+
+ for i in CONFIG:
+ CONFIG[i] = cfg[i]
+
+def generate_path (cpv, exp):
+ """Generates the correct path out of given wildcards.
+ These wildcards can be:
+ - $(cat) : category
+ - $(cat-1): first part of the category (e.g. "app")
+ - $(cat-2): second part of the category
+ - $(pkg) : name of the package
+
+ @param cpv: the cpv of the current package
+ @type cpv: string (cat/pkg-ver)
+ @param exp: the expression to render the path from
+ @type exp: string
+ @returns: rendered path
+ @rtype string"""
+
+ cat, pkg, ver, rev = split_package_name(cpv)
+
+ if exp.find("$(") != -1:
+ exp = exp.replace("$(cat)",cat).\
+ replace("$(pkg)",pkg).\
+ replace("$(cat-1)",cat.split("-")[0]).\
+ replace("$(cat-2)",cat.split("-")[1])
+ return exp
+
+### USE FLAG PART ###
+USE_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.use")
+USE_PATH_IS_DIR = os.path.isdir(USE_PATH)
+useFlags = {} # useFlags in the file
+newUseFlags = {} # useFlags as we want them to be: format: cpv -> [(file, line, useflag, (true if removed from list / false if added))]
+
+def invert_use_flag (flag):
+ """Invertes a flag.
+
+ >>> invert_use_flag("foo")
+ -foo
+ >>> invert_use_flag("-bar")
+ bar
+
+ @param flag: the flag
+ @type flag: string
+ @returns: inverted flag
+ @rtype: string
+ """
+
+ if flag[0] == "-":
+ return flag[1:]
+ else:
+ return "-"+flag
+
+def set_use_flag (pkg, flag):
+ """Sets the useflag for a given package.
+
+ @param pkg: the package
+ @type pkg: string (cpv) or L{backend.Package}-object
+ @param flag: the flag to set
+ @type flag: string"""
+
+ global useFlags, newUseFlags
+
+ if not isinstance(pkg, package.Package):
+ pkg = package.Package(pkg) # assume cpv or gentoolkit.Package
+
+ cpv = pkg.get_cpv()
+ invFlag = invert_use_flag(flag)
+
+ # if not saved in useFlags, get it by calling get_data() which calls grep()
+ data = None
+ if not cpv in useFlags:
+ data = get_data(pkg, USE_PATH)
+ useFlags[cpv] = data
+ else:
+ data = useFlags[cpv]
+
+ if not cpv in newUseFlags:
+ newUseFlags[cpv] = []
+
+ debug("data: "+str(data))
+ # add a useflag / delete one
+ added = False
+ for file, line, crit, flags in data:
+ if pkg.matches(crit):
+ # we have the inverted flag in the uselist/newuselist --> delete it
+ if invFlag in flags or (file, line, invFlag, False) in newUseFlags[cpv] or (file, line, flag, True) in newUseFlags[cpv]:
+ if added: del newUseFlags[-1] # we currently added it as an extra option - delete it
+ added = True
+ jumpOut = False
+ for t in [(file, line, invFlag, False),(file, line, flag, True)]:
+ if t in newUseFlags[cpv]:
+ newUseFlags[cpv].remove(t)
+ jumpOut = True
+ break
+ if not jumpOut: newUseFlags[cpv].append((file, line, invFlag, True))
+ break
+
+ # we want to duplicate the flag --> ignore
+ elif flag in flags:
+ added = True # emulate adding
+ break
+
+ # add as an extra flag
+ else:
+ if not added: newUseFlags[cpv].append((file, line, flag, False))
+ added = True
+
+ # create a new line
+ if not added:
+ path = USE_PATH
+ if USE_PATH_IS_DIR:
+ path = os.path.join(USE_PATH, generate_path(cpv, CONFIG["usefile"]))
+ try:
+ newUseFlags[cpv].remove((path, -1, invFlag, False))
+ except ValueError: # not in UseFlags
+ newUseFlags[cpv].append((path, -1, flag, False))
+
+ newUseFlags[cpv] = unique_array(newUseFlags[cpv])
+ debug("newUseFlags: "+str(newUseFlags))
+
+def remove_new_use_flags (cpv):
+ """Removes all new use-flags for a specific package.
+
+ @param cpv: the package for which to remove the flags
+ @type cpv: string (cpv) or L{backend.Package}-object"""
+
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ try:
+ del newUseFlags[cpv]
+ except KeyError:
+ pass
+
+def get_new_use_flags (cpv):
+ """Gets all the new use-flags for a specific package.
+
+ @param cpv: the package from which to get the flags
+ @type cpv: string (cpv) or L{backend.Package}-object
+ @returns: list of flags
+ @rtype: string[]"""
+
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ list2return = []
+ try:
+ for file, line, flag, remove in newUseFlags[cpv]:
+ if remove:
+ list2return.append(invert_use_flag(flag))
+ else:
+ list2return.append(flag)
+ except KeyError:
+ pass
+
+ return list2return
+
+def write_use_flags ():
+ """This writes our changed useflags into the file."""
+ global newUseFlags, useFlags
+
+ def insert (flag, list):
+ """Shortcut for inserting a new flag right after the package-name."""
+ list.insert(1,flag)
+
+ def remove (flag, list):
+ """Removes a flag."""
+ try:
+ list.remove(flag)
+ except ValueError: # flag is given as flag\n
+ list.remove(flag+"\n")
+ list.append("\n") #re-insert the newline
+
+ # no more flags there - comment it out
+ if len(list) == 1 or list[1][0] in ("#","\n"):
+ list[0] = "#"+list[0]
+ insert("#removed by portato#",list)
+
+ file_cache = {} # cache for having to read the file only once: name->[lines]
+ for cpv in newUseFlags:
+ flagsToAdd = [] # this is used for collecting the flags to be inserted in a _new_ line
+ for file, line, flag, delete in newUseFlags[cpv]:
+ line = int(line) # it is saved as a string so far!
+
+ # add new line
+ if line == -1:
+ flagsToAdd.append(flag)
+ # change a line
+ else:
+ if not file in file_cache:
+ # read file
+ f = open(file, "r")
+ lines = []
+ i = 1
+ while i < line: # stop at the given line
+ lines.append(f.readline())
+ i += 1
+ l = f.readline().split(" ")
+
+ # delete or insert
+ if delete:
+ remove(flag,l)
+ else:
+ insert(flag,l)
+ lines.append(" ".join(l))
+
+ # read the rest
+ lines.extend(f.readlines())
+
+ file_cache[file] = lines
+ f.close()
+
+ else: # in cache
+ l = file_cache[file][line-1].split(" ")
+ if delete:
+ remove(flag,l)
+ else:
+ insert(flag,l)
+ file_cache[file][line-1] = " ".join(l)
+
+ if flagsToAdd:
+ # write new lines
+ msg = "\n#portato update#\n"
+ if CONFIG["usePerVersion"]: # add on a per-version-base
+ msg += "=%s %s\n" % (cpv, ' '.join(flagsToAdd))
+ else: # add on a per-package-base
+ list = split_package_name(cpv)
+ msg += "%s/%s %s\n" % (list[0], list[1], ' '.join(flagsToAdd))
+ if not file in file_cache:
+ f = open(file, "a")
+ f.write(msg)
+ f.close()
+ else:
+ file_cache[file].append(msg)
+
+ # write to disk
+ for file in file_cache.keys():
+ f = open(file, "w")
+ f.writelines(file_cache[file])
+ f.close()
+ # reset
+ useFlags = {}
+ newUseFlags = {}
+
+### MASKING PART ###
+MASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.mask")
+UNMASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.unmask")
+MASK_PATH_IS_DIR = os.path.isdir(MASK_PATH)
+UNMASK_PATH_IS_DIR = os.path.isdir(UNMASK_PATH)
+
+new_masked = {}
+new_unmasked = {}
+
+def set_masked (pkg, masked = True):
+ """Sets the masking status of the package.
+
+ @param pkg: the package from which to get the flags
+ @type pkg: string (cpv) or L{backend.Package}-object
+ @param masked: if True: mask it; if False: unmask it
+ @type masked: boolean"""
+
+ global new_masked, newunmasked
+
+ if not isinstance(pkg, package.Package):
+ pkg = package.Package(pkg)
+
+ cpv = pkg.get_cpv()
+
+ if not cpv in new_unmasked:
+ new_unmasked[cpv] = []
+ if not cpv in new_masked:
+ new_masked[cpv] = []
+
+ if masked:
+ link_neq = new_masked
+ link_eq = new_unmasked
+ path = UNMASK_PATH
+ else:
+ link_neq = new_unmasked
+ link_eq = new_masked
+ path = MASK_PATH
+
+ copy = link_eq[cpv]
+ for file, line in copy:
+ if line == "-1":
+ link_eq[cpv].remove((file, line))
+
+ copy = link_neq[cpv][:]
+ for file, line in copy:
+ if line != "-1":
+ link_neq[cpv].remove(file, line)
+
+ if masked == pkg.is_masked():
+ return
+
+ data = get_data(pkg, path)
+ debug("data: "+str(link_eq))
+ done = False
+ for file, line, crit, flags in data:
+ if pkg.matches(crit):
+ link_eq[cpv].append((file, line))
+ done = True
+
+ if done: return
+
+ if masked:
+ is_dir = MASK_PATH_IS_DIR
+ path = MASK_PATH
+ else:
+ is_dir = UNMASK_PATH_IS_DIR
+ path = UNMASK_PATH
+
+ if is_dir:
+ file = os.path.join(path, generate_path(cpv, CONFIG["usefile"]))
+ else:
+ file = path
+
+ link_neq[cpv].append((file, "-1"))
+ link_neq[cpv] = unique_array(link_neq[cpv])
+ debug("new_(un)masked: "+str(link_neq))
+
+def remove_new_masked (cpv):
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ try:
+ del new_masked[cpv]
+ except KeyError:
+ pass
+
+ try:
+ del new_unmasked[cpv]
+ except KeyError:
+ pass
+
+def new_masking_status (cpv):
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ if cpv in new_masked and new_masked[cpv]:
+ return "masked"
+ elif cpv in new_unmasked and new_unmasked[cpv]:
+ return "unmasked"
+ else: return None
+
+def write_masked ():
+ global new_unmasked, new_masked
+ file_cache = {}
+
+ def write(cpv, file, line):
+ line = int(line)
+ # add new line
+ if line == -1:
+ msg = "\n#portato update#\n"
+ if CONFIG["maskPerVersion"]:
+ msg += "=%s\n" % cpv
+ else:
+ list = split_package_name(cpv)
+ msg += "%s/%s\n" % (list[0],list[1])
+ if not file in file_cache:
+ f = open(file, "a")
+ f.write(msg)
+ f.close()
+ else:
+ file_cache[file].append(msg)
+ # change a line
+ else:
+ if not file in file_cache:
+ # read file
+ f = open(file, "r")
+ lines = []
+ i = 1
+ while i < line: # stop at the given line
+ lines.append(f.readline())
+ i = i+1
+ # delete
+ l = f.readline()
+ l = "#"+l[:-1]+" # removed by portato\n"
+ lines.append(l)
+
+ # read the rest
+ lines.extend(f.readlines())
+
+ file_cache[file] = lines
+ f.close()
+ else: # in cache
+ l = file_cache[file][line-1]
+ # delete:
+ l = "#"+l[:-1]+" # removed by portato\n"
+ file_cache[file][line-1] = l
+
+
+ for cpv in new_masked:
+ for file, line in new_masked[cpv]:
+ write(cpv, file, line)
+
+ for cpv in new_unmasked:
+ for file, line in new_unmasked[cpv]:
+ write(cpv, file, line)
+
+ # write to disk
+ for file in file_cache.keys():
+ f = open(file, "w")
+ f.writelines(file_cache[file])
+ f.close()
+ # reset
+ new_masked = {}
+ new_unmasked = {}
+
+### TESTING PART ###
+TESTING_PATH = os.path.join(portage.USER_CONFIG_PATH, "package.keywords")
+TESTING_PATH_IS_DIR = os.path.isdir(TESTING_PATH)
+newTesting = {}
+arch = ""
+
+def remove_new_testing (cpv):
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ try:
+ del newTesting[cpv]
+ except KeyError:
+ pass
+
+def new_testing_status (cpv):
+ if isinstance(cpv, package.Package):
+ cpv = cpv.get_cpv()
+
+ if cpv in newTesting:
+ for file, line in newTesting[cpv]:
+ if line == "-1": return False
+ else: return True
+
+ return None
+
+def set_testing (pkg, enable):
+ """Enables the package for installing when it is marked as testing (~ARCH).
+ @param pkg: the package
+ @type pkg: string (cpv) or L{backend.Package}-object
+ @param enable: controls whether to enable (True) or disable (False) for test-installing
+ @type enable: boolean"""
+
+ global arch, newTesting
+ if not isinstance(pkg, package.Package):
+ pkg = package.Package(pkg)
+
+ arch = pkg.get_settings("ARCH")
+ cpv = pkg.get_cpv()
+ if not cpv in newTesting:
+ newTesting[cpv] = []
+
+ for file, line in newTesting[cpv]:
+ if (enable and line != "-1") or (not enable and line == "-1"):
+ newTesting[cpv].remove((file, line))
+
+ if (enable and not pkg.is_testing(allowed=True)) or (not enable and pkg.is_testing(allowed=True)):
+ return
+
+ if not enable:
+ test = get_data(pkg, TESTING_PATH)
+ debug("data (test): "+str(test))
+ for file, line, crit, flags in test:
+ if pkg.matches(crit) and flags[0] == "~"+arch:
+ newTesting[cpv].append((file, line))
+ else:
+ if TESTING_PATH_IS_DIR:
+ file = os.path.join(TESTING_PATH, CONFIG["testingfile"])
+ else:
+ file = TESTING_PATH
+ newTesting[cpv].append((file, "-1"))
+
+ newTesting[cpv] = unique_array(newTesting[cpv])
+ debug("newTesting: "+str(newTesting))
+
+def write_testing ():
+ global arch, newTesting
+ file_cache = {}
+
+ for cpv in newTesting:
+ for file, line in newTesting[cpv]:
+ line = int(line)
+ # add new line
+ if line == -1:
+ msg = "\n#portato update#\n"
+ if CONFIG["testingPerVersion"]:
+ msg += "=%s ~%s\n" % (cpv, arch)
+ else:
+ list = split_package_name(cpv)
+ msg += "%s/%s ~%s\n" % (list[0],list[1],arch)
+ if not file in file_cache:
+ f = open(file, "a")
+ f.write(msg)
+ f.close()
+ else:
+ file_cache[file].append(msg)
+ # change a line
+ else:
+ if not file in file_cache:
+ # read file
+ f = open(file, "r")
+ lines = []
+ i = 1
+ while i < line: # stop at the given line
+ lines.append(f.readline())
+ i = i+1
+ # delete
+ l = f.readline()
+ l = "#"+l[:-1]+" # removed by portato\n"
+ lines.append(l)
+
+ # read the rest
+ lines.extend(f.readlines())
+
+ file_cache[file] = lines
+ f.close()
+ else: # in cache
+ l = file_cache[file][line-1]
+ # delete:
+ l = "#"+l[:-1]+" # removed by portato\n"
+ file_cache[file][line-1] = l
+
+ # write to disk
+ for file in file_cache.keys():
+ f = open(file, "w")
+ f.writelines(file_cache[file])
+ f.close()
+ # reset
+ newTesting = {}
diff --git a/portato/backend/package.py b/portato/backend/package.py
new file mode 100644
index 0000000..8b56eb5
--- /dev/null
+++ b/portato/backend/package.py
@@ -0,0 +1,324 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/package.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+from portato.backend import *
+from portato.helper import *
+from portage_helper import *
+import flags
+
+import portage, portage_dep, gentoolkit
+from portage_util import unique_array
+
+import types
+
+class Package (gentoolkit.Package):
+ """This is a subclass of the gentoolkit.Package-class which a lot of additional functionality we need in Portato."""
+
+ def __init__ (self, cpv):
+ """Constructor.
+
+ @param cpv: The cpv or gentoolkit.Package which describes the package to create.
+ @type cpv: string (cat/pkg-ver) or gentoolkit.Package-object."""
+
+ if isinstance(cpv, gentoolkit.Package):
+ cpv = cpv.get_cpv()
+ gentoolkit.Package.__init__(self, cpv)
+ try:
+ self._status = portage.getmaskingstatus(self.get_cpv(), settings = gentoolkit.settings)
+ except KeyError: # package is not located in the system
+ self._status = None
+
+ def is_in_system (self):
+ """Returns False if the package could not be found in the portage system.
+
+ @return: True if in portage system; else False
+ @rtype: boolean"""
+
+ return (self._status != None)
+
+ def is_missing_keyword(self):
+ """Returns True if the package is missing the needed keyword.
+
+ @return: True if keyword is missing; else False
+ @rtype: boolean"""
+
+ if self._status and "missing keyword" in self._status:
+ return True
+ return False
+
+ def is_testing(self, allowed = False):
+ """Checks whether a package is marked as testing.
+
+ @param allowed: Controls whether possible keywords are taken into account or not.
+ @type allowed: boolean
+ @returns: True if the package is marked as testing; else False.
+ @rtype: boolean"""
+
+ testArch = "~" + self.get_settings("ARCH")
+ if not allowed: # keywords are NOT taken into account
+ if testArch in self.get_env_var("KEYWORDS").split():
+ return True
+ return False
+
+ else: # keywords are taken into account
+ status = flags.new_testing_status(self.get_cpv())
+ if status == None: # we haven't changed it in any way
+ if self._status and testArch+" keyword" in self._status:
+ return True
+ return False
+ else:
+ return status
+
+ def set_testing(self, enable = True):
+ """Sets the actual testing status of the package.
+
+ @param enable: if True it is masked as stable; if False it is marked as testing
+ @type enable: boolean"""
+
+ flags.set_testing(self, enable)
+
+ def remove_new_testing(self):
+ """Removes possible changed testing status."""
+
+ flags.remove_new_testing(self.get_cpv())
+
+ def is_masked (self):
+ """Returns True if either masked by package.mask or by profile.
+
+ @returns: True if masked / False otherwise
+ @rtype: boolean"""
+
+ status = flags.new_masking_status(self.get_cpv())
+ if status != None: # we have locally changed it
+ if status == "masked": return True
+ elif status == "unmasked": return False
+ else:
+ debug("BUG in flags.new_masking_status. It returns",status)
+ else: # we have not touched the status
+ if self._status and ("profile" in self._status or "package.mask" in self._status):
+ return True
+ return False
+
+ def set_masked (self, masking = False):
+ """Sets the masking status of the package.
+
+ @param masking: if True: mask it; if False: unmask it
+ @type masking: boolean"""
+
+ flags.set_masked(self, masked = masking)
+
+ def remove_new_masked (self):
+ """Removes possible changed masking status."""
+
+ flags.remove_new_masked(self.get_cpv())
+
+ def get_all_use_flags (self):
+ """Returns a list of _all_ useflags for this package, i.e. all useflags you can set for this package.
+
+ @returns: list of use-flags
+ @rtype: string[]"""
+
+ return unique_array(self.get_env_var("IUSE").split())
+
+ def get_installed_use_flags (self):
+ """Returns a list of the useflags enabled at installation time. If package is not installed, it returns an empty list.
+
+ @returns: list of useflags enabled at installation time or an empty list
+ @rtype: string[]"""
+
+ if self.is_installed():
+ uses = self.get_use_flags().split() # all set at installation time
+ iuses = self.get_all_use_flags() # all you can set for the package
+ set = []
+ for u in iuses:
+ if u in uses:
+ set.append(u)
+ return set
+ else:
+ return []
+
+ def get_new_use_flags (self):
+ """Returns a list of the new useflags, i.e. these flags which are not written to the portage-system yet.
+
+ @returns: list of flags or []
+ @rtype: string[]"""
+
+ return flags.get_new_use_flags(self)
+
+ def get_actual_use_flags (self):
+ """This returns the result of installed_use_flags + new_use_flags. If the package is not installed, it returns only the new flags.
+
+ @return: list of flags
+ @rtype: string[]"""
+
+ if self.is_installed():
+ i_flags = self.get_installed_use_flags()
+ for f in self.get_new_use_flags():
+
+ if flags.invert_flag(f) in i_flags:
+ i_flags.remove(flags.invert_flag(f))
+
+ elif f not in i_flags:
+ i_flags.append(f)
+ return i_flags
+ else:
+ return self.get_new_flags()
+
+ def set_use_flag (self, flag):
+ """Set a use-flag.
+
+ @param flag: the flag to set
+ @type flag: string"""
+
+ flags.set_use_flag(self, flag)
+
+ def remove_new_use_flags (self):
+ """Remove all the new use-flags."""
+
+ flags.remove_new_use_flags(self)
+
+ def get_matched_dep_packages (self):
+ """This function looks for all dependencies which are resolved. In normal case it makes only sense for installed packages, but should work for uninstalled ones too.
+
+ @returns: unique list of dependencies resolved (with elements like "<=net-im/foobar-1.2.3")
+ @rtype: string[]"""
+
+ # change the useflags, because we have internally changed some, but not made them visible for portage
+ newUseFlags = self.get_new_use_flags()
+ actual = self.get_settings("USE").split()
+ if newUseFlags:
+ for u in newUseFlags:
+ if u[0] == "-" and flags.invert_use_flag(u) in actual:
+ actual.remove(flags.invert_use_flag(u))
+ elif u not in actual:
+ actual.append(u)
+
+ #
+ # the following stuff is mostly adapted from portage.dep_check()
+ #
+
+ depstring = self.get_env_var("RDEPEND")+" "+self.get_env_var("DEPEND")+" "+self.get_env_var("PDEPEND")
+
+ # change the parentheses into lists
+ mysplit = portage_dep.paren_reduce(depstring)
+
+ # strip off these deps we don't have a flag for
+ mysplit = portage_dep.use_reduce(mysplit, uselist = actual, masklist = [], matchall = False, excludeall = self.get_settings("ARCH"))
+
+ # move the || (or) into the lists
+ mysplit = portage_dep.dep_opconvert(mysplit)
+
+ # turn virtuals into real packages
+ mysplit = portage.dep_virtual(mysplit, self._settings)
+
+ mysplit_reduced= portage.dep_wordreduce(mysplit, self._settings, vartree.dbapi, mode = None)
+
+ retlist = []
+ def add (list, red_list):
+ """Adds the packages to retlist."""
+ for i in range(len(list)):
+ if type(list[i]) == types.ListType:
+ add(list[i], red_list[i])
+ elif list[i] == "||":
+ continue
+ else:
+ if red_list[i]:
+ retlist.append(list[i])
+
+ add(mysplit, mysplit_reduced)
+
+ return unique_array(retlist)
+
+ def get_dep_packages (self):
+ """Returns a cpv-list of packages on which this package depends and which have not been installed yet. This does not check the dependencies in a recursive manner.
+
+ @returns: list of cpvs on which the package depend
+ @rtype: string[]
+
+ @raises portato.BlockedException: when a package in the dependency-list is blocked by an installed one
+ @raises portato.PackageNotFoundException: when a package in the dependency list could not be found in the system
+ @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()"""
+
+ dep_pkgs = [] # the package list
+
+ # change the useflags, because we have internally changed some, but not made them visible for portage
+ newUseFlags = self.get_new_use_flags()
+ actual = self.get_settings("USE").split()
+ if newUseFlags:
+ for u in newUseFlags:
+ if u[0] == "-" and flags.invert_use_flag(u) in actual:
+ actual.remove(flags.invert_use_flag(u))
+ elif u not in actual:
+ actual.append(u)
+
+ # let portage do the main stuff ;)
+ # pay attention to any changes here
+ deps = portage.dep_check (self.get_env_var("RDEPEND")+" "+self.get_env_var("DEPEND")+" "+self.get_env_var("PDEPEND"), vartree.dbapi, self._settings, myuse = actual)
+
+ if not deps: # FIXME: what is the difference to [1, []] ?
+ return []
+
+ if deps[0] == 0: # error
+ raise DependencyCalcError, deps[1]
+
+ deps = deps[1]
+
+ for dep in deps:
+ if dep[0] == '!': # blocking sth
+ dep = dep[1:]
+ if dep != self.get_cp(): # not cpv, because a version might explicitly block another one
+ blocked = find_installed_packages(dep)
+ if blocked != []:
+ raise BlockedException, (self.get_cpv(), blocked[0].get_cpv())
+ continue # finished with the blocking one -> next
+
+ pkg = find_best_match(dep)
+ if not pkg: # try to find masked ones
+ list = find_packages(dep, masked = True)
+ if not list:
+ raise PackageNotFoundException, dep
+
+ list = sort_package_list(list)
+ done = False
+ for i in range(len(list)-1,0,-1):
+ p = list[i]
+ if not p.is_masked():
+ dep_pkgs.append(p.get_cpv())
+ done = True
+ break
+ if not done:
+ dep_pkgs.append(list[-1].get_cpv())
+ else:
+ dep_pkgs.append(pkg.get_cpv())
+
+ return dep_pkgs
+
+ def get_cp (self):
+ """Returns the cp-string.
+
+ @returns: category/package.
+ @rtype: string"""
+
+ return self.get_category()+"/"+self.get_name()
+
+ def matches (self, criterion):
+ """This checks, whether this package matches a specific verisioning criterion - e.g.: "<=net-im/foobar-1.2".
+
+ @param criterion: the criterion to match against
+ @type criterion: string
+ @returns: True if matches; False if not
+ @rtype: boolean"""
+
+ if portage.match_from_list(criterion, [self.get_cpv()]) == []:
+ return False
+ else:
+ return True
diff --git a/portato/backend/portage_helper.py b/portato/backend/portage_helper.py
new file mode 100644
index 0000000..6e8fc84
--- /dev/null
+++ b/portato/backend/portage_helper.py
@@ -0,0 +1,374 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/portage_helper.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+import re, os, copy
+
+import portage, gentoolkit
+from portage_util import unique_array
+
+from portato.backend import *
+import package
+
+from portato.helper import debug
+
+def find_lambda (name):
+ """Returns the function needed by all the find_all_*-functions. Returns None if no name is given.
+
+ @param name: name to build the function of
+ @type name: string
+ @returns:
+ 1. None if no name is given
+ 2. a lambda function
+ @rtype: function"""
+
+ if name != None:
+ return lambda x: re.match(".*"+name+".*",x)
+ else:
+ return lambda x: True
+
+def geneticize_list (list_of_packages):
+ """Convertes a list of gentoolkit.Packages into L{backend.Package}s.
+
+ @param list_of_packages: the list of packages
+ @type list_of_packages: list of gentoolkit.Packages
+ @returns: converted list
+ @rtype: backend.Package[]"""
+
+ return [package.Package(x) for x in list_of_packages]
+
+def find_best_match (search_key, only_installed = False):
+ """Finds the best match in the portage tree. It does not find masked packages!
+
+ @param search_key: the key to find in the portage tree
+ @type search_key: string
+ @param only_installed: if True, only installed packages are searched
+ @type only_installed: boolean
+
+ @returns: the package found or None
+ @rtype: backend.Package"""
+
+ t = None
+ if not only_installed:
+ t = porttree.dep_bestmatch(search_key)
+ else:
+ t = vartree.dep_bestmatch(search_key)
+ if t:
+ return package.Package(t)
+ return None
+
+def find_packages (search_key, masked=False):
+ """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
+
+ @param search_key: the key to look for
+ @type search_key: string
+ @param masked: if True, also look for masked packages
+ @type masked: boolean
+
+ @returns: list of found packages
+ @rtype: backend.Package[]"""
+
+ return geneticize_list(gentoolkit.find_packages(search_key, masked))
+
+def find_installed_packages (search_key, masked=False):
+ """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
+
+ @param search_key: the key to look for
+ @type search_key: string
+ @param masked: if True, also look for masked packages
+ @type masked: boolean
+
+ @returns: list of found packages
+ @rtype: backend.Package[]"""
+
+ return geneticize_list(gentoolkit.find_installed_packages(search_key, masked))
+
+def find_system_packages ():
+ """Looks for all packages saved as "system-packages".
+
+ @returns: a tuple of (resolved_packages, unresolved_packages).
+ @rtype: (backend.Package[], backend.Package[])"""
+
+ list = gentoolkit.find_system_packages()
+ return (geneticize_list(list[0]), geneticize_list(list[1]))
+
+def find_world_packages ():
+ """Looks for all packages saved in the world-file.
+
+ @returns: a tuple of (resolved_packages, unresolved_packages).
+ @rtype: (backend.Package[], backend.Package[])"""
+
+ list = gentoolkit.find_world_packages()
+ return geneticize_list(list[0]),geneticize_list(list[1])
+
+def find_all_installed_packages (name=None, withVersion=True):
+ """Finds all installed packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
+ @type withVersion: boolean
+
+ @returns: all packages/cp-strings found
+ @rtype: backend.Package[] or cp-string[]"""
+
+ if withVersion:
+ return geneticize_list(gentoolkit.find_all_installed_packages(find_lambda(name)))
+ else:
+ t = vartree.dbapi.cp_all()
+ if name:
+ t = filter(find_lambda(name),t)
+ return t
+
+def find_all_uninstalled_packages (name=None):
+ """Finds all uninstalled packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ return geneticize_list(gentoolkit.find_all_uninstalled_packages(find_lambda(name)))
+
+def find_all_packages (name=None, withVersion=True):
+ """Finds all packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
+ @type withVersion: boolean
+
+ @returns: all packages/cp-strings found
+ @rtype: backend.Package[] or cp-string[]"""
+
+ if (withVersion):
+ return geneticize_list(gentoolkit.find_all_packages(find_lambda(name)))
+ else:
+ t = porttree.dbapi.cp_all()
+ t += vartree.dbapi.cp_all()
+ t = unique_array(t)
+ if name:
+ t = filter(find_lambda(name),t)
+ return t
+
+def find_all_world_packages (name=None):
+ """Finds all world packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ world = filter(find_lambda(name), [x.get_cpv() for x in find_world_packages()[0]])
+ world = unique_array(world)
+ return geneticize_list(world)
+
+def find_all_system_packages (name=None):
+ """Finds all system packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ sys = filter(find_lambda(name), [x.get_cpv() for x in find_system_packages()[0]])
+ sys = unique_array(sys)
+ return geneticize_list(sys)
+
+def get_all_versions (cp):
+ """Returns all versions of a certain package.
+
+ @param cp: the package
+ @type cp: string (cat/pkg)
+ @returns: the list of found packages
+ @rtype: backend.Package[]"""
+
+ t = porttree.dbapi.cp_list(cp)
+ t += vartree.dbapi.cp_list(cp)
+ t = unique_array(t)
+ return geneticize_list(t)
+
+def get_all_installed_versions (cp):
+ """Returns all installed versions of a certain package.
+
+ @param cp: the package
+ @type cp: string (cat/pkg)
+ @returns: the list of found packages
+ @rtype: backend.Package[]"""
+
+ return geneticize_list(vartree.dbapi.cp_list(cp))
+
+def list_categories (name=None):
+ """Finds all categories matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned
+ @type name: string or None
+ @returns: all categories found
+ @rtype: string[]"""
+
+ categories = gentoolkit.settings.categories
+ return filter(find_lambda(name), categories)
+
+def split_package_name (name):
+ """Splits a package name in its elements.
+
+ @param name: name to split
+ @type name: string
+ @returns: list: [category, name, version, rev] whereby rev is "r0" if not specified in the name
+ @rtype: string[]"""
+
+ return gentoolkit.split_package_name(name)
+
+def sort_package_list(pkglist):
+ """Sorts a package list in the same manner portage does.
+
+ @param pkglist: list to sort
+ @type pkglist: Packages[]"""
+
+ return gentoolkit.sort_package_list(pkglist)
+
+def reload_settings ():
+ """Reloads portage."""
+ gentoolkit.settings = portage.config(config_incrementals = copy.deepcopy(gentoolkit.settings.incrementals))
+
+def update_world (newuse = False, deep = False):
+ """Calculates the packages to get updated in an update world.
+
+ @param newuse: Checks if a use-flag has a different state then to install time.
+ @type newuse: boolean
+ @param deep: Not only check world packages but also there dependencies.
+ @type deep: boolean
+ @returns: a list containing of the tuple (new_package, old_package)
+ @rtype: (backend.Package, backend.Package)[]"""
+
+ # read world file
+ world = open(portage.WORLD_FILE)
+ packages = []
+ for line in world:
+ line = line.strip()
+ if not len(line): continue # empty line
+ if line[0] == "#": continue
+ packages.append(line)
+ world.close()
+
+ sys = gentoolkit.settings.packages
+ for x in sys:
+ if x[0] == "*":
+ x = x[1:]
+ packages.append(x.strip())
+
+ # Remove everything that is package.provided from our list
+ # This is copied from emerge.getlist()
+ for atom in packages[:]:
+ for expanded_atom in portage.flatten(portage.dep_virtual([atom], gentoolkit.settings)):
+ mykey = portage.dep_getkey(expanded_atom)
+ if mykey in gentoolkit.settings.pprovideddict and portage.match_from_list(expanded_atom, settings.pprovideddict[mykey]):
+ packages.remove(atom)
+ break
+
+ packages = [find_best_match(x) for x in packages]
+
+ checked = []
+ updating = []
+ raw_checked = []
+ def check (p):
+ """Checks whether a package is updated or not."""
+ if p.get_cp() in checked: return
+ else: checked.append(p.get_cp())
+
+ appended = False
+ tempDeep = False
+
+ if not p.is_installed():
+ oldList = find_installed_packages(p.get_cp())
+ if oldList:
+ old = oldList[0] # assume we have only one there; FIXME: slotted packages
+ else:
+ debug("Bug? Not found installed one:",p.get_cp())
+ return
+ updating.append((p, old))
+ appended = True
+ p = old
+
+ if newuse:
+ old = p.get_installed_use_flags()
+ new = p.get_settings("USE").split()
+
+ for u in p.get_all_use_flags():
+ if (u in new) != (u in old):
+ if not appended:
+ updating.append((p,p))
+ tempDeep = True
+
+ if deep or tempDeep:
+ for i in p.get_matched_dep_packages():
+ if i not in raw_checked:
+ raw_checked.append(i)
+ bm = find_best_match(i)
+ if not bm:
+ debug("Bug? No best match could be found:",i)
+ else:
+ check(bm)
+
+ for p in packages:
+ if not p: continue # if a masked package is installed we have "None" here
+ check(p)
+
+ return updating
+
+use_descs = {}
+local_use_descs = {}
+def get_use_desc (flag, package = None):
+ """Returns the description of a specific useflag or None if no desc was found.
+ If a package is given (in the <cat>/<name> format) the local use descriptions are searched too.
+
+ @param flag: flag to get the description for
+ @type flag: string
+ @param package: name of a package: if given local use descriptions are searched too
+ @type package: cp-string
+ @returns: found description
+ @rtype: string"""
+
+ # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled.
+
+ # fill cache if needed
+ if use_descs == {} or local_use_descs == {}:
+ # read use.desc
+ fd = open(gentoolkit.settings["PORTDIR"]+"/profiles/use.desc")
+ for line in fd.readlines():
+ line = line.strip()
+ if line != "" and line[0] != '#':
+ fields = [x.strip() for x in line.split(" - ",1)]
+ if len(fields) == 2:
+ use_descs[fields[0]] = fields[1]
+
+ # read use.local.desc
+ fd = open(gentoolkit.settings["PORTDIR"]+"/profiles/use.local.desc")
+ for line in fd.readlines():
+ line = line.strip()
+ if line != "" and line[0] != '#':
+ fields = [x.strip() for x in line.split(":",1)]
+ if len(fields) == 2:
+ if not fields[0] in local_use_descs: # create
+ local_use_descs[fields[0]] = {}
+ subfields = [x.strip() for x in fields[1].split(" - ",1)]
+ if len(subfields) == 2:
+ local_use_descs[fields[0]][subfields[0]] = subfields[1]
+
+ # start
+ desc = None
+ if flag in use_descs:
+ desc = use_descs[flag]
+ if package != None:
+ if package in local_use_descs:
+ if flag in local_use_descs[package]:
+ desc = local_use_descs[package][flag]
+ return desc