From 0a8814713917548767f0ff823e34d412061b3ffe Mon Sep 17 00:00:00 2001 From: René 'Necoro' Neumann Date: Tue, 8 Jul 2008 14:58:20 +0200 Subject: Removed catapult --- portato/backend/catapult/__init__.py | 19 --- portato/backend/catapult/package.py | 156 ---------------------- portato/backend/catapult/system.py | 252 ----------------------------------- 3 files changed, 427 deletions(-) delete mode 100644 portato/backend/catapult/__init__.py delete mode 100644 portato/backend/catapult/package.py delete mode 100644 portato/backend/catapult/system.py (limited to 'portato/backend') diff --git a/portato/backend/catapult/__init__.py b/portato/backend/catapult/__init__.py deleted file mode 100644 index 348b941..0000000 --- a/portato/backend/catapult/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/backend/catapult/__init__.py -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2007 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann - -from __future__ import absolute_import - -from dbus.mainloop.glib import DBusGMainLoop -DBusGMainLoop(set_as_default=True) - -from .system import CatapultSystem -from .package import CatapultPackage diff --git a/portato/backend/catapult/package.py b/portato/backend/catapult/package.py deleted file mode 100644 index 2e4f471..0000000 --- a/portato/backend/catapult/package.py +++ /dev/null @@ -1,156 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/backend/catapult/package.py -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2007-2008 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann - -from __future__ import absolute_import, with_statement - -from ..package import Package -from .. import flags -from .. import system -from ..exceptions import BlockedException, PackageNotFoundException -from ...helper import debug, unique_array - -import dbus -import catapult - -import os.path - -class CatapultPackage(Package): - - bus = dbus.SessionBus() - dbus_object = bus.get_object(catapult.get_dbus_address(catapult.DEFAULT), catapult.CATAPULT_PACKAGE_BUS, follow_name_owner_changes = True) - proxy = dbus.Interface(dbus_object, catapult.CATAPULT_PACKAGE_IFACE) - - _expand = {} - - def _new_flags (self): - flags = self.get_new_use_flags() - - nflags = [] - - for flag in flags: - if flag[0] == "~": - nflags.append((flag[1:], True)) - else: - nflags.append((flag, False)) - - return nflags - - def use_expanded (self, flag, suggest = None): - - exp = self._expand.get(flag, False) - - if exp is False: - if not suggest: - suggest = "" - s = str(self.proxy.use_expanded(self.get_cpv(), flag, suggest)) - if not s: - s = None - - self._expand[flag] = s - return s - else: - return exp - - def get_package_path(self): - return str(self.proxy.get_package_path(self.get_cpv())) - - def is_installed(self): - return self.proxy.is_installed(self.get_cpv()) - - def is_overlay(self): - return self.proxy.is_in_overlay(self.get_cpv()) - - def get_overlay_path(self): - return str(self.proxy.get_overlay_path(self.get_cpv())) - - def is_in_system (self): - return self.proxy.is_in_system(self.get_cpv()) - - def is_missing_keyword(self): - return self.proxy.is_missing_keyword(self.get_cpv()) - - def is_testing(self, use_keywords = False): - if not use_keywords: - return self.proxy.is_testing(self.get_cpv(), False) - else: - status = flags.new_testing_status(self.get_cpv()) - if status is None: - return self.proxy.is_testing(self.get_cpv(), True) - else: - return status - - def is_masked (self, use_changed = True): - if use_changed: - status = flags.new_masking_status(self.get_cpv()) - if status != None: # we have locally changed it - if status == "masked": return True - elif status == "unmasked": return False - else: - error(_("BUG in flags.new_masking_status. It returns \'%s\'"), status) - else: # we have not touched the status - return self.proxy.is_masked(self.get_cpv()) - else: # we want the original portage value XXX: bug if masked by user AND by system - if self.proxy.is_masked(self.get_cpv()): - if not flags.is_locally_masked(self, changes = False): # assume that if it is locally masked, it is not masked by the system - return True - - return False - - def get_masking_reason (self): - return str(self.proxy.get_masking_reason(self.get_cpv())) - - def get_iuse_flags (self, installed = False, removeForced = True): - return [str(x) for x in self.proxy.get_iuse_flags(self.get_cpv(), installed, removeForced)] - - def get_matched_dep_packages (self, depvar): - return [str(x) for x in self.proxy.get_matched_dep_packages(self.get_cpv(), self._new_flags())] - - def get_dep_packages (self, depvar = ["RDEPEND", "PDEPEND", "DEPEND"], with_criterions = False): - pkgs = self.proxy.get_dep_packages(self.get_cpv(), depvar, self._new_flags()) - - if not with_criterions: - return [str(x) for x,y in pkgs] - else: - return [(str(x),str(y)) for x,y in pkgs] - - def get_global_settings(self, key, installed = True): - return str(self.proxy.get_global_settings(self.get_cpv(), key, installed)) - - def get_ebuild_path(self): - return str(self.proxy.get_ebuild_path(self.get_cpv())) - - def get_package_settings(self, var, tree = True): - return str(self.proxy.get_package_settings(self.get_cpv(), var, tree)) - - def get_installed_use_flags(self): - return self.proxy.get_installed_use_flags(self.get_cpv()) - - def get_actual_use_flags(self): - return self.proxy.get_actual_use_flags(self.get_cpv(), self._new_flags()) - - def compare_version(self, other): - return self.proxy.compare_version(self.get_cpv(), other.get_cpv()) - - def matches (self, criterion): - return self.proxy.matches(self.get_cpv(), criterion) - - def get_files (self): - return self.proxy.get_files(self.get_cpv()) - - def get_name(self): - return str(self.proxy.get_name(self.get_cpv())) - - def get_version(self): - return str(self.proxy.get_version(self.get_cpv())) - - def get_category(self): - return str(self.proxy.get_category(self.get_cpv())) diff --git a/portato/backend/catapult/system.py b/portato/backend/catapult/system.py deleted file mode 100644 index 3a3bac5..0000000 --- a/portato/backend/catapult/system.py +++ /dev/null @@ -1,252 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/backend/catapult/system.py -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2007-2008 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann - -from __future__ import absolute_import - -import re, os -from threading import Event -import dbus -import catapult - -from .package import CatapultPackage -from ..system_interface import SystemInterface -from ...helper import debug, info, warning, unique_array - -class CatapultSystem (SystemInterface): - - def __init__ (self): - SystemInterface.__init__(self) - - self.bus = dbus.SessionBus() - # get the system - so = self.bus.get_object(catapult.get_dbus_address(catapult.DEFAULT), catapult.CATAPULT_SYSTEM_BUS, follow_name_owner_changes = True) - self.proxy = dbus.Interface(so, catapult.CATAPULT_SYSTEM_IFACE) - - def get_version (self): - admint = dbus.Interface(self.bus.get_object(catapult.get_dbus_address(catapult.DEFAULT), catapult.CATAPULT_BUS), catapult.CATAPULT_ADMIN_IFACE) - return "Catapult: %s v. %s" % (self.proxy.bus_name.split(".")[-1], str(admint.version())) - - def geneticize_list (self, list_of_packages, only_cpv = False): - """Convertes a list of cpv's into L{backend.Package}s. - - @param list_of_packages: the list of packages - @type list_of_packages: string[] - @param only_cpv: do nothing - return the passed list - @type only_cpv: boolean - @returns: converted list - @rtype: PortagePackage[] - """ - - if not only_cpv: - return [CatapultPackage(x) for x in list_of_packages] - else: - return [str(x) for x in list_of_packages] - - - def split_cpv (self, cpv): - split = self.proxy.split_cpv(cpv) - if all(split): - return map(str, split) - else: - return None - - def cpv_matches (self, cpv, criterion): - return CatapultPackage(cpv).matches(criterion) - - def find_best(self, list, only_cpv = False): - if only_cpv: - return str(self.proxy.find_best(list)) - else: - return CatapultPackage(self.proxy.find_best(list)) - - def find_best_match (self, search_key, masked = False, only_installed = False, only_cpv = False): - p = self.proxy.find_best_match(search_key, masked, only_installed) - - if p : - if not only_cpv: - return CatapultPackage(p) - else: - return str(p) - return None - - def _wrap_find(self, key, masked, set, withVersion, only_cpv): - - l = [] - try: - l = self.proxy.find_packages(key, set, masked, withVersion) - except dbus.DBusException, e: - name, data = str(e).split("\n")[-2].split(": ")[1:] - - if name == catapult.CATAPULT_ERR_AMBIGUOUS_PACKAGE: - debug("Ambigous packages: %s.", data) - l = [] - for cp in data.split(","): - l += self.proxy.find_packages(cp, set, masked, withVersion) - else: - raise - - return self.geneticize_list(l, not(withVersion) or only_cpv) - - def find_packages (self, search_key, masked = False, only_cpv = False): - return self._wrap_find(search_key, masked, "all", True, only_cpv) - - def find_installed_packages (self, search_key, masked = False, only_cpv = False): - return self._wrap_find(search_key, masked, "installed", True, only_cpv) - - def find_system_packages (self, only_cpv = False): -# result = self.proxy.find_system_packages() -# if only_cpv: -# return result -# else: -# return tuple(map(self.geneticize_list, result)) - return (self._wrap_find(search_key, False, "system", True, only_cpv), []) - - def find_world_packages (self, only_cpv = False): -# result = self.proxy.find_world_packages() -# if only_cpv: -# return result -# else: -# return tuple(map(self.geneticize_list, result)) - return (self._wrap_find(search_key, False, "world", True, only_cpv), []) - - def _wrap_find_all (self, key, masked, set, withVersion, only_cpv): - if not key: - key = "" - else: - key = "*%s*" % key - - l = self.proxy.find_packages("", set, masked, withVersion) - - if key: - l = catapult.filter_list(key, l) - - return self.geneticize_list(l, not(withVersion) or only_cpv) - - def find_all_installed_packages (self, name = None, withVersion = True, only_cpv = False): - return self._wrap_find_all(name, True, "installed", withVersion, only_cpv) - - def find_all_uninstalled_packages (self, name = None, only_cpv = False): - return self._wrap_find_all(name, True, "uninstalled", True, only_cpv) - - def find_all_packages (self, name = None, withVersion = True, only_cpv = False): - return self._wrap_find_all(name, True, "all", withVersion, only_cpv) - - def find_all_world_packages (self, name = None, only_cpv = False): - return self._wrap_find_all(name, True, "world", withVersion, only_cpv) - - def find_all_system_packages (self, name = None, only_cpv = False): - return self._wrap_find_all(name, True, "system", withVersion, only_cpv) - - def list_categories (self, name = None): - cats = self.proxy.list_categories() - if name: - cats = catapult.filter_list("*%s*" % name, cats) - - return map(str, cats) - - def sort_package_list(self, pkglist): - return self.geneticize_list(self.proxy.sort_package_list([x.get_cpv() for x in pkglist])) - - def reload_settings (self): - return self.proxy.reload_settings() - - def update_world (self, newuse = False, deep = False): - - ret = [] - e = Event() - - def wait (list): - ret.extend([(CatapultPackage(x), CatapultPackage(y)) for x,y in list]) - e.set() - - def error (ex): - e.set() - raise ex - - self.proxy.update_world(newuse, deep, {}, reply_handler = wait, error_handler = error, timeout = 300) - e.wait() - return ret - # return [(CatapultPackage(x), CatapultPackage(y)) for x,y in self.proxy.update_world(newuse, deep, {}, timeout = 300)] - - def get_updated_packages (self): - ret = [] - e = Event() - - def wait (list): - ret.extend([CatapultPackage(x) for x in list]) - e.set() - - def error (ex): - e.set() - raise ex - - self.proxy.get_updated_packages(reply_handler = wait, error_handler = error, timeout = 300) - e.wait() - return ret - - def get_use_desc (self, flag, package = None): - if not package: - package = "" - return str(self.proxy.get_use_desc(flag, package)) - - def get_global_settings(self, key): - return str(self.proxy.get_global_settings(key)) - - def new_package (self, cpv): - return CatapultPackage(cpv) - - def get_config_path (self): - return str(self.proxy.get_config_path()) - - def get_sync_command (self): - return [str(x) for x in self.proxy.get_sync_command()] - - def get_merge_command (self): - return [str(x) for x in self.proxy.get_merge_command()] - - def get_oneshot_option (self): - return [str(x) for x in self.proxy.get_oneshot_option()] - - def get_newuse_option (self): - return [str(x) for x in self.proxy.get_newuse_option()] - - def get_deep_option (self): - return [str(x) for x in self.proxy.get_deep_option()] - - def get_update_option (self): - return [str(x) for x in self.proxy.get_update_option()] - - def get_pretend_option (self): - return [str(x) for x in self.proxy.get_pretend_option()] - - def get_unmerge_option (self): - return [str(x) for x in self.proxy.get_unmerge_option()] - - def get_environment (self): - default_opts = self.get_global_settings("EMERGE_DEFAULT_OPTS") - opts = dict(os.environ) - - if default_opts: - opt_list = default_opts.split() - changed = False - - for option in ["--ask", "-a", "--pretend", "-p"]: - if option in opt_list: - opt_list.remove(option) - changed = True - - if changed: - opts.update(EMERGE_DEFAULT_OPTS = " ".join(opt_list)) - - opts.update(TERM = "xterm") - - return opts -- cgit v1.2.3-54-g00ecf