summaryrefslogtreecommitdiff
path: root/portato
diff options
context:
space:
mode:
authorRené 'Necoro' Neumann <necoro@necoro.net>2009-08-15 12:10:15 +0200
committerRené 'Necoro' Neumann <necoro@necoro.net>2009-08-15 12:10:15 +0200
commit62c7271dc0fbb348be83304a2ffd823cb21c10d7 (patch)
treeb31e18e3e08ab7492e4af2fd16904faa10c952f6 /portato
parent1e59217d2b2266b4870a51667ad71b011aa4204f (diff)
parent2fdd70e3a102f666ab9f036d76e7e892421f6840 (diff)
downloadportato-62c7271dc0fbb348be83304a2ffd823cb21c10d7.tar.gz
portato-62c7271dc0fbb348be83304a2ffd823cb21c10d7.tar.bz2
portato-62c7271dc0fbb348be83304a2ffd823cb21c10d7.zip
Merge in own message queue module
Diffstat (limited to 'portato')
-rw-r--r--portato/__init__.py23
-rw-r--r--portato/ipc.pxd52
-rw-r--r--portato/ipc.pyx173
-rw-r--r--portato/listener.py64
4 files changed, 260 insertions, 52 deletions
diff --git a/portato/__init__.py b/portato/__init__.py
index 60aeec4..74145e7 100644
--- a/portato/__init__.py
+++ b/portato/__init__.py
@@ -46,7 +46,7 @@ def get_parser (use_ = False):
parser = OptionParser(version = vers, prog = "portato", description = desc, usage = usage)
- parser.add_option("--shm", action = "store", nargs = 3, type="long", dest = "shm",
+ parser.add_option("--mq", action = "store", nargs = 1, type="long", dest = "mq", default = None,
help = SUPPRESS_HELP)
parser.add_option("-F", "--no-fork", action = "store_true", dest = "nofork", default = False,
@@ -80,10 +80,7 @@ def start():
from .gui import run
info("%s v. %s", _("Starting Portato"), VERSION)
- if options.shm:
- get_listener().set_send(*options.shm)
- else:
- get_listener().set_send()
+ get_listener().set_send(options.mq)
try:
run()
@@ -92,14 +89,12 @@ def start():
else: # start us again in root modus and launch listener
- import shm_wrapper as shm
+ from . import ipc
- mem = shm.create_memory(1024, permissions=0600)
- sig = shm.create_semaphore(InitialValue = 0, permissions = 0600)
- rw = shm.create_semaphore(InitialValue = 1, permissions = 0600)
+ mq = ipc.MessageQueue(None, ipc.MessageQueue.CREAT | ipc.MessageQueue.EXCL)
# start listener
- lt = threading.Thread(target=get_listener().set_recv, args = (mem, sig, rw))
+ lt = threading.Thread(target=get_listener().set_recv, args = (mq,))
lt.setDaemon(False)
lt.start()
@@ -111,7 +106,7 @@ def start():
su = detect_su_command()
if su:
debug("Using '%s' as su command.", su.bin)
- cmd = su.cmd("%s --no-fork --shm %ld %ld %ld" % (sys.argv[0], mem.key, sig.key, rw.key))
+ cmd = su.cmd("%s --no-fork --mq %ld" % (sys.argv[0], mq.key))
sp = subprocess.Popen(cmd, env = env)
@@ -129,3 +124,9 @@ def start():
if lt.isAlive():
debug("Listener is still running. Close it.")
get_listener().close()
+ lt.join()
+
+ try:
+ mq.remove()
+ except ipc.MessageQueueRemovedError:
+ debug("MessageQueue already removed. Ignore.")
diff --git a/portato/ipc.pxd b/portato/ipc.pxd
new file mode 100644
index 0000000..64ca05d
--- /dev/null
+++ b/portato/ipc.pxd
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/ipc.pxd
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2009 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+from python_string cimport *
+from python_mem cimport *
+
+cdef extern from "errno.h":
+ int errno
+ cdef enum:
+ EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC,
+ EINVAL, EPERM, EIDRM, EINTR
+
+cdef extern from *:
+ int INT_MAX
+ int RAND_MAX
+ ctypedef size_t int
+ int rand()
+
+cdef extern from "string.h":
+ char* strerror(int errno)
+ void* memcpy (void* dst, void* src, size_t len)
+
+cdef extern from "sys/msg.h" nogil:
+ cdef enum:
+ IPC_CREAT, IPC_EXCL, IPC_NOWAIT,
+ IPC_RMID
+
+ ctypedef int key_t
+
+ struct msqid_ds:
+ pass
+
+ int msgget(key_t key, int msgflg)
+ int msgctl(int msqid, int cmd, msqid_ds* buf)
+ int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg)
+ int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg)
+
+cdef struct msg_data:
+ long mtype
+ char mtext[1]
+
+cdef enum:
+ MAX_MESSAGE_SIZE = 2048
diff --git a/portato/ipc.pyx b/portato/ipc.pyx
new file mode 100644
index 0000000..e9340cf
--- /dev/null
+++ b/portato/ipc.pyx
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+#
+# File: portato/ipc.pyx
+# This file is part of the Portato-Project, a graphical portage-frontend.
+#
+# Copyright (C) 2006-2009 René 'Necoro' Neumann
+# This is free software. You may redistribute copies of it under the terms of
+# the GNU General Public License version 2.
+# There is NO WARRANTY, to the extent permitted by law.
+#
+# Written by René 'Necoro' Neumann <necoro@necoro.net>
+
+class MessageQueueError(Exception):
+ """
+ Base class for different queue errors.
+ """
+ pass
+
+class MessageQueueRemovedError (MessageQueueError):
+ """
+ This class is used iff the queue is already removed.
+ """
+ pass
+
+cdef class MessageQueue (object):
+ """
+ A simple interface to the SysV message queues.
+ """
+
+ CREAT = IPC_CREAT
+ EXCL = IPC_EXCL
+
+ cdef int msgid
+ cdef readonly key_t key
+
+ def __init__ (self, key = None, int flags = 0):
+ """
+ Create a new MessageQueue instance. Depending on the passed in flags,
+ different behavior occurs. See man msgget for the details.
+
+ If key is None, a random key is created.
+ """
+
+ if (flags & IPC_EXCL) and not (flags & IPC_CREAT):
+ raise ValueError("EXCL must be combined with CREAT.")
+
+ if key is None and not (flags & IPC_EXCL):
+ raise ValueError("The key can only be None if EXCL is set.")
+
+ # make sure there is nothing ... obscure
+ flags &= (IPC_CREAT | IPC_EXCL)
+
+ flags |= 0600 # mode
+
+ if key is None:
+ check = True
+ while check:
+ self.key = self.random_key()
+ self.msgid = msgget(self.key, flags)
+ check = (self.msgid == -1 and errno == EEXIST)
+ else:
+ self.key = key
+ self.msgid = msgget(key, flags)
+
+ if self.msgid == -1:
+ if errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ elif errno == EEXIST:
+ raise MessageQueueError("Queue already exists.")
+ elif errno == ENOENT:
+ raise MessageQueueError("Queue does not exist and CREAT is not set.")
+ elif errno == ENOMEM or errno == ENOSPC:
+ raise MemoryError("Insufficient ressources.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ def remove (self):
+ """
+ Removes the message queue.
+ """
+ cdef msqid_ds info
+ cdef int ret
+
+ ret = msgctl(self.msgid, IPC_RMID, &info)
+
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue already removed.")
+ elif errno == EPERM:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ def send (self, message, int type = 1):
+ """
+ Sends a message with a specific type.
+
+ The type must be larger zero.
+ Also note, that this is always blocking.
+ """
+ cdef msg_data * msg
+ cdef int ret
+ cdef long size = len(message)
+
+ if type <= 0:
+ raise ValueError("type must be > 0")
+
+ if size >= MAX_MESSAGE_SIZE:
+ raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE)
+
+ msg = <msg_data*>PyMem_Malloc(sizeof(msg_data) + size)
+
+ if msg is NULL:
+ raise MemoryError("Out of memory")
+
+ memcpy(msg.mtext, <char*>message, size)
+ msg.mtype = type
+
+ with nogil:
+ ret = msgsnd(self.msgid, msg, size, 0)
+
+ try:
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue was removed.")
+ elif errno == EINTR:
+ raise MessageQueueError("Signaled while waiting.")
+ elif errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+ finally:
+ PyMem_Free(msg)
+
+ def receive (self):
+ """
+ Receives a message from the queue and returns the (msg, type) pair.
+
+ Note that this method is always blocking.
+ """
+ cdef msg_data * msg
+ cdef int ret
+ cdef object retTuple
+
+ msg = <msg_data*>PyMem_Malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE)
+
+ if msg is NULL:
+ raise MemoryError("Out of memory")
+
+ msg.mtype = 0
+
+ with nogil:
+ ret = msgrcv(self.msgid, msg, <size_t>MAX_MESSAGE_SIZE, 0, 0)
+
+ try:
+ if ret == -1:
+ if errno == EIDRM or errno == EINVAL:
+ raise MessageQueueRemovedError("Queue was removed.")
+ elif errno == EINTR:
+ raise MessageQueueError("Signaled while waiting.")
+ elif errno == EACCES:
+ raise MessageQueueError("Permission denied.")
+ else:
+ raise OSError(errno, strerror(errno))
+
+ retTuple = (PyString_FromStringAndSize(msg.mtext, ret), msg.mtype)
+ finally:
+ PyMem_Free(msg)
+
+ return retTuple
+
+ cdef key_t random_key (self):
+ return <int>(<double>rand() / (<double>RAND_MAX + 1) * INT_MAX)
diff --git a/portato/listener.py b/portato/listener.py
index 3d2dd53..c96b637 100644
--- a/portato/listener.py
+++ b/portato/listener.py
@@ -21,7 +21,8 @@ except ImportError:
pynotify = None
from .constants import APP
-from .helper import debug, warning
+from .helper import debug, warning, error
+from . import ipc
class Listener (object):
"""This class handles the communication between the "listener" and the GUI.
@@ -32,22 +33,15 @@ class Listener (object):
@ivar _send: sender socket
@type _send: int"""
- def set_recv (self, mem, sig, rw):
- self._mem = mem
- self._sig = sig
- self._rw = rw
+ def set_recv (self, mq):
+
+ self.mq = mq
while True:
try:
- try:
- self._sig.P()
- self._rw.P()
- len = self._mem.read(NumberOfBytes = 4)
- string = self._mem.read(NumberOfBytes = int(len), offset = 4)
- finally:
- self._rw.V()
-
- data = string.split("\0")
+ msg, type = self.mq.receive()
+
+ data = msg.split("\0")
debug("Listener received: %s", data)
if data[0] == "notify":
@@ -59,15 +53,12 @@ class Listener (object):
except KeyboardInterrupt:
debug("Got KeyboardInterrupt. Aborting.")
break
+ except ipc.MessageQueueRemovedError:
+ debug("MessageQueue removed. Aborting.")
+ break
- self._mem.remove()
- self._sig.remove()
- self._rw.remove()
-
- self._mem = None
- self._sig = None
- self._rw = None
-
+ self.mq = None
+
def do_cmd (self, cmdlist):
"""Starts a command as the user.
@@ -89,30 +80,21 @@ class Listener (object):
n.set_urgency(int(urgency))
n.show()
- def set_send (self, mem = None, sig = None, rw = None):
- if mem is None or sig is None or rw is None:
+ def set_send (self, mq = None):
+ if mq is None:
warning(_("Listener has not been started."))
- self._mem = None
- self._sig = None
- self._rw = None
+ self.mq = None
else:
- import shm_wrapper as shm
-
- self._mem = shm.SharedMemoryHandle(mem)
- self._sig = shm.SemaphoreHandle(sig)
- self._rw = shm.SemaphoreHandle(rw)
+ self.mq = ipc.MessageQueue(mq)
def __send (self, string):
- self._rw.P()
- self._sig.Z()
try:
- self._mem.write("%4d%s" % (len(string), string))
- self._sig.V()
- finally:
- self._rw.V()
+ self.mq.send(string)
+ except ipc.MessageQueueError, e:
+ error(_("An exception occured while accessing the message queue: %s"), e)
def send_notify (self, base = "", descr = "", icon = "", urgency = None):
- if self._sig is None:
+ if self.mq is None:
self.do_notify(base, descr, icon, urgency)
else:
string = "\0".join(["notify", base, descr, icon])
@@ -125,11 +107,11 @@ class Listener (object):
self.__send(string)
def send_cmd (self, cmdlist):
- if self._sig is None:
+ if self.mq is None:
self.do_cmd(cmdlist)
else:
self.__send("\0".join(["cmd"] +cmdlist))
def close (self):
- if self._sig is not None:
+ if self.mq is not None:
self.__send("close")