summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--portato/__init__.py10
-rw-r--r--portato/listener.py16
-rw-r--r--portato/mq.pyx172
3 files changed, 185 insertions, 13 deletions
diff --git a/portato/__init__.py b/portato/__init__.py
index ae50929..bc1783b 100644
--- a/portato/__init__.py
+++ b/portato/__init__.py
@@ -89,12 +89,12 @@ def start():
else: # start us again in root modus and launch listener
- from . import sysv_ipc as ipc
+ from . import mq
- mq = ipc.MessageQueue(None, ipc.IPC_CREX)
+ _mq = mq.MessageQueue(None, mq.MessageQueue.CREAT | mq.MessageQueue.EXCL)
# start listener
- lt = threading.Thread(target=get_listener().set_recv, args = (mq,))
+ lt = threading.Thread(target=get_listener().set_recv, args = (_mq,))
lt.setDaemon(False)
lt.start()
@@ -106,7 +106,7 @@ def start():
su = detect_su_command()
if su:
debug("Using '%s' as su command.", su.bin)
- cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key))
+ cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], _mq.key))
sp = subprocess.Popen(cmd, env = env)
@@ -126,4 +126,4 @@ def start():
get_listener().close()
lt.join()
- mq.remove()
+ _mq.remove()
diff --git a/portato/listener.py b/portato/listener.py
index 1c12b00..4666269 100644
--- a/portato/listener.py
+++ b/portato/listener.py
@@ -32,13 +32,13 @@ class Listener (object):
@ivar _send: sender socket
@type _send: int"""
- def set_recv (self, mq):
+ def set_recv (self, _mq):
- self._mq = mq
+ self._mq = _mq
while True:
try:
- msg, type = self._mq.receive(block = True)
+ msg, type = self._mq.receive()
data = msg.split("\0")
debug("Listener received: %s", data)
@@ -76,17 +76,17 @@ class Listener (object):
n.set_urgency(int(urgency))
n.show()
- def set_send (self, mq = None):
- if mq is None:
+ def set_send (self, _mq = None):
+ if _mq is None:
warning(_("Listener has not been started."))
self._mq = None
else:
- from . import sysv_ipc as ipc
+ from . import mq
- self._mq = ipc.MessageQueue(mq)
+ self._mq = mq.MessageQueue(_mq)
def __send (self, string):
- self._mq.send(string, block = True)
+ self._mq.send(string)
def send_notify (self, base = "", descr = "", icon = "", urgency = None):
if self._mq is None:
diff --git a/portato/mq.pyx b/portato/mq.pyx
new file mode 100644
index 0000000..a1e1dd8
--- /dev/null
+++ b/portato/mq.pyx
@@ -0,0 +1,172 @@
+from stdlib cimport *
+
+cdef extern from "errno.h":
+ int errno
+ cdef enum:
+ EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC,
+ EINVAL, EPERM, EIDRM, EINTR
+
+cdef extern from *:
+ int INT_MAX
+ int RAND_MAX
+ ctypedef size_t int
+ int rand()
+
+cdef extern from "string.h":
+ char* strerror(int errno)
+ void* memcpy (void* dst, void* src, size_t len)
+
+cdef extern from "sys/msg.h" nogil:
+ cdef enum:
+ IPC_CREAT, IPC_EXCL, IPC_NOWAIT,
+ IPC_RMID
+
+ ctypedef int key_t
+
+ struct msqid_ds:
+ pass
+
+ int msgget(key_t key, int msgflg)
+ int msgctl(int msqid, int cmd, msqid_ds* buf)
+ int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg)
+ int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg)
+
+cdef struct msg_data:
+ long mtype
+ char mtext[1]
+
+cdef enum:
+ MAX_MESSAGE_SIZE = 2048
+
+class MessageQueueError(Exception):
+ pass
+
+class MessageQueueRemovedError (MessageQueueError):
+ pass
+
+cdef class MessageQueue (object):
+
+ CREAT = IPC_CREAT
+ EXCL = IPC_EXCL
+
+ cdef int msgid
+ cdef readonly key_t key
+
+ def __init__ (self, key = None, int flags = 0):
+
+ if (flags & IPC_EXCL) and not (flags & IPC_CREAT):
+ raise MessageQueueError("EXCL must be combined with CREAT.")
+
+ if key is None and not (flags & IPC_EXCL):
+ raise MessageQueueError("The key can only be None if EXCL is set.")
+
+ # make sure there is nothing ... obscure
+ flags &= (IPC_CREAT | IPC_EXCL)
+
+ flags |= 0600 # mode
+
+ if key is None:
+ check = True
+ while check:
+ self.key = self.random_key()
+ self.msgid = msgget(self.key, flags)
+ check = (self.msgid == -1 and errno == EEXIST)
+ else:
+ self.key = key
+ self.msgid = msgget(key, flags)
+
+ if self.msgid == -1:
+ if errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ elif errno == EEXIST:
+ raise MessageQueueError("Queue already exists.")
+ elif errno == ENOENT:
+ raise MessageQueueError("Queue does not exist and CREAT is not set.")
+ elif errno == ENOMEM or errno == ENOSPC:
+ raise MessageQueueError("Insufficient ressources.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ def remove (self):
+ cdef msqid_ds info
+ cdef int ret
+
+ ret = msgctl(self.msgid, IPC_RMID, &info)
+
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue already removed.")
+ elif errno == EPERM:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ def send (self, message, int type = 1):
+ cdef msg_data * msg
+ cdef int ret
+ cdef long size = len(message)
+
+ if type <= 0:
+ raise ValueError("type must be > 0")
+
+ if size >= MAX_MESSAGE_SIZE:
+ raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE)
+
+ msg = <msg_data*>malloc(sizeof(msg_data) + size)
+
+ if msg is NULL:
+ raise MemoryError("Out of memory")
+
+ memcpy(msg.mtext, <char*>message, size)
+ msg.mtype = type
+
+ with nogil:
+ ret = msgsnd(self.msgid, &msg, size, 0)
+
+ try:
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue was removed.")
+ elif errno == EINTR:
+ raise MessageQueueError("Signaled while waiting.")
+ elif errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+ finally:
+ free(msg)
+
+ def receive (self):
+ cdef msg_data * msg
+ cdef int ret
+ cdef object retTuple
+
+ msg = <msg_data*>malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE)
+
+ if msg is NULL:
+ raise MemoryError("Out of memory")
+
+ msg.mtype = 0
+
+ with nogil:
+ ret = msgrcv(self.msgid, msg, <size_t>MAX_MESSAGE_SIZE, 0, 0)
+
+ try:
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue was removed.")
+ elif errno == EINTR:
+ raise MessageQueueError("Signaled while waiting.")
+ elif errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ retTuple = (msg.mtext, msg.mtype)
+ finally:
+ free(msg)
+
+ return retTuple
+
+ cdef key_t random_key (self):
+ return <int>(<double>rand() / (<double>RAND_MAX + 1) * INT_MAX)