summaryrefslogtreecommitdiff
path: root/portato/backend/catapult/system.py
diff options
context:
space:
mode:
Diffstat (limited to 'portato/backend/catapult/system.py')
-rw-r--r--portato/backend/catapult/system.py252
1 files changed, 0 insertions, 252 deletions
diff --git a/portato/backend/catapult/system.py b/portato/backend/catapult/system.py
deleted file mode 100644
index 3a3bac5..0000000
--- a/portato/backend/catapult/system.py
+++ /dev/null
@@ -1,252 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# File: portato/backend/catapult/system.py
-# This file is part of the Portato-Project, a graphical portage-frontend.
-#
-# Copyright (C) 2007-2008 René 'Necoro' Neumann
-# This is free software. You may redistribute copies of it under the terms of
-# the GNU General Public License version 2.
-# There is NO WARRANTY, to the extent permitted by law.
-#
-# Written by René 'Necoro' Neumann <necoro@necoro.net>
-
-from __future__ import absolute_import
-
-import re, os
-from threading import Event
-import dbus
-import catapult
-
-from .package import CatapultPackage
-from ..system_interface import SystemInterface
-from ...helper import debug, info, warning, unique_array
-
-class CatapultSystem (SystemInterface):
-
- def __init__ (self):
- SystemInterface.__init__(self)
-
- self.bus = dbus.SessionBus()
- # get the system
- so = self.bus.get_object(catapult.get_dbus_address(catapult.DEFAULT), catapult.CATAPULT_SYSTEM_BUS, follow_name_owner_changes = True)
- self.proxy = dbus.Interface(so, catapult.CATAPULT_SYSTEM_IFACE)
-
- def get_version (self):
- admint = dbus.Interface(self.bus.get_object(catapult.get_dbus_address(catapult.DEFAULT), catapult.CATAPULT_BUS), catapult.CATAPULT_ADMIN_IFACE)
- return "Catapult: %s v. %s" % (self.proxy.bus_name.split(".")[-1], str(admint.version()))
-
- def geneticize_list (self, list_of_packages, only_cpv = False):
- """Convertes a list of cpv's into L{backend.Package}s.
-
- @param list_of_packages: the list of packages
- @type list_of_packages: string[]
- @param only_cpv: do nothing - return the passed list
- @type only_cpv: boolean
- @returns: converted list
- @rtype: PortagePackage[]
- """
-
- if not only_cpv:
- return [CatapultPackage(x) for x in list_of_packages]
- else:
- return [str(x) for x in list_of_packages]
-
-
- def split_cpv (self, cpv):
- split = self.proxy.split_cpv(cpv)
- if all(split):
- return map(str, split)
- else:
- return None
-
- def cpv_matches (self, cpv, criterion):
- return CatapultPackage(cpv).matches(criterion)
-
- def find_best(self, list, only_cpv = False):
- if only_cpv:
- return str(self.proxy.find_best(list))
- else:
- return CatapultPackage(self.proxy.find_best(list))
-
- def find_best_match (self, search_key, masked = False, only_installed = False, only_cpv = False):
- p = self.proxy.find_best_match(search_key, masked, only_installed)
-
- if p :
- if not only_cpv:
- return CatapultPackage(p)
- else:
- return str(p)
- return None
-
- def _wrap_find(self, key, masked, set, withVersion, only_cpv):
-
- l = []
- try:
- l = self.proxy.find_packages(key, set, masked, withVersion)
- except dbus.DBusException, e:
- name, data = str(e).split("\n")[-2].split(": ")[1:]
-
- if name == catapult.CATAPULT_ERR_AMBIGUOUS_PACKAGE:
- debug("Ambigous packages: %s.", data)
- l = []
- for cp in data.split(","):
- l += self.proxy.find_packages(cp, set, masked, withVersion)
- else:
- raise
-
- return self.geneticize_list(l, not(withVersion) or only_cpv)
-
- def find_packages (self, search_key, masked = False, only_cpv = False):
- return self._wrap_find(search_key, masked, "all", True, only_cpv)
-
- def find_installed_packages (self, search_key, masked = False, only_cpv = False):
- return self._wrap_find(search_key, masked, "installed", True, only_cpv)
-
- def find_system_packages (self, only_cpv = False):
-# result = self.proxy.find_system_packages()
-# if only_cpv:
-# return result
-# else:
-# return tuple(map(self.geneticize_list, result))
- return (self._wrap_find(search_key, False, "system", True, only_cpv), [])
-
- def find_world_packages (self, only_cpv = False):
-# result = self.proxy.find_world_packages()
-# if only_cpv:
-# return result
-# else:
-# return tuple(map(self.geneticize_list, result))
- return (self._wrap_find(search_key, False, "world", True, only_cpv), [])
-
- def _wrap_find_all (self, key, masked, set, withVersion, only_cpv):
- if not key:
- key = ""
- else:
- key = "*%s*" % key
-
- l = self.proxy.find_packages("", set, masked, withVersion)
-
- if key:
- l = catapult.filter_list(key, l)
-
- return self.geneticize_list(l, not(withVersion) or only_cpv)
-
- def find_all_installed_packages (self, name = None, withVersion = True, only_cpv = False):
- return self._wrap_find_all(name, True, "installed", withVersion, only_cpv)
-
- def find_all_uninstalled_packages (self, name = None, only_cpv = False):
- return self._wrap_find_all(name, True, "uninstalled", True, only_cpv)
-
- def find_all_packages (self, name = None, withVersion = True, only_cpv = False):
- return self._wrap_find_all(name, True, "all", withVersion, only_cpv)
-
- def find_all_world_packages (self, name = None, only_cpv = False):
- return self._wrap_find_all(name, True, "world", withVersion, only_cpv)
-
- def find_all_system_packages (self, name = None, only_cpv = False):
- return self._wrap_find_all(name, True, "system", withVersion, only_cpv)
-
- def list_categories (self, name = None):
- cats = self.proxy.list_categories()
- if name:
- cats = catapult.filter_list("*%s*" % name, cats)
-
- return map(str, cats)
-
- def sort_package_list(self, pkglist):
- return self.geneticize_list(self.proxy.sort_package_list([x.get_cpv() for x in pkglist]))
-
- def reload_settings (self):
- return self.proxy.reload_settings()
-
- def update_world (self, newuse = False, deep = False):
-
- ret = []
- e = Event()
-
- def wait (list):
- ret.extend([(CatapultPackage(x), CatapultPackage(y)) for x,y in list])
- e.set()
-
- def error (ex):
- e.set()
- raise ex
-
- self.proxy.update_world(newuse, deep, {}, reply_handler = wait, error_handler = error, timeout = 300)
- e.wait()
- return ret
- # return [(CatapultPackage(x), CatapultPackage(y)) for x,y in self.proxy.update_world(newuse, deep, {}, timeout = 300)]
-
- def get_updated_packages (self):
- ret = []
- e = Event()
-
- def wait (list):
- ret.extend([CatapultPackage(x) for x in list])
- e.set()
-
- def error (ex):
- e.set()
- raise ex
-
- self.proxy.get_updated_packages(reply_handler = wait, error_handler = error, timeout = 300)
- e.wait()
- return ret
-
- def get_use_desc (self, flag, package = None):
- if not package:
- package = ""
- return str(self.proxy.get_use_desc(flag, package))
-
- def get_global_settings(self, key):
- return str(self.proxy.get_global_settings(key))
-
- def new_package (self, cpv):
- return CatapultPackage(cpv)
-
- def get_config_path (self):
- return str(self.proxy.get_config_path())
-
- def get_sync_command (self):
- return [str(x) for x in self.proxy.get_sync_command()]
-
- def get_merge_command (self):
- return [str(x) for x in self.proxy.get_merge_command()]
-
- def get_oneshot_option (self):
- return [str(x) for x in self.proxy.get_oneshot_option()]
-
- def get_newuse_option (self):
- return [str(x) for x in self.proxy.get_newuse_option()]
-
- def get_deep_option (self):
- return [str(x) for x in self.proxy.get_deep_option()]
-
- def get_update_option (self):
- return [str(x) for x in self.proxy.get_update_option()]
-
- def get_pretend_option (self):
- return [str(x) for x in self.proxy.get_pretend_option()]
-
- def get_unmerge_option (self):
- return [str(x) for x in self.proxy.get_unmerge_option()]
-
- def get_environment (self):
- default_opts = self.get_global_settings("EMERGE_DEFAULT_OPTS")
- opts = dict(os.environ)
-
- if default_opts:
- opt_list = default_opts.split()
- changed = False
-
- for option in ["--ask", "-a", "--pretend", "-p"]:
- if option in opt_list:
- opt_list.remove(option)
- changed = True
-
- if changed:
- opts.update(EMERGE_DEFAULT_OPTS = " ".join(opt_list))
-
- opts.update(TERM = "xterm")
-
- return opts