diff options
Diffstat (limited to 'portato/backend')
-rw-r--r-- | portato/backend/__init__.py | 75 | ||||
-rw-r--r-- | portato/backend/flags.py | 117 | ||||
-rw-r--r-- | portato/backend/package.py | 447 | ||||
-rw-r--r-- | portato/backend/portage/__init__.py | 14 | ||||
-rw-r--r-- | portato/backend/portage/package.py | 234 | ||||
-rw-r--r-- | portato/backend/portage/settings.py | 55 | ||||
-rw-r--r-- | portato/backend/portage/system.py | 370 | ||||
-rw-r--r-- | portato/backend/portage_helper.py | 467 | ||||
-rw-r--r-- | portato/backend/system_interface.py | 283 |
9 files changed, 1055 insertions, 1007 deletions
diff --git a/portato/backend/__init__.py b/portato/backend/__init__.py index a72923b..83ee1d6 100644 --- a/portato/backend/__init__.py +++ b/portato/backend/__init__.py @@ -10,67 +10,32 @@ # # Written by René 'Necoro' Neumann <necoro@necoro.net> -import os -from threading import Lock +from system_interface import SystemInterface -# import portage -import portage +SYSTEM = "portage" +_sys = None -class PortageSettings: - """Encapsulation of the portage settings. - - @ivar settings: portage settings - @ivar settingslock: a simple Lock - @ivar trees: a dictionary of the trees - @ivar porttree: shortcut to C{trees[root]["porttree"]} - @ivar vartree: shortcut to C{trees[root]["vartree"]} - @ivar virtuals: shortcut to C{trees[root]["virtuals"]}""" +class SystemWrapper (object, SystemInterface): + def __getattribute__ (self, name): + global _sys + return eval ("_sys.%s" % name) - def __init__ (self): - """Initializes the instance. Calls L{load()}.""" - self.settingslock = Lock() - self.load() - - def load(self): - """(Re)loads the portage settings and sets the variables.""" +def set_system (new_sys): + global SYSTEM + SYSTEM = new_sys + load_system() - kwargs = {} - for k, envvar in (("config_root", "PORTAGE_CONFIGROOT"), ("target_root", "ROOT")): - kwargs[k] = os.environ.get(envvar, None) - self.trees = portage.create_trees(trees=None, **kwargs) +def load_system (): + global _sys - self.settings = self.trees["/"]["vartree"].settings + if SYSTEM == "portage": + from portato.backend.portage import PortageSystem + _sys = PortageSystem () - for myroot in self.trees: - if myroot != "/": - self.settings = self.trees[myroot]["vartree"].settings - break +system = SystemWrapper() - self.settings.unlock() - - root = self.settings["ROOT"] - - self.porttree = self.trees[root]["porttree"] - self.vartree = self.trees[root]["vartree"] - self.virtuals = self.trees[root]["virtuals"] - - portage.settings = None # we use our own one ... - -# portage tree vars -portage_settings = PortageSettings() - -# for the moment ;) -settingslock = portage_settings.settingslock -settings = portage_settings.settings -trees = portage_settings.trees -porttree = portage_settings.porttree -vartree = portage_settings.vartree -virtuals = portage_settings.virtuals +from exceptions import * +from package import Package -# this is set to "var/lib/portage/world" by default - so we add the leading / -portage.WORLD_FILE = portage_settings.settings["ROOT"]+portage.WORLD_FILE +load_system() -# import our packages -from exceptions import * -from package import * -from portage_helper import * diff --git a/portato/backend/flags.py b/portato/backend/flags.py index a7ba125..a7241ca 100644 --- a/portato/backend/flags.py +++ b/portato/backend/flags.py @@ -15,12 +15,9 @@ import os.path from subprocess import Popen, PIPE # needed for grep from portato.helper import * -from portage_helper import split_package_name +from portato.backend import system import package -import portage -from portage_util import unique_array - CONFIG = { "usefile" : "portato", "maskfile" : "portato", @@ -30,6 +27,59 @@ CONFIG = { "testingPerVersion" : True } +class Constants: + + def __init__ (self): + self.clear() + + def clear (self): + self._use_path = None + self._mask_path = None + self._unmask_path = None + self._testing_path = None + self._use_path_is_dir = None + self._mask_path_is_dir = None + self._unmask_path_is_dir = None + self._testing_path_is_dir = None + + def __get (self, name, path): + if self.__dict__[name] is None: + self.__dict__[name] = os.path.join(system.get_config_path(), path) + + return self.__dict__[name] + + def __is_dir(self, path): + name = "_" + path + "_is_dir" + if self.__dict__[name] is None: + self.__dict__[name] = os.path.isdir(self.__class__.__dict__[path](self)) + return self.__dict__[name] + + def use_path (self): + return self.__get("_use_path", "package.use") + + def use_path_is_dir (self): + return self.__is_dir("use_path") + + def mask_path (self): + return self.__get("_mask_path", "package.mask") + + def mask_path_is_dir (self): + return self.__is_dir("mask_path") + + def unmask_path (self): + return self.__get("_unmask_path", "package.unmask") + + def unmask_path_is_dir (self): + return self.__is_dir("unmask_path") + + def testing_path (self): + return self.__get("_testing_path", "package.keywords") + + def testing_path_is_dir (self): + return self.__is_dir("testing_path") + +CONST = Constants() + ### GENERAL PART ### def grep (pkg, path): @@ -43,7 +93,7 @@ def grep (pkg, path): @rtype: string""" if not isinstance(pkg, package.Package): - pkg = package.Package(pkg) # assume it is a cpv or a gentoolkit.Package + pkg = system.new_package(pkg) # assume it is a cpv or a gentoolkit.Package command = "egrep -x -n -r -H '^[<>!=~]{0,2}%s(-[0-9].*)?[[:space:]]?.*$' %s" # %s is replaced in the next line ;) return Popen((command % (pkg.get_cp(), path)), shell = True, stdout = PIPE).communicate()[0].splitlines() @@ -105,7 +155,7 @@ def generate_path (cpv, exp): @returns: rendered path @rtype string""" - cat, pkg, ver, rev = split_package_name(cpv) + cat, pkg, ver, rev = system.split_cpv(cpv) if exp.find("$(") != -1: exp = exp.replace("$(cat)",cat).\ @@ -115,8 +165,6 @@ def generate_path (cpv, exp): return exp ### USE FLAG PART ### -USE_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.use") -USE_PATH_IS_DIR = os.path.isdir(USE_PATH) useFlags = {} # useFlags in the file newUseFlags = {} # useFlags as we want them to be: format: cpv -> [(file, line, useflag, (true if removed from list / false if added))] @@ -150,7 +198,7 @@ def set_use_flag (pkg, flag): global useFlags, newUseFlags if not isinstance(pkg, package.Package): - pkg = package.Package(pkg) # assume cpv or gentoolkit.Package + pkg = system.new_package(pkg) # assume cpv or gentoolkit.Package cpv = pkg.get_cpv() invFlag = invert_use_flag(flag) @@ -158,7 +206,7 @@ def set_use_flag (pkg, flag): # if not saved in useFlags, get it by calling get_data() which calls grep() data = None if not cpv in useFlags: - data = get_data(pkg, USE_PATH) + data = get_data(pkg, CONST.use_path()) useFlags[cpv] = data else: data = useFlags[cpv] @@ -196,9 +244,9 @@ def set_use_flag (pkg, flag): # create a new line if not added: - path = USE_PATH - if USE_PATH_IS_DIR: - path = os.path.join(USE_PATH, generate_path(cpv, CONFIG["usefile"])) + path = CONST.use_path() + if CONST.use_path_is_dir(): + path = os.path.join(CONST.use_path(), generate_path(cpv, CONFIG["usefile"])) try: newUseFlags[cpv].remove((path, -1, invFlag, False)) except ValueError: # not in UseFlags @@ -313,7 +361,7 @@ def write_use_flags (): if CONFIG["usePerVersion"]: # add on a per-version-base msg += "=%s %s\n" % (cpv, ' '.join(flagsToAdd)) else: # add on a per-package-base - list = split_package_name(cpv) + list = system.split_cpv(cpv) msg += "%s/%s %s\n" % (list[0], list[1], ' '.join(flagsToAdd)) if not file in file_cache: f = open(file, "a") @@ -332,11 +380,6 @@ def write_use_flags (): newUseFlags = {} ### MASKING PART ### -MASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.mask") -UNMASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.unmask") -MASK_PATH_IS_DIR = os.path.isdir(MASK_PATH) -UNMASK_PATH_IS_DIR = os.path.isdir(UNMASK_PATH) - new_masked = {} new_unmasked = {} @@ -351,7 +394,7 @@ def set_masked (pkg, masked = True): global new_masked, newunmasked if not isinstance(pkg, package.Package): - pkg = package.Package(pkg) + pkg = system.new_package(pkg) cpv = pkg.get_cpv() @@ -363,13 +406,13 @@ def set_masked (pkg, masked = True): if masked: link_neq = new_masked link_eq = new_unmasked - path = UNMASK_PATH + path = CONST.unmask_path() else: link_neq = new_unmasked link_eq = new_masked - path = MASK_PATH + path = CONST.mask_path() - copy = link_eq[cpv] + copy = link_eq[cpv][:] for file, line in copy: if line == "-1": link_eq[cpv].remove((file, line)) @@ -393,11 +436,11 @@ def set_masked (pkg, masked = True): if done: return if masked: - is_dir = MASK_PATH_IS_DIR - path = MASK_PATH + is_dir = CONST.mask_path_is_dir() + path = CONST.mask_path() else: - is_dir = UNMASK_PATH_IS_DIR - path = UNMASK_PATH + is_dir = CONST.unmask_path_is_dir() + path = CONST.unmask_path() if is_dir: file = os.path.join(path, generate_path(cpv, CONFIG["usefile"])) @@ -426,9 +469,9 @@ def new_masking_status (cpv): if isinstance(cpv, package.Package): cpv = cpv.get_cpv() - if cpv in new_masked and new_masked[cpv]: + if cpv in new_masked and new_masked[cpv] != []: return "masked" - elif cpv in new_unmasked and new_unmasked[cpv]: + elif cpv in new_unmasked and new_unmasked[cpv] != []: return "unmasked" else: return None @@ -444,7 +487,7 @@ def write_masked (): if CONFIG["maskPerVersion"]: msg += "=%s\n" % cpv else: - list = split_package_name(cpv) + list = system.split_cpv(cpv) msg += "%s/%s\n" % (list[0],list[1]) if not file in file_cache: f = open(file, "a") @@ -497,8 +540,6 @@ def write_masked (): new_unmasked = {} ### TESTING PART ### -TESTING_PATH = os.path.join(portage.USER_CONFIG_PATH, "package.keywords") -TESTING_PATH_IS_DIR = os.path.isdir(TESTING_PATH) newTesting = {} arch = "" @@ -531,7 +572,7 @@ def set_testing (pkg, enable): global arch, newTesting if not isinstance(pkg, package.Package): - pkg = package.Package(pkg) + pkg = system.new_package(pkg) arch = pkg.get_settings("ARCH") cpv = pkg.get_cpv() @@ -546,16 +587,16 @@ def set_testing (pkg, enable): return if not enable: - test = get_data(pkg, TESTING_PATH) + test = get_data(pkg, CONST.testing_path()) debug("data (test): "+str(test)) for file, line, crit, flags in test: if pkg.matches(crit) and flags[0] == "~"+arch: newTesting[cpv].append((file, line)) else: - if TESTING_PATH_IS_DIR: - file = os.path.join(TESTING_PATH, CONFIG["testingfile"]) + if CONST.testing_path_is_dir(): + file = os.path.join(CONST.testing_path(), CONFIG["testingfile"]) else: - file = TESTING_PATH + file = CONST.testing_path() newTesting[cpv].append((file, "-1")) newTesting[cpv] = unique_array(newTesting[cpv]) @@ -574,7 +615,7 @@ def write_testing (): if CONFIG["testingPerVersion"]: msg += "=%s ~%s\n" % (cpv, arch) else: - list = split_package_name(cpv) + list = system.split_cpv(cpv) msg += "%s/%s ~%s\n" % (list[0],list[1],arch) if not file in file_cache: f = open(file, "a") diff --git a/portato/backend/package.py b/portato/backend/package.py deleted file mode 100644 index 6ba47fe..0000000 --- a/portato/backend/package.py +++ /dev/null @@ -1,447 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/backend/package.py -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2006-2007 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann <necoro@necoro.net> - -from portato.backend import portage_settings -from portato.helper import * -from portage_helper import * -from exceptions import * -import flags - -import portage, portage_dep -from portage_util import unique_array - -import types -import os.path - -class Package: - """This is a class abstracting a normal package which can be installed.""" - - def __init__ (self, cpv): - """Constructor. - - @param cpv: The cpv which describes the package to create. - @type cpv: string (cat/pkg-ver)""" - - self._cpv = cpv - self._scpv = portage.catpkgsplit(self._cpv) - - if not self._scpv: - raise ValueError("invalid cpv: %s" % cpv) - - self._settings = portage_settings.settings - self._settingslock = portage_settings.settingslock - - self._trees = portage_settings.trees - - self.forced_flags = set() - self.forced_flags.update(self._settings.usemask) - self.forced_flags.update(self._settings.useforce) - - try: - self._status = portage.getmaskingstatus(self.get_cpv(), settings = self._settings) - except KeyError: # package is not located in the system - self._status = None - - def is_installed(self): - """Returns true if this package is installed (merged) - @rtype: boolean""" - - return portage_settings.vartree.dbapi.cpv_exists(self._cpv) - - def is_overlay(self): - """Returns true if the package is in an overlay. - @rtype: boolean""" - - dir,ovl = portage_settings.porttree.dbapi.findname2(self._cpv) - return ovl != self._settings["PORTDIR"] - - def is_in_system (self): - """Returns False if the package could not be found in the portage system. - - @return: True if in portage system; else False - @rtype: boolean""" - - return (self._status != None) - - def is_missing_keyword(self): - """Returns True if the package is missing the needed keyword. - - @return: True if keyword is missing; else False - @rtype: boolean""" - - if self._status and "missing keyword" in self._status: - return True - return False - - def is_testing(self, use_keywords = False): - """Checks whether a package is marked as testing. - - @param use_keywords: Controls whether possible keywords are taken into account or not. - @type use_keywords: boolean - @returns: True if the package is marked as testing; else False. - @rtype: boolean""" - - testArch = "~" + self.get_settings("ARCH") - if not use_keywords: # keywords are NOT taken into account - if testArch in self.get_env_var("KEYWORDS").split(): - return True - return False - - else: # keywords are taken into account - status = flags.new_testing_status(self.get_cpv()) - if status is None: # we haven't changed it in any way - if self._status and testArch+" keyword" in self._status: - return True - return False - else: - return status - - def set_testing(self, enable = True): - """Sets the actual testing status of the package. - - @param enable: if True it is masked as stable; if False it is marked as testing - @type enable: boolean""" - - flags.set_testing(self, enable) - - def remove_new_testing(self): - """Removes possible changed testing status.""" - - flags.remove_new_testing(self.get_cpv()) - - def is_masked (self): - """Returns True if either masked by package.mask or by profile. - - @returns: True if masked / False otherwise - @rtype: boolean""" - - status = flags.new_masking_status(self.get_cpv()) - if status != None: # we have locally changed it - if status == "masked": return True - elif status == "unmasked": return False - else: - debug("BUG in flags.new_masking_status. It returns",status) - else: # we have not touched the status - if self._status and ("profile" in self._status or "package.mask" in self._status): - return True - return False - - def set_masked (self, masking = False): - """Sets the masking status of the package. - - @param masking: if True: mask it; if False: unmask it - @type masking: boolean""" - - flags.set_masked(self, masked = masking) - - def remove_new_masked (self): - """Removes possible changed masking status.""" - - flags.remove_new_masked(self.get_cpv()) - - def get_all_use_flags (self, installed = False): - """Returns a list of _all_ useflags for this package, i.e. all useflags you can set for this package. - - @param installed: do not take the ones stated in the ebuild, but the ones it has been installed with - @type installed: boolean - - @returns: list of use-flags - @rtype: string[]""" - - if installed or not self.is_in_system(): - tree = portage_settings.vartree - else: - tree = portage_settings.porttree - - return list(set(self.get_env_var("IUSE", tree = tree).split()).difference(self.forced_flags)) - - def get_installed_use_flags (self): - """Returns a list of the useflags enabled at installation time. If package is not installed, it returns an empty list. - - @returns: list of useflags enabled at installation time or an empty list - @rtype: string[]""" - - if self.is_installed(): - uses = set(self.get_use_flags().split()) # all set at installation time - iuses = set(self.get_all_use_flags(installed=True)) # all you can set for the package - - return list(uses.intersection(iuses)) - else: - return [] - - def get_new_use_flags (self): - """Returns a list of the new useflags, i.e. these flags which are not written to the portage-system yet. - - @returns: list of flags or [] - @rtype: string[]""" - - return flags.get_new_use_flags(self) - - def get_actual_use_flags (self): - """This returns the result of installed_use_flags + new_use_flags. If the package is not installed, it returns only the new flags. - - @return: list of flags - @rtype: string[]""" - - if self.is_installed(): - i_flags = self.get_installed_use_flags() - for f in self.get_new_use_flags(): - - if flags.invert_use_flag(f) in i_flags: - i_flags.remove(flags.invert_use_flag(f)) - - elif f not in i_flags: - i_flags.append(f) - return i_flags - else: - return self.get_new_use_flags() - - def set_use_flag (self, flag): - """Set a use-flag. - - @param flag: the flag to set - @type flag: string""" - - flags.set_use_flag(self, flag) - - def remove_new_use_flags (self): - """Remove all the new use-flags.""" - - flags.remove_new_use_flags(self) - - def is_use_flag_enabled (self, flag): - """Looks whether a given useflag is enabled for the package, taking all options - (ie. even the new flags) into account. - - @param flag: the flag to check - @type flag: string - @returns: True or False - @rtype: bool""" - - if self.is_installed() and flag in self.get_actual_use_flags(): # flags set during install - return True - - elif (not self.is_installed()) and flag in self.get_settings("USE").split() \ - and not flags.invert_use_flag(flag) in self.get_new_use_flags(): # flags that would be set - return True - - elif flag in self.get_new_use_flags(): - return True - - else: - return False - - def get_matched_dep_packages (self, depvar): - """This function looks for all dependencies which are resolved. In normal case it makes only sense for installed packages, but should work for uninstalled ones too. - - @returns: unique list of dependencies resolved (with elements like "<=net-im/foobar-1.2.3") - @rtype: string[] - - @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()""" - - # change the useflags, because we have internally changed some, but not made them visible for portage - newUseFlags = self.get_new_use_flags() - actual = self.get_settings("USE").split() - if newUseFlags: - for u in newUseFlags: - if u[0] == "-" and flags.invert_use_flag(u) in actual: - actual.remove(flags.invert_use_flag(u)) - elif u not in actual: - actual.append(u) - - depstring = "" - for d in depvar: - depstring += self.get_env_var(d)+" " - - portage_dep._dep_check_strict = False - deps = portage.dep_check(depstring, None, self._settings, myuse = actual, trees = self._trees) - portage_dep._dep_check_strict = True - - if not deps: # FIXME: what is the difference to [1, []] ? - return [] - - if deps[0] == 0: # error - raise DependencyCalcError, deps[1] - - deps = deps[1] - - retlist = [] - - for d in deps: - if not d[0] == "!": - retlist.append(d) - - return retlist - - def get_dep_packages (self, depvar = ["RDEPEND", "PDEPEND", "DEPEND"]): - """Returns a cpv-list of packages on which this package depends and which have not been installed yet. This does not check the dependencies in a recursive manner. - - @returns: list of cpvs on which the package depend - @rtype: string[] - - @raises portato.BlockedException: when a package in the dependency-list is blocked by an installed one - @raises portato.PackageNotFoundException: when a package in the dependency list could not be found in the system - @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()""" - - dep_pkgs = [] # the package list - - # change the useflags, because we have internally changed some, but not made them visible for portage - newUseFlags = self.get_new_use_flags() - actual = self.get_settings("USE").split() - if newUseFlags: - for u in newUseFlags: - if u[0] == "-" and flags.invert_use_flag(u) in actual: - actual.remove(flags.invert_use_flag(u)) - elif u not in actual: - actual.append(u) - - depstring = "" - for d in depvar: - depstring += self.get_env_var(d)+" " - - # let portage do the main stuff ;) - # pay attention to any changes here - deps = portage.dep_check (depstring, portage_settings.vartree.dbapi, self._settings, myuse = actual, trees = self._trees) - - if not deps: # FIXME: what is the difference to [1, []] ? - return [] - - if deps[0] == 0: # error - raise DependencyCalcError, deps[1] - - deps = deps[1] - - for dep in deps: - if dep[0] == '!': # blocking sth - dep = dep[1:] - if dep != self.get_cp(): # not cpv, because a version might explicitly block another one - blocked = find_installed_packages(dep) - if blocked != []: - raise BlockedException, (self.get_cpv(), blocked[0].get_cpv()) - continue # finished with the blocking one -> next - - pkg = find_best_match(dep) - if not pkg: # try to find masked ones - list = find_packages(dep, masked = True) - if not list: - raise PackageNotFoundException, dep - - list = sort_package_list(list) - done = False - for i in range(len(list)-1,0,-1): - p = list[i] - if not p.is_masked(): - dep_pkgs.append(p.get_cpv()) - done = True - break - if not done: - dep_pkgs.append(list[-1].get_cpv()) - else: - dep_pkgs.append(pkg.get_cpv()) - - return dep_pkgs - - def get_cpv(self): - """Returns full Category/Package-Version string""" - return self._cpv - - def get_cp (self): - """Returns the cp-string. - - @returns: category/package. - @rtype: string""" - - return self.get_category()+"/"+self.get_name() - - def get_slot_cp (self): - - return ("%s:%s" % (self.get_cp(), self.get_env_var("SLOT"))) - - def get_name(self): - """Returns base name of package, no category nor version""" - return self._scpv[1] - - def get_version(self): - """Returns version of package, with revision number""" - v = self._scpv[2] - if self._scpv[3] != "r0": - v += "-" + self._scpv[3] - return v - - def get_category(self): - """Returns category of package""" - return self._scpv[0] - - def get_settings(self, key): - """Returns the value of the given key for this package (useful - for package.* files).""" - self._settingslock.acquire() - self._settings.setcpv(self._cpv) - v = self._settings[key] - self._settingslock.release() - return v - - def get_ebuild_path(self): - """Returns the complete path to the .ebuild file""" - return portage_settings.porttree.dbapi.findname(self._cpv) - - def get_package_path(self): - """Returns the path to where the ChangeLog, Manifest, .ebuild files reside""" - p = self.get_ebuild_path() - sp = p.split("/") - if len(sp): - import string - return string.join(sp[:-1],"/") - - def get_env_var(self, var, tree = None): - """Returns one of the predefined env vars DEPEND, RDEPEND, SRC_URI,....""" - if not tree: - mytree = portage_settings.vartree - if not self.is_installed(): - mytree = portage_settings.porttree - else: - mytree = tree - r = mytree.dbapi.aux_get(self._cpv,[var]) - - return r[0] - - def get_use_flags(self): - if self.is_installed(): - return self.get_env_var("USE", tree = portage_settings.vartree) - else: return "" - - def compare_version(self,other): - """Compares this package's version to another's CPV; returns -1, 0, 1""" - v1 = self._scpv - v2 = portage.catpkgsplit(other.get_cpv()) - # if category is different - if v1[0] != v2[0]: - return cmp(v1[0],v2[0]) - # if name is different - elif v1[1] != v2[1]: - return cmp(v1[1],v2[1]) - # Compare versions - else: - return portage.pkgcmp(v1[1:],v2[1:]) - - def matches (self, criterion): - """This checks, whether this package matches a specific verisioning criterion - e.g.: "<=net-im/foobar-1.2". - - @param criterion: the criterion to match against - @type criterion: string - @returns: True if matches; False if not - @rtype: boolean""" - - if portage.match_from_list(criterion, [self.get_cpv()]) == []: - return False - else: - return True diff --git a/portato/backend/portage/__init__.py b/portato/backend/portage/__init__.py new file mode 100644 index 0000000..e21278c --- /dev/null +++ b/portato/backend/portage/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/portage/__init__.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2007 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +from system import PortageSystem +from package import PortagePackage diff --git a/portato/backend/portage/package.py b/portato/backend/portage/package.py new file mode 100644 index 0000000..74a5107 --- /dev/null +++ b/portato/backend/portage/package.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/portage/package.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2007 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +from portato.helper import * +from portato.backend.exceptions import * +from portato.backend import flags, Package, system + +import portage, portage_dep +from portage_util import unique_array + +import os.path + +class PortagePackage (Package): + """This is a class abstracting a normal package which can be installed for the portage-system.""" + + def __init__ (self, cpv): + """Constructor. + + @param cpv: The cpv which describes the package to create. + @type cpv: string (cat/pkg-ver)""" + + Package.__init__(self, cpv) + self._settings = system.settings + self._settingslock = system.settings.settingslock + + self._trees = system.settings.trees + + self.forced_flags = set() + self.forced_flags.update(self._settings.settings.usemask) + self.forced_flags.update(self._settings.settings.useforce) + + try: + self._status = portage.getmaskingstatus(self.get_cpv(), settings = self._settings.settings) + except KeyError: # package is not located in the system + self._status = None + + def is_installed(self): + return self._settings.vartree.dbapi.cpv_exists(self._cpv) + + def is_overlay(self): + dir,ovl = self._settings.porttree.dbapi.findname2(self._cpv) + return ovl != self._settings.settings["PORTDIR"] + + def is_in_system (self): + return (self._status != None) + + def is_missing_keyword(self): + if self._status and "missing keyword" in self._status: + return True + return False + + def is_testing(self, use_keywords = False): + testArch = "~" + self.get_settings("ARCH") + if not use_keywords: # keywords are NOT taken into account + if testArch in self.get_env_var("KEYWORDS").split(): + return True + return False + + else: # keywords are taken into account + status = flags.new_testing_status(self.get_cpv()) + if status is None: # we haven't changed it in any way + if self._status and testArch+" keyword" in self._status: + return True + return False + else: + return status + + def is_masked (self): + status = flags.new_masking_status(self.get_cpv()) + if status != None: # we have locally changed it + if status == "masked": return True + elif status == "unmasked": return False + else: + debug("BUG in flags.new_masking_status. It returns",status) + else: # we have not touched the status + if self._status and ("profile" in self._status or "package.mask" in self._status): + return True + return False + + def get_all_use_flags (self, installed = False): + if installed or not self.is_in_system(): + tree = self._settings.vartree + else: + tree = self._settings.porttree + + return list(set(self.get_env_var("IUSE", tree = tree).split()).difference(self.forced_flags)) + + def get_matched_dep_packages (self, depvar): + # change the useflags, because we have internally changed some, but not made them visible for portage + newUseFlags = self.get_new_use_flags() + actual = self.get_settings("USE").split() + if newUseFlags: + for u in newUseFlags: + if u[0] == "-" and flags.invert_use_flag(u) in actual: + actual.remove(flags.invert_use_flag(u)) + elif u not in actual: + actual.append(u) + + depstring = "" + for d in depvar: + depstring += self.get_env_var(d)+" " + + portage_dep._dep_check_strict = False + deps = portage.dep_check(depstring, None, self._settings.settings, myuse = actual, trees = self._trees) + portage_dep._dep_check_strict = True + + if not deps: # FIXME: what is the difference to [1, []] ? + return [] + + if deps[0] == 0: # error + raise DependencyCalcError, deps[1] + + deps = deps[1] + + retlist = [] + + for d in deps: + if not d[0] == "!": + retlist.append(d) + + return retlist + + def get_dep_packages (self, depvar = ["RDEPEND", "PDEPEND", "DEPEND"]): + dep_pkgs = [] # the package list + + # change the useflags, because we have internally changed some, but not made them visible for portage + newUseFlags = self.get_new_use_flags() + actual = self.get_settings("USE").split() + if newUseFlags: + for u in newUseFlags: + if u[0] == "-" and flags.invert_use_flag(u) in actual: + actual.remove(flags.invert_use_flag(u)) + elif u not in actual: + actual.append(u) + + depstring = "" + for d in depvar: + depstring += self.get_env_var(d)+" " + + # let portage do the main stuff ;) + # pay attention to any changes here + deps = portage.dep_check (depstring, self._settings.vartree.dbapi, self._settings.settings, myuse = actual, trees = self._trees) + + if not deps: # FIXME: what is the difference to [1, []] ? + return [] + + if deps[0] == 0: # error + raise DependencyCalcError, deps[1] + + deps = deps[1] + + for dep in deps: + if dep[0] == '!': # blocking sth + dep = dep[1:] + if dep != self.get_cp(): # not cpv, because a version might explicitly block another one + blocked = system.find_installed_packages(dep) + if blocked != []: + raise BlockedException, (self.get_cpv(), blocked[0].get_cpv()) + continue # finished with the blocking one -> next + + pkg = system.find_best_match(dep) + if not pkg: # try to find masked ones + list = system.find_packages(dep, masked = True) + if not list: + raise PackageNotFoundException, dep + + list = system.sort_package_list(list) + done = False + for i in range(len(list)-1,0,-1): + p = list[i] + if not p.is_masked(): + dep_pkgs.append(p.get_cpv()) + done = True + break + if not done: + dep_pkgs.append(list[-1].get_cpv()) + else: + dep_pkgs.append(pkg.get_cpv()) + + return dep_pkgs + + def get_settings(self, key): + self._settingslock.acquire() + self._settings.settings.setcpv(self._cpv) + v = self._settings.settings[key] + self._settingslock.release() + return v + + def get_ebuild_path(self): + return self._settings.porttree.dbapi.findname(self._cpv) + + def get_env_var(self, var, tree = None): + if not tree: + mytree = self._settings.vartree + if not self.is_installed(): + mytree = self._settings.porttree + else: + mytree = tree + r = mytree.dbapi.aux_get(self._cpv,[var]) + + return r[0] + + def get_use_flags(self): + if self.is_installed(): + return self.get_env_var("USE", tree = self._settings.vartree) + else: return "" + + def compare_version(self,other): + v1 = self._scpv + v2 = portage.catpkgsplit(other.get_cpv()) + # if category is different + if v1[0] != v2[0]: + return cmp(v1[0],v2[0]) + # if name is different + elif v1[1] != v2[1]: + return cmp(v1[1],v2[1]) + # Compare versions + else: + return portage.pkgcmp(v1[1:],v2[1:]) + + def matches (self, criterion): + if portage.match_from_list(criterion, [self.get_cpv()]) == []: + return False + else: + return True diff --git a/portato/backend/portage/settings.py b/portato/backend/portage/settings.py new file mode 100644 index 0000000..5d3aef0 --- /dev/null +++ b/portato/backend/portage/settings.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/portage/settings.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2007 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +import os +import portage +from threading import Lock + +class PortageSettings: + """Encapsulation of the portage settings. + + @ivar settings: portage settings + @ivar settingslock: a simple Lock + @ivar trees: a dictionary of the trees + @ivar porttree: shortcut to C{trees[root]["porttree"]} + @ivar vartree: shortcut to C{trees[root]["vartree"]} + @ivar virtuals: shortcut to C{trees[root]["virtuals"]}""" + + def __init__ (self): + """Initializes the instance. Calls L{load()}.""" + self.settingslock = Lock() + self.load() + + def load(self): + """(Re)loads the portage settings and sets the variables.""" + + kwargs = {} + for k, envvar in (("config_root", "PORTAGE_CONFIGROOT"), ("target_root", "ROOT")): + kwargs[k] = os.environ.get(envvar, None) + self.trees = portage.create_trees(trees=None, **kwargs) + + self.settings = self.trees["/"]["vartree"].settings + + for myroot in self.trees: + if myroot != "/": + self.settings = self.trees[myroot]["vartree"].settings + break + + self.settings.unlock() + + root = self.settings["ROOT"] + + self.porttree = self.trees[root]["porttree"] + self.vartree = self.trees[root]["vartree"] + self.virtuals = self.trees[root]["virtuals"] + + portage.settings = None # we use our own one ... diff --git a/portato/backend/portage/system.py b/portato/backend/portage/system.py new file mode 100644 index 0000000..1cfda62 --- /dev/null +++ b/portato/backend/portage/system.py @@ -0,0 +1,370 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/portage/system.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2007 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +import re, os +import types +import portage + +import package +from settings import PortageSettings +from portato.helper import debug, unique_array +from portato.backend.system_interface import SystemInterface + +class PortageSystem (SystemInterface): + + def __init__ (self): + + self.settings = PortageSettings() + portage.WORLD_FILE = self.settings.settings["ROOT"]+portage.WORLD_FILE + + def new_package (self, cpv): + return package.PortagePackage(cpv) + + def get_config_path (self): + return portage.USER_CONFIG_PATH + + def get_world_file_path (self): + return portage.WORLD_FILE + + def get_merge_command (self): + return ["/usr/bin/python", "/usr/bin/emerge"] + + def get_sync_command (self): + return self.get_merge_command()+["--sync"] + + def get_oneshot_option (self): + return ["--oneshot"] + + def get_newuse_option (self): + return ["--newuse"] + + def get_deep_option (self): + return ["--deep"] + + def get_update_option (self): + return ["--update"] + + def get_pretend_option (self): + return ["--pretend", "--verbose"] + + def get_unmerge_option (self): + return ["--unmerge"] + + def find_lambda (self, name): + """Returns the function needed by all the find_all_*-functions. Returns None if no name is given. + + @param name: name to build the function of + @type name: string + @returns: + 1. None if no name is given + 2. a lambda function + @rtype: function""" + + if name != None: + return lambda x: re.match(".*"+name+".*",x) + else: + return lambda x: True + + def geneticize_list (self, list_of_packages): + """Convertes a list of cpv's into L{backend.Package}s. + + @param list_of_packages: the list of packages + @type list_of_packages: list of gentoolkit.Packages + @returns: converted list + @rtype: PortagePackage[]""" + + return [package.PortagePackage(x) for x in list_of_packages] + + def find_best (self, list): + return package.PortagePackage(portage.best(list)) + + def find_best_match (self, search_key, only_installed = False): + t = None + if not only_installed: + t = self.settings.porttree.dep_bestmatch(search_key) + else: + t = self.settings.vartree.dep_bestmatch(search_key) + if t: + return package.PortagePackage(t) + return None + + def find_packages (self, search_key, masked=False): + try: + if masked: + t = self.settings.porttree.dbapi.xmatch("match-all", search_key) + t += self.settings.vartree.dbapi.match(search_key) + else: + t = self.settings.porttree.dbapi.match(search_key) + t += self.settings.vartree.dbapi.match(search_key) + # catch the "ambigous package" Exception + except ValueError, e: + if type(e[0]) == types.ListType: + t = [] + for cp in e[0]: + if masked: + t += self.settings.porttree.dbapi.xmatch("match-all", cp) + t += self.settings.vartree.dbapi.match(cp) + else: + t += self.settings.porttree.dbapi.match(cp) + t += self.settings.vartree.dbapi.match(cp) + else: + raise ValueError(e) + # Make the list of packages unique + t = unique_array(t) + t.sort() + return self.geneticize_list(t) + + def find_installed_packages (self, search_key, masked = False): + try: + t = self.settings.vartree.dbapi.match(search_key) + # catch the "ambigous package" Exception + except ValueError, e: + if type(e[0]) == types.ListType: + t = [] + for cp in e[0]: + t += self.settings.vartree.dbapi.match(cp) + else: + raise ValueError(e) + + return self.geneticize_list(t) + + def __find_resolved_unresolved (self, list, check): + resolved = [] + unresolved = [] + for x in list: + cpv = x.strip() + if len(cpv) and check(cpv): + pkg = self.find_best_match(cpv) + if pkg: + resolved.append(pkg) + else: + unresolved.append(cpv) + return (resolved, self.geneticize_list(unresolved)) + + def find_system_packages (self): + pkglist = self.settings.settings.packages + + return self.__find_resolved_unresolved(pkglist, lambda cpv: cpv[0] == "*") + + def find_world_packages (self): + f = open(portage.WORLD_FILE) + pkglist = f.readlines() + f.close() + + return self.__find_resolved_unresolved(pkglist, lambda cpv: cpv[0] != "#") + + def find_all_installed_packages (self, name = None, withVersion=True): + if withVersion: + t = self.settings.vartree.dbapi.cpv_all() + if name: + t = filter(self.find_lambda(name),t) + return self.geneticize_list(t) + + else: + t = self.settings.vartree.dbapi.cp_all() + if name: + t = filter(self.find_lambda(name),t) + return t + + def find_all_uninstalled_packages (self, name = None): + alist = self.find_all_packages(name) + return self.geneticize_list([x for x in alist if not x.is_installed()]) + + def find_all_packages (self, name = None, withVersion = True): + t = self.settings.porttree.dbapi.cp_all() + t += self.settings.vartree.dbapi.cp_all() + if name: + t = filter(self.find_lambda(name),t) + t = unique_array(t) + + if (withVersion): + t2 = [] + for x in t: + t2 += self.settings.porttree.dbapi.cp_list(x) + t2 += self.settings.vartree.dbapi.cp_list(x) + t2 = unique_array(t2) + return self.geneticize_list(t2) + else: + return t; + + def find_all_world_packages (self, name = None): + world = filter(self.find_lambda(name), [x.get_cpv() for x in self.find_world_packages()[0]]) + world = unique_array(world) + return self.geneticize_list(world) + + def find_all_system_packages (self, name = None): + sys = filter(self.find_lambda(name), [x.get_cpv() for x in self.find_system_packages()[0]]) + sys = unique_array(sys) + return self.geneticize_list(sys) + + def list_categories (self, name = None): + categories = self.settings.settings.categories + return filter(self.find_lambda(name), categories) + + def split_cpv (self, cpv): + cpv = portage.dep_getcpv(cpv) + return portage.catpkgsplit(cpv) + + def sort_package_list(self, pkglist): + pkglist.sort(package.PortagePackage.compare_version) + return pkglist + + def reload_settings (self): + self.settings.load() + + def update_world (self, newuse = False, deep = False): + # read world file + world = open(portage.WORLD_FILE) + packages = [] + for line in world: + line = line.strip() + if len(line) == 0: continue # empty line + if line[0] == "#": continue # comment + packages.append(line) + world.close() + + # append system packages + packages.extend(unique_array([p.get_cp() for p in self.find_all_system_packages()])) + + def get_new_packages (packages): + new_packages = [] + for p in packages: + inst = self.find_installed_packages(p) + if len(inst) > 1: + myslots = set() + for i in inst: # get the slots of the installed packages + myslots.add(i.get_env_var("SLOT")) + + myslots.add(self.find_best_match(p).get_env_var("SLOT")) # add the slot of the best package in portage + for slot in myslots: + new_packages.append(\ + self.find_best(\ + [x.get_cpv() for x in self.find_packages("%s:%s" % (i.get_cp(), slot))]\ + )) + else: + new_packages.append(self.find_best_match(p)) + + return new_packages + + checked = [] + updating = [] + raw_checked = [] + def check (p, add_not_installed = True): + """Checks whether a package is updated or not.""" + if p.get_cp() in checked: return + else: checked.append(p.get_cp()) + + appended = False + tempDeep = False + + if not p.is_installed(): + oldList = self.find_installed_packages(p.get_slot_cp()) + if oldList: + old = oldList[0] # we should only have one package here - else it is a bug + else: + oldList = self.sort_package_list(self.find_installed_packages(p.get_cp())) + if not oldList: + if add_not_installed: + debug("Not found installed",p.get_cpv(),"==> adding") + oldList = [p] + else: + return + old = oldList[-1] + + updating.append((p, old)) + appended = True + p = old + + if newuse and p.is_installed() and p.is_in_system(): # there is no use to check newuse for a package which is not existing in portage anymore :) + + new_iuse = set(p.get_all_use_flags(installed = False)) # IUSE in the ebuild + old_iuse = set(p.get_all_use_flags(installed = True)) # IUSE in the vardb + + if new_iuse.symmetric_difference(old_iuse): # difference between new_iuse and old_iuse + tempDeep = True + if not appended: + updating.append((p,p)) + appended = True + + else: + old = set(p.get_installed_use_flags()) + new = set(p.get_settings("USE").split()) + + if new_iuse.intersection(new) != old_iuse.intersection(old): + tempDeep = True + if not appended: + updating.append((p,p)) + appended = True + + if deep or tempDeep: + states = [(["RDEPEND","PDEPEND"],True), (["DEPEND"], False)] + + for state in states: + for i in p.get_matched_dep_packages(state[0]): + if i not in raw_checked: + raw_checked.append(i) + bm = get_new_packages([i]) + if not bm: + debug("Bug? No best match could be found:",i) + else: + for pkg in bm: + if not pkg: continue + check(pkg, state[1]) + + for p in get_new_packages(packages): + if not p: continue # if a masked package is installed we have "None" here + check(p, True) + + return updating + + use_descs = {} + local_use_descs = {} + def get_use_desc (self, flag, package = None): + # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled. + + # fill cache if needed + if self.use_descs == {} or self.local_use_descs == {}: + # read use.desc + fd = open(self.settings.settings["PORTDIR"]+"/profiles/use.desc") + lines = fd.readlines() + fd.close() + for line in lines: + line = line.strip() + if line != "" and line[0] != '#': + fields = [x.strip() for x in line.split(" - ",1)] + if len(fields) == 2: + self.use_descs[fields[0]] = fields[1] + + # read use.local.desc + fd = open(self.settings.settings["PORTDIR"]+"/profiles/use.local.desc") + lines = fd.readlines() + fd.close() + for line in lines: + line = line.strip() + if line != "" and line[0] != '#': + fields = [x.strip() for x in line.split(":",1)] + if len(fields) == 2: + if not fields[0] in self.local_use_descs: # create + self.local_use_descs[fields[0]] = {} + subfields = [x.strip() for x in fields[1].split(" - ",1)] + if len(subfields) == 2: + self.local_use_descs[fields[0]][subfields[0]] = subfields[1] + + # start + desc = None + if flag in self.use_descs: + desc = self.use_descs[flag] + if package != None: + if package in self.local_use_descs: + if flag in self.local_use_descs[package]: + desc = self.local_use_descs[package][flag] + return desc diff --git a/portato/backend/portage_helper.py b/portato/backend/portage_helper.py deleted file mode 100644 index be4c76c..0000000 --- a/portato/backend/portage_helper.py +++ /dev/null @@ -1,467 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/backend/portage_helper.py -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2006-2007 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann <necoro@necoro.net> - -import re, os -import types -import portage - -from portage_util import unique_array - -from portato.backend import portage_settings -import package - -from portato.helper import debug - -def find_lambda (name): - """Returns the function needed by all the find_all_*-functions. Returns None if no name is given. - - @param name: name to build the function of - @type name: string - @returns: - 1. None if no name is given - 2. a lambda function - @rtype: function""" - - if name != None: - return lambda x: re.match(".*"+name+".*",x) - else: - return lambda x: True - -def geneticize_list (list_of_packages): - """Convertes a list of cpv's into L{backend.Package}s. - - @param list_of_packages: the list of packages - @type list_of_packages: list of gentoolkit.Packages - @returns: converted list - @rtype: backend.Package[]""" - - return [package.Package(x) for x in list_of_packages] - -def find_best (list): - """Returns the best package out of a list of packages. - - @param list: the list of packages to select from - @type list: string[] - @returns: the best package - @rtype: backend.Package""" - - return package.Package(portage.best(list)) - -def find_best_match (search_key, only_installed = False): - """Finds the best match in the portage tree. It does not find masked packages! - - @param search_key: the key to find in the portage tree - @type search_key: string - @param only_installed: if True, only installed packages are searched - @type only_installed: boolean - - @returns: the package found or None - @rtype: backend.Package""" - - t = None - if not only_installed: - t = portage_settings.porttree.dep_bestmatch(search_key) - else: - t = portage_settings.vartree.dep_bestmatch(search_key) - if t: - return package.Package(t) - return None - -def find_packages (search_key, masked=False): - """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. - - @param search_key: the key to look for - @type search_key: string - @param masked: if True, also look for masked packages - @type masked: boolean - - @returns: list of found packages - @rtype: backend.Package[]""" - - try: - if masked: - t = portage_settings.porttree.dbapi.xmatch("match-all", search_key) - t += portage_settings.vartree.dbapi.match(search_key) - else: - t = portage_settings.porttree.dbapi.match(search_key) - t += portage_settings.vartree.dbapi.match(search_key) - # catch the "ambigous package" Exception - except ValueError, e: - if type(e[0]) == types.ListType: - t = [] - for cp in e[0]: - if masked: - t += portage_settings.porttree.dbapi.xmatch("match-all", cp) - t += portage_settings.vartree.dbapi.match(cp) - else: - t += portage_settings.porttree.dbapi.match(cp) - t += portage_settings.vartree.dbapi.match(cp) - else: - raise ValueError(e) - # Make the list of packages unique - t = unique_array(t) - t.sort() - return geneticize_list(t) - -def find_installed_packages (search_key, masked=False): - """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. - - @param search_key: the key to look for - @type search_key: string - @param masked: if True, also look for masked packages - @type masked: boolean - - @returns: list of found packages - @rtype: backend.Package[]""" - - try: - t = portage_settings.vartree.dbapi.match(search_key) - # catch the "ambigous package" Exception - except ValueError, e: - if type(e[0]) == types.ListType: - t = [] - for cp in e[0]: - t += portage_settings.vartree.dbapi.match(cp) - else: - raise ValueError(e) - - return geneticize_list(t) - -def find_system_packages (): - """Looks for all packages saved as "system-packages". - - @returns: a tuple of (resolved_packages, unresolved_packages). - @rtype: (backend.Package[], backend.Package[])""" - - pkglist = portage_settings.settings.packages - resolved = [] - unresolved = [] - for x in pkglist: - cpv = x.strip() - if len(cpv) and cpv[0] == "*": - pkg = find_best_match(cpv) - if pkg: - resolved.append(pkg) - else: - unresolved.append(cpv) - return (resolved, geneticize_list(unresolved)) - -def find_world_packages (): - """Looks for all packages saved in the world-file. - - @returns: a tuple of (resolved_packages, unresolved_packages). - @rtype: (backend.Package[], backend.Package[])""" - - f = open(portage.WORLD_FILE) - pkglist = f.readlines() - resolved = [] - unresolved = [] - for x in pkglist: - cpv = x.strip() - if len(cpv) and cpv[0] != "#": - pkg = find_best_match(cpv) - if pkg: - resolved.append(pkg) - else: - unresolved.append(cpv) - return (resolved, geneticize_list(unresolved)) - -def find_all_installed_packages (name=None, withVersion=True): - """Finds all installed packages matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned - @type name: string or None - @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered - @type withVersion: boolean - - @returns: all packages/cp-strings found - @rtype: backend.Package[] or cp-string[]""" - - if withVersion: - t = portage_settings.vartree.dbapi.cpv_all() - if name: - t = filter(find_lambda(name),t) - return geneticize_list(t) - - else: - t = portage_settings.vartree.dbapi.cp_all() - if name: - t = filter(find_lambda(name),t) - return t - -def find_all_uninstalled_packages (name=None): - """Finds all uninstalled packages matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned - @type name: string or None - @returns: all packages found - @rtype: backend.Package[]""" - - alist = find_all_packages(name) - return geneticize_list([x for x in alist if not x.is_installed()]) - -def find_all_packages (name=None, withVersion=True): - """Finds all packages matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned - @type name: string or None - @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered - @type withVersion: boolean - - @returns: all packages/cp-strings found - @rtype: backend.Package[] or cp-string[]""" - - t = portage_settings.porttree.dbapi.cp_all() - t += portage_settings.vartree.dbapi.cp_all() - if name: - t = filter(find_lambda(name),t) - t = unique_array(t) - - if (withVersion): - t2 = [] - for x in t: - t2 += portage_settings.porttree.dbapi.cp_list(x) - t2 += portage_settings.vartree.dbapi.cp_list(x) - t2 = unique_array(t2) - return geneticize_list(t2) - else: - return t; - -def find_all_world_packages (name=None): - """Finds all world packages matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned - @type name: string or None - @returns: all packages found - @rtype: backend.Package[]""" - - world = filter(find_lambda(name), [x.get_cpv() for x in find_world_packages()[0]]) - world = unique_array(world) - return geneticize_list(world) - -def find_all_system_packages (name=None): - """Finds all system packages matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned - @type name: string or None - @returns: all packages found - @rtype: backend.Package[]""" - - sys = filter(find_lambda(name), [x.get_cpv() for x in find_system_packages()[0]]) - sys = unique_array(sys) - return geneticize_list(sys) - -def list_categories (name=None): - """Finds all categories matching a name or all if no name is specified. - - @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned - @type name: string or None - @returns: all categories found - @rtype: string[]""" - - categories = portage_settings.settings.categories - return filter(find_lambda(name), categories) - -def split_package_name (name): - """Splits a package name in its elements. - - @param name: name to split - @type name: string - @returns: list: [category, name, version, rev] whereby rev is "r0" if not specified in the name - @rtype: string[]""" - - r = portage.catpkgsplit(portage.dep_getcpv(name)) - if not r: - r = name.split("/") - if len(r) == 1: - return ["", name, "", "r0"] - else: - return r + ["", "r0"] - if r[0] == 'null': - r[0] = '' - return r - -def sort_package_list(pkglist): - """Sorts a package list in the same manner portage does. - - @param pkglist: list to sort - @type pkglist: Packages[]""" - - pkglist.sort(package.Package.compare_version) - return pkglist - -def reload_settings (): - """Reloads portage.""" - portage_settings.load() - -def update_world (newuse = False, deep = False): - """Calculates the packages to get updated in an update world. - - @param newuse: Checks if a use-flag has a different state then to install time. - @type newuse: boolean - @param deep: Not only check world packages but also there dependencies. - @type deep: boolean - @returns: a list containing of the tuple (new_package, old_package) - @rtype: (backend.Package, backend.Package)[]""" - - # read world file - world = open(portage.WORLD_FILE) - packages = [] - for line in world: - line = line.strip() - if len(line) == 0: continue # empty line - if line[0] == "#": continue # comment - packages.append(line) - world.close() - - # append system packages - packages.extend(unique_array([p.get_cp() for p in find_all_system_packages()])) - - def get_new_packages (packages): - new_packages = [] - for p in packages: - inst = find_installed_packages(p) - if len(inst) > 1: - myslots = set() - for i in inst: # get the slots of the installed packages - myslots.add(i.get_env_var("SLOT")) - - myslots.add(find_best_match(p).get_env_var("SLOT")) # add the slot of the best package in portage - for slot in myslots: - new_packages.append(\ - find_best(\ - [x.get_cpv() for x in find_packages("%s:%s" % (i.get_cp(), slot))]\ - )) - else: - new_packages.append(find_best_match(p)) - - return new_packages - - checked = [] - updating = [] - raw_checked = [] - def check (p, add_not_installed = True): - """Checks whether a package is updated or not.""" - if p.get_cp() in checked: return - else: checked.append(p.get_cp()) - - appended = False - tempDeep = False - - if not p.is_installed(): - oldList = find_installed_packages(p.get_slot_cp()) - if oldList: - old = oldList[0] # we should only have one package here - else it is a bug - else: - oldList = sort_package_list(find_installed_packages(p.get_cp())) - if not oldList: - if add_not_installed: - debug("Not found installed",p.get_cpv(),"==> adding") - oldList = [p] - else: - return - old = oldList[-1] - - updating.append((p, old)) - appended = True - p = old - - if newuse and p.is_installed() and p.is_in_system(): # there is no use to check newuse for a package which is not existing in portage anymore :) - - new_iuse = set(p.get_all_use_flags(installed = False)) # IUSE in the ebuild - old_iuse = set(p.get_all_use_flags(installed = True)) # IUSE in the vardb - - if new_iuse.symmetric_difference(old_iuse): # difference between new_iuse and old_iuse - tempDeep = True - if not appended: - updating.append((p,p)) - appended = True - - else: - old = set(p.get_installed_use_flags()) - new = set(p.get_settings("USE").split()) - - if new_iuse.intersection(new) != old_iuse.intersection(old): - tempDeep = True - if not appended: - updating.append((p,p)) - appended = True - - if deep or tempDeep: - states = [(["RDEPEND","PDEPEND"],True), (["DEPEND"], False)] - - for state in states: - for i in p.get_matched_dep_packages(state[0]): - if i not in raw_checked: - raw_checked.append(i) - bm = get_new_packages([i]) - if not bm: - debug("Bug? No best match could be found:",i) - else: - for pkg in bm: - if not pkg: continue - check(pkg, state[1]) - - for p in get_new_packages(packages): - if not p: continue # if a masked package is installed we have "None" here - check(p, True) - - return updating - -use_descs = {} -local_use_descs = {} -def get_use_desc (flag, package = None): - """Returns the description of a specific useflag or None if no desc was found. - If a package is given (in the <cat>/<name> format) the local use descriptions are searched too. - - @param flag: flag to get the description for - @type flag: string - @param package: name of a package: if given local use descriptions are searched too - @type package: cp-string - @returns: found description - @rtype: string""" - - # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled. - - # fill cache if needed - if use_descs == {} or local_use_descs == {}: - # read use.desc - fd = open(portage_settings.settings["PORTDIR"]+"/profiles/use.desc") - for line in fd.readlines(): - line = line.strip() - if line != "" and line[0] != '#': - fields = [x.strip() for x in line.split(" - ",1)] - if len(fields) == 2: - use_descs[fields[0]] = fields[1] - - # read use.local.desc - fd = open(portage_settings.settings["PORTDIR"]+"/profiles/use.local.desc") - for line in fd.readlines(): - line = line.strip() - if line != "" and line[0] != '#': - fields = [x.strip() for x in line.split(":",1)] - if len(fields) == 2: - if not fields[0] in local_use_descs: # create - local_use_descs[fields[0]] = {} - subfields = [x.strip() for x in fields[1].split(" - ",1)] - if len(subfields) == 2: - local_use_descs[fields[0]][subfields[0]] = subfields[1] - - # start - desc = None - if flag in use_descs: - desc = use_descs[flag] - if package != None: - if package in local_use_descs: - if flag in local_use_descs[package]: - desc = local_use_descs[package][flag] - return desc diff --git a/portato/backend/system_interface.py b/portato/backend/system_interface.py new file mode 100644 index 0000000..eb81401 --- /dev/null +++ b/portato/backend/system_interface.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +# +# File: portato/backend/system_interface.py +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2007 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +class SystemInterface: + + def split_cpv (self, cpv): + """Splits a cpv into all its parts. + + @param cpv: the cpv to split + @type cpv: string + @returns: the splitted cpv + @rtype: string[]""" + + raise NotImplementedError + + def find_best(self, list): + """Returns the best package out of a list of packages. + + @param list: the list of packages to select from + @type list: string[] + @returns: the best package + @rtype: backend.Package""" + + raise NotImplementedError + + def find_best_match (self, search_key, only_installed = False): + """Finds the best match in the portage tree. It does not find masked packages! + + @param search_key: the key to find in the portage tree + @type search_key: string + @param only_installed: if True, only installed packages are searched + @type only_installed: boolean + + @returns: the package found or None + @rtype: backend.Package""" + + raise NotImplementedError + + def find_packages (self, search_key, masked=False): + """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. + + @param search_key: the key to look for + @type search_key: string + @param masked: if True, also look for masked packages + @type masked: boolean + + @returns: list of found packages + @rtype: backend.Package[]""" + + raise NotImplementedError + + + def find_installed_packages (self, search_key, masked = False): + """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible. + + @param search_key: the key to look for + @type search_key: string + @param masked: if True, also look for masked packages + @type masked: boolean + + @returns: list of found packages + @rtype: backend.Package[]""" + + raise NotImplementedError + + def find_system_packages (self): + """Looks for all packages saved as "system-packages". + + @returns: a tuple of (resolved_packages, unresolved_packages). + @rtype: (backend.Package[], backend.Package[])""" + + raise NotImplementedError + + def find_world_packages (self): + """Looks for all packages saved in the world-file. + + @returns: a tuple of (resolved_packages, unresolved_packages). + @rtype: (backend.Package[], backend.Package[])""" + + raise NotImplementedError + + def find_all_installed_packages (self, name = None, withVersion = True): + """Finds all installed packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered + @type withVersion: boolean + + @returns: all packages/cp-strings found + @rtype: backend.Package[] or cp-string[]""" + + raise NotImplementedError + + def find_all_uninstalled_packages (self, name = None): + """Finds all uninstalled packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + raise NotImplementedError + + def find_all_packages (self, name = None, withVersion = True): + """Finds all packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered + @type withVersion: boolean + + @returns: all packages/cp-strings found + @rtype: backend.Package[] or cp-string[]""" + + raise NotImplementedError + + def find_all_world_packages (self, name = None): + """Finds all world packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + raise NotImplementedError + + def find_all_system_packages (self, name = None): + """Finds all system packages matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned + @type name: string or None + @returns: all packages found + @rtype: backend.Package[]""" + + raise NotImplementedError + + def list_categories (self, name = None): + """Finds all categories matching a name or all if no name is specified. + + @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned + @type name: string or None + @returns: all categories found + @rtype: string[]""" + + raise NotImplementedError + + def sort_package_list(self, pkglist): + """Sorts a package list in the same manner portage does. + + @param pkglist: list to sort + @type pkglist: Packages[]""" + + raise NotImplementedError + + def reload_settings (self): + """Reloads portage.""" + + raise NotImplementedError + + def update_world (self, newuse = False, deep = False): + """Calculates the packages to get updated in an update world. + + @param newuse: Checks if a use-flag has a different state then to install time. + @type newuse: boolean + @param deep: Not only check world packages but also there dependencies. + @type deep: boolean + @returns: a list containing of the tuple (new_package, old_package) + @rtype: (backend.Package, backend.Package)[]""" + + raise NotImplementedError + + def get_use_desc (self, flag, package = None): + """Returns the description of a specific useflag or None if no desc was found. + If a package is given (in the <cat>/<name> format) the local use descriptions are searched too. + + @param flag: flag to get the description for + @type flag: string + @param package: name of a package: if given local use descriptions are searched too + @type package: cp-string + @returns: found description + @rtype: string""" + + raise NotImplementedError + + def new_package (self, cpv): + """Returns an instance of the appropriate Package-Subclass. + + @param cpv: the cpv to create the package from + @type cpv: string + @returns: a new Package-object. + @rtype: Package""" + + raise NotImplementedError + + def get_config_path (self): + """Returns the actual path to the config files. + + @returns: the path, e.g. /etc/portage + @rtyoe: string""" + + raise NotImplementedError + + def get_world_file_path (self): + """Returns the path to the world file. + + @returns: the path of the world file + @rtype: string""" + + raise NotImplementedError + + def get_sync_command (self): + """Returns the command(s) to run for syncing. This can be overridden by the user. + + @returns: command to run + @rtype: string[]""" + + raise NotImplementedError + + def get_merge_command (self): + """Returns the command(s) to run for the merging. + + @returns: command to run + @rtype: string[]""" + + raise NotImplementedError + + def get_oneshot_option (self): + """Returns the options to append for marking a merge as "oneshot". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError + + def get_newuse_option (self): + """Returns the options to append for marking a merge as "newuse". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError + + def get_deep_option (self): + """Returns the options to append for marking a merge as "deep". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError + + def get_update_option (self): + """Returns the options to append for marking a merge as "update". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError + + def get_pretend_option (self): + """Returns the options to append for marking a merge as "pretend". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError + + def get_unmerge_option (self): + """Returns the options to append for marking a merge as "unmerge". + + @returns: option(s) to append + @rtype: string[]""" + + raise NotImplementedError |