Python/BLIP.py
changeset 12 710113961756
parent 11 29e8b03c05d4
child 13 84c2d38f924c
     1.1 --- a/Python/BLIP.py	Tue Jun 03 16:56:33 2008 -0700
     1.2 +++ b/Python/BLIP.py	Tue Jun 03 22:24:21 2008 -0700
     1.3 @@ -19,6 +19,8 @@
     1.4  import zlib
     1.5  
     1.6  
     1.7 +# INTERNAL CONSTANTS -- NO TOUCHIES!
     1.8 +
     1.9  kFrameMagicNumber   = 0x9B34F205
    1.10  kFrameHeaderFormat  = '!LLHH'
    1.11  kFrameHeaderSize    = 12
    1.12 @@ -37,6 +39,7 @@
    1.13  log = logging.getLogger('BLIP')
    1.14  log.propagate = True
    1.15  
    1.16 +
    1.17  class MessageException(Exception):
    1.18      pass
    1.19  
    1.20 @@ -45,10 +48,12 @@
    1.21  
    1.22  
    1.23  class Listener (asyncore.dispatcher):
    1.24 +    "BLIP listener/server class"
    1.25 +    
    1.26      def __init__(self, port):
    1.27 +        "Create a listener on a port"
    1.28          asyncore.dispatcher.__init__(self)
    1.29 -        self.onConnected = None
    1.30 -        self.onRequest = None
    1.31 +        self.onConnected = self.onRequest = None
    1.32          self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    1.33          self.bind( ('',port) )
    1.34          self.listen(5)
    1.35 @@ -64,6 +69,8 @@
    1.36  
    1.37  class Connection (asynchat.async_chat):
    1.38      def __init__( self, address, conn=None ):
    1.39 +        "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    1.40 +        "otherwise it'll open a new outgoing socket."
    1.41          asynchat.async_chat.__init__(self,conn)
    1.42          self.address = address
    1.43          if conn:
    1.44 @@ -92,11 +99,11 @@
    1.45      ### SENDING:
    1.46      
    1.47      def _outQueueMessage(self, msg,isNew=True):
    1.48 -        n = self.outBox.length
    1.49 +        n = len(self.outBox)
    1.50          index = n
    1.51          if msg.urgent and n>1:
    1.52              while index > 0:
    1.53 -                otherMsg = self.outBox[index]
    1.54 +                otherMsg = self.outBox[index-1]
    1.55                  if otherMsg.urgent:
    1.56                      if index<n:
    1.57                          index += 1
    1.58 @@ -106,20 +113,27 @@
    1.59                  index -= 1
    1.60              else:
    1.61                  index = 1
    1.62 -                    
    1.63 +        
    1.64          self.outBox.insert(index,msg)
    1.65          if isNew:
    1.66              log.info("Queuing outgoing message at index %i",index)
    1.67 +            if n==0:
    1.68 +                self._sendNextFrame()
    1.69 +        else:
    1.70 +            log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
    1.71      
    1.72      def _sendNextFrame(self):
    1.73 -        n = len(self.outBox)
    1.74 -        if n > 0:
    1.75 -            msg = self.outBox.pop(0)
    1.76 -            frameSize = 4096
    1.77 -            if msg.urgent or n==1 or not self.outBox[0].urgent:
    1.78 -                frameSize *= 4
    1.79 -            if msg._sendNextFrame(self):
    1.80 -                self._outQueueMessage(msg,isNew=False)
    1.81 +        while self.outBox:              #FIX: Don't send everything at once; only as space becomes available!
    1.82 +            n = len(self.outBox)
    1.83 +            if n > 0:
    1.84 +                msg = self.outBox.pop(0)
    1.85 +                frameSize = 4096
    1.86 +                if msg.urgent or n==1 or not self.outBox[0].urgent:
    1.87 +                    frameSize *= 4
    1.88 +                if msg._sendNextFrame(self,frameSize):
    1.89 +                    self._outQueueMessage(msg,isNew=False)
    1.90 +                else:
    1.91 +                    log.info("Finished sending %s",msg)
    1.92      
    1.93      
    1.94      ### RECEIVING:
    1.95 @@ -132,7 +146,7 @@
    1.96                  self.inHeader += data
    1.97          else:
    1.98              self.inMessage._receivedData(data)
    1.99 -        
   1.100 +    
   1.101      def found_terminator(self):
   1.102          if self.expectingHeader:
   1.103              # Got a header:
   1.104 @@ -150,7 +164,7 @@
   1.105                  self.set_terminator(frameLen)
   1.106              else:
   1.107                  self._endOfFrame()
   1.108 -                
   1.109 +        
   1.110          else:
   1.111              # Got the frame's payload:
   1.112              self._endOfFrame()
   1.113 @@ -162,12 +176,13 @@
   1.114              message = self.pendingRequests.get(requestNo)
   1.115              if message==None and requestNo == self.inNumRequests+1:
   1.116                  message = IncomingRequest(self,requestNo,flags)
   1.117 +                assert message!=None
   1.118                  self.pendingRequests[requestNo] = message
   1.119                  self.inNumRequests += 1
   1.120          elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   1.121              message = self.pendingResponses.get(requestNo)
   1.122 -            
   1.123 -        if message:
   1.124 +        
   1.125 +        if message != None:
   1.126              message._beginFrame(flags)
   1.127          else:
   1.128              log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   1.129 @@ -183,7 +198,7 @@
   1.130              log.debug("End of frame of %s",msg)
   1.131              if not msg.moreComing:
   1.132                  self._receivedMessage(msg)
   1.133 -
   1.134 +    
   1.135      def _receivedMessage(self, msg):
   1.136          log.info("Received: %s",msg)
   1.137          # Remove from pending:
   1.138 @@ -194,20 +209,19 @@
   1.139          # Decode:
   1.140          try:
   1.141              msg._finished()
   1.142 +            if not msg.isResponse:
   1.143 +                self.onRequest(msg)
   1.144          except Exception, x:
   1.145 -            log.error("Exception parsing message: %s", traceback.format_exc())
   1.146 -            return
   1.147 -        # Dispatch:
   1.148 -        try:
   1.149 -            self.onRequest(msg)
   1.150 -        except Exception, x:
   1.151 -            log.error("Exception dispatching message: %s", traceback.format_exc())
   1.152 +            log.error("Exception handling incoming message: %s", traceback.format_exc())
   1.153              #FIX: Send an error reply
   1.154  
   1.155 +
   1.156  ### MESSAGES:
   1.157  
   1.158  
   1.159  class Message (object):
   1.160 +    "Abstract superclass of all request/response objects"
   1.161 +    
   1.162      def __init__(self, connection, properties=None, body=None):
   1.163          self.connection = connection
   1.164          self.properties = properties or {}
   1.165 @@ -215,7 +229,10 @@
   1.166      
   1.167      @property
   1.168      def flags(self):
   1.169 -        flags = kMsgType_Request
   1.170 +        if self.isResponse:
   1.171 +            flags = kMsgType_Response
   1.172 +        else:
   1.173 +            flags = kMsgType_Request
   1.174          if self.urgent:     flags |= kMsgFlag_Urgent
   1.175          if self.compressed: flags |= kMsgFlag_Compressed
   1.176          if self.noReply:    flags |= kMsgFlag_NoReply
   1.177 @@ -235,17 +252,30 @@
   1.178          s = str(self)
   1.179          if len(self.properties): s += repr(self.properties)
   1.180          return s
   1.181 -        
   1.182 -    @property
   1.183 +    
   1.184 +    @property 
   1.185      def isResponse(self):
   1.186 +        "Is this message a response?"
   1.187          return False
   1.188 +    
   1.189 +    @property 
   1.190 +    def contentType(self):
   1.191 +        return self.properties.get('Content-Type')
   1.192 +    
   1.193 +    def __getitem__(self, key):     return self.properties.get(key)
   1.194 +    def __contains__(self, key):    return key in self.properties
   1.195 +    def __len__(self):              return len(self.properties)
   1.196 +    def __nonzero__(self):          return True
   1.197 +    def __iter__(self):             return self.properties.__iter__()
   1.198  
   1.199  
   1.200  class IncomingMessage (Message):
   1.201 +    "Abstract superclass of incoming messages."
   1.202 +    
   1.203      def __init__(self, connection, requestNo, flags):
   1.204          super(IncomingMessage,self).__init__(connection)
   1.205          self.requestNo  = requestNo
   1.206 -        self.urgent     = (flags & kMsgFlag_Urgent) != 0 
   1.207 +        self.urgent     = (flags & kMsgFlag_Urgent) != 0
   1.208          self.compressed = (flags & kMsgFlag_Compressed) != 0
   1.209          self.noReply    = (flags & kMsgFlag_NoReply) != 0
   1.210          self.moreComing = (flags & kMsgFlag_MoreComing) != 0
   1.211 @@ -254,7 +284,7 @@
   1.212      def _beginFrame(self, flags):
   1.213          if (flags & kMsgFlag_MoreComing)==0:
   1.214              self.moreComing = False
   1.215 -
   1.216 +    
   1.217      def _receivedData(self, data):
   1.218          self.frames.append(data)
   1.219      
   1.220 @@ -295,30 +325,40 @@
   1.221                     '\x07' : "Channel",
   1.222                     '\x08' : "Error-Code",
   1.223                     '\x09' : "Error-Domain"}
   1.224 -        
   1.225 +
   1.226  
   1.227  
   1.228  class OutgoingMessage (Message):
   1.229 +    "Abstract superclass of outgoing requests/responses."
   1.230 +    
   1.231 +    def __init__(self, connection, properties=None, body=None):
   1.232 +        Message.__init__(self,connection,properties,body)
   1.233 +        self.urgent = self.compressed = self.noReply = False
   1.234 +        self.moreComing = True
   1.235 +    
   1.236 +    def __setitem__(self, key,val):
   1.237 +        self.properties[key] = val
   1.238 +    def __delitem__(self, key):
   1.239 +        del self.properties[key]
   1.240      
   1.241      def send(self):
   1.242 +        "Sends this message."
   1.243 +        log.info("Sending %s",self)
   1.244          out = StringIO()
   1.245 -        out.write("xx")         # placeholder for properties length (16 bits)
   1.246 -        for (key,value) in self.properties:
   1.247 -            def _writePropString(self, str):
   1.248 -                out.write(str)
   1.249 -                #FIX: Abbreviate
   1.250 +        for (key,value) in self.properties.iteritems():
   1.251 +            def _writePropString(str):
   1.252 +                out.write(str)    #FIX: Abbreviate
   1.253                  out.write('\000')
   1.254 -            self._writePropString(key)
   1.255 -            self._writePropString(value)
   1.256 -        propsLen = out.tell()
   1.257 -        self.encoded = out.stringvalue()
   1.258 +            _writePropString(key)
   1.259 +            _writePropString(value)
   1.260 +        self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
   1.261          out.close()
   1.262 -        self.encoded[0:2] = struct.pack('!H',propsLen)
   1.263          
   1.264          body = self.body
   1.265          if self.compressed:
   1.266              body = zlib.compress(body,5)
   1.267          self.encoded += body
   1.268 +        log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   1.269          
   1.270          self.bytesSent = 0
   1.271          self.connection._outQueueMessage(self)
   1.272 @@ -327,46 +367,68 @@
   1.273          pos = self.bytesSent
   1.274          payload = self.encoded[pos:pos+maxLen]
   1.275          pos += len(payload)
   1.276 -        if pos >= len(self.encoded):
   1.277 -            self.moreComing = False
   1.278 -
   1.279 -        conn.push( struct.pack(kFrameHeaderFormat, 
   1.280 -                               kFrameMagicNumber,
   1.281 -                               self.requestNo,
   1.282 -                               self.flags,
   1.283 -                               kFrameHeaderSize+len(payload)) )
   1.284 +        self.moreComing = (pos < len(self.encoded))
   1.285 +        log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   1.286 +        
   1.287 +        conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   1.288 +                                                   self.requestNo,
   1.289 +                                                   self.flags,
   1.290 +                                                   kFrameHeaderSize+len(payload)) )
   1.291          conn.push( payload )
   1.292          
   1.293          self.bytesSent = pos
   1.294 +        return self.moreComing
   1.295  
   1.296  
   1.297 -class Request (Message):
   1.298 -    pass
   1.299 +class Request (object):
   1.300 +    @property
   1.301 +    def response(self):
   1.302 +        "The response object for this request."
   1.303 +        r = self.__dict__.get('_response')
   1.304 +        if r==None:
   1.305 +            r = self._response = self._createResponse()
   1.306 +        return r
   1.307 +
   1.308  
   1.309  class Response (Message):
   1.310 +    def __init__(self, request):
   1.311 +        assert not request.noReply
   1.312 +        self.request = request
   1.313 +        self.requestNo = request.requestNo
   1.314 +        self.urgent = request.urgent
   1.315 +    
   1.316      @property
   1.317      def isResponse(self):
   1.318          return True
   1.319  
   1.320 -    @property
   1.321 -    def flags(self):
   1.322 -        flags = super(Response,self).flags() ^ kMsgType_Request
   1.323 -        flags ^= kMsgType_Response
   1.324 -        return flags
   1.325 -
   1.326  
   1.327  
   1.328  class IncomingRequest (IncomingMessage, Request):
   1.329 -    pass
   1.330 +    def _createResponse(self):
   1.331 +        return OutgoingResponse(self)
   1.332  
   1.333  class OutgoingRequest (OutgoingMessage, Request):
   1.334 -    pass
   1.335 +    def _createResponse(self):
   1.336 +        return IncomingResponse(self)
   1.337  
   1.338  class IncomingResponse (IncomingMessage, Response):
   1.339 -    pass
   1.340 -
   1.341 +    def __init__(self, request):
   1.342 +        IncomingMessage.__init__(self,request.connection,request.requestNo,0)
   1.343 +        Response.__init__(self,request)
   1.344 +        self.onComplete = None
   1.345 +    
   1.346 +    def _finished(self):
   1.347 +        super(IncomingResponse,self)._finished()
   1.348 +        if self.onComplete:
   1.349 +            try:
   1.350 +                self.onComplete(self)
   1.351 +            except Exception, x:
   1.352 +                log.error("Exception dispatching response: %s", traceback.format_exc())
   1.353 +            
   1.354  class OutgoingResponse (OutgoingMessage, Response):
   1.355 -    pass
   1.356 +    def __init__(self, request):
   1.357 +        OutgoingMessage.__init__(self,request.connection)
   1.358 +        Response.__init__(self,request)
   1.359  
   1.360  
   1.361  ### UNIT TESTS:
   1.362 @@ -374,8 +436,23 @@
   1.363  
   1.364  class BLIPTests(unittest.TestCase):
   1.365      def setUp(self):
   1.366 +        def handleRequest(request):
   1.367 +            logging.info("Got request!: %r",request)
   1.368 +            body = request.body
   1.369 +            assert len(body)<32768
   1.370 +            assert request.contentType == 'application/octet-stream'
   1.371 +            assert int(request['Size']) == len(body)
   1.372 +            assert request['User-Agent'] == 'BLIPConnectionTester'
   1.373 +            for i in xrange(0,len(request.body)):
   1.374 +                assert ord(body[i]) == i%256
   1.375 +            
   1.376 +            response = request.response
   1.377 +            response.body = request.body
   1.378 +            response['Content-Type'] = request.contentType
   1.379 +            response.send()
   1.380 +        
   1.381          listener = Listener(46353)
   1.382 -        listener.onRequest = lambda req: logging.info("Got request!: %r",req)
   1.383 +        listener.onRequest = handleRequest
   1.384      
   1.385      def testListener(self):
   1.386          logging.info("Waiting...")