summaryrefslogtreecommitdiff
path: root/portato/backend
diff options
context:
space:
mode:
Diffstat (limited to 'portato/backend')
-rw-r--r--portato/backend/__init__.py75
-rw-r--r--portato/backend/flags.py117
-rw-r--r--portato/backend/package.py447
-rw-r--r--portato/backend/portage/__init__.py14
-rw-r--r--portato/backend/portage/package.py234
-rw-r--r--portato/backend/portage/settings.py55
-rw-r--r--portato/backend/portage/system.py370
-rw-r--r--portato/backend/portage_helper.py467
-rw-r--r--portato/backend/system_interface.py283
9 files changed, 1055 insertions, 1007 deletions
diff --git a/portato/backend/__init__.py b/portato/backend/__init__.py
index a72923b..83ee1d6 100644
--- a/portato/backend/__init__.py
+++ b/portato/backend/__init__.py
@@ -10,67 +10,32 @@
#
# Written by René 'Necoro' Neumann <necoro@necoro.net>
-import os
-from threading import Lock
+from system_interface import SystemInterface
-# import portage
-import portage
+SYSTEM = "portage"
+_sys = None
-class PortageSettings:
- """Encapsulation of the portage settings.
-
- @ivar settings: portage settings
- @ivar settingslock: a simple Lock
- @ivar trees: a dictionary of the trees
- @ivar porttree: shortcut to C{trees[root]["porttree"]}
- @ivar vartree: shortcut to C{trees[root]["vartree"]}
- @ivar virtuals: shortcut to C{trees[root]["virtuals"]}"""
+class SystemWrapper (object, SystemInterface):
+ def __getattribute__ (self, name):
+ global _sys
+ return eval ("_sys.%s" % name)
- def __init__ (self):
- """Initializes the instance. Calls L{load()}."""
- self.settingslock = Lock()
- self.load()
-
- def load(self):
- """(Re)loads the portage settings and sets the variables."""
+def set_system (new_sys):
+ global SYSTEM
+ SYSTEM = new_sys
+ load_system()
- kwargs = {}
- for k, envvar in (("config_root", "PORTAGE_CONFIGROOT"), ("target_root", "ROOT")):
- kwargs[k] = os.environ.get(envvar, None)
- self.trees = portage.create_trees(trees=None, **kwargs)
+def load_system ():
+ global _sys
- self.settings = self.trees["/"]["vartree"].settings
+ if SYSTEM == "portage":
+ from portato.backend.portage import PortageSystem
+ _sys = PortageSystem ()
- for myroot in self.trees:
- if myroot != "/":
- self.settings = self.trees[myroot]["vartree"].settings
- break
+system = SystemWrapper()
- self.settings.unlock()
-
- root = self.settings["ROOT"]
-
- self.porttree = self.trees[root]["porttree"]
- self.vartree = self.trees[root]["vartree"]
- self.virtuals = self.trees[root]["virtuals"]
-
- portage.settings = None # we use our own one ...
-
-# portage tree vars
-portage_settings = PortageSettings()
-
-# for the moment ;)
-settingslock = portage_settings.settingslock
-settings = portage_settings.settings
-trees = portage_settings.trees
-porttree = portage_settings.porttree
-vartree = portage_settings.vartree
-virtuals = portage_settings.virtuals
+from exceptions import *
+from package import Package
-# this is set to "var/lib/portage/world" by default - so we add the leading /
-portage.WORLD_FILE = portage_settings.settings["ROOT"]+portage.WORLD_FILE
+load_system()
-# import our packages
-from exceptions import *
-from package import *
-from portage_helper import *
diff --git a/portato/backend/flags.py b/portato/backend/flags.py
index a7ba125..a7241ca 100644
--- a/portato/backend/flags.py
+++ b/portato/backend/flags.py
@@ -15,12 +15,9 @@ import os.path
from subprocess import Popen, PIPE # needed for grep
from portato.helper import *
-from portage_helper import split_package_name
+from portato.backend import system
import package
-import portage
-from portage_util import unique_array
-
CONFIG = {
"usefile" : "portato",
"maskfile" : "portato",
@@ -30,6 +27,59 @@ CONFIG = {
"testingPerVersion" : True
}
+class Constants:
+
+ def __init__ (self):
+ self.clear()
+
+ def clear (self):
+ self._use_path = None
+ self._mask_path = None
+ self._unmask_path = None
+ self._testing_path = None
+ self._use_path_is_dir = None
+ self._mask_path_is_dir = None
+ self._unmask_path_is_dir = None
+ self._testing_path_is_dir = None
+
+ def __get (self, name, path):
+ if self.__dict__[name] is None:
+ self.__dict__[name] = os.path.join(system.get_config_path(), path)
+
+ return self.__dict__[name]
+
+ def __is_dir(self, path):
+ name = "_" + path + "_is_dir"
+ if self.__dict__[name] is None:
+ self.__dict__[name] = os.path.isdir(self.__class__.__dict__[path](self))
+ return self.__dict__[name]
+
+ def use_path (self):
+ return self.__get("_use_path", "package.use")
+
+ def use_path_is_dir (self):
+ return self.__is_dir("use_path")
+
+ def mask_path (self):
+ return self.__get("_mask_path", "package.mask")
+
+ def mask_path_is_dir (self):
+ return self.__is_dir("mask_path")
+
+ def unmask_path (self):
+ return self.__get("_unmask_path", "package.unmask")
+
+ def unmask_path_is_dir (self):
+ return self.__is_dir("unmask_path")
+
+ def testing_path (self):
+ return self.__get("_testing_path", "package.keywords")
+
+ def testing_path_is_dir (self):
+ return self.__is_dir("testing_path")
+
+CONST = Constants()
+
### GENERAL PART ###
def grep (pkg, path):
@@ -43,7 +93,7 @@ def grep (pkg, path):
@rtype: string"""
if not isinstance(pkg, package.Package):
- pkg = package.Package(pkg) # assume it is a cpv or a gentoolkit.Package
+ pkg = system.new_package(pkg) # assume it is a cpv or a gentoolkit.Package
command = "egrep -x -n -r -H '^[<>!=~]{0,2}%s(-[0-9].*)?[[:space:]]?.*$' %s" # %s is replaced in the next line ;)
return Popen((command % (pkg.get_cp(), path)), shell = True, stdout = PIPE).communicate()[0].splitlines()
@@ -105,7 +155,7 @@ def generate_path (cpv, exp):
@returns: rendered path
@rtype string"""
- cat, pkg, ver, rev = split_package_name(cpv)
+ cat, pkg, ver, rev = system.split_cpv(cpv)
if exp.find("$(") != -1:
exp = exp.replace("$(cat)",cat).\
@@ -115,8 +165,6 @@ def generate_path (cpv, exp):
return exp
### USE FLAG PART ###
-USE_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.use")
-USE_PATH_IS_DIR = os.path.isdir(USE_PATH)
useFlags = {} # useFlags in the file
newUseFlags = {} # useFlags as we want them to be: format: cpv -> [(file, line, useflag, (true if removed from list / false if added))]
@@ -150,7 +198,7 @@ def set_use_flag (pkg, flag):
global useFlags, newUseFlags
if not isinstance(pkg, package.Package):
- pkg = package.Package(pkg) # assume cpv or gentoolkit.Package
+ pkg = system.new_package(pkg) # assume cpv or gentoolkit.Package
cpv = pkg.get_cpv()
invFlag = invert_use_flag(flag)
@@ -158,7 +206,7 @@ def set_use_flag (pkg, flag):
# if not saved in useFlags, get it by calling get_data() which calls grep()
data = None
if not cpv in useFlags:
- data = get_data(pkg, USE_PATH)
+ data = get_data(pkg, CONST.use_path())
useFlags[cpv] = data
else:
data = useFlags[cpv]
@@ -196,9 +244,9 @@ def set_use_flag (pkg, flag):
# create a new line
if not added:
- path = USE_PATH
- if USE_PATH_IS_DIR:
- path = os.path.join(USE_PATH, generate_path(cpv, CONFIG["usefile"]))
+ path = CONST.use_path()
+ if CONST.use_path_is_dir():
+ path = os.path.join(CONST.use_path(), generate_path(cpv, CONFIG["usefile"]))
try:
newUseFlags[cpv].remove((path, -1, invFlag, False))
except ValueError: # not in UseFlags
@@ -313,7 +361,7 @@ def write_use_flags ():
if CONFIG["usePerVersion"]: # add on a per-version-base
msg += "=%s %s\n" % (cpv, ' '.join(flagsToAdd))
else: # add on a per-package-base
- list = split_package_name(cpv)
+ list = system.split_cpv(cpv)
msg += "%s/%s %s\n" % (list[0], list[1], ' '.join(flagsToAdd))
if not file in file_cache:
f = open(file, "a")
@@ -332,11 +380,6 @@ def write_use_flags ():
newUseFlags = {}
### MASKING PART ###
-MASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.mask")
-UNMASK_PATH = os.path.join(portage.USER_CONFIG_PATH,"package.unmask")
-MASK_PATH_IS_DIR = os.path.isdir(MASK_PATH)
-UNMASK_PATH_IS_DIR = os.path.isdir(UNMASK_PATH)
-
new_masked = {}
new_unmasked = {}
@@ -351,7 +394,7 @@ def set_masked (pkg, masked = True):
global new_masked, newunmasked
if not isinstance(pkg, package.Package):
- pkg = package.Package(pkg)
+ pkg = system.new_package(pkg)
cpv = pkg.get_cpv()
@@ -363,13 +406,13 @@ def set_masked (pkg, masked = True):
if masked:
link_neq = new_masked
link_eq = new_unmasked
- path = UNMASK_PATH
+ path = CONST.unmask_path()
else:
link_neq = new_unmasked
link_eq = new_masked
- path = MASK_PATH
+ path = CONST.mask_path()
- copy = link_eq[cpv]
+ copy = link_eq[cpv][:]
for file, line in copy:
if line == "-1":
link_eq[cpv].remove((file, line))
@@ -393,11 +436,11 @@ def set_masked (pkg, masked = True):
if done: return
if masked:
- is_dir = MASK_PATH_IS_DIR
- path = MASK_PATH
+ is_dir = CONST.mask_path_is_dir()
+ path = CONST.mask_path()
else:
- is_dir = UNMASK_PATH_IS_DIR
- path = UNMASK_PATH
+ is_dir = CONST.unmask_path_is_dir()
+ path = CONST.unmask_path()
if is_dir:
file = os.path.join(path, generate_path(cpv, CONFIG["usefile"]))
@@ -426,9 +469,9 @@ def new_masking_status (cpv):
if isinstance(cpv, package.Package):
cpv = cpv.get_cpv()
- if cpv in new_masked and new_masked[cpv]:
+ if cpv in new_masked and new_masked[cpv] != []:
return "masked"
- elif cpv in new_unmasked and new_unmasked[cpv]:
+ elif cpv in new_unmasked and new_unmasked[cpv] != []:
return "unmasked"
else: return None
@@ -444,7 +487,7 @@ def write_masked ():
if CONFIG["maskPerVersion"]:
msg += "=%s\n" % cpv
else:
- list = split_package_name(cpv)
+ list = system.split_cpv(cpv)
msg += "%s/%s\n" % (list[0],list[1])
if not file in file_cache:
f = open(file, "a")
@@ -497,8 +540,6 @@ def write_masked ():
new_unmasked = {}
### TESTING PART ###
-TESTING_PATH = os.path.join(portage.USER_CONFIG_PATH, "package.keywords")
-TESTING_PATH_IS_DIR = os.path.isdir(TESTING_PATH)
newTesting = {}
arch = ""
@@ -531,7 +572,7 @@ def set_testing (pkg, enable):
global arch, newTesting
if not isinstance(pkg, package.Package):
- pkg = package.Package(pkg)
+ pkg = system.new_package(pkg)
arch = pkg.get_settings("ARCH")
cpv = pkg.get_cpv()
@@ -546,16 +587,16 @@ def set_testing (pkg, enable):
return
if not enable:
- test = get_data(pkg, TESTING_PATH)
+ test = get_data(pkg, CONST.testing_path())
debug("data (test): "+str(test))
for file, line, crit, flags in test:
if pkg.matches(crit) and flags[0] == "~"+arch:
newTesting[cpv].append((file, line))
else:
- if TESTING_PATH_IS_DIR:
- file = os.path.join(TESTING_PATH, CONFIG["testingfile"])
+ if CONST.testing_path_is_dir():
+ file = os.path.join(CONST.testing_path(), CONFIG["testingfile"])
else:
- file = TESTING_PATH
+ file = CONST.testing_path()
newTesting[cpv].append((file, "-1"))
newTesting[cpv] = unique_array(newTesting[cpv])
@@ -574,7 +615,7 @@ def write_testing ():
if CONFIG["testingPerVersion"]:
msg += "=%s ~%s\n" % (cpv, arch)
else:
- list = split_package_name(cpv)
+ list = system.split_cpv(cpv)
msg += "%s/%s ~%s\n" % (list[0],list[1],arch)
if not file in file_cache:
f = open(file, "a")
diff --git a/portato/backend/package.py b/portato/backend/package.py
deleted file mode 100644
index 6ba47fe..0000000
--- a/portato/backend/package.py
+++ /dev/null
@@ -1,447 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# File: portato/backend/package.py
-# This file is part of the Portato-Project, a graphical portage-frontend.
-#
-# Copyright (C) 2006-2007 René 'Necoro' Neumann
-# This is free software. You may redistribute copies of it under the terms of
-# the GNU General Public License version 2.
-# There is NO WARRANTY, to the extent permitted by law.
-#
-# Written by René 'Necoro' Neumann <necoro@necoro.net>
-
-from portato.backend import portage_settings
-from portato.helper import *
-from portage_helper import *
-from exceptions import *
-import flags
-
-import portage, portage_dep
-from portage_util import unique_array
-
-import types
-import os.path
-
-class Package:
- """This is a class abstracting a normal package which can be installed."""
-
- def __init__ (self, cpv):
- """Constructor.
-
- @param cpv: The cpv which describes the package to create.
- @type cpv: string (cat/pkg-ver)"""
-
- self._cpv = cpv
- self._scpv = portage.catpkgsplit(self._cpv)
-
- if not self._scpv:
- raise ValueError("invalid cpv: %s" % cpv)
-
- self._settings = portage_settings.settings
- self._settingslock = portage_settings.settingslock
-
- self._trees = portage_settings.trees
-
- self.forced_flags = set()
- self.forced_flags.update(self._settings.usemask)
- self.forced_flags.update(self._settings.useforce)
-
- try:
- self._status = portage.getmaskingstatus(self.get_cpv(), settings = self._settings)
- except KeyError: # package is not located in the system
- self._status = None
-
- def is_installed(self):
- """Returns true if this package is installed (merged)
- @rtype: boolean"""
-
- return portage_settings.vartree.dbapi.cpv_exists(self._cpv)
-
- def is_overlay(self):
- """Returns true if the package is in an overlay.
- @rtype: boolean"""
-
- dir,ovl = portage_settings.porttree.dbapi.findname2(self._cpv)
- return ovl != self._settings["PORTDIR"]
-
- def is_in_system (self):
- """Returns False if the package could not be found in the portage system.
-
- @return: True if in portage system; else False
- @rtype: boolean"""
-
- return (self._status != None)
-
- def is_missing_keyword(self):
- """Returns True if the package is missing the needed keyword.
-
- @return: True if keyword is missing; else False
- @rtype: boolean"""
-
- if self._status and "missing keyword" in self._status:
- return True
- return False
-
- def is_testing(self, use_keywords = False):
- """Checks whether a package is marked as testing.
-
- @param use_keywords: Controls whether possible keywords are taken into account or not.
- @type use_keywords: boolean
- @returns: True if the package is marked as testing; else False.
- @rtype: boolean"""
-
- testArch = "~" + self.get_settings("ARCH")
- if not use_keywords: # keywords are NOT taken into account
- if testArch in self.get_env_var("KEYWORDS").split():
- return True
- return False
-
- else: # keywords are taken into account
- status = flags.new_testing_status(self.get_cpv())
- if status is None: # we haven't changed it in any way
- if self._status and testArch+" keyword" in self._status:
- return True
- return False
- else:
- return status
-
- def set_testing(self, enable = True):
- """Sets the actual testing status of the package.
-
- @param enable: if True it is masked as stable; if False it is marked as testing
- @type enable: boolean"""
-
- flags.set_testing(self, enable)
-
- def remove_new_testing(self):
- """Removes possible changed testing status."""
-
- flags.remove_new_testing(self.get_cpv())
-
- def is_masked (self):
- """Returns True if either masked by package.mask or by profile.
-
- @returns: True if masked / False otherwise
- @rtype: boolean"""
-
- status = flags.new_masking_status(self.get_cpv())
- if status != None: # we have locally changed it
- if status == "masked": return True
- elif status == "unmasked": return False
- else:
- debug("BUG in flags.new_masking_status. It returns",status)
- else: # we have not touched the status
- if self._status and ("profile" in self._status or "package.mask" in self._status):
- return True
- return False
-
- def set_masked (self, masking = False):
- """Sets the masking status of the package.
-
- @param masking: if True: mask it; if False: unmask it
- @type masking: boolean"""
-
- flags.set_masked(self, masked = masking)
-
- def remove_new_masked (self):
- """Removes possible changed masking status."""
-
- flags.remove_new_masked(self.get_cpv())
-
- def get_all_use_flags (self, installed = False):
- """Returns a list of _all_ useflags for this package, i.e. all useflags you can set for this package.
-
- @param installed: do not take the ones stated in the ebuild, but the ones it has been installed with
- @type installed: boolean
-
- @returns: list of use-flags
- @rtype: string[]"""
-
- if installed or not self.is_in_system():
- tree = portage_settings.vartree
- else:
- tree = portage_settings.porttree
-
- return list(set(self.get_env_var("IUSE", tree = tree).split()).difference(self.forced_flags))
-
- def get_installed_use_flags (self):
- """Returns a list of the useflags enabled at installation time. If package is not installed, it returns an empty list.
-
- @returns: list of useflags enabled at installation time or an empty list
- @rtype: string[]"""
-
- if self.is_installed():
- uses = set(self.get_use_flags().split()) # all set at installation time
- iuses = set(self.get_all_use_flags(installed=True)) # all you can set for the package
-
- return list(uses.intersection(iuses))
- else:
- return []
-
- def get_new_use_flags (self):
- """Returns a list of the new useflags, i.e. these flags which are not written to the portage-system yet.
-
- @returns: list of flags or []
- @rtype: string[]"""
-
- return flags.get_new_use_flags(self)
-
- def get_actual_use_flags (self):
- """This returns the result of installed_use_flags + new_use_flags. If the package is not installed, it returns only the new flags.
-
- @return: list of flags
- @rtype: string[]"""
-
- if self.is_installed():
- i_flags = self.get_installed_use_flags()
- for f in self.get_new_use_flags():
-
- if flags.invert_use_flag(f) in i_flags:
- i_flags.remove(flags.invert_use_flag(f))
-
- elif f not in i_flags:
- i_flags.append(f)
- return i_flags
- else:
- return self.get_new_use_flags()
-
- def set_use_flag (self, flag):
- """Set a use-flag.
-
- @param flag: the flag to set
- @type flag: string"""
-
- flags.set_use_flag(self, flag)
-
- def remove_new_use_flags (self):
- """Remove all the new use-flags."""
-
- flags.remove_new_use_flags(self)
-
- def is_use_flag_enabled (self, flag):
- """Looks whether a given useflag is enabled for the package, taking all options
- (ie. even the new flags) into account.
-
- @param flag: the flag to check
- @type flag: string
- @returns: True or False
- @rtype: bool"""
-
- if self.is_installed() and flag in self.get_actual_use_flags(): # flags set during install
- return True
-
- elif (not self.is_installed()) and flag in self.get_settings("USE").split() \
- and not flags.invert_use_flag(flag) in self.get_new_use_flags(): # flags that would be set
- return True
-
- elif flag in self.get_new_use_flags():
- return True
-
- else:
- return False
-
- def get_matched_dep_packages (self, depvar):
- """This function looks for all dependencies which are resolved. In normal case it makes only sense for installed packages, but should work for uninstalled ones too.
-
- @returns: unique list of dependencies resolved (with elements like "<=net-im/foobar-1.2.3")
- @rtype: string[]
-
- @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()"""
-
- # change the useflags, because we have internally changed some, but not made them visible for portage
- newUseFlags = self.get_new_use_flags()
- actual = self.get_settings("USE").split()
- if newUseFlags:
- for u in newUseFlags:
- if u[0] == "-" and flags.invert_use_flag(u) in actual:
- actual.remove(flags.invert_use_flag(u))
- elif u not in actual:
- actual.append(u)
-
- depstring = ""
- for d in depvar:
- depstring += self.get_env_var(d)+" "
-
- portage_dep._dep_check_strict = False
- deps = portage.dep_check(depstring, None, self._settings, myuse = actual, trees = self._trees)
- portage_dep._dep_check_strict = True
-
- if not deps: # FIXME: what is the difference to [1, []] ?
- return []
-
- if deps[0] == 0: # error
- raise DependencyCalcError, deps[1]
-
- deps = deps[1]
-
- retlist = []
-
- for d in deps:
- if not d[0] == "!":
- retlist.append(d)
-
- return retlist
-
- def get_dep_packages (self, depvar = ["RDEPEND", "PDEPEND", "DEPEND"]):
- """Returns a cpv-list of packages on which this package depends and which have not been installed yet. This does not check the dependencies in a recursive manner.
-
- @returns: list of cpvs on which the package depend
- @rtype: string[]
-
- @raises portato.BlockedException: when a package in the dependency-list is blocked by an installed one
- @raises portato.PackageNotFoundException: when a package in the dependency list could not be found in the system
- @raises portato.DependencyCalcError: when an error occured during executing portage.dep_check()"""
-
- dep_pkgs = [] # the package list
-
- # change the useflags, because we have internally changed some, but not made them visible for portage
- newUseFlags = self.get_new_use_flags()
- actual = self.get_settings("USE").split()
- if newUseFlags:
- for u in newUseFlags:
- if u[0] == "-" and flags.invert_use_flag(u) in actual:
- actual.remove(flags.invert_use_flag(u))
- elif u not in actual:
- actual.append(u)
-
- depstring = ""
- for d in depvar:
- depstring += self.get_env_var(d)+" "
-
- # let portage do the main stuff ;)
- # pay attention to any changes here
- deps = portage.dep_check (depstring, portage_settings.vartree.dbapi, self._settings, myuse = actual, trees = self._trees)
-
- if not deps: # FIXME: what is the difference to [1, []] ?
- return []
-
- if deps[0] == 0: # error
- raise DependencyCalcError, deps[1]
-
- deps = deps[1]
-
- for dep in deps:
- if dep[0] == '!': # blocking sth
- dep = dep[1:]
- if dep != self.get_cp(): # not cpv, because a version might explicitly block another one
- blocked = find_installed_packages(dep)
- if blocked != []:
- raise BlockedException, (self.get_cpv(), blocked[0].get_cpv())
- continue # finished with the blocking one -> next
-
- pkg = find_best_match(dep)
- if not pkg: # try to find masked ones
- list = find_packages(dep, masked = True)
- if not list:
- raise PackageNotFoundException, dep
-
- list = sort_package_list(list)
- done = False
- for i in range(len(list)-1,0,-1):
- p = list[i]
- if not p.is_masked():
- dep_pkgs.append(p.get_cpv())
- done = True
- break
- if not done:
- dep_pkgs.append(list[-1].get_cpv())
- else:
- dep_pkgs.append(pkg.get_cpv())
-
- return dep_pkgs
-
- def get_cpv(self):
- """Returns full Category/Package-Version string"""
- return self._cpv
-
- def get_cp (self):
- """Returns the cp-string.
-
- @returns: category/package.
- @rtype: string"""
-
- return self.get_category()+"/"+self.get_name()
-
- def get_slot_cp (self):
-
- return ("%s:%s" % (self.get_cp(), self.get_env_var("SLOT")))
-
- def get_name(self):
- """Returns base name of package, no category nor version"""
- return self._scpv[1]
-
- def get_version(self):
- """Returns version of package, with revision number"""
- v = self._scpv[2]
- if self._scpv[3] != "r0":
- v += "-" + self._scpv[3]
- return v
-
- def get_category(self):
- """Returns category of package"""
- return self._scpv[0]
-
- def get_settings(self, key):
- """Returns the value of the given key for this package (useful
- for package.* files)."""
- self._settingslock.acquire()
- self._settings.setcpv(self._cpv)
- v = self._settings[key]
- self._settingslock.release()
- return v
-
- def get_ebuild_path(self):
- """Returns the complete path to the .ebuild file"""
- return portage_settings.porttree.dbapi.findname(self._cpv)
-
- def get_package_path(self):
- """Returns the path to where the ChangeLog, Manifest, .ebuild files reside"""
- p = self.get_ebuild_path()
- sp = p.split("/")
- if len(sp):
- import string
- return string.join(sp[:-1],"/")
-
- def get_env_var(self, var, tree = None):
- """Returns one of the predefined env vars DEPEND, RDEPEND, SRC_URI,...."""
- if not tree:
- mytree = portage_settings.vartree
- if not self.is_installed():
- mytree = portage_settings.porttree
- else:
- mytree = tree
- r = mytree.dbapi.aux_get(self._cpv,[var])
-
- return r[0]
-
- def get_use_flags(self):
- if self.is_installed():
- return self.get_env_var("USE", tree = portage_settings.vartree)
- else: return ""
-
- def compare_version(self,other):
- """Compares this package's version to another's CPV; returns -1, 0, 1"""
- v1 = self._scpv
- v2 = portage.catpkgsplit(other.get_cpv())
- # if category is different
- if v1[0] != v2[0]:
- return cmp(v1[0],v2[0])
- # if name is different
- elif v1[1] != v2[1]:
- return cmp(v1[1],v2[1])
- # Compare versions
- else:
- return portage.pkgcmp(v1[1:],v2[1:])
-
- def matches (self, criterion):
- """This checks, whether this package matches a specific verisioning criterion - e.g.: "<=net-im/foobar-1.2".
-
- @param criterion: the criterion to match against
- @type criterion: string
- @returns: True if matches; False if not
- @rtype: boolean"""
-
- if portage.match_from_list(criterion, [self.get_cpv()]) == []:
- return False
- else:
- return True
diff --git a/portato/backend/portage/__init__.py b/portato/backend/portage/__init__.py
new file mode 100644
index 0000000..e21278c
--- /dev/null
+++ b/portato/backend/portage/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/portage/__init__.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2007 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+from system import PortageSystem
+from package import PortagePackage
diff --git a/portato/backend/portage/package.py b/portato/backend/portage/package.py
new file mode 100644
index 0000000..74a5107
--- /dev/null
+++ b/portato/backend/portage/package.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/portage/package.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2007 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+from portato.helper import *
+from portato.backend.exceptions import *
+from portato.backend import flags, Package, system
+
+import portage, portage_dep
+from portage_util import unique_array
+
+import os.path
+
+class PortagePackage (Package):
+ """This is a class abstracting a normal package which can be installed for the portage-system."""
+
+ def __init__ (self, cpv):
+ """Constructor.
+
+ @param cpv: The cpv which describes the package to create.
+ @type cpv: string (cat/pkg-ver)"""
+
+ Package.__init__(self, cpv)
+ self._settings = system.settings
+ self._settingslock = system.settings.settingslock
+
+ self._trees = system.settings.trees
+
+ self.forced_flags = set()
+ self.forced_flags.update(self._settings.settings.usemask)
+ self.forced_flags.update(self._settings.settings.useforce)
+
+ try:
+ self._status = portage.getmaskingstatus(self.get_cpv(), settings = self._settings.settings)
+ except KeyError: # package is not located in the system
+ self._status = None
+
+ def is_installed(self):
+ return self._settings.vartree.dbapi.cpv_exists(self._cpv)
+
+ def is_overlay(self):
+ dir,ovl = self._settings.porttree.dbapi.findname2(self._cpv)
+ return ovl != self._settings.settings["PORTDIR"]
+
+ def is_in_system (self):
+ return (self._status != None)
+
+ def is_missing_keyword(self):
+ if self._status and "missing keyword" in self._status:
+ return True
+ return False
+
+ def is_testing(self, use_keywords = False):
+ testArch = "~" + self.get_settings("ARCH")
+ if not use_keywords: # keywords are NOT taken into account
+ if testArch in self.get_env_var("KEYWORDS").split():
+ return True
+ return False
+
+ else: # keywords are taken into account
+ status = flags.new_testing_status(self.get_cpv())
+ if status is None: # we haven't changed it in any way
+ if self._status and testArch+" keyword" in self._status:
+ return True
+ return False
+ else:
+ return status
+
+ def is_masked (self):
+ status = flags.new_masking_status(self.get_cpv())
+ if status != None: # we have locally changed it
+ if status == "masked": return True
+ elif status == "unmasked": return False
+ else:
+ debug("BUG in flags.new_masking_status. It returns",status)
+ else: # we have not touched the status
+ if self._status and ("profile" in self._status or "package.mask" in self._status):
+ return True
+ return False
+
+ def get_all_use_flags (self, installed = False):
+ if installed or not self.is_in_system():
+ tree = self._settings.vartree
+ else:
+ tree = self._settings.porttree
+
+ return list(set(self.get_env_var("IUSE", tree = tree).split()).difference(self.forced_flags))
+
+ def get_matched_dep_packages (self, depvar):
+ # change the useflags, because we have internally changed some, but not made them visible for portage
+ newUseFlags = self.get_new_use_flags()
+ actual = self.get_settings("USE").split()
+ if newUseFlags:
+ for u in newUseFlags:
+ if u[0] == "-" and flags.invert_use_flag(u) in actual:
+ actual.remove(flags.invert_use_flag(u))
+ elif u not in actual:
+ actual.append(u)
+
+ depstring = ""
+ for d in depvar:
+ depstring += self.get_env_var(d)+" "
+
+ portage_dep._dep_check_strict = False
+ deps = portage.dep_check(depstring, None, self._settings.settings, myuse = actual, trees = self._trees)
+ portage_dep._dep_check_strict = True
+
+ if not deps: # FIXME: what is the difference to [1, []] ?
+ return []
+
+ if deps[0] == 0: # error
+ raise DependencyCalcError, deps[1]
+
+ deps = deps[1]
+
+ retlist = []
+
+ for d in deps:
+ if not d[0] == "!":
+ retlist.append(d)
+
+ return retlist
+
+ def get_dep_packages (self, depvar = ["RDEPEND", "PDEPEND", "DEPEND"]):
+ dep_pkgs = [] # the package list
+
+ # change the useflags, because we have internally changed some, but not made them visible for portage
+ newUseFlags = self.get_new_use_flags()
+ actual = self.get_settings("USE").split()
+ if newUseFlags:
+ for u in newUseFlags:
+ if u[0] == "-" and flags.invert_use_flag(u) in actual:
+ actual.remove(flags.invert_use_flag(u))
+ elif u not in actual:
+ actual.append(u)
+
+ depstring = ""
+ for d in depvar:
+ depstring += self.get_env_var(d)+" "
+
+ # let portage do the main stuff ;)
+ # pay attention to any changes here
+ deps = portage.dep_check (depstring, self._settings.vartree.dbapi, self._settings.settings, myuse = actual, trees = self._trees)
+
+ if not deps: # FIXME: what is the difference to [1, []] ?
+ return []
+
+ if deps[0] == 0: # error
+ raise DependencyCalcError, deps[1]
+
+ deps = deps[1]
+
+ for dep in deps:
+ if dep[0] == '!': # blocking sth
+ dep = dep[1:]
+ if dep != self.get_cp(): # not cpv, because a version might explicitly block another one
+ blocked = system.find_installed_packages(dep)
+ if blocked != []:
+ raise BlockedException, (self.get_cpv(), blocked[0].get_cpv())
+ continue # finished with the blocking one -> next
+
+ pkg = system.find_best_match(dep)
+ if not pkg: # try to find masked ones
+ list = system.find_packages(dep, masked = True)
+ if not list:
+ raise PackageNotFoundException, dep
+
+ list = system.sort_package_list(list)
+ done = False
+ for i in range(len(list)-1,0,-1):
+ p = list[i]
+ if not p.is_masked():
+ dep_pkgs.append(p.get_cpv())
+ done = True
+ break
+ if not done:
+ dep_pkgs.append(list[-1].get_cpv())
+ else:
+ dep_pkgs.append(pkg.get_cpv())
+
+ return dep_pkgs
+
+ def get_settings(self, key):
+ self._settingslock.acquire()
+ self._settings.settings.setcpv(self._cpv)
+ v = self._settings.settings[key]
+ self._settingslock.release()
+ return v
+
+ def get_ebuild_path(self):
+ return self._settings.porttree.dbapi.findname(self._cpv)
+
+ def get_env_var(self, var, tree = None):
+ if not tree:
+ mytree = self._settings.vartree
+ if not self.is_installed():
+ mytree = self._settings.porttree
+ else:
+ mytree = tree
+ r = mytree.dbapi.aux_get(self._cpv,[var])
+
+ return r[0]
+
+ def get_use_flags(self):
+ if self.is_installed():
+ return self.get_env_var("USE", tree = self._settings.vartree)
+ else: return ""
+
+ def compare_version(self,other):
+ v1 = self._scpv
+ v2 = portage.catpkgsplit(other.get_cpv())
+ # if category is different
+ if v1[0] != v2[0]:
+ return cmp(v1[0],v2[0])
+ # if name is different
+ elif v1[1] != v2[1]:
+ return cmp(v1[1],v2[1])
+ # Compare versions
+ else:
+ return portage.pkgcmp(v1[1:],v2[1:])
+
+ def matches (self, criterion):
+ if portage.match_from_list(criterion, [self.get_cpv()]) == []:
+ return False
+ else:
+ return True
diff --git a/portato/backend/portage/settings.py b/portato/backend/portage/settings.py
new file mode 100644
index 0000000..5d3aef0
--- /dev/null
+++ b/portato/backend/portage/settings.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/portage/settings.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2007 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+import os
+import portage
+from threading import Lock
+
+class PortageSettings:
+ """Encapsulation of the portage settings.
+
+ @ivar settings: portage settings
+ @ivar settingslock: a simple Lock
+ @ivar trees: a dictionary of the trees
+ @ivar porttree: shortcut to C{trees[root]["porttree"]}
+ @ivar vartree: shortcut to C{trees[root]["vartree"]}
+ @ivar virtuals: shortcut to C{trees[root]["virtuals"]}"""
+
+ def __init__ (self):
+ """Initializes the instance. Calls L{load()}."""
+ self.settingslock = Lock()
+ self.load()
+
+ def load(self):
+ """(Re)loads the portage settings and sets the variables."""
+
+ kwargs = {}
+ for k, envvar in (("config_root", "PORTAGE_CONFIGROOT"), ("target_root", "ROOT")):
+ kwargs[k] = os.environ.get(envvar, None)
+ self.trees = portage.create_trees(trees=None, **kwargs)
+
+ self.settings = self.trees["/"]["vartree"].settings
+
+ for myroot in self.trees:
+ if myroot != "/":
+ self.settings = self.trees[myroot]["vartree"].settings
+ break
+
+ self.settings.unlock()
+
+ root = self.settings["ROOT"]
+
+ self.porttree = self.trees[root]["porttree"]
+ self.vartree = self.trees[root]["vartree"]
+ self.virtuals = self.trees[root]["virtuals"]
+
+ portage.settings = None # we use our own one ...
diff --git a/portato/backend/portage/system.py b/portato/backend/portage/system.py
new file mode 100644
index 0000000..1cfda62
--- /dev/null
+++ b/portato/backend/portage/system.py
@@ -0,0 +1,370 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/portage/system.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2007 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+import re, os
+import types
+import portage
+
+import package
+from settings import PortageSettings
+from portato.helper import debug, unique_array
+from portato.backend.system_interface import SystemInterface
+
+class PortageSystem (SystemInterface):
+
+ def __init__ (self):
+
+ self.settings = PortageSettings()
+ portage.WORLD_FILE = self.settings.settings["ROOT"]+portage.WORLD_FILE
+
+ def new_package (self, cpv):
+ return package.PortagePackage(cpv)
+
+ def get_config_path (self):
+ return portage.USER_CONFIG_PATH
+
+ def get_world_file_path (self):
+ return portage.WORLD_FILE
+
+ def get_merge_command (self):
+ return ["/usr/bin/python", "/usr/bin/emerge"]
+
+ def get_sync_command (self):
+ return self.get_merge_command()+["--sync"]
+
+ def get_oneshot_option (self):
+ return ["--oneshot"]
+
+ def get_newuse_option (self):
+ return ["--newuse"]
+
+ def get_deep_option (self):
+ return ["--deep"]
+
+ def get_update_option (self):
+ return ["--update"]
+
+ def get_pretend_option (self):
+ return ["--pretend", "--verbose"]
+
+ def get_unmerge_option (self):
+ return ["--unmerge"]
+
+ def find_lambda (self, name):
+ """Returns the function needed by all the find_all_*-functions. Returns None if no name is given.
+
+ @param name: name to build the function of
+ @type name: string
+ @returns:
+ 1. None if no name is given
+ 2. a lambda function
+ @rtype: function"""
+
+ if name != None:
+ return lambda x: re.match(".*"+name+".*",x)
+ else:
+ return lambda x: True
+
+ def geneticize_list (self, list_of_packages):
+ """Convertes a list of cpv's into L{backend.Package}s.
+
+ @param list_of_packages: the list of packages
+ @type list_of_packages: list of gentoolkit.Packages
+ @returns: converted list
+ @rtype: PortagePackage[]"""
+
+ return [package.PortagePackage(x) for x in list_of_packages]
+
+ def find_best (self, list):
+ return package.PortagePackage(portage.best(list))
+
+ def find_best_match (self, search_key, only_installed = False):
+ t = None
+ if not only_installed:
+ t = self.settings.porttree.dep_bestmatch(search_key)
+ else:
+ t = self.settings.vartree.dep_bestmatch(search_key)
+ if t:
+ return package.PortagePackage(t)
+ return None
+
+ def find_packages (self, search_key, masked=False):
+ try:
+ if masked:
+ t = self.settings.porttree.dbapi.xmatch("match-all", search_key)
+ t += self.settings.vartree.dbapi.match(search_key)
+ else:
+ t = self.settings.porttree.dbapi.match(search_key)
+ t += self.settings.vartree.dbapi.match(search_key)
+ # catch the "ambigous package" Exception
+ except ValueError, e:
+ if type(e[0]) == types.ListType:
+ t = []
+ for cp in e[0]:
+ if masked:
+ t += self.settings.porttree.dbapi.xmatch("match-all", cp)
+ t += self.settings.vartree.dbapi.match(cp)
+ else:
+ t += self.settings.porttree.dbapi.match(cp)
+ t += self.settings.vartree.dbapi.match(cp)
+ else:
+ raise ValueError(e)
+ # Make the list of packages unique
+ t = unique_array(t)
+ t.sort()
+ return self.geneticize_list(t)
+
+ def find_installed_packages (self, search_key, masked = False):
+ try:
+ t = self.settings.vartree.dbapi.match(search_key)
+ # catch the "ambigous package" Exception
+ except ValueError, e:
+ if type(e[0]) == types.ListType:
+ t = []
+ for cp in e[0]:
+ t += self.settings.vartree.dbapi.match(cp)
+ else:
+ raise ValueError(e)
+
+ return self.geneticize_list(t)
+
+ def __find_resolved_unresolved (self, list, check):
+ resolved = []
+ unresolved = []
+ for x in list:
+ cpv = x.strip()
+ if len(cpv) and check(cpv):
+ pkg = self.find_best_match(cpv)
+ if pkg:
+ resolved.append(pkg)
+ else:
+ unresolved.append(cpv)
+ return (resolved, self.geneticize_list(unresolved))
+
+ def find_system_packages (self):
+ pkglist = self.settings.settings.packages
+
+ return self.__find_resolved_unresolved(pkglist, lambda cpv: cpv[0] == "*")
+
+ def find_world_packages (self):
+ f = open(portage.WORLD_FILE)
+ pkglist = f.readlines()
+ f.close()
+
+ return self.__find_resolved_unresolved(pkglist, lambda cpv: cpv[0] != "#")
+
+ def find_all_installed_packages (self, name = None, withVersion=True):
+ if withVersion:
+ t = self.settings.vartree.dbapi.cpv_all()
+ if name:
+ t = filter(self.find_lambda(name),t)
+ return self.geneticize_list(t)
+
+ else:
+ t = self.settings.vartree.dbapi.cp_all()
+ if name:
+ t = filter(self.find_lambda(name),t)
+ return t
+
+ def find_all_uninstalled_packages (self, name = None):
+ alist = self.find_all_packages(name)
+ return self.geneticize_list([x for x in alist if not x.is_installed()])
+
+ def find_all_packages (self, name = None, withVersion = True):
+ t = self.settings.porttree.dbapi.cp_all()
+ t += self.settings.vartree.dbapi.cp_all()
+ if name:
+ t = filter(self.find_lambda(name),t)
+ t = unique_array(t)
+
+ if (withVersion):
+ t2 = []
+ for x in t:
+ t2 += self.settings.porttree.dbapi.cp_list(x)
+ t2 += self.settings.vartree.dbapi.cp_list(x)
+ t2 = unique_array(t2)
+ return self.geneticize_list(t2)
+ else:
+ return t;
+
+ def find_all_world_packages (self, name = None):
+ world = filter(self.find_lambda(name), [x.get_cpv() for x in self.find_world_packages()[0]])
+ world = unique_array(world)
+ return self.geneticize_list(world)
+
+ def find_all_system_packages (self, name = None):
+ sys = filter(self.find_lambda(name), [x.get_cpv() for x in self.find_system_packages()[0]])
+ sys = unique_array(sys)
+ return self.geneticize_list(sys)
+
+ def list_categories (self, name = None):
+ categories = self.settings.settings.categories
+ return filter(self.find_lambda(name), categories)
+
+ def split_cpv (self, cpv):
+ cpv = portage.dep_getcpv(cpv)
+ return portage.catpkgsplit(cpv)
+
+ def sort_package_list(self, pkglist):
+ pkglist.sort(package.PortagePackage.compare_version)
+ return pkglist
+
+ def reload_settings (self):
+ self.settings.load()
+
+ def update_world (self, newuse = False, deep = False):
+ # read world file
+ world = open(portage.WORLD_FILE)
+ packages = []
+ for line in world:
+ line = line.strip()
+ if len(line) == 0: continue # empty line
+ if line[0] == "#": continue # comment
+ packages.append(line)
+ world.close()
+
+ # append system packages
+ packages.extend(unique_array([p.get_cp() for p in self.find_all_system_packages()]))
+
+ def get_new_packages (packages):
+ new_packages = []
+ for p in packages:
+ inst = self.find_installed_packages(p)
+ if len(inst) > 1:
+ myslots = set()
+ for i in inst: # get the slots of the installed packages
+ myslots.add(i.get_env_var("SLOT"))
+
+ myslots.add(self.find_best_match(p).get_env_var("SLOT")) # add the slot of the best package in portage
+ for slot in myslots:
+ new_packages.append(\
+ self.find_best(\
+ [x.get_cpv() for x in self.find_packages("%s:%s" % (i.get_cp(), slot))]\
+ ))
+ else:
+ new_packages.append(self.find_best_match(p))
+
+ return new_packages
+
+ checked = []
+ updating = []
+ raw_checked = []
+ def check (p, add_not_installed = True):
+ """Checks whether a package is updated or not."""
+ if p.get_cp() in checked: return
+ else: checked.append(p.get_cp())
+
+ appended = False
+ tempDeep = False
+
+ if not p.is_installed():
+ oldList = self.find_installed_packages(p.get_slot_cp())
+ if oldList:
+ old = oldList[0] # we should only have one package here - else it is a bug
+ else:
+ oldList = self.sort_package_list(self.find_installed_packages(p.get_cp()))
+ if not oldList:
+ if add_not_installed:
+ debug("Not found installed",p.get_cpv(),"==> adding")
+ oldList = [p]
+ else:
+ return
+ old = oldList[-1]
+
+ updating.append((p, old))
+ appended = True
+ p = old
+
+ if newuse and p.is_installed() and p.is_in_system(): # there is no use to check newuse for a package which is not existing in portage anymore :)
+
+ new_iuse = set(p.get_all_use_flags(installed = False)) # IUSE in the ebuild
+ old_iuse = set(p.get_all_use_flags(installed = True)) # IUSE in the vardb
+
+ if new_iuse.symmetric_difference(old_iuse): # difference between new_iuse and old_iuse
+ tempDeep = True
+ if not appended:
+ updating.append((p,p))
+ appended = True
+
+ else:
+ old = set(p.get_installed_use_flags())
+ new = set(p.get_settings("USE").split())
+
+ if new_iuse.intersection(new) != old_iuse.intersection(old):
+ tempDeep = True
+ if not appended:
+ updating.append((p,p))
+ appended = True
+
+ if deep or tempDeep:
+ states = [(["RDEPEND","PDEPEND"],True), (["DEPEND"], False)]
+
+ for state in states:
+ for i in p.get_matched_dep_packages(state[0]):
+ if i not in raw_checked:
+ raw_checked.append(i)
+ bm = get_new_packages([i])
+ if not bm:
+ debug("Bug? No best match could be found:",i)
+ else:
+ for pkg in bm:
+ if not pkg: continue
+ check(pkg, state[1])
+
+ for p in get_new_packages(packages):
+ if not p: continue # if a masked package is installed we have "None" here
+ check(p, True)
+
+ return updating
+
+ use_descs = {}
+ local_use_descs = {}
+ def get_use_desc (self, flag, package = None):
+ # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled.
+
+ # fill cache if needed
+ if self.use_descs == {} or self.local_use_descs == {}:
+ # read use.desc
+ fd = open(self.settings.settings["PORTDIR"]+"/profiles/use.desc")
+ lines = fd.readlines()
+ fd.close()
+ for line in lines:
+ line = line.strip()
+ if line != "" and line[0] != '#':
+ fields = [x.strip() for x in line.split(" - ",1)]
+ if len(fields) == 2:
+ self.use_descs[fields[0]] = fields[1]
+
+ # read use.local.desc
+ fd = open(self.settings.settings["PORTDIR"]+"/profiles/use.local.desc")
+ lines = fd.readlines()
+ fd.close()
+ for line in lines:
+ line = line.strip()
+ if line != "" and line[0] != '#':
+ fields = [x.strip() for x in line.split(":",1)]
+ if len(fields) == 2:
+ if not fields[0] in self.local_use_descs: # create
+ self.local_use_descs[fields[0]] = {}
+ subfields = [x.strip() for x in fields[1].split(" - ",1)]
+ if len(subfields) == 2:
+ self.local_use_descs[fields[0]][subfields[0]] = subfields[1]
+
+ # start
+ desc = None
+ if flag in self.use_descs:
+ desc = self.use_descs[flag]
+ if package != None:
+ if package in self.local_use_descs:
+ if flag in self.local_use_descs[package]:
+ desc = self.local_use_descs[package][flag]
+ return desc
diff --git a/portato/backend/portage_helper.py b/portato/backend/portage_helper.py
deleted file mode 100644
index be4c76c..0000000
--- a/portato/backend/portage_helper.py
+++ /dev/null
@@ -1,467 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# File: portato/backend/portage_helper.py
-# This file is part of the Portato-Project, a graphical portage-frontend.
-#
-# Copyright (C) 2006-2007 René 'Necoro' Neumann
-# This is free software. You may redistribute copies of it under the terms of
-# the GNU General Public License version 2.
-# There is NO WARRANTY, to the extent permitted by law.
-#
-# Written by René 'Necoro' Neumann <necoro@necoro.net>
-
-import re, os
-import types
-import portage
-
-from portage_util import unique_array
-
-from portato.backend import portage_settings
-import package
-
-from portato.helper import debug
-
-def find_lambda (name):
- """Returns the function needed by all the find_all_*-functions. Returns None if no name is given.
-
- @param name: name to build the function of
- @type name: string
- @returns:
- 1. None if no name is given
- 2. a lambda function
- @rtype: function"""
-
- if name != None:
- return lambda x: re.match(".*"+name+".*",x)
- else:
- return lambda x: True
-
-def geneticize_list (list_of_packages):
- """Convertes a list of cpv's into L{backend.Package}s.
-
- @param list_of_packages: the list of packages
- @type list_of_packages: list of gentoolkit.Packages
- @returns: converted list
- @rtype: backend.Package[]"""
-
- return [package.Package(x) for x in list_of_packages]
-
-def find_best (list):
- """Returns the best package out of a list of packages.
-
- @param list: the list of packages to select from
- @type list: string[]
- @returns: the best package
- @rtype: backend.Package"""
-
- return package.Package(portage.best(list))
-
-def find_best_match (search_key, only_installed = False):
- """Finds the best match in the portage tree. It does not find masked packages!
-
- @param search_key: the key to find in the portage tree
- @type search_key: string
- @param only_installed: if True, only installed packages are searched
- @type only_installed: boolean
-
- @returns: the package found or None
- @rtype: backend.Package"""
-
- t = None
- if not only_installed:
- t = portage_settings.porttree.dep_bestmatch(search_key)
- else:
- t = portage_settings.vartree.dep_bestmatch(search_key)
- if t:
- return package.Package(t)
- return None
-
-def find_packages (search_key, masked=False):
- """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
-
- @param search_key: the key to look for
- @type search_key: string
- @param masked: if True, also look for masked packages
- @type masked: boolean
-
- @returns: list of found packages
- @rtype: backend.Package[]"""
-
- try:
- if masked:
- t = portage_settings.porttree.dbapi.xmatch("match-all", search_key)
- t += portage_settings.vartree.dbapi.match(search_key)
- else:
- t = portage_settings.porttree.dbapi.match(search_key)
- t += portage_settings.vartree.dbapi.match(search_key)
- # catch the "ambigous package" Exception
- except ValueError, e:
- if type(e[0]) == types.ListType:
- t = []
- for cp in e[0]:
- if masked:
- t += portage_settings.porttree.dbapi.xmatch("match-all", cp)
- t += portage_settings.vartree.dbapi.match(cp)
- else:
- t += portage_settings.porttree.dbapi.match(cp)
- t += portage_settings.vartree.dbapi.match(cp)
- else:
- raise ValueError(e)
- # Make the list of packages unique
- t = unique_array(t)
- t.sort()
- return geneticize_list(t)
-
-def find_installed_packages (search_key, masked=False):
- """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
-
- @param search_key: the key to look for
- @type search_key: string
- @param masked: if True, also look for masked packages
- @type masked: boolean
-
- @returns: list of found packages
- @rtype: backend.Package[]"""
-
- try:
- t = portage_settings.vartree.dbapi.match(search_key)
- # catch the "ambigous package" Exception
- except ValueError, e:
- if type(e[0]) == types.ListType:
- t = []
- for cp in e[0]:
- t += portage_settings.vartree.dbapi.match(cp)
- else:
- raise ValueError(e)
-
- return geneticize_list(t)
-
-def find_system_packages ():
- """Looks for all packages saved as "system-packages".
-
- @returns: a tuple of (resolved_packages, unresolved_packages).
- @rtype: (backend.Package[], backend.Package[])"""
-
- pkglist = portage_settings.settings.packages
- resolved = []
- unresolved = []
- for x in pkglist:
- cpv = x.strip()
- if len(cpv) and cpv[0] == "*":
- pkg = find_best_match(cpv)
- if pkg:
- resolved.append(pkg)
- else:
- unresolved.append(cpv)
- return (resolved, geneticize_list(unresolved))
-
-def find_world_packages ():
- """Looks for all packages saved in the world-file.
-
- @returns: a tuple of (resolved_packages, unresolved_packages).
- @rtype: (backend.Package[], backend.Package[])"""
-
- f = open(portage.WORLD_FILE)
- pkglist = f.readlines()
- resolved = []
- unresolved = []
- for x in pkglist:
- cpv = x.strip()
- if len(cpv) and cpv[0] != "#":
- pkg = find_best_match(cpv)
- if pkg:
- resolved.append(pkg)
- else:
- unresolved.append(cpv)
- return (resolved, geneticize_list(unresolved))
-
-def find_all_installed_packages (name=None, withVersion=True):
- """Finds all installed packages matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
- @type name: string or None
- @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
- @type withVersion: boolean
-
- @returns: all packages/cp-strings found
- @rtype: backend.Package[] or cp-string[]"""
-
- if withVersion:
- t = portage_settings.vartree.dbapi.cpv_all()
- if name:
- t = filter(find_lambda(name),t)
- return geneticize_list(t)
-
- else:
- t = portage_settings.vartree.dbapi.cp_all()
- if name:
- t = filter(find_lambda(name),t)
- return t
-
-def find_all_uninstalled_packages (name=None):
- """Finds all uninstalled packages matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
- @type name: string or None
- @returns: all packages found
- @rtype: backend.Package[]"""
-
- alist = find_all_packages(name)
- return geneticize_list([x for x in alist if not x.is_installed()])
-
-def find_all_packages (name=None, withVersion=True):
- """Finds all packages matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
- @type name: string or None
- @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
- @type withVersion: boolean
-
- @returns: all packages/cp-strings found
- @rtype: backend.Package[] or cp-string[]"""
-
- t = portage_settings.porttree.dbapi.cp_all()
- t += portage_settings.vartree.dbapi.cp_all()
- if name:
- t = filter(find_lambda(name),t)
- t = unique_array(t)
-
- if (withVersion):
- t2 = []
- for x in t:
- t2 += portage_settings.porttree.dbapi.cp_list(x)
- t2 += portage_settings.vartree.dbapi.cp_list(x)
- t2 = unique_array(t2)
- return geneticize_list(t2)
- else:
- return t;
-
-def find_all_world_packages (name=None):
- """Finds all world packages matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
- @type name: string or None
- @returns: all packages found
- @rtype: backend.Package[]"""
-
- world = filter(find_lambda(name), [x.get_cpv() for x in find_world_packages()[0]])
- world = unique_array(world)
- return geneticize_list(world)
-
-def find_all_system_packages (name=None):
- """Finds all system packages matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
- @type name: string or None
- @returns: all packages found
- @rtype: backend.Package[]"""
-
- sys = filter(find_lambda(name), [x.get_cpv() for x in find_system_packages()[0]])
- sys = unique_array(sys)
- return geneticize_list(sys)
-
-def list_categories (name=None):
- """Finds all categories matching a name or all if no name is specified.
-
- @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned
- @type name: string or None
- @returns: all categories found
- @rtype: string[]"""
-
- categories = portage_settings.settings.categories
- return filter(find_lambda(name), categories)
-
-def split_package_name (name):
- """Splits a package name in its elements.
-
- @param name: name to split
- @type name: string
- @returns: list: [category, name, version, rev] whereby rev is "r0" if not specified in the name
- @rtype: string[]"""
-
- r = portage.catpkgsplit(portage.dep_getcpv(name))
- if not r:
- r = name.split("/")
- if len(r) == 1:
- return ["", name, "", "r0"]
- else:
- return r + ["", "r0"]
- if r[0] == 'null':
- r[0] = ''
- return r
-
-def sort_package_list(pkglist):
- """Sorts a package list in the same manner portage does.
-
- @param pkglist: list to sort
- @type pkglist: Packages[]"""
-
- pkglist.sort(package.Package.compare_version)
- return pkglist
-
-def reload_settings ():
- """Reloads portage."""
- portage_settings.load()
-
-def update_world (newuse = False, deep = False):
- """Calculates the packages to get updated in an update world.
-
- @param newuse: Checks if a use-flag has a different state then to install time.
- @type newuse: boolean
- @param deep: Not only check world packages but also there dependencies.
- @type deep: boolean
- @returns: a list containing of the tuple (new_package, old_package)
- @rtype: (backend.Package, backend.Package)[]"""
-
- # read world file
- world = open(portage.WORLD_FILE)
- packages = []
- for line in world:
- line = line.strip()
- if len(line) == 0: continue # empty line
- if line[0] == "#": continue # comment
- packages.append(line)
- world.close()
-
- # append system packages
- packages.extend(unique_array([p.get_cp() for p in find_all_system_packages()]))
-
- def get_new_packages (packages):
- new_packages = []
- for p in packages:
- inst = find_installed_packages(p)
- if len(inst) > 1:
- myslots = set()
- for i in inst: # get the slots of the installed packages
- myslots.add(i.get_env_var("SLOT"))
-
- myslots.add(find_best_match(p).get_env_var("SLOT")) # add the slot of the best package in portage
- for slot in myslots:
- new_packages.append(\
- find_best(\
- [x.get_cpv() for x in find_packages("%s:%s" % (i.get_cp(), slot))]\
- ))
- else:
- new_packages.append(find_best_match(p))
-
- return new_packages
-
- checked = []
- updating = []
- raw_checked = []
- def check (p, add_not_installed = True):
- """Checks whether a package is updated or not."""
- if p.get_cp() in checked: return
- else: checked.append(p.get_cp())
-
- appended = False
- tempDeep = False
-
- if not p.is_installed():
- oldList = find_installed_packages(p.get_slot_cp())
- if oldList:
- old = oldList[0] # we should only have one package here - else it is a bug
- else:
- oldList = sort_package_list(find_installed_packages(p.get_cp()))
- if not oldList:
- if add_not_installed:
- debug("Not found installed",p.get_cpv(),"==> adding")
- oldList = [p]
- else:
- return
- old = oldList[-1]
-
- updating.append((p, old))
- appended = True
- p = old
-
- if newuse and p.is_installed() and p.is_in_system(): # there is no use to check newuse for a package which is not existing in portage anymore :)
-
- new_iuse = set(p.get_all_use_flags(installed = False)) # IUSE in the ebuild
- old_iuse = set(p.get_all_use_flags(installed = True)) # IUSE in the vardb
-
- if new_iuse.symmetric_difference(old_iuse): # difference between new_iuse and old_iuse
- tempDeep = True
- if not appended:
- updating.append((p,p))
- appended = True
-
- else:
- old = set(p.get_installed_use_flags())
- new = set(p.get_settings("USE").split())
-
- if new_iuse.intersection(new) != old_iuse.intersection(old):
- tempDeep = True
- if not appended:
- updating.append((p,p))
- appended = True
-
- if deep or tempDeep:
- states = [(["RDEPEND","PDEPEND"],True), (["DEPEND"], False)]
-
- for state in states:
- for i in p.get_matched_dep_packages(state[0]):
- if i not in raw_checked:
- raw_checked.append(i)
- bm = get_new_packages([i])
- if not bm:
- debug("Bug? No best match could be found:",i)
- else:
- for pkg in bm:
- if not pkg: continue
- check(pkg, state[1])
-
- for p in get_new_packages(packages):
- if not p: continue # if a masked package is installed we have "None" here
- check(p, True)
-
- return updating
-
-use_descs = {}
-local_use_descs = {}
-def get_use_desc (flag, package = None):
- """Returns the description of a specific useflag or None if no desc was found.
- If a package is given (in the <cat>/<name> format) the local use descriptions are searched too.
-
- @param flag: flag to get the description for
- @type flag: string
- @param package: name of a package: if given local use descriptions are searched too
- @type package: cp-string
- @returns: found description
- @rtype: string"""
-
- # In the first run the dictionaries 'use_descs' and 'local_use_descs' are filled.
-
- # fill cache if needed
- if use_descs == {} or local_use_descs == {}:
- # read use.desc
- fd = open(portage_settings.settings["PORTDIR"]+"/profiles/use.desc")
- for line in fd.readlines():
- line = line.strip()
- if line != "" and line[0] != '#':
- fields = [x.strip() for x in line.split(" - ",1)]
- if len(fields) == 2:
- use_descs[fields[0]] = fields[1]
-
- # read use.local.desc
- fd = open(portage_settings.settings["PORTDIR"]+"/profiles/use.local.desc")
- for line in fd.readlines():
- line = line.strip()
- if line != "" and line[0] != '#':
- fields = [x.strip() for x in line.split(":",1)]
- if len(fields) == 2:
- if not fields[0] in local_use_descs: # create
- local_use_descs[fields[0]] = {}
- subfields = [x.strip() for x in fields[1].split(" - ",1)]
- if len(subfields) == 2:
- local_use_descs[fields[0]][subfields[0]] = subfields[1]
-
- # start
- desc = None
- if flag in use_descs:
- desc = use_descs[flag]
- if package != None:
- if package in local_use_descs:
- if flag in local_use_descs[package]:
- desc = local_use_descs[package][flag]
- return desc
diff --git a/portato/backend/system_interface.py b/portato/backend/system_interface.py
new file mode 100644
index 0000000..eb81401
--- /dev/null
+++ b/portato/backend/system_interface.py
@@ -0,0 +1,283 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/backend/system_interface.py
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2007 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+class SystemInterface:
+
+ def split_cpv (self, cpv):
+ """Splits a cpv into all its parts.
+
+ @param cpv: the cpv to split
+ @type cpv: string
+ @returns: the splitted cpv
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def find_best(self, list):
+ """Returns the best package out of a list of packages.
+
+ @param list: the list of packages to select from
+ @type list: string[]
+ @returns: the best package
+ @rtype: backend.Package"""
+
+ raise NotImplementedError
+
+ def find_best_match (self, search_key, only_installed = False):
+ """Finds the best match in the portage tree. It does not find masked packages!
+
+ @param search_key: the key to find in the portage tree
+ @type search_key: string
+ @param only_installed: if True, only installed packages are searched
+ @type only_installed: boolean
+
+ @returns: the package found or None
+ @rtype: backend.Package"""
+
+ raise NotImplementedError
+
+ def find_packages (self, search_key, masked=False):
+ """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
+
+ @param search_key: the key to look for
+ @type search_key: string
+ @param masked: if True, also look for masked packages
+ @type masked: boolean
+
+ @returns: list of found packages
+ @rtype: backend.Package[]"""
+
+ raise NotImplementedError
+
+
+ def find_installed_packages (self, search_key, masked = False):
+ """This returns a list of packages which have to fit exactly. Additionally ranges like '<,>,=,~,!' et. al. are possible.
+
+ @param search_key: the key to look for
+ @type search_key: string
+ @param masked: if True, also look for masked packages
+ @type masked: boolean
+
+ @returns: list of found packages
+ @rtype: backend.Package[]"""
+
+ raise NotImplementedError
+
+ def find_system_packages (self):
+ """Looks for all packages saved as "system-packages".
+
+ @returns: a tuple of (resolved_packages, unresolved_packages).
+ @rtype: (backend.Package[], backend.Package[])"""
+
+ raise NotImplementedError
+
+ def find_world_packages (self):
+ """Looks for all packages saved in the world-file.
+
+ @returns: a tuple of (resolved_packages, unresolved_packages).
+ @rtype: (backend.Package[], backend.Package[])"""
+
+ raise NotImplementedError
+
+ def find_all_installed_packages (self, name = None, withVersion = True):
+ """Finds all installed packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
+ @type withVersion: boolean
+
+ @returns: all packages/cp-strings found
+ @rtype: backend.Package[] or cp-string[]"""
+
+ raise NotImplementedError
+
+ def find_all_uninstalled_packages (self, name = None):
+ """Finds all uninstalled packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ raise NotImplementedError
+
+ def find_all_packages (self, name = None, withVersion = True):
+ """Finds all packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @param withVersion: if True version-specific packages are returned; else only the cat/package-strings a delivered
+ @type withVersion: boolean
+
+ @returns: all packages/cp-strings found
+ @rtype: backend.Package[] or cp-string[]"""
+
+ raise NotImplementedError
+
+ def find_all_world_packages (self, name = None):
+ """Finds all world packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ raise NotImplementedError
+
+ def find_all_system_packages (self, name = None):
+ """Finds all system packages matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all packages are returned
+ @type name: string or None
+ @returns: all packages found
+ @rtype: backend.Package[]"""
+
+ raise NotImplementedError
+
+ def list_categories (self, name = None):
+ """Finds all categories matching a name or all if no name is specified.
+
+ @param name: the name to look for - it is expanded to .*name.* ; if None, all categories are returned
+ @type name: string or None
+ @returns: all categories found
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def sort_package_list(self, pkglist):
+ """Sorts a package list in the same manner portage does.
+
+ @param pkglist: list to sort
+ @type pkglist: Packages[]"""
+
+ raise NotImplementedError
+
+ def reload_settings (self):
+ """Reloads portage."""
+
+ raise NotImplementedError
+
+ def update_world (self, newuse = False, deep = False):
+ """Calculates the packages to get updated in an update world.
+
+ @param newuse: Checks if a use-flag has a different state then to install time.
+ @type newuse: boolean
+ @param deep: Not only check world packages but also there dependencies.
+ @type deep: boolean
+ @returns: a list containing of the tuple (new_package, old_package)
+ @rtype: (backend.Package, backend.Package)[]"""
+
+ raise NotImplementedError
+
+ def get_use_desc (self, flag, package = None):
+ """Returns the description of a specific useflag or None if no desc was found.
+ If a package is given (in the <cat>/<name> format) the local use descriptions are searched too.
+
+ @param flag: flag to get the description for
+ @type flag: string
+ @param package: name of a package: if given local use descriptions are searched too
+ @type package: cp-string
+ @returns: found description
+ @rtype: string"""
+
+ raise NotImplementedError
+
+ def new_package (self, cpv):
+ """Returns an instance of the appropriate Package-Subclass.
+
+ @param cpv: the cpv to create the package from
+ @type cpv: string
+ @returns: a new Package-object.
+ @rtype: Package"""
+
+ raise NotImplementedError
+
+ def get_config_path (self):
+ """Returns the actual path to the config files.
+
+ @returns: the path, e.g. /etc/portage
+ @rtyoe: string"""
+
+ raise NotImplementedError
+
+ def get_world_file_path (self):
+ """Returns the path to the world file.
+
+ @returns: the path of the world file
+ @rtype: string"""
+
+ raise NotImplementedError
+
+ def get_sync_command (self):
+ """Returns the command(s) to run for syncing. This can be overridden by the user.
+
+ @returns: command to run
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_merge_command (self):
+ """Returns the command(s) to run for the merging.
+
+ @returns: command to run
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_oneshot_option (self):
+ """Returns the options to append for marking a merge as "oneshot".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_newuse_option (self):
+ """Returns the options to append for marking a merge as "newuse".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_deep_option (self):
+ """Returns the options to append for marking a merge as "deep".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_update_option (self):
+ """Returns the options to append for marking a merge as "update".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_pretend_option (self):
+ """Returns the options to append for marking a merge as "pretend".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError
+
+ def get_unmerge_option (self):
+ """Returns the options to append for marking a merge as "unmerge".
+
+ @returns: option(s) to append
+ @rtype: string[]"""
+
+ raise NotImplementedError