# HG changeset patch # User Jens Alfke # Date 1212624680 25200 # Node ID 84c2d38f924cbc44fbc31bc724c56d548a913071 # Parent 7101139617563f26052c9c5f6d998a86e72e4403 Python implementation much improved. Can send requests now. Fully interoperable with Obj-C implementation's test cases. diff -r 710113961756 -r 84c2d38f924c BLIP/BLIPTest.m --- a/BLIP/BLIPTest.m Tue Jun 03 22:24:21 2008 -0700 +++ b/BLIP/BLIPTest.m Wed Jun 04 17:11:20 2008 -0700 @@ -308,7 +308,7 @@ AssertEq(bytes[i],i % 256); AssertEqual([request valueOfProperty: @"Content-Type"], @"application/octet-stream"); - AssertEqual([request valueOfProperty: @"User-Agent"], @"BLIPConnectionTester"); + Assert([request valueOfProperty: @"User-Agent"] != nil); AssertEq([[request valueOfProperty: @"Size"] intValue], size); [request respondWithData: body contentType: request.contentType]; diff -r 710113961756 -r 84c2d38f924c Python/BLIP.py --- a/Python/BLIP.py Tue Jun 03 22:24:21 2008 -0700 +++ b/Python/BLIP.py Wed Jun 04 17:11:20 2008 -0700 @@ -1,10 +1,9 @@ -#!/usr/bin/env python # encoding: utf-8 """ BLIP.py Created by Jens Alfke on 2008-06-03. -Copyright (c) 2008 Jens Alfke. All rights reserved. +Copyright notice and BSD license at end of file. """ import asynchat @@ -15,10 +14,17 @@ import struct import sys import traceback -import unittest import zlib +# Connection status enumeration: +kDisconnected = -1 +kClosed = 0 +kOpening = 1 +kOpen = 2 +kClosing = 3 + + # INTERNAL CONSTANTS -- NO TOUCHIES! kFrameMagicNumber = 0x9B34F205 @@ -47,57 +53,118 @@ pass +### LISTENER AND CONNECTION CLASSES: + + class Listener (asyncore.dispatcher): "BLIP listener/server class" - def __init__(self, port): + def __init__(self, port, sslKeyFile=None, sslCertFile=None): "Create a listener on a port" asyncore.dispatcher.__init__(self) self.onConnected = self.onRequest = None self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind( ('',port) ) self.listen(5) + self.sslKeyFile=sslKeyFile + self.sslCertFile=sslCertFile log.info("Listening on port %u", port) def handle_accept( self ): - client,address = self.accept() - conn = Connection(address,client) + socket,address = self.accept() + if self.sslKeyFile: + socket.ssl(socket,self.sslKeyFile,self.sslCertFile) + conn = Connection(address, sock=socket, listener=self) conn.onRequest = self.onRequest if self.onConnected: self.onConnected(conn) + def handle_error(self): + (typ,val,trace) = sys.exc_info() + log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc()) + self.close() + + class Connection (asynchat.async_chat): - def __init__( self, address, conn=None ): + def __init__( self, address, sock=None, listener=None, ssl=None ): "Opens a connection with the given address. If a connection/socket object is provided it'll use that," "otherwise it'll open a new outgoing socket." - asynchat.async_chat.__init__(self,conn) - self.address = address - if conn: + if sock: + asynchat.async_chat.__init__(self,sock) log.info("Accepted connection from %s",address) + self.status = kOpen else: + asynchat.async_chat.__init__(self) log.info("Opening connection to %s",address) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.status = kOpening + if ssl: + ssl(self.socket) self.connect(address) + self.address = address + self.listener = listener self.onRequest = None self.pendingRequests = {} self.pendingResponses = {} self.outBox = [] self.inMessage = None - self.inNumRequests = 0 + self.inNumRequests = self.outNumRequests = 0 self._endOfFrame() - #def handle_error(self,x): - # log.error("Uncaught exception: %s",x) - # self.close() + def close(self): + if self.status > kClosed: + self.status = kClosing + log.info("Connection closing...") + asynchat.async_chat.close(self) - def _fatal(self, error): - log.error("Fatal BLIP connection error: %s",error) + def handle_connect(self): + log.info("Connection open!") + self.status = kOpen + + def handle_error(self): + (typ,val,trace) = sys.exc_info() + log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc()) + self.discard_buffers() + self.status = kDisconnected self.close() + def handle_close(self): + log.info("Connection closed!") + self.pendingRequests = self.pendingResponses = None + self.outBox = None + if self.status == kClosing: + self.status = kClosed + else: + self.status = kDisconnected + asynchat.async_chat.handle_close(self) + ### SENDING: + @property + def canSend(self): + return self.status==kOpening or self.status==kOpen + + def _sendMessage(self, msg): + if self.canSend: + self._outQueueMessage(msg,True) + return True + else: + return False + + def _sendRequest(self, req): + if self.canSend: + requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1 + response = req.response + if response: + response.requestNo = requestNo + self.pendingResponses[requestNo] = response + log.debug("pendingResponses[%i] := %s",requestNo,response) + return self._sendMessage(req) + else: + return False + def _outQueueMessage(self, msg,isNew=True): n = len(self.outBox) index = n @@ -116,7 +183,7 @@ self.outBox.insert(index,msg) if isNew: - log.info("Queuing outgoing message at index %i",index) + log.info("Queuing %s at index %i",msg,index) if n==0: self._sendNextFrame() else: @@ -144,7 +211,7 @@ self.inHeader = data else: self.inHeader += data - else: + elif self.inMessage: self.inMessage._receivedData(data) def found_terminator(self): @@ -152,8 +219,8 @@ # Got a header: (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader) self.inHeader = None - if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic) - if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen) + if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic + if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen frameLen -= kFrameHeaderSize log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i", (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen) @@ -196,14 +263,14 @@ self.set_terminator(kFrameHeaderSize) # wait for binary header if msg: log.debug("End of frame of %s",msg) - if not msg.moreComing: + if not msg._moreComing: self._receivedMessage(msg) def _receivedMessage(self, msg): log.info("Received: %s",msg) # Remove from pending: if msg.isResponse: - del self.pendingReplies[msg.requestNo] + del self.pendingResponses[msg.requestNo] else: del self.pendingRequests[msg.requestNo] # Decode: @@ -216,16 +283,17 @@ #FIX: Send an error reply -### MESSAGES: +### MESSAGE CLASSES: class Message (object): "Abstract superclass of all request/response objects" - def __init__(self, connection, properties=None, body=None): + def __init__(self, connection, body=None, properties=None): self.connection = connection + self.body = body self.properties = properties or {} - self.body = body + self.requestNo = None @property def flags(self): @@ -236,15 +304,17 @@ if self.urgent: flags |= kMsgFlag_Urgent if self.compressed: flags |= kMsgFlag_Compressed if self.noReply: flags |= kMsgFlag_NoReply - if self.moreComing: flags |= kMsgFlag_MoreComing + if self._moreComing:flags |= kMsgFlag_MoreComing return flags def __str__(self): - s = "%s[#%i" %(type(self).__name__,self.requestNo) + s = "%s[" %(type(self).__name__) + if self.requestNo != None: + s += "#%i" %self.requestNo if self.urgent: s += " URG" if self.compressed: s += " CMP" if self.noReply: s += " NOR" - if self.moreComing: s += " MOR" + if self._moreComing:s += " MOR" if self.body: s += " %i bytes" %len(self.body) return s+"]" @@ -253,12 +323,12 @@ if len(self.properties): s += repr(self.properties) return s - @property + @property def isResponse(self): "Is this message a response?" return False - @property + @property def contentType(self): return self.properties.get('Content-Type') @@ -278,17 +348,19 @@ self.urgent = (flags & kMsgFlag_Urgent) != 0 self.compressed = (flags & kMsgFlag_Compressed) != 0 self.noReply = (flags & kMsgFlag_NoReply) != 0 - self.moreComing = (flags & kMsgFlag_MoreComing) != 0 + self._moreComing= (flags & kMsgFlag_MoreComing) != 0 self.frames = [] def _beginFrame(self, flags): - if (flags & kMsgFlag_MoreComing)==0: - self.moreComing = False + """Received a frame header.""" + self._moreComing = (flags & kMsgFlag_MoreComing)!=0 def _receivedData(self, data): + """Received data from a frame.""" self.frames.append(data) def _finished(self): + """The entire message has been received; now decode it.""" encoded = "".join(self.frames) self.frames = None @@ -327,47 +399,54 @@ '\x09' : "Error-Domain"} - class OutgoingMessage (Message): "Abstract superclass of outgoing requests/responses." - def __init__(self, connection, properties=None, body=None): - Message.__init__(self,connection,properties,body) + def __init__(self, connection, body=None, properties=None): + Message.__init__(self,connection,body,properties) self.urgent = self.compressed = self.noReply = False - self.moreComing = True + self._moreComing = True def __setitem__(self, key,val): self.properties[key] = val def __delitem__(self, key): del self.properties[key] - def send(self): - "Sends this message." - log.info("Sending %s",self) + @property + def sent(self): + return 'encoded' in self.__dict__ + + def _encode(self): + "Generates the message's encoded form, prior to sending it." out = StringIO() for (key,value) in self.properties.iteritems(): - def _writePropString(str): - out.write(str) #FIX: Abbreviate + def _writePropString(s): + out.write(str(s)) #FIX: Abbreviate out.write('\000') _writePropString(key) _writePropString(value) - self.encoded = struct.pack('!H',out.tell()) + out.getvalue() - out.close() + propertiesSize = out.tell() + assert propertiesSize<65536 #FIX: Return an error instead body = self.body if self.compressed: - body = zlib.compress(body,5) - self.encoded += body + z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format + out.write(z.compress(body)) + body = z.flush() + out.write(body) + + self.encoded = struct.pack('!H',propertiesSize) + out.getvalue() + out.close() log.debug("Encoded %s into %u bytes", self,len(self.encoded)) - self.bytesSent = 0 - self.connection._outQueueMessage(self) def _sendNextFrame(self, conn,maxLen): pos = self.bytesSent payload = self.encoded[pos:pos+maxLen] pos += len(payload) - self.moreComing = (pos < len(self.encoded)) + self._moreComing = (pos < len(self.encoded)) + if not self._moreComing: + self.encoded = None log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos) conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber, @@ -377,13 +456,15 @@ conn.push( payload ) self.bytesSent = pos - return self.moreComing + return self._moreComing class Request (object): @property def response(self): "The response object for this request." + if self.noReply: + return None r = self.__dict__.get('_response') if r==None: r = self._response = self._createResponse() @@ -391,7 +472,7 @@ class Response (Message): - def __init__(self, request): + def _setRequest(self, request): assert not request.noReply self.request = request self.requestNo = request.requestNo @@ -402,19 +483,24 @@ return True - class IncomingRequest (IncomingMessage, Request): def _createResponse(self): return OutgoingResponse(self) + class OutgoingRequest (OutgoingMessage, Request): def _createResponse(self): return IncomingResponse(self) + + def send(self): + self._encode() + return self.connection._sendRequest(self) and self.response + class IncomingResponse (IncomingMessage, Response): def __init__(self, request): - IncomingMessage.__init__(self,request.connection,request.requestNo,0) - Response.__init__(self,request) + IncomingMessage.__init__(self,request.connection,None,0) + self._setRequest(request) self.onComplete = None def _finished(self): @@ -424,40 +510,36 @@ self.onComplete(self) except Exception, x: log.error("Exception dispatching response: %s", traceback.format_exc()) - + + class OutgoingResponse (OutgoingMessage, Response): def __init__(self, request): OutgoingMessage.__init__(self,request.connection) - Response.__init__(self,request) + self._setRequest(request) + + def send(self): + self._encode() + return self.connection._sendMessage(self) -### UNIT TESTS: - - -class BLIPTests(unittest.TestCase): - def setUp(self): - def handleRequest(request): - logging.info("Got request!: %r",request) - body = request.body - assert len(body)<32768 - assert request.contentType == 'application/octet-stream' - assert int(request['Size']) == len(body) - assert request['User-Agent'] == 'BLIPConnectionTester' - for i in xrange(0,len(request.body)): - assert ord(body[i]) == i%256 - - response = request.response - response.body = request.body - response['Content-Type'] = request.contentType - response.send() - - listener = Listener(46353) - listener.onRequest = handleRequest - - def testListener(self): - logging.info("Waiting...") - asyncore.loop() - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - unittest.main() \ No newline at end of file +""" + Copyright (c) 2008, Jens Alfke . All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, are permitted + provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this list of conditions + and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions + and the following disclaimer in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR + IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI- + BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" diff -r 710113961756 -r 84c2d38f924c Python/BLIPConnectionTest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Python/BLIPConnectionTest.py Wed Jun 04 17:11:20 2008 -0700 @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +BLIPConnectionTest.py + +Created by Jens Alfke on 2008-06-04. +This source file is test/example code, and is in the public domain. +""" + +from BLIP import Connection, OutgoingRequest, kOpening + +import asyncore +from cStringIO import StringIO +from datetime import datetime +import logging +import random +import unittest + + +kSendInterval = 2.0 + +def randbool(): + return random.randint(0,1) == 1 + + +class BLIPConnectionTest(unittest.TestCase): + + def setUp(self): + self.connection = Connection( ('localhost',46353) ) + + def sendRequest(self): + size = random.randint(0,32767) + io = StringIO() + for i in xrange(0,size): + io.write( chr(i % 256) ) + body = io.getvalue() + io.close + + req = OutgoingRequest(self.connection, body,{'Content-Type': 'application/octet-stream', + 'User-Agent': 'PyBLIP', + 'Date': datetime.now(), + 'Size': size}) + req.compressed = randbool() + req.urgent = randbool() + req.response.onComplete = self.gotResponse + return req.send() + + def gotResponse(self, response): + logging.info("Got response!: %s",response) + request = response.request + assert response.body == request.body + + def testClient(self): + lastReqTime = None + nRequests = 0 + while nRequests < 10: + asyncore.loop(timeout=kSendInterval,count=1) + + now = datetime.now() + if self.connection.status!=kOpening and not lastReqTime or (now-lastReqTime).seconds >= kSendInterval: + lastReqTime = now + if not self.sendRequest(): + logging.warn("Couldn't send request (connection is probably closed)") + break; + nRequests += 1 + + def tearDown(self): + self.connection.close() + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main() diff -r 710113961756 -r 84c2d38f924c Python/BLIPListenerTest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Python/BLIPListenerTest.py Wed Jun 04 17:11:20 2008 -0700 @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +BLIPListenerTest.py + +Created by Jens Alfke on 2008-06-04. +This source file is test/example code, and is in the public domain. +""" + +from BLIP import Listener + +import asyncore +import logging +import unittest + + +class BLIPListenerTest(unittest.TestCase): + + def testListener(self): + def handleRequest(request): + logging.info("Got request!: %r",request) + body = request.body + assert len(body)<32768 + assert request.contentType == 'application/octet-stream' + assert int(request['Size']) == len(body) + assert request['User-Agent'] != None + for i in xrange(0,len(request.body)): + assert ord(body[i]) == i%256 + + response = request.response + response.body = request.body + response['Content-Type'] = request.contentType + response.send() + + listener = Listener(46353) + listener.onRequest = handleRequest + logging.info("Listener is waiting...") + + try: + asyncore.loop() + except KeyboardInterrupt: + logging.info("KeyboardInterrupt") + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main()