diff options
Diffstat (limited to 'portato/backend')
-rw-r--r-- | portato/backend/__init__.py | 33 | ||||
-rw-r--r-- | portato/backend/exceptions.py | 23 | ||||
-rw-r--r-- | portato/backend/flags.py | 617 | ||||
-rw-r--r-- | portato/backend/package.py | 324 | ||||
-rw-r--r-- | portato/backend/portage_helper.py | 374 |
5 files changed, 1371 insertions, 0 deletions
diff --git a/portato/backend/__init__.py b/portato/backend/__init__.py new file mode 100644 index 0000000..55cb10b --- /dev/null +++ b/portato/backend/__init__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/__init__.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +import sys + +# insert the gentoolkit-location into syspath +sys.path.insert(0, "/usr/lib/gentoolkit/pym") + +# import gentoolkit and portage +import gentoolkit +import portage + +# this is set to "var/lib/portage/world" by default - so we add the leading / +portage.WORLD_FILE = portage.settings["ROOT"]+portage.WORLD_FILE +portage.settings = None # we use our own one ... + +# portage tree vars +porttree = gentoolkit.porttree +vartree = gentoolkit.vartree + +# import our packages +from exceptions import * +from package import * +from portage_helper import * diff --git a/portato/backend/exceptions.py b/portato/backend/exceptions.py new file mode 100644 index 0000000..a5cb2fb --- /dev/null +++ b/portato/backend/exceptions.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/exceptions.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +class BlockedException (Exception): + """An exception marking, that some package is blocking another one.""" + pass + +class PackageNotFoundException (Exception): + """An exception marking that a package could not be found.""" + pass + +class DependencyCalcError (Exception): + """An error occured during dependency calculation.""" + pass diff --git a/portato/backend/flags.py b/portato/backend/flags.py new file mode 100644 index 0000000..d1187c9 --- /dev/null +++ b/portato/backend/flags.py @@ -0,0 +1,617 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/flags.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +import os +import os.path +from subprocess import Popen, PIPE # needed for grep + +from portato.helper import * +from portage_helper import split_package_name +import package + +import portage +from portage_util import unique_array + +CONFIG = { + "usefile" : "portato", + "maskfile" : "portato", + "testingfile" : "portato", + "usePerVersion" : True, + "maskPerVersion" : True, + "testingPerVersion" : True + } + +### GENERAL PART ### + +def grep (pkg, path): + """Grep runs "egrep" on a given path and looks for occurences of a given package. + @param pkg: the package + @type pkg: string (cpv) or L{backend.Package}-object + @param path: path to look in + @type path: string + + @returns: occurences of pkg in the format: "file:line-no:complete_line_found" + @rtype: string""" + + if not isinstance(pkg, package.Package): + pkg = package.Package(pkg) # assume it is a cpv or a gentoolkit.Package + + command = "egrep -x -n -r -H '^[<>!=~]{0,2}%s(-[0-9].*)?[[:space:]]?.*$' %s" # %s is replaced in the next line ;) + return Popen((command % (pkg.get_cp(), path)), shell = True, stdout = PIPE).communicate()[0].splitlines() + +def get_data(pkg, path): + """This splits up the data of L{grep} and builds tuples in the format (file,line,criterion,list_of_flags). + @param pkg: package to find + @type pkg: string (cpv) or L{backend.Package}-object + @param path: path to look in + @type path: string + + @returns: a list of tuples in the form (file,line,criterion,list_of_flags) + @rtype: (string,string,string,string[])[]""" + + flags = [] + + # do grep + list = grep(pkg, path) + + for i in range(len(list)): + file, line, fl = tuple(list[i].split(":")) # get file, line and flag-list + fl = fl.split() + crit = fl[0] + fl = fl[1:] + # stop after first comment + for j in range(len(fl)): + if fl[j][0] == "#": # comment - stop here + fl = fl[:j] + break + flags.append((file,line,crit,fl)) + + return flags + +def set_config (cfg): + """This function sets the CONFIG-variable for the whole module. Use this instead of modifying L{CONFIG} directly. + @param cfg: a dictionary with at least all the keys of the CONFIG-var + @type cfg: dict + @raises KeyError: if a keyword is missing in the new cfg""" + + for i in CONFIG.keys(): + if not i in cfg: + raise KeyError, "Missing keyword in config: "+i + + for i in CONFIG: + CONFIG[i] = cfg[i] + +def generate_path (cpv, exp): + """Generates the correct path out of given wildcards. + These wildcards can be: + - $(cat) : category + - $(cat-1): first part of the category (e.g. "app") + - $(cat-2): second part of the category + - $(pkg) : name of the package + + @param cpv: the cpv of the current package + @type cpv: string (cat/pkg-ver) + @param exp: the expression to render the path from + @type exp: string + @returns: rendered path + @rtype string""" + + cat, pkg, ver, rev = split_package_name(cpv) + + if exp.find("$(") != -1: + exp = exp.replace("$(cat)",cat).\ + replace("$(pkg)",pkg).\ + replace("$(cat-1)",cat.split("-")[0]).\ + replace("$(cat-2)",cat.split("-")[1]) + return exp + +### USE FLAG PART ### +USE_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.use") +USE_PATH_IS_DIR = os.path.isdir(USE_PATH) +useFlags = {} # useFlags in the file +newUseFlags = {} # useFlags as we want them to be: format: cpv -> [(file, line, useflag, (true if removed from list / false if added))] + +def invert_use_flag (flag): + """Invertes a flag. + + >>> invert_use_flag("foo") + -foo + >>> invert_use_flag("-bar") + bar + + @param flag: the flag + @type flag: string + @returns: inverted flag + @rtype: string + """ + + if flag[0] == "-": + return flag[1:] + else: + return "-"+flag + +def set_use_flag (pkg, flag): + """Sets the useflag for a given package. + + @param pkg: the package + @type pkg: string (cpv) or L{backend.Package}-object + @param flag: the flag to set + @type flag: string""" + + global useFlags, newUseFlags + + if not isinstance(pkg, package.Package): + pkg = package.Package(pkg) # assume cpv or gentoolkit.Package + + cpv = pkg.get_cpv() + invFlag = invert_use_flag(flag) + + # if not saved in useFlags, get it by calling get_data() which calls grep() + data = None + if not cpv in useFlags: + data = get_data(pkg, USE_PATH) + useFlags[cpv] = data + else: + data = useFlags[cpv] + + if not cpv in newUseFlags: + newUseFlags[cpv] = [] + + debug("data: "+str(data)) + # add a useflag / delete one + added = False + for file, line, crit, flags in data: + if pkg.matches(crit): + # we have the inverted flag in the uselist/newuselist --> delete it + if invFlag in flags or (file, line, invFlag, False) in newUseFlags[cpv] or (file, line, flag, True) in newUseFlags[cpv]: + if added: del newUseFlags[-1] # we currently added it as an extra option - delete it + added = True + jumpOut = False + for t in [(file, line, invFlag, False),(file, line, flag, True)]: + if t in newUseFlags[cpv]: + newUseFlags[cpv].remove(t) + jumpOut = True + break + if not jumpOut: newUseFlags[cpv].append((file, line, invFlag, True)) + break + + # we want to duplicate the flag --> ignore + elif flag in flags: + added = True # emulate adding + break + + # add as an extra flag + else: + if not added: newUseFlags[cpv].append((file, line, flag, False)) + added = True + + # create a new line + if not added: + path = USE_PATH + if USE_PATH_IS_DIR: + path = os.path.join(USE_PATH, generate_path(cpv, CONFIG["usefile"])) + try: + newUseFlags[cpv].remove((path, -1, invFlag, False)) + except ValueError: # not in UseFlags + newUseFlags[cpv].append((path, -1, flag, False)) + + newUseFlags[cpv] = unique_array(newUseFlags[cpv]) + debug("newUseFlags: "+str(newUseFlags)) + +def remove_new_use_flags (cpv): + """Removes all new use-flags for a specific package. + + @param cpv: the package for which to remove the flags + @type cpv: string (cpv) or L{backend.Package}-object""" + + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + try: + del newUseFlags[cpv] + except KeyError: + pass + +def get_new_use_flags (cpv): + """Gets all the new use-flags for a specific package. + + @param cpv: the package from which to get the flags + @type cpv: string (cpv) or L{backend.Package}-object + @returns: list of flags + @rtype: string[]""" + + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + list2return = [] + try: + for file, line, flag, remove in newUseFlags[cpv]: + if remove: + list2return.append(invert_use_flag(flag)) + else: + list2return.append(flag) + except KeyError: + pass + + return list2return + +def write_use_flags (): + """This writes our changed useflags into the file.""" + global newUseFlags, useFlags + + def insert (flag, list): + """Shortcut for inserting a new flag right after the package-name.""" + list.insert(1,flag) + + def remove (flag, list): + """Removes a flag.""" + try: + list.remove(flag) + except ValueError: # flag is given as flag\n + list.remove(flag+"\n") + list.append("\n") #re-insert the newline + + # no more flags there - comment it out + if len(list) == 1 or list[1][0] in ("#","\n"): + list[0] = "#"+list[0] + insert("#removed by portato#",list) + + file_cache = {} # cache for having to read the file only once: name->[lines] + for cpv in newUseFlags: + flagsToAdd = [] # this is used for collecting the flags to be inserted in a _new_ line + for file, line, flag, delete in newUseFlags[cpv]: + line = int(line) # it is saved as a string so far! + + # add new line + if line == -1: + flagsToAdd.append(flag) + # change a line + else: + if not file in file_cache: + # read file + f = open(file, "r") + lines = [] + i = 1 + while i < line: # stop at the given line + lines.append(f.readline()) + i += 1 + l = f.readline().split(" ") + + # delete or insert + if delete: + remove(flag,l) + else: + insert(flag,l) + lines.append(" ".join(l)) + + # read the rest + lines.extend(f.readlines()) + + file_cache[file] = lines + f.close() + + else: # in cache + l = file_cache[file][line-1].split(" ") + if delete: + remove(flag,l) + else: + insert(flag,l) + file_cache[file][line-1] = " ".join(l) + + if flagsToAdd: + # write new lines + msg = "\n#portato update#\n" + if CONFIG["usePerVersion"]: # add on a per-version-base + msg += "=%s %s\n" % (cpv, ' '.join(flagsToAdd)) + else: # add on a per-package-base + list = split_package_name(cpv) + msg += "%s/%s %s\n" % (list[0], list[1], ' '.join(flagsToAdd)) + if not file in file_cache: + f = open(file, "a") + f.write(msg) + f.close() + else: + file_cache[file].append(msg) + + # write to disk + for file in file_cache.keys(): + f = open(file, "w") + f.writelines(file_cache[file]) + f.close() + # reset + useFlags = {} + newUseFlags = {} + +### MASKING PART ### +MASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.mask") +UNMASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.unmask") +MASK_PATH_IS_DIR = os.path.isdir(MASK_PATH) +UNMASK_PATH_IS_DIR = os.path.isdir(UNMASK_PATH) + +new_masked = {} +new_unmasked = {} + +def set_masked (pkg, masked = True): + """Sets the masking status of the package. + + @param pkg: the package from which to get the flags + @type pkg: string (cpv) or L{backend.Package}-object + @param masked: if True: mask it; if False: unmask it + @type masked: boolean""" + + global new_masked, newunmasked + + if not isinstance(pkg, package.Package): + pkg = package.Package(pkg) + + cpv = pkg.get_cpv() + + if not cpv in new_unmasked: + new_unmasked[cpv] = [] + if not cpv in new_masked: + new_masked[cpv] = [] + + if masked: + link_neq = new_masked + link_eq = new_unmasked + path = UNMASK_PATH + else: + link_neq = new_unmasked + link_eq = new_masked + path = MASK_PATH + + copy = link_eq[cpv] + for file, line in copy: + if line == "-1": + link_eq[cpv].remove((file, line)) + + copy = link_neq[cpv][:] + for file, line in copy: + if line != "-1": + link_neq[cpv].remove(file, line) + + if masked == pkg.is_masked(): + return + + data = get_data(pkg, path) + debug("data: "+str(link_eq)) + done = False + for file, line, crit, flags in data: + if pkg.matches(crit): + link_eq[cpv].append((file, line)) + done = True + + if done: return + + if masked: + is_dir = MASK_PATH_IS_DIR + path = MASK_PATH + else: + is_dir = UNMASK_PATH_IS_DIR + path = UNMASK_PATH + + if is_dir: + file = os.path.join(path, generate_path(cpv, CONFIG["usefile"])) + else: + file = path + + link_neq[cpv].append((file, "-1")) + link_neq[cpv] = unique_array(link_neq[cpv]) + debug("new_(un)masked: "+str(link_neq)) + +def remove_new_masked (cpv): + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + try: + del new_masked[cpv] + except KeyError: + pass + + try: + del new_unmasked[cpv] + except KeyError: + pass + +def new_masking_status (cpv): + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + if cpv in new_masked and new_masked[cpv]: + return "masked" + elif cpv in new_unmasked and new_unmasked[cpv]: + return "unmasked" + else: return None + +def write_masked (): + global new_unmasked, new_masked + file_cache = {} + + def write(cpv, file, line): + line = int(line) + # add new line + if line == -1: + msg = "\n#portato update#\n" + if CONFIG["maskPerVersion"]: + msg += "=%s\n" % cpv + else: + list = split_package_name(cpv) + msg += "%s/%s\n" % (list[0],list[1]) + if not file in file_cache: + f = open(file, "a") + f.write(msg) + f.close() + else: + file_cache[file].append(msg) + # change a line + else: + if not file in file_cache: + # read file + f = open(file, "r") + lines = [] + i = 1 + while i < line: # stop at the given line + lines.append(f.readline()) + i = i+1 + # delete + l = f.readline() + l = "#"+l[:-1]+" # removed by portato\n" + lines.append(l) + + # read the rest + lines.extend(f.readlines()) + + file_cache[file] = lines + f.close() + else: # in cache + l = file_cache[file][line-1] + # delete: + l = "#"+l[:-1]+" # removed by portato\n" + file_cache[file][line-1] = l + + + for cpv in new_masked: + for file, line in new_masked[cpv]: + write(cpv, file, line) + + for cpv in new_unmasked: + for file, line in new_unmasked[cpv]: + write(cpv, file, line) + + # write to disk + for file in file_cache.keys(): + f = open(file, "w") + f.writelines(file_cache[file]) + f.close() + # reset + new_masked = {} + new_unmasked = {} + +### TESTING PART ### +TESTING_PATH = os.path.join(portage.USER_CONFIG_PATH, "package.keywords") +TESTING_PATH_IS_DIR = os.path.isdir(TESTING_PATH) +newTesting = {} +arch = "" + +def remove_new_testing (cpv): + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + try: + del newTesting[cpv] + except KeyError: + pass + +def new_testing_status (cpv): + if isinstance(cpv, package.Package): + cpv = cpv.get_cpv() + + if cpv in newTesting: + for file, line in newTesting[cpv]: + if line == "-1": return False + else: return True + + return None + +def set_testing (pkg, enable): + """Enables the package for installing when it is marked as testing (~ARCH). + @param pkg: the package + @type pkg: string (cpv) or L{backend.Package}-object + @param enable: controls whether to enable (True) or disable (False) for test-installing + @type enable: boolean""" + + global arch, newTesting + if not isinstance(pkg, package.Package): + pkg = package.Package(pkg) + + arch = pkg.get_settings("ARCH") + cpv = pkg.get_cpv() + if not cpv in newTesting: + newTesting[cpv] = [] + + for file, line in newTesting[cpv]: + if (enable and line != "-1") or (not enable and line == "-1"): + newTesting[cpv].remove((file, line)) + + if (enable and not pkg.is_testing(allowed=True)) or (not enable and pkg.is_testing(allowed=True)): + return + + if not enable: + test = get_data(pkg, TESTING_PATH) + debug("data (test): "+str(test)) + for file, line, crit, flags in test: + if pkg.matches(crit) and flags[0] == "~"+arch: + newTesting[cpv].append((file, line)) + else: + if TESTING_PATH_IS_DIR: + file = os.path.join(TESTING_PATH, CONFIG["testingfile"]) + else: + file = TESTING_PATH + newTesting[cpv].append((file, "-1")) + + newTesting[cpv] = unique_array(newTesting[cpv]) + debug("newTesting: "+str(newTesting)) + +def write_testing (): + global arch, newTesting + file_cache = {} + + for cpv in newTesting: + for file, line in newTesting[cpv]: + line = int(line) + # add new line + if line == -1: + msg = "\n#portato update#\n" + if CONFIG["testingPerVersion"]: + msg += "=%s ~%s\n" % (cpv, arch) + else: + list = split_package_name(cpv) + msg += "%s/%s ~%s\n" % (list[0],list[1],arch) + if not file in file_cache: + f = open(file, "a") + f.write(msg) + f.close() + else: + file_cache[file].append(msg) + # change a line + else: + if not file in file_cache: + # read file + f = open(file, "r") + lines = [] + i = 1 + while i < line: # stop at the given line + lines.append(f.readline()) + i = i+1 + # delete + l = f.readline() + l = "#"+l[:-1]+" # removed by portato\n" + lines.append(l) + + # read the rest + lines.extend(f.readlines()) + + file_cache[file] = lines + f.close() + else: # in cache + l = file_cache[file][line-1] + # delete: + l = "#"+l[:-1]+" # removed by portato\n" + file_cache[file][line-1] = l + + # write to disk + for file in file_cache.keys(): + f = open(file, "w") + f.writelines(file_cache[file]) + f.close() + # reset + newTesting = {} diff --git a/portato/backend/package.py b/portato/backend/package.py new file mode 100644 index 0000000..8b56eb5 --- /dev/null +++ b/portato/backend/package.py @@ -0,0 +1,324 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/package.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +from portato.backend import * +from portato.helper import * +from portage_helper import * +import flags + +import portage, portage_dep, gentoolkit +from portage_util import unique_array + +import types + +class Package (gentoolkit.Package): + """This is a subclass of the gentoolkit.Package-class which a lot of additional functionality we need in Portato.""" + + def __init__ (self, cpv): + """Constructor. + + @param cpv: The cpv or gentoolkit.Package which describes the package to create. + @type cpv: string (cat/pkg-ver) or gentoolkit.Package-object.""" + + if isinstance(cpv, gentoolkit.Package): + cpv = cpv.get_cpv() + gentoolkit.Package.__init__(self, cpv) + try: + self._status = portage.getmaskingstatus(self.get_cpv(), settings = gentoolkit.settings) + except KeyError: # package is not located in the system + self._status = None + + def is_in_system (self): + """Returns False if the package could not be found in the portage system. + + @return: True if in portage system; else False + @rtype: boolean""" + + return (self._status != None) + + def is_missing_keyword(self): + """Returns True if the package is missing the needed keyword. + + @return: True if keyword is missing; else False + @rtype: boolean""" + + if self._status and "missing keyword" in self._status: + return True + return False + + def is_testing(self, allowed = False): + """Checks whether a package is marked as testing. + + @param allowed: Controls whether possible keywords are taken into account or not. + @type allowed: boolean + @returns: True if the package is marked as testing; else False. + @rtype: boolean""" + + testArch = "~" + self.get_settings("ARCH") + if not allowed: # keywords are NOT taken into account + if testArch in self.get_env_var("KEYWORDS").split(): + return True + return False + + else: # keywords are taken into account + status = flags.new_testing_status(self.get_cpv()) + if status == None: # we haven't changed it in any way + if self._status and testArch+" keyword" in self._status: + return True + return False + else: + return status + + def set_testing(self, enable = True): + """Sets the actual testing status of the package. + + @param enable: if True it is masked as stable; if False it is marked as testing + @type enable: boolean""" + + flags.set_testing(self, enable) + + def remove_new_testing(self): + """Removes possible changed testing status.""" + + flags.remove_new_testing(self.get_cpv()) + + def is_masked (self): + """Returns True if either masked by package.mask or by profile. + + @returns: True if masked / False otherwise + @rtype: boolean""" + + status = flags.new_masking_status(self.get_cpv()) + if status != None: # we have locally changed it + if status == "masked": return True + elif status == "unmasked": return False + else: + debug("BUG in flags.new_masking_status. It returns",status) + else: # we have not touched the status + if self._status and ("profile" in self._status or "package.mask" in self._status): + return True + return False + + def set_masked (self, masking = False): + """Sets the masking status of the package. + + @param masking: if True: mask it; if False: unmask it + @type masking: boolean""" + + flags.set_masked(self, masked = masking) + + def remove_new_masked (self): + """Removes possible changed masking status.""" + + flags.remove_new_masked(self.get_cpv()) + + def get_all_use_flags (self): + """Returns a list of _all_ useflags for this package, i.e. all useflags you can set for this package. + + @returns: list of use-flags + @rtype: string[]""" + + return unique_array(self.get_env_var("IUSE").split()) + + def get_installed_use_flags (self): + """Returns a list of the useflags enabled at installation time. If package is not installed, it returns an empty list. + + @returns: list of useflags enabled at installation time or an empty list + @rtype: string[]""" + + if self.is_installed(): + uses = self.get_use_flags().split() # all set at installation time + iuses = self.get_all_use_flags() # all you can set for the package + set = [] + for u in iuses: + if u in uses: + set.append(u) + return set + else: + return [] + + def get_new_use_flags (self): + """Returns a list of the new useflags, i.e. these flags which are not written to the portage-system yet. + + @returns: list of flags or [] + @rtype: string[]""" + + return flags.get_new_use_flags(self) + + def get_actual_use_flags (self): + """This returns the result of installed_use_flags + new_use_flags. If the package is not installed, it returns only the new flags. + + @return: list of flags + @rtype: string[]""" + + if self.is_installed(): + i_flags = self.get_installed_use_flags() + for f in self.get_new_use_flags(): + + if flags.invert_flag(f) in i_flags: + i_flags.remove(flags.invert_flag(f)) + + elif f not in i_flags: + i_flags.append(f) + return i_flags + else: + return self.get_new_flags() + + def set_use_flag (self, flag): + """Set a use-flag. + + @param flag: the flag to set + @type flag: string""" + + flags.set_use_flag(self, flag) + + def remove_new_use_flags (self): + """Remove all the new use-flags.""" + + flags.remove_new_use_flags(self) + + def get_matched_dep_packages (self): + """This function looks for all dependencies which are resolved. In normal case it makes only sense for installed packages, but should work for uninstalled ones too. + + @returns: unique list of dependencies resolved (with elements like "<=net-im/foobar-1.2.3") + @rtype: string[]""" + + # change the useflags, because we have internally changed some, but not made them visible for portage + newUseFlags = self.get_new_use_flags() + actual = self.get_settings("USE").split() + if newUseFlags: + for u in newUseFlags: + if u[0] == "-" and flags.invert_use_flag(u) in actual: + actual.remove(flags.invert_use_flag(u)) + elif u not in actual: + actual.append(u) + + # + # the following stuff is mostly adapted from portage.dep_check() + # + + depstring = self.get_env_var("RDEPEND")+" "+self.get_env_var("DEPEND")+" "+self.get_env_var("PDEPEND") + + # change the parentheses into lists + mysplit = portage_dep.paren_reduce(depstring) + + # strip off these deps we don't have a flag for + mysplit = portage_dep.use_reduce(mysplit, uselist = actual, masklist = [], matchall = False, excludeall = self.get_settings("ARCH")) + + # move the || (or) into the lists + mysplit = portage_dep.dep_opconvert(mysplit) + + # turn virtuals into real packages + mysplit = portage.dep_virtual(mysplit, self._settings) + + mysplit_reduced= portage.dep_wordreduce(mysplit, self._settings, vartree.dbapi, mode = None) + + retlist = [] + def add (list, red_list): + """Adds the packages to retlist.""" + for i in range(len(list)): + if type(list[i]) == types.ListType: + add(list[i], red_list[i]) + elif list[i] == "||": + continue + else: + if red_list[i]: + retlist.append(list[i]) + + add(mysplit, mysplit_reduced) + + return unique_array(retlist) + + def get_dep_packages (self): + """Returns a cpv-list of packages on which this package depends and which have not been installed yet. This does not check the dependencies in a recursive manner. + + @returns: list of cpvs on which the package depend + @rtype: string[] + + @raises portato.BlockedException: when a package in the dependency-list is blocked by an installed one + @raises portato.PackageNotFoundException: when a package in the dependency list could not be found in the system + @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()""" + + dep_pkgs = [] # the package list + + # change the useflags, because we have internally changed some, but not made them visible for portage + newUseFlags = self.get_new_use_flags() + actual = self.get_settings("USE").split() + if newUseFlags: + for u in newUseFlags: + if u[0] == "-" and flags.invert_use_flag(u) in actual: + actual.remove(flags.invert_use_flag(u)) + elif u not in actual: + actual.append(u) + + # let portage do the main stuff ;) + # pay attention to any changes here + deps = portage.dep_check (self.get_env_var("RDEPEND")+" "+self.get_env_var("DEPEND")+" "+self.get_env_var("PDEPEND"), vartree.dbapi, self._settings, myuse = actual) + + if not deps: # FIXME: what is the difference to [1, []] ? + return [] + + if deps[0] == 0: # error + raise DependencyCalcError, deps[1] + + deps = deps[1] + + for dep in deps: + if dep[0] == '!': # blocking sth + dep = dep[1:] + if dep != self.get_cp(): # not cpv, because a version might explicitly block another one + blocked = find_installed_packages(dep) + if blocked != []: + raise BlockedException, (self.get_cpv(), blocked[0].get_cpv()) + continue # finished with the blocking one -> next + + pkg = find_best_match(dep) + if not pkg: # try to find masked ones + list = find_packages(dep, masked = True) + if not list: + raise PackageNotFoundException, dep + + list = sort_package_list(list) + done = False + for i in range(len(list)-1,0,-1): + p = list[i] + if not p.is_masked(): + dep_pkgs.append(p.get_cpv()) + done = True + break + if not done: + dep_pkgs.append(list[-1].get_cpv()) + else: + dep_pkgs.append(pkg.get_cpv()) + + return dep_pkgs + + def get_cp (self): + """Returns the cp-string. + + @returns: category/package. + @rtype: string""" + + return self.get_category()+"/"+self.get_name() + + def matches (self, criterion): + """This checks, whether this package matches a specific verisioning criterion - e.g.: "<=net-im/foobar-1.2". + + @param criterion: the criterion to match against + @type criterion: string + @returns: True if matches; False if not + @rtype: boolean""" + + if portage.match_from_list(criterion, [self.get_cpv()]) == []: + return False + else: + return True diff --git a/portato/backend/portage_helper.py b/portato/backend/portage_helper.py new file mode 100644 index 0000000..6e8fc84 --- /dev/null +++ b/portato/backend/portage_helper.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/portage_helper.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +import re, os, copy + +import portage, gentoolkit +from portage_util import unique_array + +from portato.backend import * +import package + +from portato.helper import debug + +def find_lambda (name): + """Returns the function needed by all the find_all_*-functions. Returns None if no name is given. + + @param name: name to build the function of + @type name: string + @returns: + 1. None if no name is given + 2. a lambda function + @rtype: function""" + + if name != None: + return lambda x: re.match(".*"+name+".*",x) + else: + return lambda x: True + +def geneticize_list (list_of_packages): + """Convertes a list of gentoolkit.Packages into L{backend.Package}s. + + @param list_of_packages: the list of packages + @type list_of_packages: list of gentoolkit.Packages + @returns: converted list + @rtype: backend.Package[]""" + + return [package.Package(x) for x in list_of_packages] + +def find_best_match (search_key, only_installed = False): + """Finds the best match in the portage tree. It does not find masked packages! + + @param search_key: the key to find in the portage tree + @type search_key: string + @param only_installed: if True, only installed packages are searched + @type only_installed: boolean + + @returns: the package found or None + @rtype: backend.Package""" + + t = None + if not only_installed: + t = porttree.dep_bestmatch(search_key) + else: + t = vartree.dep_bestmatch(search_key) + if t: + return package.Package(t) + return None + +def find_packages (search_key, masked=False): + """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. + + @param search_key: the key to look for + @type search_key: string + @param masked: if True, also look for masked packages + @type masked: boolean + + @returns: list of found packages + @rtype: backend.Package[]""" + + return geneticize_list(gentoolkit.find_packages(search_key, masked)) + +def find_installed_packages (search_key, masked=False): + """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. + + @param search_key: the key to look for + @type search_key: string + @param masked: if True, also look for masked packages + @type masked: boolean + + @returns: list of found packages + @rtype: backend.Package[]""" + + return geneticize_list(gentoolkit.find_installed_packages(search_key, masked)) + +def find_system_packages (): + """Looks for all packages saved as "system-packages". + + @returns: a tuple of (resolved_packages, unresolved_packages). + @rtype: (backend.Package[], backend.Package[])""" + + list = gentoolkit.find_system_packages() + return (geneticize_list(list[0]), geneticize_list(list[1])) + +def find_world_packages (): + """Looks for all packages saved in the world-file. + + @returns: a tuple of (resolved_packages, unresolved_packages). + @rtype: (backend.Package[], backend.Package[])""" + + list = gentoolkit.find_world_packages() + return geneticize_list(list[0]),geneticize_list(list[1]) + +def find_all_installed_packages (name=None, withVersion=True): + """Finds all installed packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered + @type withVersion: boolean + + @returns: all packages/cp-strings found + @rtype: backend.Package[] or cp-string[]""" + + if withVersion: + return geneticize_list(gentoolkit.find_all_installed_packages(find_lambda(name))) + else: + t = vartree.dbapi.cp_all() + if name: + t = filter(find_lambda(name),t) + return t + +def find_all_uninstalled_packages (name=None): + """Finds all uninstalled packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + return geneticize_list(gentoolkit.find_all_uninstalled_packages(find_lambda(name))) + +def find_all_packages (name=None, withVersion=True): + """Finds all packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered + @type withVersion: boolean + + @returns: all packages/cp-strings found + @rtype: backend.Package[] or cp-string[]""" + + if (withVersion): + return geneticize_list(gentoolkit.find_all_packages(find_lambda(name))) + else: + t = porttree.dbapi.cp_all() + t += vartree.dbapi.cp_all() + t = unique_array(t) + if name: + t = filter(find_lambda(name),t) + return t + +def find_all_world_packages (name=None): + """Finds all world packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + world = filter(find_lambda(name), [x.get_cpv() for x in find_world_packages()[0]]) + world = unique_array(world) + return geneticize_list(world) + +def find_all_system_packages (name=None): + """Finds all system packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + sys = filter(find_lambda(name), [x.get_cpv() for x in find_system_packages()[0]]) + sys = unique_array(sys) + return geneticize_list(sys) + +def get_all_versions (cp): + """Returns all versions of a certain package. + + @param cp: the package + @type cp: string (cat/pkg) + @returns: the list of found packages + @rtype: backend.Package[]""" + + t = porttree.dbapi.cp_list(cp) + t += vartree.dbapi.cp_list(cp) + t = unique_array(t) + return geneticize_list(t) + +def get_all_installed_versions (cp): + """Returns all installed versions of a certain package. + + @param cp: the package + @type cp: string (cat/pkg) + @returns: the list of found packages + @rtype: backend.Package[]""" + + return geneticize_list(vartree.dbapi.cp_list(cp)) + +def list_categories (name=None): + """Finds all categories matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned + @type name: string or None + @returns: all categories found + @rtype: string[]""" + + categories = gentoolkit.settings.categories + return filter(find_lambda(name), categories) + +def split_package_name (name): + """Splits a package name in its elements. + + @param name: name to split + @type name: string + @returns: list: [category, name, version, rev] whereby rev is "r0" if not specified in the name + @rtype: string[]""" + + return gentoolkit.split_package_name(name) + +def sort_package_list(pkglist): + """Sorts a package list in the same manner portage does. + + @param pkglist: list to sort + @type pkglist: Packages[]""" + + return gentoolkit.sort_package_list(pkglist) + +def reload_settings (): + """Reloads portage.""" + gentoolkit.settings = portage.config(config_incrementals = copy.deepcopy(gentoolkit.settings.incrementals)) + +def update_world (newuse = False, deep = False): + """Calculates the packages to get updated in an update world. + + @param newuse: Checks if a use-flag has a different state then to install time. + @type newuse: boolean + @param deep: Not only check world packages but also there dependencies. + @type deep: boolean + @returns: a list containing of the tuple (new_package, old_package) + @rtype: (backend.Package, backend.Package)[]""" + + # read world file + world = open(portage.WORLD_FILE) + packages = [] + for line in world: + line = line.strip() + if not len(line): continue # empty line + if line[0] == "#": continue + packages.append(line) + world.close() + + sys = gentoolkit.settings.packages + for x in sys: + if x[0] == "*": + x = x[1:] + packages.append(x.strip()) + + # Remove everything that is package.provided from our list + # This is copied from emerge.getlist() + for atom in packages[:]: + for expanded_atom in portage.flatten(portage.dep_virtual([atom], gentoolkit.settings)): + mykey = portage.dep_getkey(expanded_atom) + if mykey in gentoolkit.settings.pprovideddict and portage.match_from_list(expanded_atom, settings.pprovideddict[mykey]): + packages.remove(atom) + break + + packages = [find_best_match(x) for x in packages] + + checked = [] + updating = [] + raw_checked = [] + def check (p): + """Checks whether a package is updated or not.""" + if p.get_cp() in checked: return + else: checked.append(p.get_cp()) + + appended = False + tempDeep = False + + if not p.is_installed(): + oldList = find_installed_packages(p.get_cp()) + if oldList: + old = oldList[0] # assume we have only one there; FIXME: slotted packages + else: + debug("Bug? Not found installed one:",p.get_cp()) + return + updating.append((p, old)) + appended = True + p = old + + if newuse: + old = p.get_installed_use_flags() + new = p.get_settings("USE").split() + + for u in p.get_all_use_flags(): + if (u in new) != (u in old): + if not appended: + updating.append((p,p)) + tempDeep = True + + if deep or tempDeep: + for i in p.get_matched_dep_packages(): + if i not in raw_checked: + raw_checked.append(i) + bm = find_best_match(i) + if not bm: + debug("Bug? No best match could be found:",i) + else: + check(bm) + + for p in packages: + if not p: continue # if a masked package is installed we have "None" here + check(p) + + return updating + +use_descs = {} +local_use_descs = {} +def get_use_desc (flag, package = None): + """Returns the description of a specific useflag or None if no desc was found. + If a package is given (in the <cat>/<name> format) the local use descriptions are searched too. + + @param flag: flag to get the description for + @type flag: string + @param package: name of a package: if given local use descriptions are searched too + @type package: cp-string + @returns: found description + @rtype: string""" + + # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled. + + # fill cache if needed + if use_descs == {} or local_use_descs == {}: + # read use.desc + fd = open(gentoolkit.settings["PORTDIR"]+"/profiles/use.desc") + for line in fd.readlines(): + line = line.strip() + if line != "" and line[0] != '#': + fields = [x.strip() for x in line.split(" - ",1)] + if len(fields) == 2: + use_descs[fields[0]] = fields[1] + + # read use.local.desc + fd = open(gentoolkit.settings["PORTDIR"]+"/profiles/use.local.desc") + for line in fd.readlines(): + line = line.strip() + if line != "" and line[0] != '#': + fields = [x.strip() for x in line.split(":",1)] + if len(fields) == 2: + if not fields[0] in local_use_descs: # create + local_use_descs[fields[0]] = {} + subfields = [x.strip() for x in fields[1].split(" - ",1)] + if len(subfields) == 2: + local_use_descs[fields[0]][subfields[0]] = subfields[1] + + # start + desc = None + if flag in use_descs: + desc = use_descs[flag] + if package != None: + if package in local_use_descs: + if flag in local_use_descs[package]: + desc = local_use_descs[package][flag] + return desc |