summaryrefslogtreecommitdiff
path: root/portato/mq.pyx
blob: a1e1dd861615cc42f8a4551ddfb6de16547dfe06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from stdlib cimport *

cdef extern from "errno.h":
    int errno
    cdef enum:
        EACCES, EEXIST, ENOENT, ENOMEM, ENOSPC,
        EINVAL, EPERM, EIDRM, EINTR

cdef extern from *:
    int INT_MAX
    int RAND_MAX
    ctypedef size_t int
    int rand()

cdef extern from "string.h":
    char* strerror(int errno)
    void* memcpy (void* dst, void* src, size_t len)

cdef extern from "sys/msg.h" nogil:
    cdef enum:
        IPC_CREAT, IPC_EXCL, IPC_NOWAIT,
        IPC_RMID

    ctypedef int key_t

    struct msqid_ds:
        pass

    int msgget(key_t key, int msgflg)
    int msgctl(int msqid, int cmd, msqid_ds* buf)
    int msgsnd(int msgid, void* msgp, size_t msgsz, int msgflg)
    int msgrcv(int msgid, void* msgp, size_t msgsz, long msgtype, int msgflg)

cdef struct msg_data:
    long mtype
    char mtext[1]

cdef enum:
    MAX_MESSAGE_SIZE = 2048

class MessageQueueError(Exception):
    pass

class MessageQueueRemovedError (MessageQueueError):
    pass

cdef class MessageQueue (object):

    CREAT = IPC_CREAT
    EXCL = IPC_EXCL
    
    cdef int msgid
    cdef readonly key_t key

    def __init__ (self, key = None, int flags = 0):

        if (flags & IPC_EXCL) and not (flags & IPC_CREAT):
            raise MessageQueueError("EXCL must be combined with CREAT.")

        if key is None and not (flags & IPC_EXCL):
            raise MessageQueueError("The key can only be None if EXCL is set.")

        # make sure there is nothing ... obscure
        flags &= (IPC_CREAT | IPC_EXCL)

        flags |= 0600 # mode

        if key is None:
            check = True
            while check:
                self.key = self.random_key()
                self.msgid = msgget(self.key, flags)
                check = (self.msgid == -1 and errno == EEXIST)
        else:
            self.key = key
            self.msgid = msgget(key, flags)

        if self.msgid == -1:
            if errno == EACCES:
                raise MessageQueueError("Permission denied.")
            elif errno == EEXIST:
                raise MessageQueueError("Queue already exists.")
            elif errno == ENOENT:
                raise MessageQueueError("Queue does not exist and CREAT is not set.")
            elif errno == ENOMEM or errno == ENOSPC:
                raise MessageQueueError("Insufficient ressources.")
            else:
                raise OSError(errno, strerror(errno))

    def remove (self):
        cdef msqid_ds info
        cdef int ret

        ret = msgctl(self.msgid, IPC_RMID, &info)

        if ret == -1:
            if errno == EIDRM or errno == EINVAL:
                raise MessageQueueRemovedError("Queue already removed.")
            elif errno == EPERM:
                raise MessageQueueError("Permission denied.")
            else:
                raise OSError(errno, strerror(errno))

    def send (self, message, int type = 1):
        cdef msg_data * msg
        cdef int ret
        cdef long size = len(message)

        if type <= 0:
            raise ValueError("type must be > 0")

        if size >= MAX_MESSAGE_SIZE:
            raise ValueError("Message must be smaller than %d", MAX_MESSAGE_SIZE)

        msg = <msg_data*>malloc(sizeof(msg_data) + size)

        if msg is NULL:
            raise MemoryError("Out of memory")

        memcpy(msg.mtext, <char*>message, size)
        msg.mtype = type

        with nogil:
            ret = msgsnd(self.msgid, &msg, size, 0)

        try:
            if ret == -1:
                if errno == EIDRM or errno == EINVAL:
                    raise MessageQueueRemovedError("Queue was removed.")
                elif errno == EINTR:
                    raise MessageQueueError("Signaled while waiting.")
                elif errno == EACCES:
                    raise MessageQueueError("Permission denied.")
                else:
                    raise OSError(errno, strerror(errno))
        finally:
            free(msg)

    def receive (self):
        cdef msg_data * msg
        cdef int ret
        cdef object retTuple

        msg = <msg_data*>malloc(sizeof(msg_data) + MAX_MESSAGE_SIZE)

        if msg is NULL:
            raise MemoryError("Out of memory")

        msg.mtype = 0

        with nogil:
            ret = msgrcv(self.msgid, msg, <size_t>MAX_MESSAGE_SIZE, 0, 0)
        
        try:
            if ret == -1:
                if errno == EIDRM or errno == EINVAL:
                    raise MessageQueueRemovedError("Queue was removed.")
                elif errno == EINTR:
                    raise MessageQueueError("Signaled while waiting.")
                elif errno == EACCES:
                    raise MessageQueueError("Permission denied.")
                else:
                    raise OSError(errno, strerror(errno))

            retTuple = (msg.mtext, msg.mtype)
        finally:
            free(msg)

        return retTuple

    cdef key_t random_key (self):
        return <int>(<double>rand() / (<double>RAND_MAX + 1) * INT_MAX)