jens@11
|
1 |
# encoding: utf-8
|
jens@11
|
2 |
"""
|
jens@11
|
3 |
BLIP.py
|
jens@11
|
4 |
|
jens@11
|
5 |
Created by Jens Alfke on 2008-06-03.
|
jens@13
|
6 |
Copyright notice and BSD license at end of file.
|
jens@11
|
7 |
"""
|
jens@11
|
8 |
|
jens@11
|
9 |
import asynchat
|
jens@11
|
10 |
import asyncore
|
jens@11
|
11 |
from cStringIO import StringIO
|
jens@11
|
12 |
import logging
|
jens@11
|
13 |
import socket
|
jens@11
|
14 |
import struct
|
jens@11
|
15 |
import sys
|
jens@11
|
16 |
import traceback
|
jens@11
|
17 |
import zlib
|
jens@11
|
18 |
|
jens@11
|
19 |
|
jens@13
|
20 |
# Connection status enumeration:
|
jens@13
|
21 |
kDisconnected = -1
|
jens@13
|
22 |
kClosed = 0
|
jens@13
|
23 |
kOpening = 1
|
jens@13
|
24 |
kOpen = 2
|
jens@13
|
25 |
kClosing = 3
|
jens@13
|
26 |
|
jens@13
|
27 |
|
jens@12
|
28 |
# INTERNAL CONSTANTS -- NO TOUCHIES!
|
jens@12
|
29 |
|
morrowa@51
|
30 |
kFrameMagicNumber = 0x9B34F206
|
jens@11
|
31 |
kFrameHeaderFormat = '!LLHH'
|
jens@11
|
32 |
kFrameHeaderSize = 12
|
jens@11
|
33 |
|
jens@11
|
34 |
kMsgFlag_TypeMask = 0x000F
|
jens@11
|
35 |
kMsgFlag_Compressed = 0x0010
|
jens@11
|
36 |
kMsgFlag_Urgent = 0x0020
|
jens@11
|
37 |
kMsgFlag_NoReply = 0x0040
|
jens@11
|
38 |
kMsgFlag_MoreComing = 0x0080
|
morrowa@51
|
39 |
kMsgFlag_Meta = 0x0100
|
jens@11
|
40 |
|
jens@11
|
41 |
kMsgType_Request = 0
|
jens@11
|
42 |
kMsgType_Response = 1
|
jens@11
|
43 |
kMsgType_Error = 2
|
jens@11
|
44 |
|
morrowa@51
|
45 |
kMsgProfile_Hi = "Hi"
|
morrowa@51
|
46 |
kMsgProfile_Bye = "Bye"
|
morrowa@51
|
47 |
|
morrowa@56
|
48 |
# Logging Setup
|
morrowa@56
|
49 |
class NullLoggingHandler(logging.Handler):
|
morrowa@56
|
50 |
def emit(self, record):
|
morrowa@56
|
51 |
pass
|
jens@11
|
52 |
|
jens@11
|
53 |
log = logging.getLogger('BLIP')
|
morrowa@56
|
54 |
# This line prevents the "No handlers found" warning if the calling code does not use logging.
|
morrowa@56
|
55 |
log.addHandler(NullLoggingHandler())
|
jens@11
|
56 |
log.propagate = True
|
jens@11
|
57 |
|
jens@12
|
58 |
|
jens@11
|
59 |
class MessageException(Exception):
|
jens@11
|
60 |
pass
|
jens@11
|
61 |
|
jens@11
|
62 |
class ConnectionException(Exception):
|
jens@11
|
63 |
pass
|
jens@11
|
64 |
|
jens@11
|
65 |
|
jens@13
|
66 |
### LISTENER AND CONNECTION CLASSES:
|
jens@13
|
67 |
|
jens@13
|
68 |
|
jens@11
|
69 |
class Listener (asyncore.dispatcher):
|
jens@12
|
70 |
"BLIP listener/server class"
|
jens@12
|
71 |
|
jens@13
|
72 |
def __init__(self, port, sslKeyFile=None, sslCertFile=None):
|
jens@12
|
73 |
"Create a listener on a port"
|
jens@11
|
74 |
asyncore.dispatcher.__init__(self)
|
jens@12
|
75 |
self.onConnected = self.onRequest = None
|
jens@11
|
76 |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
jens@11
|
77 |
self.bind( ('',port) )
|
jens@11
|
78 |
self.listen(5)
|
jens@13
|
79 |
self.sslKeyFile=sslKeyFile
|
jens@13
|
80 |
self.sslCertFile=sslCertFile
|
jens@11
|
81 |
log.info("Listening on port %u", port)
|
jens@11
|
82 |
|
jens@11
|
83 |
def handle_accept( self ):
|
jens@13
|
84 |
socket,address = self.accept()
|
jens@13
|
85 |
if self.sslKeyFile:
|
jens@13
|
86 |
socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
|
jens@13
|
87 |
conn = Connection(address, sock=socket, listener=self)
|
jens@11
|
88 |
conn.onRequest = self.onRequest
|
jens@11
|
89 |
if self.onConnected:
|
jens@11
|
90 |
self.onConnected(conn)
|
jens@11
|
91 |
|
jens@13
|
92 |
def handle_error(self):
|
jens@13
|
93 |
(typ,val,trace) = sys.exc_info()
|
jens@13
|
94 |
log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
|
jens@13
|
95 |
self.close()
|
jens@13
|
96 |
|
jens@13
|
97 |
|
jens@11
|
98 |
|
jens@11
|
99 |
class Connection (asynchat.async_chat):
|
jens@13
|
100 |
def __init__( self, address, sock=None, listener=None, ssl=None ):
|
jens@12
|
101 |
"Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
|
jens@12
|
102 |
"otherwise it'll open a new outgoing socket."
|
jens@13
|
103 |
if sock:
|
jens@13
|
104 |
asynchat.async_chat.__init__(self,sock)
|
jens@11
|
105 |
log.info("Accepted connection from %s",address)
|
jens@13
|
106 |
self.status = kOpen
|
jens@11
|
107 |
else:
|
jens@13
|
108 |
asynchat.async_chat.__init__(self)
|
jens@11
|
109 |
log.info("Opening connection to %s",address)
|
jens@11
|
110 |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
jens@13
|
111 |
self.status = kOpening
|
jens@13
|
112 |
if ssl:
|
jens@13
|
113 |
ssl(self.socket)
|
jens@11
|
114 |
self.connect(address)
|
jens@13
|
115 |
self.address = address
|
jens@13
|
116 |
self.listener = listener
|
morrowa@51
|
117 |
self.onRequest = self.onCloseRequest = self.onCloseRefused = None
|
jens@11
|
118 |
self.pendingRequests = {}
|
jens@11
|
119 |
self.pendingResponses = {}
|
jens@11
|
120 |
self.outBox = []
|
jens@11
|
121 |
self.inMessage = None
|
jens@13
|
122 |
self.inNumRequests = self.outNumRequests = 0
|
jens@14
|
123 |
self.sending = False
|
jens@11
|
124 |
self._endOfFrame()
|
morrowa@51
|
125 |
self._closeWhenPossible = False
|
jens@11
|
126 |
|
jens@13
|
127 |
def handle_connect(self):
|
jens@13
|
128 |
log.info("Connection open!")
|
jens@13
|
129 |
self.status = kOpen
|
jens@13
|
130 |
|
jens@13
|
131 |
def handle_error(self):
|
jens@13
|
132 |
(typ,val,trace) = sys.exc_info()
|
jens@13
|
133 |
log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
|
jens@13
|
134 |
self.discard_buffers()
|
jens@13
|
135 |
self.status = kDisconnected
|
jens@11
|
136 |
self.close()
|
jens@11
|
137 |
|
jens@11
|
138 |
|
jens@11
|
139 |
### SENDING:
|
jens@11
|
140 |
|
jens@13
|
141 |
@property
|
morrowa@51
|
142 |
def isOpen(self):
|
jens@13
|
143 |
return self.status==kOpening or self.status==kOpen
|
jens@13
|
144 |
|
morrowa@51
|
145 |
@property
|
morrowa@51
|
146 |
def canSend(self):
|
morrowa@51
|
147 |
return self.isOpen and not self._closeWhenPossible
|
morrowa@51
|
148 |
|
jens@13
|
149 |
def _sendMessage(self, msg):
|
morrowa@51
|
150 |
if self.isOpen:
|
jens@13
|
151 |
self._outQueueMessage(msg,True)
|
jens@14
|
152 |
if not self.sending:
|
jens@14
|
153 |
log.debug("Waking up the output stream")
|
jens@14
|
154 |
self.sending = True
|
jens@14
|
155 |
self.push_with_producer(self)
|
jens@13
|
156 |
return True
|
jens@13
|
157 |
else:
|
jens@13
|
158 |
return False
|
jens@13
|
159 |
|
jens@13
|
160 |
def _sendRequest(self, req):
|
jens@13
|
161 |
if self.canSend:
|
jens@13
|
162 |
requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
|
jens@13
|
163 |
response = req.response
|
jens@13
|
164 |
if response:
|
jens@13
|
165 |
response.requestNo = requestNo
|
jens@13
|
166 |
self.pendingResponses[requestNo] = response
|
jens@13
|
167 |
log.debug("pendingResponses[%i] := %s",requestNo,response)
|
jens@13
|
168 |
return self._sendMessage(req)
|
jens@13
|
169 |
else:
|
morrowa@54
|
170 |
log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
|
jens@13
|
171 |
return False
|
jens@13
|
172 |
|
jens@11
|
173 |
def _outQueueMessage(self, msg,isNew=True):
|
jens@12
|
174 |
n = len(self.outBox)
|
jens@11
|
175 |
index = n
|
jens@11
|
176 |
if msg.urgent and n>1:
|
jens@11
|
177 |
while index > 0:
|
jens@12
|
178 |
otherMsg = self.outBox[index-1]
|
jens@11
|
179 |
if otherMsg.urgent:
|
jens@11
|
180 |
if index<n:
|
jens@11
|
181 |
index += 1
|
jens@11
|
182 |
break
|
jens@14
|
183 |
elif isNew and otherMsg.bytesSent==0:
|
jens@11
|
184 |
break
|
jens@11
|
185 |
index -= 1
|
jens@11
|
186 |
else:
|
jens@11
|
187 |
index = 1
|
jens@12
|
188 |
|
jens@11
|
189 |
self.outBox.insert(index,msg)
|
jens@11
|
190 |
if isNew:
|
jens@13
|
191 |
log.info("Queuing %s at index %i",msg,index)
|
jens@12
|
192 |
else:
|
jens@12
|
193 |
log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
|
jens@11
|
194 |
|
jens@14
|
195 |
def more(self):
|
jens@14
|
196 |
n = len(self.outBox)
|
jens@14
|
197 |
if n > 0:
|
jens@14
|
198 |
msg = self.outBox.pop(0)
|
jens@14
|
199 |
frameSize = 4096
|
jens@14
|
200 |
if msg.urgent or n==1 or not self.outBox[0].urgent:
|
jens@14
|
201 |
frameSize *= 4
|
jens@14
|
202 |
data = msg._sendNextFrame(frameSize)
|
jens@14
|
203 |
if msg._moreComing:
|
jens@14
|
204 |
self._outQueueMessage(msg,isNew=False)
|
jens@14
|
205 |
else:
|
jens@14
|
206 |
log.info("Finished sending %s",msg)
|
jens@14
|
207 |
return data
|
jens@14
|
208 |
else:
|
jens@14
|
209 |
log.debug("Nothing more to send")
|
jens@14
|
210 |
self.sending = False
|
morrowa@51
|
211 |
self._closeIfReady()
|
jens@14
|
212 |
return None
|
jens@11
|
213 |
|
jens@11
|
214 |
### RECEIVING:
|
jens@11
|
215 |
|
jens@11
|
216 |
def collect_incoming_data(self, data):
|
jens@11
|
217 |
if self.expectingHeader:
|
jens@11
|
218 |
if self.inHeader==None:
|
jens@11
|
219 |
self.inHeader = data
|
jens@11
|
220 |
else:
|
jens@11
|
221 |
self.inHeader += data
|
jens@13
|
222 |
elif self.inMessage:
|
jens@11
|
223 |
self.inMessage._receivedData(data)
|
jens@12
|
224 |
|
jens@11
|
225 |
def found_terminator(self):
|
jens@11
|
226 |
if self.expectingHeader:
|
jens@11
|
227 |
# Got a header:
|
jens@11
|
228 |
(magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
|
jens@11
|
229 |
self.inHeader = None
|
jens@13
|
230 |
if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
|
jens@13
|
231 |
if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
|
jens@11
|
232 |
frameLen -= kFrameHeaderSize
|
jens@11
|
233 |
log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
|
jens@11
|
234 |
(flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
|
jens@11
|
235 |
self.inMessage = self._inMessageForFrame(requestNo,flags)
|
jens@11
|
236 |
|
jens@11
|
237 |
if frameLen > 0:
|
jens@11
|
238 |
self.expectingHeader = False
|
jens@11
|
239 |
self.set_terminator(frameLen)
|
jens@11
|
240 |
else:
|
jens@11
|
241 |
self._endOfFrame()
|
jens@12
|
242 |
|
jens@11
|
243 |
else:
|
jens@11
|
244 |
# Got the frame's payload:
|
jens@11
|
245 |
self._endOfFrame()
|
jens@11
|
246 |
|
jens@11
|
247 |
def _inMessageForFrame(self, requestNo,flags):
|
jens@11
|
248 |
message = None
|
jens@11
|
249 |
msgType = flags & kMsgFlag_TypeMask
|
jens@11
|
250 |
if msgType==kMsgType_Request:
|
jens@11
|
251 |
message = self.pendingRequests.get(requestNo)
|
jens@11
|
252 |
if message==None and requestNo == self.inNumRequests+1:
|
jens@11
|
253 |
message = IncomingRequest(self,requestNo,flags)
|
jens@12
|
254 |
assert message!=None
|
jens@11
|
255 |
self.pendingRequests[requestNo] = message
|
jens@11
|
256 |
self.inNumRequests += 1
|
jens@11
|
257 |
elif msgType==kMsgType_Response or msgType==kMsgType_Error:
|
jens@11
|
258 |
message = self.pendingResponses.get(requestNo)
|
morrowa@51
|
259 |
message._updateFlags(flags)
|
jens@12
|
260 |
|
jens@12
|
261 |
if message != None:
|
jens@11
|
262 |
message._beginFrame(flags)
|
jens@11
|
263 |
else:
|
jens@11
|
264 |
log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
|
jens@11
|
265 |
return message
|
jens@11
|
266 |
|
jens@11
|
267 |
def _endOfFrame(self):
|
jens@11
|
268 |
msg = self.inMessage
|
jens@11
|
269 |
self.inMessage = None
|
jens@11
|
270 |
self.expectingHeader = True
|
jens@11
|
271 |
self.inHeader = None
|
jens@11
|
272 |
self.set_terminator(kFrameHeaderSize) # wait for binary header
|
jens@11
|
273 |
if msg:
|
jens@11
|
274 |
log.debug("End of frame of %s",msg)
|
jens@13
|
275 |
if not msg._moreComing:
|
jens@11
|
276 |
self._receivedMessage(msg)
|
jens@12
|
277 |
|
jens@11
|
278 |
def _receivedMessage(self, msg):
|
jens@11
|
279 |
log.info("Received: %s",msg)
|
jens@11
|
280 |
# Remove from pending:
|
jens@11
|
281 |
if msg.isResponse:
|
jens@13
|
282 |
del self.pendingResponses[msg.requestNo]
|
jens@11
|
283 |
else:
|
jens@11
|
284 |
del self.pendingRequests[msg.requestNo]
|
jens@11
|
285 |
# Decode:
|
jens@11
|
286 |
try:
|
jens@11
|
287 |
msg._finished()
|
jens@12
|
288 |
if not msg.isResponse:
|
morrowa@51
|
289 |
if msg._meta:
|
morrowa@51
|
290 |
self._dispatchMetaRequest(msg)
|
morrowa@51
|
291 |
else:
|
morrowa@51
|
292 |
self.onRequest(msg)
|
jens@11
|
293 |
except Exception, x:
|
jens@12
|
294 |
log.error("Exception handling incoming message: %s", traceback.format_exc())
|
jens@11
|
295 |
#FIX: Send an error reply
|
morrowa@51
|
296 |
# Check to see if we're done and ready to close:
|
morrowa@51
|
297 |
self._closeIfReady()
|
morrowa@51
|
298 |
|
morrowa@51
|
299 |
def _dispatchMetaRequest(self, request):
|
morrowa@51
|
300 |
"""Handles dispatching internal meta requests."""
|
morrowa@51
|
301 |
if request['Profile'] == kMsgProfile_Bye:
|
morrowa@54
|
302 |
self._handleCloseRequest(request)
|
morrowa@51
|
303 |
else:
|
morrowa@51
|
304 |
response = request.response
|
morrowa@51
|
305 |
response.isError = True
|
morrowa@51
|
306 |
response['Error-Domain'] = "BLIP"
|
morrowa@51
|
307 |
response['Error-Code'] = 404
|
morrowa@51
|
308 |
response.body = "Unknown meta profile"
|
morrowa@51
|
309 |
response.send()
|
morrowa@51
|
310 |
|
morrowa@51
|
311 |
### CLOSING:
|
morrowa@51
|
312 |
|
morrowa@54
|
313 |
def _handleCloseRequest(self, request):
|
morrowa@54
|
314 |
"""Handles requests from a peer to close."""
|
morrowa@54
|
315 |
shouldClose = True
|
morrowa@54
|
316 |
if self.onCloseRequest:
|
morrowa@54
|
317 |
shouldClose = self.onCloseRequest()
|
morrowa@54
|
318 |
if not shouldClose:
|
morrowa@54
|
319 |
log.debug("Sending resfusal to close...")
|
morrowa@54
|
320 |
response = request.response
|
morrowa@54
|
321 |
response.isError = True
|
morrowa@54
|
322 |
response['Error-Domain'] = "BLIP"
|
morrowa@54
|
323 |
response['Error-Code'] = 403
|
morrowa@54
|
324 |
response.body = "Close request denied"
|
morrowa@54
|
325 |
response.send()
|
morrowa@54
|
326 |
else:
|
morrowa@54
|
327 |
log.debug("Sending permission to close...")
|
morrowa@54
|
328 |
response = request.response
|
morrowa@54
|
329 |
response.send()
|
morrowa@54
|
330 |
|
morrowa@51
|
331 |
def close(self):
|
morrowa@51
|
332 |
"""Publicly callable close method. Sends close request to peer."""
|
morrowa@51
|
333 |
if self.status != kOpen:
|
morrowa@51
|
334 |
return False
|
morrowa@51
|
335 |
log.info("Sending close request...")
|
morrowa@51
|
336 |
req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
|
morrowa@51
|
337 |
req._meta = True
|
morrowa@51
|
338 |
req.response.onComplete = self._handleCloseResponse
|
morrowa@51
|
339 |
if not req.send():
|
morrowa@51
|
340 |
log.error("Error sending close request.")
|
morrowa@51
|
341 |
return False
|
morrowa@51
|
342 |
else:
|
morrowa@51
|
343 |
self.status = kClosing
|
morrowa@51
|
344 |
return True
|
morrowa@51
|
345 |
|
morrowa@51
|
346 |
def _handleCloseResponse(self, response):
|
morrowa@51
|
347 |
"""Called when we receive a response to a close request."""
|
morrowa@51
|
348 |
log.info("Received close response.")
|
morrowa@51
|
349 |
if response.isError:
|
morrowa@51
|
350 |
# remote refused to close
|
morrowa@51
|
351 |
if self.onCloseRefused:
|
morrowa@51
|
352 |
self.onCloseRefused(response)
|
morrowa@51
|
353 |
self.status = kOpen
|
morrowa@51
|
354 |
else:
|
morrowa@51
|
355 |
# now wait until everything has finished sending, then actually close
|
morrowa@51
|
356 |
log.info("No refusal, actually closing...")
|
morrowa@51
|
357 |
self._closeWhenPossible = True
|
morrowa@51
|
358 |
|
morrowa@51
|
359 |
def _closeIfReady(self):
|
morrowa@51
|
360 |
"""Checks if all transmissions are complete and then closes the actual socket."""
|
morrowa@51
|
361 |
if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
|
morrowa@51
|
362 |
# self._closeWhenPossible = False
|
morrowa@51
|
363 |
log.debug("_closeIfReady closing.")
|
morrowa@51
|
364 |
asynchat.async_chat.close(self)
|
morrowa@51
|
365 |
|
morrowa@51
|
366 |
def handle_close(self):
|
morrowa@51
|
367 |
"""Called when the socket actually closes."""
|
morrowa@51
|
368 |
log.info("Connection closed!")
|
morrowa@51
|
369 |
self.pendingRequests = self.pendingResponses = None
|
morrowa@51
|
370 |
self.outBox = None
|
morrowa@51
|
371 |
if self.status == kClosing:
|
morrowa@51
|
372 |
self.status = kClosed
|
morrowa@51
|
373 |
else:
|
morrowa@51
|
374 |
self.status = kDisconnected
|
morrowa@53
|
375 |
asyncore.dispatcher.close(self)
|
jens@11
|
376 |
|
jens@12
|
377 |
|
jens@13
|
378 |
### MESSAGE CLASSES:
|
jens@11
|
379 |
|
jens@11
|
380 |
|
jens@11
|
381 |
class Message (object):
|
jens@12
|
382 |
"Abstract superclass of all request/response objects"
|
jens@12
|
383 |
|
jens@13
|
384 |
def __init__(self, connection, body=None, properties=None):
|
jens@11
|
385 |
self.connection = connection
|
jens@13
|
386 |
self.body = body
|
jens@11
|
387 |
self.properties = properties or {}
|
jens@13
|
388 |
self.requestNo = None
|
jens@11
|
389 |
|
jens@11
|
390 |
@property
|
jens@11
|
391 |
def flags(self):
|
jens@12
|
392 |
if self.isResponse:
|
morrowa@51
|
393 |
if self.isError:
|
morrowa@51
|
394 |
flags = kMsgType_Error
|
morrowa@51
|
395 |
else:
|
morrowa@51
|
396 |
flags = kMsgType_Response
|
jens@12
|
397 |
else:
|
jens@12
|
398 |
flags = kMsgType_Request
|
jens@11
|
399 |
if self.urgent: flags |= kMsgFlag_Urgent
|
jens@11
|
400 |
if self.compressed: flags |= kMsgFlag_Compressed
|
jens@11
|
401 |
if self.noReply: flags |= kMsgFlag_NoReply
|
jens@13
|
402 |
if self._moreComing:flags |= kMsgFlag_MoreComing
|
morrowa@51
|
403 |
if self._meta: flags |= kMsgFlag_Meta
|
jens@11
|
404 |
return flags
|
jens@11
|
405 |
|
jens@11
|
406 |
def __str__(self):
|
jens@13
|
407 |
s = "%s[" %(type(self).__name__)
|
jens@13
|
408 |
if self.requestNo != None:
|
jens@13
|
409 |
s += "#%i" %self.requestNo
|
jens@11
|
410 |
if self.urgent: s += " URG"
|
jens@11
|
411 |
if self.compressed: s += " CMP"
|
jens@11
|
412 |
if self.noReply: s += " NOR"
|
jens@13
|
413 |
if self._moreComing:s += " MOR"
|
morrowa@51
|
414 |
if self._meta: s += " MET"
|
jens@11
|
415 |
if self.body: s += " %i bytes" %len(self.body)
|
jens@11
|
416 |
return s+"]"
|
jens@11
|
417 |
|
jens@11
|
418 |
def __repr__(self):
|
jens@11
|
419 |
s = str(self)
|
jens@11
|
420 |
if len(self.properties): s += repr(self.properties)
|
jens@11
|
421 |
return s
|
jens@12
|
422 |
|
jens@13
|
423 |
@property
|
jens@11
|
424 |
def isResponse(self):
|
jens@12
|
425 |
"Is this message a response?"
|
jens@11
|
426 |
return False
|
jens@12
|
427 |
|
jens@13
|
428 |
@property
|
jens@12
|
429 |
def contentType(self):
|
jens@12
|
430 |
return self.properties.get('Content-Type')
|
jens@12
|
431 |
|
jens@12
|
432 |
def __getitem__(self, key): return self.properties.get(key)
|
jens@12
|
433 |
def __contains__(self, key): return key in self.properties
|
jens@12
|
434 |
def __len__(self): return len(self.properties)
|
jens@12
|
435 |
def __nonzero__(self): return True
|
jens@12
|
436 |
def __iter__(self): return self.properties.__iter__()
|
jens@11
|
437 |
|
jens@11
|
438 |
|
jens@11
|
439 |
class IncomingMessage (Message):
|
jens@12
|
440 |
"Abstract superclass of incoming messages."
|
jens@12
|
441 |
|
jens@11
|
442 |
def __init__(self, connection, requestNo, flags):
|
jens@11
|
443 |
super(IncomingMessage,self).__init__(connection)
|
jens@11
|
444 |
self.requestNo = requestNo
|
morrowa@51
|
445 |
self._updateFlags(flags)
|
morrowa@51
|
446 |
self.frames = []
|
morrowa@51
|
447 |
|
morrowa@51
|
448 |
def _updateFlags(self, flags):
|
jens@12
|
449 |
self.urgent = (flags & kMsgFlag_Urgent) != 0
|
jens@11
|
450 |
self.compressed = (flags & kMsgFlag_Compressed) != 0
|
jens@11
|
451 |
self.noReply = (flags & kMsgFlag_NoReply) != 0
|
jens@13
|
452 |
self._moreComing= (flags & kMsgFlag_MoreComing) != 0
|
morrowa@51
|
453 |
self._meta = (flags & kMsgFlag_Meta) != 0
|
morrowa@51
|
454 |
self.isError = (flags & kMsgType_Error) != 0
|
jens@11
|
455 |
|
jens@11
|
456 |
def _beginFrame(self, flags):
|
jens@13
|
457 |
"""Received a frame header."""
|
jens@13
|
458 |
self._moreComing = (flags & kMsgFlag_MoreComing)!=0
|
jens@12
|
459 |
|
jens@11
|
460 |
def _receivedData(self, data):
|
jens@13
|
461 |
"""Received data from a frame."""
|
jens@11
|
462 |
self.frames.append(data)
|
jens@11
|
463 |
|
jens@11
|
464 |
def _finished(self):
|
jens@13
|
465 |
"""The entire message has been received; now decode it."""
|
jens@11
|
466 |
encoded = "".join(self.frames)
|
jens@11
|
467 |
self.frames = None
|
jens@11
|
468 |
|
jens@11
|
469 |
# Decode the properties:
|
jens@11
|
470 |
if len(encoded) < 2: raise MessageException, "missing properties length"
|
jens@11
|
471 |
propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
|
jens@11
|
472 |
if propSize>len(encoded): raise MessageException, "properties too long to fit"
|
jens@11
|
473 |
if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
|
jens@11
|
474 |
|
morrowa@51
|
475 |
if propSize > 2:
|
morrowa@51
|
476 |
proplist = encoded[2:propSize-1].split('\000')
|
morrowa@51
|
477 |
|
morrowa@51
|
478 |
if len(proplist) & 1: raise MessageException, "odd number of property strings"
|
morrowa@51
|
479 |
for i in xrange(0,len(proplist),2):
|
morrowa@51
|
480 |
def expand(str):
|
morrowa@51
|
481 |
if len(str)==1:
|
morrowa@51
|
482 |
str = IncomingMessage.__expandDict.get(str,str)
|
morrowa@51
|
483 |
return str
|
morrowa@51
|
484 |
self.properties[ expand(proplist[i])] = expand(proplist[i+1])
|
morrowa@51
|
485 |
|
jens@11
|
486 |
encoded = encoded[propSize:]
|
jens@11
|
487 |
# Decode the body:
|
jens@11
|
488 |
if self.compressed and len(encoded)>0:
|
jens@11
|
489 |
try:
|
jens@11
|
490 |
encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
|
jens@11
|
491 |
except zlib.error:
|
jens@11
|
492 |
raise MessageException, sys.exc_info()[1]
|
jens@11
|
493 |
self.body = encoded
|
jens@11
|
494 |
|
jens@11
|
495 |
__expandDict= {'\x01' : "Content-Type",
|
jens@11
|
496 |
'\x02' : "Profile",
|
jens@11
|
497 |
'\x03' : "application/octet-stream",
|
jens@11
|
498 |
'\x04' : "text/plain; charset=UTF-8",
|
jens@11
|
499 |
'\x05' : "text/xml",
|
jens@11
|
500 |
'\x06' : "text/yaml",
|
jens@11
|
501 |
'\x07' : "Channel",
|
jens@11
|
502 |
'\x08' : "Error-Code",
|
jens@11
|
503 |
'\x09' : "Error-Domain"}
|
jens@12
|
504 |
|
jens@11
|
505 |
|
jens@11
|
506 |
class OutgoingMessage (Message):
|
jens@12
|
507 |
"Abstract superclass of outgoing requests/responses."
|
jens@12
|
508 |
|
jens@13
|
509 |
def __init__(self, connection, body=None, properties=None):
|
jens@13
|
510 |
Message.__init__(self,connection,body,properties)
|
morrowa@51
|
511 |
self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
|
jens@13
|
512 |
self._moreComing = True
|
jens@12
|
513 |
|
jens@12
|
514 |
def __setitem__(self, key,val):
|
jens@12
|
515 |
self.properties[key] = val
|
jens@12
|
516 |
def __delitem__(self, key):
|
jens@12
|
517 |
del self.properties[key]
|
jens@11
|
518 |
|
jens@13
|
519 |
@property
|
jens@13
|
520 |
def sent(self):
|
jens@16
|
521 |
return hasattr(self,'encoded')
|
jens@13
|
522 |
|
jens@13
|
523 |
def _encode(self):
|
jens@13
|
524 |
"Generates the message's encoded form, prior to sending it."
|
jens@11
|
525 |
out = StringIO()
|
jens@12
|
526 |
for (key,value) in self.properties.iteritems():
|
jens@13
|
527 |
def _writePropString(s):
|
jens@13
|
528 |
out.write(str(s)) #FIX: Abbreviate
|
jens@11
|
529 |
out.write('\000')
|
jens@12
|
530 |
_writePropString(key)
|
jens@12
|
531 |
_writePropString(value)
|
jens@13
|
532 |
propertiesSize = out.tell()
|
jens@13
|
533 |
assert propertiesSize<65536 #FIX: Return an error instead
|
jens@11
|
534 |
|
morrowa@51
|
535 |
body = self.body or ""
|
jens@11
|
536 |
if self.compressed:
|
jens@13
|
537 |
z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
|
jens@13
|
538 |
out.write(z.compress(body))
|
jens@13
|
539 |
body = z.flush()
|
jens@13
|
540 |
out.write(body)
|
jens@13
|
541 |
|
jens@13
|
542 |
self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
|
jens@13
|
543 |
out.close()
|
jens@12
|
544 |
log.debug("Encoded %s into %u bytes", self,len(self.encoded))
|
jens@11
|
545 |
self.bytesSent = 0
|
jens@11
|
546 |
|
jens@14
|
547 |
def _sendNextFrame(self, maxLen):
|
jens@11
|
548 |
pos = self.bytesSent
|
jens@11
|
549 |
payload = self.encoded[pos:pos+maxLen]
|
jens@11
|
550 |
pos += len(payload)
|
jens@13
|
551 |
self._moreComing = (pos < len(self.encoded))
|
jens@13
|
552 |
if not self._moreComing:
|
jens@13
|
553 |
self.encoded = None
|
jens@12
|
554 |
log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
|
jens@12
|
555 |
|
jens@14
|
556 |
header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
|
jens@12
|
557 |
self.requestNo,
|
jens@12
|
558 |
self.flags,
|
jens@14
|
559 |
kFrameHeaderSize+len(payload))
|
jens@11
|
560 |
self.bytesSent = pos
|
jens@14
|
561 |
return header + payload
|
jens@11
|
562 |
|
jens@11
|
563 |
|
jens@12
|
564 |
class Request (object):
|
jens@12
|
565 |
@property
|
jens@12
|
566 |
def response(self):
|
jens@12
|
567 |
"The response object for this request."
|
jens@13
|
568 |
if self.noReply:
|
jens@13
|
569 |
return None
|
jens@12
|
570 |
r = self.__dict__.get('_response')
|
jens@12
|
571 |
if r==None:
|
jens@12
|
572 |
r = self._response = self._createResponse()
|
jens@12
|
573 |
return r
|
jens@12
|
574 |
|
jens@11
|
575 |
|
jens@11
|
576 |
class Response (Message):
|
jens@13
|
577 |
def _setRequest(self, request):
|
jens@12
|
578 |
assert not request.noReply
|
jens@12
|
579 |
self.request = request
|
jens@12
|
580 |
self.requestNo = request.requestNo
|
jens@12
|
581 |
self.urgent = request.urgent
|
jens@12
|
582 |
|
jens@11
|
583 |
@property
|
jens@11
|
584 |
def isResponse(self):
|
jens@11
|
585 |
return True
|
jens@11
|
586 |
|
jens@11
|
587 |
|
jens@11
|
588 |
class IncomingRequest (IncomingMessage, Request):
|
jens@12
|
589 |
def _createResponse(self):
|
jens@12
|
590 |
return OutgoingResponse(self)
|
jens@11
|
591 |
|
jens@13
|
592 |
|
jens@11
|
593 |
class OutgoingRequest (OutgoingMessage, Request):
|
jens@12
|
594 |
def _createResponse(self):
|
jens@12
|
595 |
return IncomingResponse(self)
|
jens@13
|
596 |
|
jens@13
|
597 |
def send(self):
|
jens@13
|
598 |
self._encode()
|
jens@13
|
599 |
return self.connection._sendRequest(self) and self.response
|
jens@13
|
600 |
|
jens@11
|
601 |
|
jens@11
|
602 |
class IncomingResponse (IncomingMessage, Response):
|
jens@12
|
603 |
def __init__(self, request):
|
jens@13
|
604 |
IncomingMessage.__init__(self,request.connection,None,0)
|
jens@13
|
605 |
self._setRequest(request)
|
jens@12
|
606 |
self.onComplete = None
|
jens@12
|
607 |
|
jens@12
|
608 |
def _finished(self):
|
jens@12
|
609 |
super(IncomingResponse,self)._finished()
|
jens@12
|
610 |
if self.onComplete:
|
jens@12
|
611 |
try:
|
jens@12
|
612 |
self.onComplete(self)
|
jens@12
|
613 |
except Exception, x:
|
jens@12
|
614 |
log.error("Exception dispatching response: %s", traceback.format_exc())
|
jens@13
|
615 |
|
jens@13
|
616 |
|
jens@11
|
617 |
class OutgoingResponse (OutgoingMessage, Response):
|
jens@12
|
618 |
def __init__(self, request):
|
jens@12
|
619 |
OutgoingMessage.__init__(self,request.connection)
|
jens@13
|
620 |
self._setRequest(request)
|
jens@13
|
621 |
|
jens@13
|
622 |
def send(self):
|
jens@13
|
623 |
self._encode()
|
jens@13
|
624 |
return self.connection._sendMessage(self)
|
jens@11
|
625 |
|
jens@11
|
626 |
|
jens@13
|
627 |
"""
|
jens@13
|
628 |
Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
|
jens@13
|
629 |
|
jens@13
|
630 |
Redistribution and use in source and binary forms, with or without modification, are permitted
|
jens@13
|
631 |
provided that the following conditions are met:
|
jens@13
|
632 |
|
jens@13
|
633 |
* Redistributions of source code must retain the above copyright notice, this list of conditions
|
jens@13
|
634 |
and the following disclaimer.
|
jens@13
|
635 |
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions
|
jens@13
|
636 |
and the following disclaimer in the documentation and/or other materials provided with the
|
jens@13
|
637 |
distribution.
|
jens@13
|
638 |
|
jens@13
|
639 |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
|
jens@13
|
640 |
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
jens@13
|
641 |
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
|
jens@13
|
642 |
BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
jens@13
|
643 |
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
jens@13
|
644 |
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
jens@13
|
645 |
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
jens@13
|
646 |
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
jens@13
|
647 |
"""
|