BLIP.py working for listener side (it talks to the Obj-C BLIPConnectionTester.)
6 Created by Jens Alfke on 2008-06-03.
7 Copyright (c) 2008 Jens Alfke. All rights reserved.
12 from cStringIO import StringIO
22 # INTERNAL CONSTANTS -- NO TOUCHIES!
24 kFrameMagicNumber = 0x9B34F205
25 kFrameHeaderFormat = '!LLHH'
28 kMsgFlag_TypeMask = 0x000F
29 kMsgFlag_Compressed = 0x0010
30 kMsgFlag_Urgent = 0x0020
31 kMsgFlag_NoReply = 0x0040
32 kMsgFlag_MoreComing = 0x0080
39 log = logging.getLogger('BLIP')
43 class MessageException(Exception):
46 class ConnectionException(Exception):
50 class Listener (asyncore.dispatcher):
51 "BLIP listener/server class"
53 def __init__(self, port):
54 "Create a listener on a port"
55 asyncore.dispatcher.__init__(self)
56 self.onConnected = self.onRequest = None
57 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
58 self.bind( ('',port) )
60 log.info("Listening on port %u", port)
62 def handle_accept( self ):
63 client,address = self.accept()
64 conn = Connection(address,client)
65 conn.onRequest = self.onRequest
67 self.onConnected(conn)
70 class Connection (asynchat.async_chat):
71 def __init__( self, address, conn=None ):
72 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
73 "otherwise it'll open a new outgoing socket."
74 asynchat.async_chat.__init__(self,conn)
75 self.address = address
77 log.info("Accepted connection from %s",address)
79 log.info("Opening connection to %s",address)
80 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
83 self.pendingRequests = {}
84 self.pendingResponses = {}
87 self.inNumRequests = 0
90 #def handle_error(self,x):
91 # log.error("Uncaught exception: %s",x)
94 def _fatal(self, error):
95 log.error("Fatal BLIP connection error: %s",error)
101 def _outQueueMessage(self, msg,isNew=True):
104 if msg.urgent and n>1:
106 otherMsg = self.outBox[index-1]
111 elif isNew and otherMsg._bytesWritten==0:
117 self.outBox.insert(index,msg)
119 log.info("Queuing outgoing message at index %i",index)
121 self._sendNextFrame()
123 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
125 def _sendNextFrame(self):
126 while self.outBox: #FIX: Don't send everything at once; only as space becomes available!
129 msg = self.outBox.pop(0)
131 if msg.urgent or n==1 or not self.outBox[0].urgent:
133 if msg._sendNextFrame(self,frameSize):
134 self._outQueueMessage(msg,isNew=False)
136 log.info("Finished sending %s",msg)
141 def collect_incoming_data(self, data):
142 if self.expectingHeader:
143 if self.inHeader==None:
146 self.inHeader += data
148 self.inMessage._receivedData(data)
150 def found_terminator(self):
151 if self.expectingHeader:
153 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
155 if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
156 if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
157 frameLen -= kFrameHeaderSize
158 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
159 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
160 self.inMessage = self._inMessageForFrame(requestNo,flags)
163 self.expectingHeader = False
164 self.set_terminator(frameLen)
169 # Got the frame's payload:
172 def _inMessageForFrame(self, requestNo,flags):
174 msgType = flags & kMsgFlag_TypeMask
175 if msgType==kMsgType_Request:
176 message = self.pendingRequests.get(requestNo)
177 if message==None and requestNo == self.inNumRequests+1:
178 message = IncomingRequest(self,requestNo,flags)
180 self.pendingRequests[requestNo] = message
181 self.inNumRequests += 1
182 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
183 message = self.pendingResponses.get(requestNo)
186 message._beginFrame(flags)
188 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
191 def _endOfFrame(self):
193 self.inMessage = None
194 self.expectingHeader = True
196 self.set_terminator(kFrameHeaderSize) # wait for binary header
198 log.debug("End of frame of %s",msg)
199 if not msg.moreComing:
200 self._receivedMessage(msg)
202 def _receivedMessage(self, msg):
203 log.info("Received: %s",msg)
204 # Remove from pending:
206 del self.pendingReplies[msg.requestNo]
208 del self.pendingRequests[msg.requestNo]
212 if not msg.isResponse:
215 log.error("Exception handling incoming message: %s", traceback.format_exc())
216 #FIX: Send an error reply
222 class Message (object):
223 "Abstract superclass of all request/response objects"
225 def __init__(self, connection, properties=None, body=None):
226 self.connection = connection
227 self.properties = properties or {}
233 flags = kMsgType_Response
235 flags = kMsgType_Request
236 if self.urgent: flags |= kMsgFlag_Urgent
237 if self.compressed: flags |= kMsgFlag_Compressed
238 if self.noReply: flags |= kMsgFlag_NoReply
239 if self.moreComing: flags |= kMsgFlag_MoreComing
243 s = "%s[#%i" %(type(self).__name__,self.requestNo)
244 if self.urgent: s += " URG"
245 if self.compressed: s += " CMP"
246 if self.noReply: s += " NOR"
247 if self.moreComing: s += " MOR"
248 if self.body: s += " %i bytes" %len(self.body)
253 if len(self.properties): s += repr(self.properties)
257 def isResponse(self):
258 "Is this message a response?"
262 def contentType(self):
263 return self.properties.get('Content-Type')
265 def __getitem__(self, key): return self.properties.get(key)
266 def __contains__(self, key): return key in self.properties
267 def __len__(self): return len(self.properties)
268 def __nonzero__(self): return True
269 def __iter__(self): return self.properties.__iter__()
272 class IncomingMessage (Message):
273 "Abstract superclass of incoming messages."
275 def __init__(self, connection, requestNo, flags):
276 super(IncomingMessage,self).__init__(connection)
277 self.requestNo = requestNo
278 self.urgent = (flags & kMsgFlag_Urgent) != 0
279 self.compressed = (flags & kMsgFlag_Compressed) != 0
280 self.noReply = (flags & kMsgFlag_NoReply) != 0
281 self.moreComing = (flags & kMsgFlag_MoreComing) != 0
284 def _beginFrame(self, flags):
285 if (flags & kMsgFlag_MoreComing)==0:
286 self.moreComing = False
288 def _receivedData(self, data):
289 self.frames.append(data)
292 encoded = "".join(self.frames)
295 # Decode the properties:
296 if len(encoded) < 2: raise MessageException, "missing properties length"
297 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
298 if propSize>len(encoded): raise MessageException, "properties too long to fit"
299 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
301 proplist = encoded[2:propSize-1].split('\000')
302 encoded = encoded[propSize:]
303 if len(proplist) & 1: raise MessageException, "odd number of property strings"
304 for i in xrange(0,len(proplist),2):
307 str = IncomingMessage.__expandDict.get(str,str)
309 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
312 if self.compressed and len(encoded)>0:
314 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
316 raise MessageException, sys.exc_info()[1]
319 __expandDict= {'\x01' : "Content-Type",
321 '\x03' : "application/octet-stream",
322 '\x04' : "text/plain; charset=UTF-8",
324 '\x06' : "text/yaml",
326 '\x08' : "Error-Code",
327 '\x09' : "Error-Domain"}
331 class OutgoingMessage (Message):
332 "Abstract superclass of outgoing requests/responses."
334 def __init__(self, connection, properties=None, body=None):
335 Message.__init__(self,connection,properties,body)
336 self.urgent = self.compressed = self.noReply = False
337 self.moreComing = True
339 def __setitem__(self, key,val):
340 self.properties[key] = val
341 def __delitem__(self, key):
342 del self.properties[key]
345 "Sends this message."
346 log.info("Sending %s",self)
348 for (key,value) in self.properties.iteritems():
349 def _writePropString(str):
350 out.write(str) #FIX: Abbreviate
352 _writePropString(key)
353 _writePropString(value)
354 self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
359 body = zlib.compress(body,5)
361 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
364 self.connection._outQueueMessage(self)
366 def _sendNextFrame(self, conn,maxLen):
368 payload = self.encoded[pos:pos+maxLen]
370 self.moreComing = (pos < len(self.encoded))
371 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
373 conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
376 kFrameHeaderSize+len(payload)) )
380 return self.moreComing
383 class Request (object):
386 "The response object for this request."
387 r = self.__dict__.get('_response')
389 r = self._response = self._createResponse()
393 class Response (Message):
394 def __init__(self, request):
395 assert not request.noReply
396 self.request = request
397 self.requestNo = request.requestNo
398 self.urgent = request.urgent
401 def isResponse(self):
406 class IncomingRequest (IncomingMessage, Request):
407 def _createResponse(self):
408 return OutgoingResponse(self)
410 class OutgoingRequest (OutgoingMessage, Request):
411 def _createResponse(self):
412 return IncomingResponse(self)
414 class IncomingResponse (IncomingMessage, Response):
415 def __init__(self, request):
416 IncomingMessage.__init__(self,request.connection,request.requestNo,0)
417 Response.__init__(self,request)
418 self.onComplete = None
421 super(IncomingResponse,self)._finished()
424 self.onComplete(self)
426 log.error("Exception dispatching response: %s", traceback.format_exc())
428 class OutgoingResponse (OutgoingMessage, Response):
429 def __init__(self, request):
430 OutgoingMessage.__init__(self,request.connection)
431 Response.__init__(self,request)
437 class BLIPTests(unittest.TestCase):
439 def handleRequest(request):
440 logging.info("Got request!: %r",request)
442 assert len(body)<32768
443 assert request.contentType == 'application/octet-stream'
444 assert int(request['Size']) == len(body)
445 assert request['User-Agent'] == 'BLIPConnectionTester'
446 for i in xrange(0,len(request.body)):
447 assert ord(body[i]) == i%256
449 response = request.response
450 response.body = request.body
451 response['Content-Type'] = request.contentType
454 listener = Listener(46353)
455 listener.onRequest = handleRequest
457 def testListener(self):
458 logging.info("Waiting...")
461 if __name__ == '__main__':
462 logging.basicConfig(level=logging.INFO)