diff options
-rw-r--r-- | doc/Changelog | 1 | ||||
-rw-r--r-- | portato/__init__.py | 23 | ||||
-rw-r--r-- | portato/ipc.pxd | 52 | ||||
-rw-r--r-- | portato/ipc.pyx | 173 | ||||
-rw-r--r-- | portato/listener.py | 64 | ||||
-rw-r--r-- | setup.py | 11 |
6 files changed, 265 insertions, 59 deletions
diff --git a/doc/Changelog b/doc/Changelog index ea9605c..8fcb404 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,6 +1,7 @@ next: - allow eix as backend DB +- use an internal messagequeue module instead of external shm 0.13: - allow lines w/o keyword in package.keywords diff --git a/portato/__init__.py b/portato/__init__.py index 60aeec4..74145e7 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -46,7 +46,7 @@ def get_parser (use_ = False): parser = OptionParser(version = vers, prog = "portato", description = desc, usage = usage) - parser.add_option("--shm", action = "store", nargs = 3, type="long", dest = "shm", + parser.add_option("--mq", action = "store", nargs = 1, type="long", dest = "mq", default = None, help = SUPPRESS_HELP) parser.add_option("-F", "--no-fork", action = "store_true", dest = "nofork", default = False, @@ -80,10 +80,7 @@ def start(): from .gui import run info("%s v. %s", _("Starting Portato"), VERSION) - if options.shm: - get_listener().set_send(*options.shm) - else: - get_listener().set_send() + get_listener().set_send(options.mq) try: run() @@ -92,14 +89,12 @@ def start(): else: # start us again in root modus and launch listener - import shm_wrapper as shm + from . import ipc - mem = shm.create_memory(1024, permissions=0600) - sig = shm.create_semaphore(InitialValue = 0, permissions = 0600) - rw = shm.create_semaphore(InitialValue = 1, permissions = 0600) + mq = ipc.MessageQueue(None, ipc.MessageQueue.CREAT | ipc.MessageQueue.EXCL) # start listener - lt = threading.Thread(target=get_listener().set_recv, args = (mem, sig, rw)) + lt = threading.Thread(target=get_listener().set_recv, args = (mq,)) lt.setDaemon(False) lt.start() @@ -111,7 +106,7 @@ def start(): su = detect_su_command() if su: debug("Using '%s' as su command.", su.bin) - cmd = su.cmd("%s --no-fork --shm %ld %ld %ld" % (sys.argv[0], mem.key, sig.key, rw.key)) + cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key)) sp = subprocess.Popen(cmd, env = env) @@ -129,3 +124,9 @@ def start(): if lt.isAlive(): debug("Listener is still running. Close it.") get_listener().close() + lt.join() + + try: + mq.remove() + except ipc.MessageQueueRemovedError: + debug("MessageQueue already removed. Ignore.") diff --git a/portato/ipc.pxd b/portato/ipc.pxd new file mode 100644 index 0000000..64ca05d --- /dev/null +++ b/portato/ipc.pxd @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# +# File: portato/ipc.pxd +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +from python_string cimport * +from python_mem cimport * + +cdef extern from "errno.h": + int errno + cdef enum: + EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, + EINVAL, EPERM, EIDRM, EINTR + +cdef extern from *: + int INT_MAX + int RAND_MAX + ctypedef size_t int + int rand() + +cdef extern from "string.h": + char* strerror(int errno) + void* memcpy (void* dst, void* src, size_t len) + +cdef extern from "sys/msg.h" nogil: + cdef enum: + IPC_CREAT, IPC_EXCL, IPC_NOWAIT, + IPC_RMID + + ctypedef int key_t + + struct msqid_ds: + pass + + int msgget(key_t key, int msgflg) + int msgctl(int msqid, int cmd, msqid_ds* buf) + int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) + int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) + +cdef struct msg_data: + long mtype + char mtext[1] + +cdef enum: + MAX_MESSAGE_SIZE = 2048 diff --git a/portato/ipc.pyx b/portato/ipc.pyx new file mode 100644 index 0000000..e9340cf --- /dev/null +++ b/portato/ipc.pyx @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# +# File: portato/ipc.pyx +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann <necoro@necoro.net> + +class MessageQueueError(Exception): + """ + Base class for different queue errors. + """ + pass + +class MessageQueueRemovedError (MessageQueueError): + """ + This class is used iff the queue is already removed. + """ + pass + +cdef class MessageQueue (object): + """ + A simple interface to the SysV message queues. + """ + + CREAT = IPC_CREAT + EXCL = IPC_EXCL + + cdef int msgid + cdef readonly key_t key + + def __init__ (self, key = None, int flags = 0): + """ + Create a new MessageQueue instance. Depending on the passed in flags, + different behavior occurs. See man msgget for the details. + + If key is None, a random key is created. + """ + + if (flags & IPC_EXCL) and not (flags & IPC_CREAT): + raise ValueError("EXCL must be combined with CREAT.") + + if key is None and not (flags & IPC_EXCL): + raise ValueError("The key can only be None if EXCL is set.") + + # make sure there is nothing ... obscure + flags &= (IPC_CREAT | IPC_EXCL) + + flags |= 0600 # mode + + if key is None: + check = True + while check: + self.key = self.random_key() + self.msgid = msgget(self.key, flags) + check = (self.msgid == -1 and errno == EEXIST) + else: + self.key = key + self.msgid = msgget(key, flags) + + if self.msgid == -1: + if errno == EACCES: + raise MessageQueueError("Permission denied.") + elif errno == EEXIST: + raise MessageQueueError("Queue already exists.") + elif errno == ENOENT: + raise MessageQueueError("Queue does not exist and CREAT is not set.") + elif errno == ENOMEM or errno == ENOSPC: + raise MemoryError("Insufficient ressources.") + else: + raise OSError(errno, strerror(errno)) + + def remove (self): + """ + Removes the message queue. + """ + cdef msqid_ds info + cdef int ret + + ret = msgctl(self.msgid, IPC_RMID, &info) + + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue already removed.") + elif errno == EPERM: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + def send (self, message, int type = 1): + """ + Sends a message with a specific type. + + The type must be larger zero. + Also note, that this is always blocking. + """ + cdef msg_data * msg + cdef int ret + cdef long size = len(message) + + if type <= 0: + raise ValueError("type must be > 0") + + if size >= MAX_MESSAGE_SIZE: + raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) + + msg = <msg_data*>PyMem_Malloc(sizeof(msg_data) + size) + + if msg is NULL: + raise MemoryError("Out of memory") + + memcpy(msg.mtext, <char*>message, size) + msg.mtype = type + + with nogil: + ret = msgsnd(self.msgid, msg, size, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + finally: + PyMem_Free(msg) + + def receive (self): + """ + Receives a message from the queue and returns the (msg, type) pair. + + Note that this method is always blocking. + """ + cdef msg_data * msg + cdef int ret + cdef object retTuple + + msg = <msg_data*>PyMem_Malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) + + if msg is NULL: + raise MemoryError("Out of memory") + + msg.mtype = 0 + + with nogil: + ret = msgrcv(self.msgid, msg, <size_t>MAX_MESSAGE_SIZE, 0, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + retTuple = (PyString_FromStringAndSize(msg.mtext, ret), msg.mtype) + finally: + PyMem_Free(msg) + + return retTuple + + cdef key_t random_key (self): + return <int>(<double>rand() / (<double>RAND_MAX + 1) * INT_MAX) diff --git a/portato/listener.py b/portato/listener.py index 3d2dd53..c96b637 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -21,7 +21,8 @@ except ImportError: pynotify = None from .constants import APP -from .helper import debug, warning +from .helper import debug, warning, error +from . import ipc class Listener (object): """This class handles the communication between the "listener" and the GUI. @@ -32,22 +33,15 @@ class Listener (object): @ivar _send: sender socket @type _send: int""" - def set_recv (self, mem, sig, rw): - self._mem = mem - self._sig = sig - self._rw = rw + def set_recv (self, mq): + + self.mq = mq while True: try: - try: - self._sig.P() - self._rw.P() - len = self._mem.read(NumberOfBytes = 4) - string = self._mem.read(NumberOfBytes = int(len), offset = 4) - finally: - self._rw.V() - - data = string.split("\0") + msg, type = self.mq.receive() + + data = msg.split("\0") debug("Listener received: %s", data) if data[0] == "notify": @@ -59,15 +53,12 @@ class Listener (object): except KeyboardInterrupt: debug("Got KeyboardInterrupt. Aborting.") break + except ipc.MessageQueueRemovedError: + debug("MessageQueue removed. Aborting.") + break - self._mem.remove() - self._sig.remove() - self._rw.remove() - - self._mem = None - self._sig = None - self._rw = None - + self.mq = None + def do_cmd (self, cmdlist): """Starts a command as the user. @@ -89,30 +80,21 @@ class Listener (object): n.set_urgency(int(urgency)) n.show() - def set_send (self, mem = None, sig = None, rw = None): - if mem is None or sig is None or rw is None: + def set_send (self, mq = None): + if mq is None: warning(_("Listener has not been started.")) - self._mem = None - self._sig = None - self._rw = None + self.mq = None else: - import shm_wrapper as shm - - self._mem = shm.SharedMemoryHandle(mem) - self._sig = shm.SemaphoreHandle(sig) - self._rw = shm.SemaphoreHandle(rw) + self.mq = ipc.MessageQueue(mq) def __send (self, string): - self._rw.P() - self._sig.Z() try: - self._mem.write("%4d%s" % (len(string), string)) - self._sig.V() - finally: - self._rw.V() + self.mq.send(string) + except ipc.MessageQueueError, e: + error(_("An exception occured while accessing the message queue: %s"), e) def send_notify (self, base = "", descr = "", icon = "", urgency = None): - if self._sig is None: + if self.mq is None: self.do_notify(base, descr, icon, urgency) else: string = "\0".join(["notify", base, descr, icon]) @@ -125,11 +107,11 @@ class Listener (object): self.__send(string) def send_cmd (self, cmdlist): - if self._sig is None: + if self.mq is None: self.do_cmd(cmdlist) else: self.__send("\0".join(["cmd"] +cmdlist)) def close (self): - if self._sig is not None: + if self.mq is not None: self.__send("close") @@ -15,6 +15,8 @@ import os import sys from distutils.core import setup +from distutils.extension import Extension +from Cython.Distutils import build_ext from portato.constants import VERSION, ICON_DIR, PLUGIN_DIR, TEMPLATE_DIR, APP @@ -38,17 +40,12 @@ data_files = [ (PLUGIN_DIR, plugin_list("gpytage", "notify", "etc_proposals", "reload_portage", "package_details"))] # extension stuff -ext_modules = [] -cmdclass={'build_manpage': build_manpage} +ext_modules = [Extension("portato.ipc", ["portato/ipc.pyx"])] if "--disable-eix" in sys.argv: sys.argv.remove("--disable-eix") else: - from Cython.Distutils import build_ext - from distutils.extension import Extension - ext_modules.append(Extension("portato.eix.parser", ["portato/eix/parser.pyx"])) - cmdclass['build_ext'] = build_ext packages.append("portato.eix") if "--enable-eix" in sys.argv: @@ -67,5 +64,5 @@ setup(name=APP, packages = packages, data_files = data_files, ext_modules = ext_modules, - cmdclass = cmdclass + cmdclass={'build_manpage': build_manpage, 'build_ext' : build_ext} ) |