diff options
-rw-r--r-- | portato/__init__.py | 10 | ||||
-rw-r--r-- | portato/listener.py | 16 | ||||
-rw-r--r-- | portato/mq.pyx | 172 |
3 files changed, 185 insertions, 13 deletions
diff --git a/portato/__init__.py b/portato/__init__.py index ae50929..bc1783b 100644 --- a/portato/__init__.py +++ b/portato/__init__.py @@ -89,12 +89,12 @@ def start(): else: # start us again in root modus and launch listener - from . import sysv_ipc as ipc + from . import mq - mq = ipc.MessageQueue(None, ipc.IPC_CREX) + _mq = mq.MessageQueue(None, mq.MessageQueue.CREAT | mq.MessageQueue.EXCL) # start listener - lt = threading.Thread(target=get_listener().set_recv, args = (mq,)) + lt = threading.Thread(target=get_listener().set_recv, args = (_mq,)) lt.setDaemon(False) lt.start() @@ -106,7 +106,7 @@ def start(): su = detect_su_command() if su: debug("Using '%s' as su command.", su.bin) - cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key)) + cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], _mq.key)) sp = subprocess.Popen(cmd, env = env) @@ -126,4 +126,4 @@ def start(): get_listener().close() lt.join() - mq.remove() + _mq.remove() diff --git a/portato/listener.py b/portato/listener.py index 1c12b00..4666269 100644 --- a/portato/listener.py +++ b/portato/listener.py @@ -32,13 +32,13 @@ class Listener (object): @ivar _send: sender socket @type _send: int""" - def set_recv (self, mq): + def set_recv (self, _mq): - self._mq = mq + self._mq = _mq while True: try: - msg, type = self._mq.receive(block = True) + msg, type = self._mq.receive() data = msg.split("\0") debug("Listener received: %s", data) @@ -76,17 +76,17 @@ class Listener (object): n.set_urgency(int(urgency)) n.show() - def set_send (self, mq = None): - if mq is None: + def set_send (self, _mq = None): + if _mq is None: warning(_("Listener has not been started.")) self._mq = None else: - from . import sysv_ipc as ipc + from . import mq - self._mq = ipc.MessageQueue(mq) + self._mq = mq.MessageQueue(_mq) def __send (self, string): - self._mq.send(string, block = True) + self._mq.send(string) def send_notify (self, base = "", descr = "", icon = "", urgency = None): if self._mq is None: diff --git a/portato/mq.pyx b/portato/mq.pyx new file mode 100644 index 0000000..a1e1dd8 --- /dev/null +++ b/portato/mq.pyx @@ -0,0 +1,172 @@ +from stdlib cimport * + +cdef extern from "errno.h": + int errno + cdef enum: + EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC, + EINVAL, EPERM, EIDRM, EINTR + +cdef extern from *: + int INT_MAX + int RAND_MAX + ctypedef size_t int + int rand() + +cdef extern from "string.h": + char* strerror(int errno) + void* memcpy (void* dst, void* src, size_t len) + +cdef extern from "sys/msg.h" nogil: + cdef enum: + IPC_CREAT, IPC_EXCL, IPC_NOWAIT, + IPC_RMID + + ctypedef int key_t + + struct msqid_ds: + pass + + int msgget(key_t key, int msgflg) + int msgctl(int msqid, int cmd, msqid_ds* buf) + int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg) + int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg) + +cdef struct msg_data: + long mtype + char mtext[1] + +cdef enum: + MAX_MESSAGE_SIZE = 2048 + +class MessageQueueError(Exception): + pass + +class MessageQueueRemovedError (MessageQueueError): + pass + +cdef class MessageQueue (object): + + CREAT = IPC_CREAT + EXCL = IPC_EXCL + + cdef int msgid + cdef readonly key_t key + + def __init__ (self, key = None, int flags = 0): + + if (flags & IPC_EXCL) and not (flags & IPC_CREAT): + raise MessageQueueError("EXCL must be combined with CREAT.") + + if key is None and not (flags & IPC_EXCL): + raise MessageQueueError("The key can only be None if EXCL is set.") + + # make sure there is nothing ... obscure + flags &= (IPC_CREAT | IPC_EXCL) + + flags |= 0600 # mode + + if key is None: + check = True + while check: + self.key = self.random_key() + self.msgid = msgget(self.key, flags) + check = (self.msgid == -1 and errno == EEXIST) + else: + self.key = key + self.msgid = msgget(key, flags) + + if self.msgid == -1: + if errno == EACCES: + raise MessageQueueError("Permission denied.") + elif errno == EEXIST: + raise MessageQueueError("Queue already exists.") + elif errno == ENOENT: + raise MessageQueueError("Queue does not exist and CREAT is not set.") + elif errno == ENOMEM or errno == ENOSPC: + raise MessageQueueError("Insufficient ressources.") + else: + raise OSError(errno, strerror(errno)) + + def remove (self): + cdef msqid_ds info + cdef int ret + + ret = msgctl(self.msgid, IPC_RMID, &info) + + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue already removed.") + elif errno == EPERM: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + def send (self, message, int type = 1): + cdef msg_data * msg + cdef int ret + cdef long size = len(message) + + if type <= 0: + raise ValueError("type must be > 0") + + if size >= MAX_MESSAGE_SIZE: + raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE) + + msg = <msg_data*>malloc(sizeof(msg_data) + size) + + if msg is NULL: + raise MemoryError("Out of memory") + + memcpy(msg.mtext, <char*>message, size) + msg.mtype = type + + with nogil: + ret = msgsnd(self.msgid, &msg, size, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + finally: + free(msg) + + def receive (self): + cdef msg_data * msg + cdef int ret + cdef object retTuple + + msg = <msg_data*>malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE) + + if msg is NULL: + raise MemoryError("Out of memory") + + msg.mtype = 0 + + with nogil: + ret = msgrcv(self.msgid, msg, <size_t>MAX_MESSAGE_SIZE, 0, 0) + + try: + if ret == -1: + if errno == EIDRM or errno == EINVAL: + raise MessageQueueRemovedError("Queue was removed.") + elif errno == EINTR: + raise MessageQueueError("Signaled while waiting.") + elif errno == EACCES: + raise MessageQueueError("Permission denied.") + else: + raise OSError(errno, strerror(errno)) + + retTuple = (msg.mtext, msg.mtype) + finally: + free(msg) + + return retTuple + + cdef key_t random_key (self): + return <int>(<double>rand() / (<double>RAND_MAX + 1) * INT_MAX) |