From 5355e5ad97e9c235c0cb1aecabae3b4fd38eea2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 02:07:51 +0200 Subject: Use message queue instead of SHM --- portato/__init__.py | 20 +++++++++---------- portato/listener.py | 57 ++++++++++++++++------------------------------------- 2 files changed, 26 insertions(+), 51 deletions(-) (limited to 'portato') diff --git a/portato/__init__.py b/portato/__init__.py index 60aeec4..ae50929 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -46,7 +46,7 @@ def get_parser (use_ = False): parser = OptionParser(version = vers, prog = "portato", description = desc, usage = usage) - parser.add_option("--shm", action = "store", nargs = 3, type="long", dest = "shm", + parser.add_option("--mq", action = "store", nargs = 1, type="long", dest = "mq", default = None, help = SUPPRESS_HELP) parser.add_option("-F", "--no-fork", action = "store_true", dest = "nofork", default = False, @@ -80,10 +80,7 @@ def start(): from .gui import run info("%s v. %s", _("Starting Portato"), VERSION) - if options.shm: - get_listener().set_send(*options.shm) - else: - get_listener().set_send() + get_listener().set_send(options.mq) try: run() @@ -92,14 +89,12 @@ def start(): else: # start us again in root modus and launch listener - import shm_wrapper as shm + from . import sysv_ipc as ipc - mem = shm.create_memory(1024, permissions=0600) - sig = shm.create_semaphore(InitialValue = 0, permissions = 0600) - rw = shm.create_semaphore(InitialValue = 1, permissions = 0600) + mq = ipc.MessageQueue(None, ipc.IPC_CREX) # start listener - lt = threading.Thread(target=get_listener().set_recv, args = (mem, sig, rw)) + lt = threading.Thread(target=get_listener().set_recv, args = (mq,)) lt.setDaemon(False) lt.start() @@ -111,7 +106,7 @@ def start(): su = detect_su_command() if su: debug("Using '%s' as su command.", su.bin) - cmd = su.cmd("%s --no-fork --shm %ld %ld %ld" % (sys.argv[0], mem.key, sig.key, rw.key)) + cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key)) sp = subprocess.Popen(cmd, env = env) @@ -129,3 +124,6 @@ def start(): if lt.isAlive(): debug("Listener is still running. Close it.") get_listener().close() + lt.join() + + mq.remove() diff --git a/portato/listener.py b/portato/listener.py index 3d2dd53..1c12b00 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -32,22 +32,15 @@ class Listener (object): @ivar _send: sender socket @type _send: int""" - def set_recv (self, mem, sig, rw): - self._mem = mem - self._sig = sig - self._rw = rw + def set_recv (self, mq): + + self._mq = mq while True: try: - try: - self._sig.P() - self._rw.P() - len = self._mem.read(NumberOfBytes = 4) - string = self._mem.read(NumberOfBytes = int(len), offset = 4) - finally: - self._rw.V() - - data = string.split("\0") + msg, type = self._mq.receive(block = True) + + data = msg.split("\0") debug("Listener received: %s", data) if data[0] == "notify": @@ -60,14 +53,8 @@ class Listener (object): debug("Got KeyboardInterrupt. Aborting.") break - self._mem.remove() - self._sig.remove() - self._rw.remove() - - self._mem = None - self._sig = None - self._rw = None - + self._mq = None + def do_cmd (self, cmdlist): """Starts a command as the user. @@ -89,30 +76,20 @@ class Listener (object): n.set_urgency(int(urgency)) n.show() - def set_send (self, mem = None, sig = None, rw = None): - if mem is None or sig is None or rw is None: + def set_send (self, mq = None): + if mq is None: warning(_("Listener has not been started.")) - self._mem = None - self._sig = None - self._rw = None + self._mq = None else: - import shm_wrapper as shm + from . import sysv_ipc as ipc - self._mem = shm.SharedMemoryHandle(mem) - self._sig = shm.SemaphoreHandle(sig) - self._rw = shm.SemaphoreHandle(rw) + self._mq = ipc.MessageQueue(mq) def __send (self, string): - self._rw.P() - self._sig.Z() - try: - self._mem.write("%4d%s" % (len(string), string)) - self._sig.V() - finally: - self._rw.V() + self._mq.send(string, block = True) def send_notify (self, base = "", descr = "", icon = "", urgency = None): - if self._sig is None: + if self._mq is None: self.do_notify(base, descr, icon, urgency) else: string = "\0".join(["notify", base, descr, icon]) @@ -125,11 +102,11 @@ class Listener (object): self.__send(string) def send_cmd (self, cmdlist): - if self._sig is None: + if self._mq is None: self.do_cmd(cmdlist) else: self.__send("\0".join(["cmd"] +cmdlist)) def close (self): - if self._sig is not None: + if self._mq is not None: self.__send("close") -- cgit v1.2.3 From 4feddaea01a755bcabc331a7a20836bc319f8b3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 04:28:53 +0200 Subject: First try of own mq-module --- portato/__init__.py | 10 +-- portato/listener.py | 16 ++--- portato/mq.pyx | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 185 insertions(+), 13 deletions(-) create mode 100644 portato/mq.pyx (limited to 'portato') diff --git a/portato/__init__.py b/portato/__init__.py index ae50929..bc1783b 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -89,12 +89,12 @@ def start(): else: # start us again in root modus and launch listener - from . import sysv_ipc as ipc + from . import mq - mq = ipc.MessageQueue(None, ipc.IPC_CREX) + _mq = mq.MessageQueue(None, mq.MessageQueue.CREAT | mq.MessageQueue.EXCL) # start listener - lt = threading.Thread(target=get_listener().set_recv, args = (mq,)) + lt = threading.Thread(target=get_listener().set_recv, args = (_mq,)) lt.setDaemon(False) lt.start() @@ -106,7 +106,7 @@ def start(): su = detect_su_command() if su: debug("Using '%s' as su command.", su.bin) - cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key)) + cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], _mq.key)) sp = subprocess.Popen(cmd, env = env) @@ -126,4 +126,4 @@ def start(): get_listener().close() lt.join() - mq.remove() + _mq.remove() diff --git a/portato/listener.py b/portato/listener.py index 1c12b00..4666269 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -32,13 +32,13 @@ class Listener (object): @ivar _send: sender socket @type _send: int""" - def set_recv (self, mq): + def set_recv (self, _mq): - self._mq = mq + self._mq = _mq while True: try: - msg, type = self._mq.receive(block = True) + msg, type = self._mq.receive() data = msg.split("\0") debug("Listener received: %s", data) @@ -76,17 +76,17 @@ class Listener (object): n.set_urgency(int(urgency)) n.show() - def set_send (self, mq = None): - if mq is None: + def set_send (self, _mq = None): + if _mq is None: warning(_("Listener has not been started.")) self._mq = None else: - from . import sysv_ipc as ipc + from . import mq - self._mq = ipc.MessageQueue(mq) + self._mq = mq.MessageQueue(_mq) def __send (self, string): - self._mq.send(string, block = True) + self._mq.send(string) def send_notify (self, base = "", descr = "", icon = "", urgency = None): if self._mq is None: diff --git a/portato/mq.pyx b/portato/mq.pyx new file mode 100644 index 0000000..a1e1dd8 --- /dev/null +++ b/portato/mq.pyx @@ -0,0 +1,172 @@ +from stdlib cimport * + +cdef extern from "errno.h": + int errno + cdef enum: + EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, + EINVAL, EPERM, EIDRM, EINTR + +cdef extern from *: + int INT_MAX + int RAND_MAX + ctypedef size_t int + int rand() + +cdef extern from "string.h": + char* strerror(int errno) + void* memcpy (void* dst, void* src, size_t len) + +cdef extern from "sys/msg.h" nogil: + cdef enum: + IPC_CREAT, IPC_EXCL, IPC_NOWAIT, + IPC_RMID + + ctypedef int key_t + + struct msqid_ds: + pass + + int msgget(key_t key, int msgflg) + int msgctl(int msqid, int cmd, msqid_ds* buf) + int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) + int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) + +cdef struct msg_data: + long mtype + char mtext[1] + +cdef enum: + MAX_MESSAGE_SIZE = 2048 + +class MessageQueueError(Exception): + pass + +class MessageQueueRemovedError (MessageQueueError): + pass + +cdef class MessageQueue (object): + + CREAT = IPC_CREAT + EXCL = IPC_EXCL + + cdef int msgid + cdef readonly key_t key + + def __init__ (self, key = None, int flags = 0): + + if (flags & IPC_EXCL) and not (flags & IPC_CREAT): + raise MessageQueueError("EXCL must be combined with CREAT.") + + if key is None and not (flags & IPC_EXCL): + raise MessageQueueError("The key can only be None if EXCL is set.") + + # make sure there is nothing ... obscure + flags &= (IPC_CREAT | IPC_EXCL) + + flags |= 0600 # mode + + if key is None: + check = True + while check: + self.key = self.random_key() + self.msgid = msgget(self.key, flags) + check = (self.msgid == -1 and errno == EEXIST) + else: + self.key = key + self.msgid = msgget(key, flags) + + if self.msgid == -1: + if errno == EACCES: + raise MessageQueueError("Permission denied.") + elif errno == EEXIST: + raise MessageQueueError("Queue already exists.") + elif errno == ENOENT: + raise MessageQueueError("Queue does not exist and CREAT is not set.") + elif errno == ENOMEM or errno == ENOSPC: + raise MessageQueueError("Insufficient ressources.") + else: + raise OSError(errno, strerror(errno)) + + def remove (self): + cdef msqid_ds info + cdef int ret + + ret = msgctl(self.msgid, IPC_RMID, &info) + + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue already removed.") + elif errno == EPERM: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + def send (self, message, int type = 1): + cdef msg_data * msg + cdef int ret + cdef long size = len(message) + + if type <= 0: + raise ValueError("type must be > 0") + + if size >= MAX_MESSAGE_SIZE: + raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) + + msg = malloc(sizeof(msg_data) + size) + + if msg is NULL: + raise MemoryError("Out of memory") + + memcpy(msg.mtext, message, size) + msg.mtype = type + + with nogil: + ret = msgsnd(self.msgid, &msg, size, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + finally: + free(msg) + + def receive (self): + cdef msg_data * msg + cdef int ret + cdef object retTuple + + msg = malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) + + if msg is NULL: + raise MemoryError("Out of memory") + + msg.mtype = 0 + + with nogil: + ret = msgrcv(self.msgid, msg, MAX_MESSAGE_SIZE, 0, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + retTuple = (msg.mtext, msg.mtype) + finally: + free(msg) + + return retTuple + + cdef key_t random_key (self): + return (rand() / (RAND_MAX + 1) * INT_MAX) -- cgit v1.2.3 From 327be30ad41bd0ea9c6757b7d527ab9d5baee2ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 04:37:11 +0200 Subject: Moved all the include stuff to the pxd --- portato/mq.pxd | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ portato/mq.pyx | 50 +++++++++++--------------------------------------- 2 files changed, 62 insertions(+), 39 deletions(-) create mode 100644 portato/mq.pxd (limited to 'portato') diff --git a/portato/mq.pxd b/portato/mq.pxd new file mode 100644 index 0000000..ab5cfbe --- /dev/null +++ b/portato/mq.pxd @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# File: portato/mq.pxd +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann + +from stdlib cimport * + +cdef extern from "errno.h": + int errno + cdef enum: + EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, + EINVAL, EPERM, EIDRM, EINTR + +cdef extern from *: + int INT_MAX + int RAND_MAX + ctypedef size_t int + int rand() + +cdef extern from "string.h": + char* strerror(int errno) + void* memcpy (void* dst, void* src, size_t len) + +cdef extern from "sys/msg.h" nogil: + cdef enum: + IPC_CREAT, IPC_EXCL, IPC_NOWAIT, + IPC_RMID + + ctypedef int key_t + + struct msqid_ds: + pass + + int msgget(key_t key, int msgflg) + int msgctl(int msqid, int cmd, msqid_ds* buf) + int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) + int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) + +cdef struct msg_data: + long mtype + char mtext[1] + +cdef enum: + MAX_MESSAGE_SIZE = 2048 diff --git a/portato/mq.pyx b/portato/mq.pyx index a1e1dd8..a5d0745 100644 --- a/portato/mq.pyx +++ b/portato/mq.pyx @@ -1,42 +1,14 @@ -from stdlib cimport * - -cdef extern from "errno.h": - int errno - cdef enum: - EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, - EINVAL, EPERM, EIDRM, EINTR - -cdef extern from *: - int INT_MAX - int RAND_MAX - ctypedef size_t int - int rand() - -cdef extern from "string.h": - char* strerror(int errno) - void* memcpy (void* dst, void* src, size_t len) - -cdef extern from "sys/msg.h" nogil: - cdef enum: - IPC_CREAT, IPC_EXCL, IPC_NOWAIT, - IPC_RMID - - ctypedef int key_t - - struct msqid_ds: - pass - - int msgget(key_t key, int msgflg) - int msgctl(int msqid, int cmd, msqid_ds* buf) - int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) - int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) - -cdef struct msg_data: - long mtype - char mtext[1] - -cdef enum: - MAX_MESSAGE_SIZE = 2048 +# -*- coding: utf-8 -*- +# +# File: portato/mq.pyx +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann class MessageQueueError(Exception): pass -- cgit v1.2.3 From 44d410d007e88d794e35e322082bec3ec69d5fa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 11:46:13 +0200 Subject: Finish mq module --- portato/mq.pxd | 3 ++- portato/mq.pyx | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) (limited to 'portato') diff --git a/portato/mq.pxd b/portato/mq.pxd index ab5cfbe..be7631b 100644 --- a/portato/mq.pxd +++ b/portato/mq.pxd @@ -10,7 +10,8 @@ # # Written by René 'Necoro' Neumann -from stdlib cimport * +from python_string cimport * +from python_mem cimport * cdef extern from "errno.h": int errno diff --git a/portato/mq.pyx b/portato/mq.pyx index a5d0745..e9ce464 100644 --- a/portato/mq.pyx +++ b/portato/mq.pyx @@ -84,7 +84,7 @@ cdef class MessageQueue (object): if size >= MAX_MESSAGE_SIZE: raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) - msg = malloc(sizeof(msg_data) + size) + msg = PyMem_Malloc(sizeof(msg_data) + size) if msg is NULL: raise MemoryError("Out of memory") @@ -93,7 +93,7 @@ cdef class MessageQueue (object): msg.mtype = type with nogil: - ret = msgsnd(self.msgid, &msg, size, 0) + ret = msgsnd(self.msgid, msg, size, 0) try: if ret == -1: @@ -106,14 +106,14 @@ cdef class MessageQueue (object): else: raise OSError(errno, strerror(errno)) finally: - free(msg) + PyMem_Free(msg) def receive (self): cdef msg_data * msg cdef int ret cdef object retTuple - msg = malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) + msg = PyMem_Malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) if msg is NULL: raise MemoryError("Out of memory") @@ -134,9 +134,9 @@ cdef class MessageQueue (object): else: raise OSError(errno, strerror(errno)) - retTuple = (msg.mtext, msg.mtype) + retTuple = (PyString_FromStringAndSize(msg.mtext, ret), msg.mtype) finally: - free(msg) + PyMem_Free(msg) return retTuple -- cgit v1.2.3 From 088452702812614cbbbaa64b116f29971920fac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 11:51:15 +0200 Subject: Renamed 'mq' to 'ipc' --- portato/__init__.py | 10 ++-- portato/ipc.pxd | 52 +++++++++++++++++++ portato/ipc.pyx | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++++ portato/listener.py | 26 +++++----- portato/mq.pxd | 52 ------------------- portato/mq.pyx | 144 ---------------------------------------------------- 6 files changed, 214 insertions(+), 214 deletions(-) create mode 100644 portato/ipc.pxd create mode 100644 portato/ipc.pyx delete mode 100644 portato/mq.pxd delete mode 100644 portato/mq.pyx (limited to 'portato') diff --git a/portato/__init__.py b/portato/__init__.py index bc1783b..433c89a 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -89,12 +89,12 @@ def start(): else: # start us again in root modus and launch listener - from . import mq + from . import ipc - _mq = mq.MessageQueue(None, mq.MessageQueue.CREAT | mq.MessageQueue.EXCL) + mq = ipc.MessageQueue(None, ipc.MessageQueue.CREAT | ipc.MessageQueue.EXCL) # start listener - lt = threading.Thread(target=get_listener().set_recv, args = (_mq,)) + lt = threading.Thread(target=get_listener().set_recv, args = (mq,)) lt.setDaemon(False) lt.start() @@ -106,7 +106,7 @@ def start(): su = detect_su_command() if su: debug("Using '%s' as su command.", su.bin) - cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], _mq.key)) + cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key)) sp = subprocess.Popen(cmd, env = env) @@ -126,4 +126,4 @@ def start(): get_listener().close() lt.join() - _mq.remove() + mq.remove() diff --git a/portato/ipc.pxd b/portato/ipc.pxd new file mode 100644 index 0000000..64ca05d --- /dev/null +++ b/portato/ipc.pxd @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# +# File: portato/ipc.pxd +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann + +from python_string cimport * +from python_mem cimport * + +cdef extern from "errno.h": + int errno + cdef enum: + EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, + EINVAL, EPERM, EIDRM, EINTR + +cdef extern from *: + int INT_MAX + int RAND_MAX + ctypedef size_t int + int rand() + +cdef extern from "string.h": + char* strerror(int errno) + void* memcpy (void* dst, void* src, size_t len) + +cdef extern from "sys/msg.h" nogil: + cdef enum: + IPC_CREAT, IPC_EXCL, IPC_NOWAIT, + IPC_RMID + + ctypedef int key_t + + struct msqid_ds: + pass + + int msgget(key_t key, int msgflg) + int msgctl(int msqid, int cmd, msqid_ds* buf) + int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) + int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) + +cdef struct msg_data: + long mtype + char mtext[1] + +cdef enum: + MAX_MESSAGE_SIZE = 2048 diff --git a/portato/ipc.pyx b/portato/ipc.pyx new file mode 100644 index 0000000..4a06102 --- /dev/null +++ b/portato/ipc.pyx @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +# +# File: portato/ipc.pyx +# This file is part of the Portato-Project, a graphical portage-frontend. +# +# Copyright (C) 2006-2009 René 'Necoro' Neumann +# This is free software. You may redistribute copies of it under the terms of +# the GNU General Public License version 2. +# There is NO WARRANTY, to the extent permitted by law. +# +# Written by René 'Necoro' Neumann + +class MessageQueueError(Exception): + pass + +class MessageQueueRemovedError (MessageQueueError): + pass + +cdef class MessageQueue (object): + + CREAT = IPC_CREAT + EXCL = IPC_EXCL + + cdef int msgid + cdef readonly key_t key + + def __init__ (self, key = None, int flags = 0): + + if (flags & IPC_EXCL) and not (flags & IPC_CREAT): + raise MessageQueueError("EXCL must be combined with CREAT.") + + if key is None and not (flags & IPC_EXCL): + raise MessageQueueError("The key can only be None if EXCL is set.") + + # make sure there is nothing ... obscure + flags &= (IPC_CREAT | IPC_EXCL) + + flags |= 0600 # mode + + if key is None: + check = True + while check: + self.key = self.random_key() + self.msgid = msgget(self.key, flags) + check = (self.msgid == -1 and errno == EEXIST) + else: + self.key = key + self.msgid = msgget(key, flags) + + if self.msgid == -1: + if errno == EACCES: + raise MessageQueueError("Permission denied.") + elif errno == EEXIST: + raise MessageQueueError("Queue already exists.") + elif errno == ENOENT: + raise MessageQueueError("Queue does not exist and CREAT is not set.") + elif errno == ENOMEM or errno == ENOSPC: + raise MessageQueueError("Insufficient ressources.") + else: + raise OSError(errno, strerror(errno)) + + def remove (self): + cdef msqid_ds info + cdef int ret + + ret = msgctl(self.msgid, IPC_RMID, &info) + + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue already removed.") + elif errno == EPERM: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + def send (self, message, int type = 1): + cdef msg_data * msg + cdef int ret + cdef long size = len(message) + + if type <= 0: + raise ValueError("type must be > 0") + + if size >= MAX_MESSAGE_SIZE: + raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) + + msg = PyMem_Malloc(sizeof(msg_data) + size) + + if msg is NULL: + raise MemoryError("Out of memory") + + memcpy(msg.mtext, message, size) + msg.mtype = type + + with nogil: + ret = msgsnd(self.msgid, msg, size, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + finally: + PyMem_Free(msg) + + def receive (self): + cdef msg_data * msg + cdef int ret + cdef object retTuple + + msg = PyMem_Malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) + + if msg is NULL: + raise MemoryError("Out of memory") + + msg.mtype = 0 + + with nogil: + ret = msgrcv(self.msgid, msg, MAX_MESSAGE_SIZE, 0, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + retTuple = (PyString_FromStringAndSize(msg.mtext, ret), msg.mtype) + finally: + PyMem_Free(msg) + + return retTuple + + cdef key_t random_key (self): + return (rand() / (RAND_MAX + 1) * INT_MAX) diff --git a/portato/listener.py b/portato/listener.py index 4666269..aa9e31d 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -32,13 +32,13 @@ class Listener (object): @ivar _send: sender socket @type _send: int""" - def set_recv (self, _mq): + def set_recv (self, mq): - self._mq = _mq + self.mq = mq while True: try: - msg, type = self._mq.receive() + msg, type = self.mq.receive() data = msg.split("\0") debug("Listener received: %s", data) @@ -53,7 +53,7 @@ class Listener (object): debug("Got KeyboardInterrupt. Aborting.") break - self._mq = None + self.mq = None def do_cmd (self, cmdlist): """Starts a command as the user. @@ -76,20 +76,20 @@ class Listener (object): n.set_urgency(int(urgency)) n.show() - def set_send (self, _mq = None): - if _mq is None: + def set_send (self, mq = None): + if mq is None: warning(_("Listener has not been started.")) - self._mq = None + self.mq = None else: - from . import mq + from . import ipc - self._mq = mq.MessageQueue(_mq) + self.mq = ipc.MessageQueue(mq) def __send (self, string): - self._mq.send(string) + self.mq.send(string) def send_notify (self, base = "", descr = "", icon = "", urgency = None): - if self._mq is None: + if self.mq is None: self.do_notify(base, descr, icon, urgency) else: string = "\0".join(["notify", base, descr, icon]) @@ -102,11 +102,11 @@ class Listener (object): self.__send(string) def send_cmd (self, cmdlist): - if self._mq is None: + if self.mq is None: self.do_cmd(cmdlist) else: self.__send("\0".join(["cmd"] +cmdlist)) def close (self): - if self._mq is not None: + if self.mq is not None: self.__send("close") diff --git a/portato/mq.pxd b/portato/mq.pxd deleted file mode 100644 index be7631b..0000000 --- a/portato/mq.pxd +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/mq.pxd -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2006-2009 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann - -from python_string cimport * -from python_mem cimport * - -cdef extern from "errno.h": - int errno - cdef enum: - EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, - EINVAL, EPERM, EIDRM, EINTR - -cdef extern from *: - int INT_MAX - int RAND_MAX - ctypedef size_t int - int rand() - -cdef extern from "string.h": - char* strerror(int errno) - void* memcpy (void* dst, void* src, size_t len) - -cdef extern from "sys/msg.h" nogil: - cdef enum: - IPC_CREAT, IPC_EXCL, IPC_NOWAIT, - IPC_RMID - - ctypedef int key_t - - struct msqid_ds: - pass - - int msgget(key_t key, int msgflg) - int msgctl(int msqid, int cmd, msqid_ds* buf) - int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) - int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) - -cdef struct msg_data: - long mtype - char mtext[1] - -cdef enum: - MAX_MESSAGE_SIZE = 2048 diff --git a/portato/mq.pyx b/portato/mq.pyx deleted file mode 100644 index e9ce464..0000000 --- a/portato/mq.pyx +++ /dev/null @@ -1,144 +0,0 @@ -# -*- coding: utf-8 -*- -# -# File: portato/mq.pyx -# This file is part of the Portato-Project, a graphical portage-frontend. -# -# Copyright (C) 2006-2009 René 'Necoro' Neumann -# This is free software. You may redistribute copies of it under the terms of -# the GNU General Public License version 2. -# There is NO WARRANTY, to the extent permitted by law. -# -# Written by René 'Necoro' Neumann - -class MessageQueueError(Exception): - pass - -class MessageQueueRemovedError (MessageQueueError): - pass - -cdef class MessageQueue (object): - - CREAT = IPC_CREAT - EXCL = IPC_EXCL - - cdef int msgid - cdef readonly key_t key - - def __init__ (self, key = None, int flags = 0): - - if (flags & IPC_EXCL) and not (flags & IPC_CREAT): - raise MessageQueueError("EXCL must be combined with CREAT.") - - if key is None and not (flags & IPC_EXCL): - raise MessageQueueError("The key can only be None if EXCL is set.") - - # make sure there is nothing ... obscure - flags &= (IPC_CREAT | IPC_EXCL) - - flags |= 0600 # mode - - if key is None: - check = True - while check: - self.key = self.random_key() - self.msgid = msgget(self.key, flags) - check = (self.msgid == -1 and errno == EEXIST) - else: - self.key = key - self.msgid = msgget(key, flags) - - if self.msgid == -1: - if errno == EACCES: - raise MessageQueueError("Permission denied.") - elif errno == EEXIST: - raise MessageQueueError("Queue already exists.") - elif errno == ENOENT: - raise MessageQueueError("Queue does not exist and CREAT is not set.") - elif errno == ENOMEM or errno == ENOSPC: - raise MessageQueueError("Insufficient ressources.") - else: - raise OSError(errno, strerror(errno)) - - def remove (self): - cdef msqid_ds info - cdef int ret - - ret = msgctl(self.msgid, IPC_RMID, &info) - - if ret == -1: - if errno == EIDRM or errno == EINVAL: - raise MessageQueueRemovedError("Queue already removed.") - elif errno == EPERM: - raise MessageQueueError("Permission denied.") - else: - raise OSError(errno, strerror(errno)) - - def send (self, message, int type = 1): - cdef msg_data * msg - cdef int ret - cdef long size = len(message) - - if type <= 0: - raise ValueError("type must be > 0") - - if size >= MAX_MESSAGE_SIZE: - raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) - - msg = PyMem_Malloc(sizeof(msg_data) + size) - - if msg is NULL: - raise MemoryError("Out of memory") - - memcpy(msg.mtext, message, size) - msg.mtype = type - - with nogil: - ret = msgsnd(self.msgid, msg, size, 0) - - try: - if ret == -1: - if errno == EIDRM or errno == EINVAL: - raise MessageQueueRemovedError("Queue was removed.") - elif errno == EINTR: - raise MessageQueueError("Signaled while waiting.") - elif errno == EACCES: - raise MessageQueueError("Permission denied.") - else: - raise OSError(errno, strerror(errno)) - finally: - PyMem_Free(msg) - - def receive (self): - cdef msg_data * msg - cdef int ret - cdef object retTuple - - msg = PyMem_Malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) - - if msg is NULL: - raise MemoryError("Out of memory") - - msg.mtype = 0 - - with nogil: - ret = msgrcv(self.msgid, msg, MAX_MESSAGE_SIZE, 0, 0) - - try: - if ret == -1: - if errno == EIDRM or errno == EINVAL: - raise MessageQueueRemovedError("Queue was removed.") - elif errno == EINTR: - raise MessageQueueError("Signaled while waiting.") - elif errno == EACCES: - raise MessageQueueError("Permission denied.") - else: - raise OSError(errno, strerror(errno)) - - retTuple = (PyString_FromStringAndSize(msg.mtext, ret), msg.mtype) - finally: - PyMem_Free(msg) - - return retTuple - - cdef key_t random_key (self): - return (rand() / (RAND_MAX + 1) * INT_MAX) -- cgit v1.2.3 From cc13f5299417b0a08db0399cee9c548013c87513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 12:03:02 +0200 Subject: Correctly handle exceptions --- portato/__init__.py | 5 ++++- portato/listener.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) (limited to 'portato') diff --git a/portato/__init__.py b/portato/__init__.py index 433c89a..74145e7 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -126,4 +126,7 @@ def start(): get_listener().close() lt.join() - mq.remove() + try: + mq.remove() + except ipc.MessageQueueRemovedError: + debug("MessageQueue already removed. Ignore.") diff --git a/portato/listener.py b/portato/listener.py index aa9e31d..c96b637 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -21,7 +21,8 @@ except ImportError: pynotify = None from .constants import APP -from .helper import debug, warning +from .helper import debug, warning, error +from . import ipc class Listener (object): """This class handles the communication between the "listener" and the GUI. @@ -52,6 +53,9 @@ class Listener (object): except KeyboardInterrupt: debug("Got KeyboardInterrupt. Aborting.") break + except ipc.MessageQueueRemovedError: + debug("MessageQueue removed. Aborting.") + break self.mq = None @@ -81,12 +85,13 @@ class Listener (object): warning(_("Listener has not been started.")) self.mq = None else: - from . import ipc - self.mq = ipc.MessageQueue(mq) def __send (self, string): - self.mq.send(string) + try: + self.mq.send(string) + except ipc.MessageQueueError, e: + error(_("An exception occured while accessing the message queue: %s"), e) def send_notify (self, base = "", descr = "", icon = "", urgency = None): if self.mq is None: -- cgit v1.2.3 From 2fdd70e3a102f666ab9f036d76e7e892421f6840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Sat, 15 Aug 2009 12:09:43 +0200 Subject: documentation --- portato/ipc.pyx | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'portato') diff --git a/portato/ipc.pyx b/portato/ipc.pyx index 4a06102..e9340cf 100644 --- a/portato/ipc.pyx +++ b/portato/ipc.pyx @@ -11,12 +11,21 @@ # Written by René 'Necoro' Neumann class MessageQueueError(Exception): + """ + Base class for different queue errors. + """ pass class MessageQueueRemovedError (MessageQueueError): + """ + This class is used iff the queue is already removed. + """ pass cdef class MessageQueue (object): + """ + A simple interface to the SysV message queues. + """ CREAT = IPC_CREAT EXCL = IPC_EXCL @@ -25,12 +34,18 @@ cdef class MessageQueue (object): cdef readonly key_t key def __init__ (self, key = None, int flags = 0): + """ + Create a new MessageQueue instance. Depending on the passed in flags, + different behavior occurs. See man msgget for the details. + + If key is None, a random key is created. + """ if (flags & IPC_EXCL) and not (flags & IPC_CREAT): - raise MessageQueueError("EXCL must be combined with CREAT.") + raise ValueError("EXCL must be combined with CREAT.") if key is None and not (flags & IPC_EXCL): - raise MessageQueueError("The key can only be None if EXCL is set.") + raise ValueError("The key can only be None if EXCL is set.") # make sure there is nothing ... obscure flags &= (IPC_CREAT | IPC_EXCL) @@ -55,11 +70,14 @@ cdef class MessageQueue (object): elif errno == ENOENT: raise MessageQueueError("Queue does not exist and CREAT is not set.") elif errno == ENOMEM or errno == ENOSPC: - raise MessageQueueError("Insufficient ressources.") + raise MemoryError("Insufficient ressources.") else: raise OSError(errno, strerror(errno)) def remove (self): + """ + Removes the message queue. + """ cdef msqid_ds info cdef int ret @@ -74,6 +92,12 @@ cdef class MessageQueue (object): raise OSError(errno, strerror(errno)) def send (self, message, int type = 1): + """ + Sends a message with a specific type. + + The type must be larger zero. + Also note, that this is always blocking. + """ cdef msg_data * msg cdef int ret cdef long size = len(message) @@ -109,6 +133,11 @@ cdef class MessageQueue (object): PyMem_Free(msg) def receive (self): + """ + Receives a message from the queue and returns the (msg, type) pair. + + Note that this method is always blocking. + """ cdef msg_data * msg cdef int ret cdef object retTuple -- cgit v1.2.3