# HG changeset patch # User Jens Alfke # Date 1212557061 25200 # Node ID 7101139617563f26052c9c5f6d998a86e72e4403 # Parent 29e8b03c05d45a31037f825fd29c5b3f2379cc0f BLIP.py working for listener side (it talks to the Obj-C BLIPConnectionTester.) diff -r 29e8b03c05d4 -r 710113961756 Python/BLIP.py --- a/Python/BLIP.py Tue Jun 03 16:56:33 2008 -0700 +++ b/Python/BLIP.py Tue Jun 03 22:24:21 2008 -0700 @@ -19,6 +19,8 @@ import zlib +# INTERNAL CONSTANTS -- NO TOUCHIES! + kFrameMagicNumber = 0x9B34F205 kFrameHeaderFormat = '!LLHH' kFrameHeaderSize = 12 @@ -37,6 +39,7 @@ log = logging.getLogger('BLIP') log.propagate = True + class MessageException(Exception): pass @@ -45,10 +48,12 @@ class Listener (asyncore.dispatcher): + "BLIP listener/server class" + def __init__(self, port): + "Create a listener on a port" asyncore.dispatcher.__init__(self) - self.onConnected = None - self.onRequest = None + self.onConnected = self.onRequest = None self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind( ('',port) ) self.listen(5) @@ -64,6 +69,8 @@ class Connection (asynchat.async_chat): def __init__( self, address, conn=None ): + "Opens a connection with the given address. If a connection/socket object is provided it'll use that," + "otherwise it'll open a new outgoing socket." asynchat.async_chat.__init__(self,conn) self.address = address if conn: @@ -92,11 +99,11 @@ ### SENDING: def _outQueueMessage(self, msg,isNew=True): - n = self.outBox.length + n = len(self.outBox) index = n if msg.urgent and n>1: while index > 0: - otherMsg = self.outBox[index] + otherMsg = self.outBox[index-1] if otherMsg.urgent: if index 0: - msg = self.outBox.pop(0) - frameSize = 4096 - if msg.urgent or n==1 or not self.outBox[0].urgent: - frameSize *= 4 - if msg._sendNextFrame(self): - self._outQueueMessage(msg,isNew=False) + while self.outBox: #FIX: Don't send everything at once; only as space becomes available! + n = len(self.outBox) + if n > 0: + msg = self.outBox.pop(0) + frameSize = 4096 + if msg.urgent or n==1 or not self.outBox[0].urgent: + frameSize *= 4 + if msg._sendNextFrame(self,frameSize): + self._outQueueMessage(msg,isNew=False) + else: + log.info("Finished sending %s",msg) ### RECEIVING: @@ -132,7 +146,7 @@ self.inHeader += data else: self.inMessage._receivedData(data) - + def found_terminator(self): if self.expectingHeader: # Got a header: @@ -150,7 +164,7 @@ self.set_terminator(frameLen) else: self._endOfFrame() - + else: # Got the frame's payload: self._endOfFrame() @@ -162,12 +176,13 @@ message = self.pendingRequests.get(requestNo) if message==None and requestNo == self.inNumRequests+1: message = IncomingRequest(self,requestNo,flags) + assert message!=None self.pendingRequests[requestNo] = message self.inNumRequests += 1 elif msgType==kMsgType_Response or msgType==kMsgType_Error: message = self.pendingResponses.get(requestNo) - - if message: + + if message != None: message._beginFrame(flags) else: log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo) @@ -183,7 +198,7 @@ log.debug("End of frame of %s",msg) if not msg.moreComing: self._receivedMessage(msg) - + def _receivedMessage(self, msg): log.info("Received: %s",msg) # Remove from pending: @@ -194,20 +209,19 @@ # Decode: try: msg._finished() + if not msg.isResponse: + self.onRequest(msg) except Exception, x: - log.error("Exception parsing message: %s", traceback.format_exc()) - return - # Dispatch: - try: - self.onRequest(msg) - except Exception, x: - log.error("Exception dispatching message: %s", traceback.format_exc()) + log.error("Exception handling incoming message: %s", traceback.format_exc()) #FIX: Send an error reply + ### MESSAGES: class Message (object): + "Abstract superclass of all request/response objects" + def __init__(self, connection, properties=None, body=None): self.connection = connection self.properties = properties or {} @@ -215,7 +229,10 @@ @property def flags(self): - flags = kMsgType_Request + if self.isResponse: + flags = kMsgType_Response + else: + flags = kMsgType_Request if self.urgent: flags |= kMsgFlag_Urgent if self.compressed: flags |= kMsgFlag_Compressed if self.noReply: flags |= kMsgFlag_NoReply @@ -235,17 +252,30 @@ s = str(self) if len(self.properties): s += repr(self.properties) return s - - @property + + @property def isResponse(self): + "Is this message a response?" return False + + @property + def contentType(self): + return self.properties.get('Content-Type') + + def __getitem__(self, key): return self.properties.get(key) + def __contains__(self, key): return key in self.properties + def __len__(self): return len(self.properties) + def __nonzero__(self): return True + def __iter__(self): return self.properties.__iter__() class IncomingMessage (Message): + "Abstract superclass of incoming messages." + def __init__(self, connection, requestNo, flags): super(IncomingMessage,self).__init__(connection) self.requestNo = requestNo - self.urgent = (flags & kMsgFlag_Urgent) != 0 + self.urgent = (flags & kMsgFlag_Urgent) != 0 self.compressed = (flags & kMsgFlag_Compressed) != 0 self.noReply = (flags & kMsgFlag_NoReply) != 0 self.moreComing = (flags & kMsgFlag_MoreComing) != 0 @@ -254,7 +284,7 @@ def _beginFrame(self, flags): if (flags & kMsgFlag_MoreComing)==0: self.moreComing = False - + def _receivedData(self, data): self.frames.append(data) @@ -295,30 +325,40 @@ '\x07' : "Channel", '\x08' : "Error-Code", '\x09' : "Error-Domain"} - + class OutgoingMessage (Message): + "Abstract superclass of outgoing requests/responses." + + def __init__(self, connection, properties=None, body=None): + Message.__init__(self,connection,properties,body) + self.urgent = self.compressed = self.noReply = False + self.moreComing = True + + def __setitem__(self, key,val): + self.properties[key] = val + def __delitem__(self, key): + del self.properties[key] def send(self): + "Sends this message." + log.info("Sending %s",self) out = StringIO() - out.write("xx") # placeholder for properties length (16 bits) - for (key,value) in self.properties: - def _writePropString(self, str): - out.write(str) - #FIX: Abbreviate + for (key,value) in self.properties.iteritems(): + def _writePropString(str): + out.write(str) #FIX: Abbreviate out.write('\000') - self._writePropString(key) - self._writePropString(value) - propsLen = out.tell() - self.encoded = out.stringvalue() + _writePropString(key) + _writePropString(value) + self.encoded = struct.pack('!H',out.tell()) + out.getvalue() out.close() - self.encoded[0:2] = struct.pack('!H',propsLen) body = self.body if self.compressed: body = zlib.compress(body,5) self.encoded += body + log.debug("Encoded %s into %u bytes", self,len(self.encoded)) self.bytesSent = 0 self.connection._outQueueMessage(self) @@ -327,46 +367,68 @@ pos = self.bytesSent payload = self.encoded[pos:pos+maxLen] pos += len(payload) - if pos >= len(self.encoded): - self.moreComing = False - - conn.push( struct.pack(kFrameHeaderFormat, - kFrameMagicNumber, - self.requestNo, - self.flags, - kFrameHeaderSize+len(payload)) ) + self.moreComing = (pos < len(self.encoded)) + log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos) + + conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber, + self.requestNo, + self.flags, + kFrameHeaderSize+len(payload)) ) conn.push( payload ) self.bytesSent = pos + return self.moreComing -class Request (Message): - pass +class Request (object): + @property + def response(self): + "The response object for this request." + r = self.__dict__.get('_response') + if r==None: + r = self._response = self._createResponse() + return r + class Response (Message): + def __init__(self, request): + assert not request.noReply + self.request = request + self.requestNo = request.requestNo + self.urgent = request.urgent + @property def isResponse(self): return True - @property - def flags(self): - flags = super(Response,self).flags() ^ kMsgType_Request - flags ^= kMsgType_Response - return flags - class IncomingRequest (IncomingMessage, Request): - pass + def _createResponse(self): + return OutgoingResponse(self) class OutgoingRequest (OutgoingMessage, Request): - pass + def _createResponse(self): + return IncomingResponse(self) class IncomingResponse (IncomingMessage, Response): - pass - + def __init__(self, request): + IncomingMessage.__init__(self,request.connection,request.requestNo,0) + Response.__init__(self,request) + self.onComplete = None + + def _finished(self): + super(IncomingResponse,self)._finished() + if self.onComplete: + try: + self.onComplete(self) + except Exception, x: + log.error("Exception dispatching response: %s", traceback.format_exc()) + class OutgoingResponse (OutgoingMessage, Response): - pass + def __init__(self, request): + OutgoingMessage.__init__(self,request.connection) + Response.__init__(self,request) ### UNIT TESTS: @@ -374,8 +436,23 @@ class BLIPTests(unittest.TestCase): def setUp(self): + def handleRequest(request): + logging.info("Got request!: %r",request) + body = request.body + assert len(body)<32768 + assert request.contentType == 'application/octet-stream' + assert int(request['Size']) == len(body) + assert request['User-Agent'] == 'BLIPConnectionTester' + for i in xrange(0,len(request.body)): + assert ord(body[i]) == i%256 + + response = request.response + response.body = request.body + response['Content-Type'] = request.contentType + response.send() + listener = Listener(46353) - listener.onRequest = lambda req: logging.info("Got request!: %r",req) + listener.onRequest = handleRequest def testListener(self): logging.info("Waiting...")