Python/BLIP.py
author Jens Alfke <jens@mooseyard.com>
Tue Jun 03 22:24:21 2008 -0700 (2008-06-03)
changeset 12 710113961756
parent 11 29e8b03c05d4
child 13 84c2d38f924c
permissions -rw-r--r--
BLIP.py working for listener side (it talks to the Obj-C BLIPConnectionTester.)
     1 #!/usr/bin/env python
     2 # encoding: utf-8
     3 """
     4 BLIP.py
     5 
     6 Created by Jens Alfke on 2008-06-03.
     7 Copyright (c) 2008 Jens Alfke. All rights reserved.
     8 """
     9 
    10 import asynchat
    11 import asyncore
    12 from cStringIO import StringIO
    13 import logging
    14 import socket
    15 import struct
    16 import sys
    17 import traceback
    18 import unittest
    19 import zlib
    20 
    21 
    22 # INTERNAL CONSTANTS -- NO TOUCHIES!
    23 
    24 kFrameMagicNumber   = 0x9B34F205
    25 kFrameHeaderFormat  = '!LLHH'
    26 kFrameHeaderSize    = 12
    27 
    28 kMsgFlag_TypeMask   = 0x000F
    29 kMsgFlag_Compressed = 0x0010
    30 kMsgFlag_Urgent     = 0x0020
    31 kMsgFlag_NoReply    = 0x0040
    32 kMsgFlag_MoreComing = 0x0080
    33 
    34 kMsgType_Request    = 0
    35 kMsgType_Response   = 1
    36 kMsgType_Error      = 2
    37 
    38 
    39 log = logging.getLogger('BLIP')
    40 log.propagate = True
    41 
    42 
    43 class MessageException(Exception):
    44     pass
    45 
    46 class ConnectionException(Exception):
    47     pass
    48 
    49 
    50 class Listener (asyncore.dispatcher):
    51     "BLIP listener/server class"
    52     
    53     def __init__(self, port):
    54         "Create a listener on a port"
    55         asyncore.dispatcher.__init__(self)
    56         self.onConnected = self.onRequest = None
    57         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    58         self.bind( ('',port) )
    59         self.listen(5)
    60         log.info("Listening on port %u", port)
    61     
    62     def handle_accept( self ):
    63         client,address = self.accept()
    64         conn = Connection(address,client)
    65         conn.onRequest = self.onRequest
    66         if self.onConnected:
    67             self.onConnected(conn)
    68 
    69 
    70 class Connection (asynchat.async_chat):
    71     def __init__( self, address, conn=None ):
    72         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    73         "otherwise it'll open a new outgoing socket."
    74         asynchat.async_chat.__init__(self,conn)
    75         self.address = address
    76         if conn:
    77             log.info("Accepted connection from %s",address)
    78         else:
    79             log.info("Opening connection to %s",address)
    80             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    81             self.connect(address)
    82         self.onRequest = None
    83         self.pendingRequests = {}
    84         self.pendingResponses = {}
    85         self.outBox = []
    86         self.inMessage = None
    87         self.inNumRequests = 0
    88         self._endOfFrame()
    89     
    90     #def handle_error(self,x):
    91     #    log.error("Uncaught exception: %s",x)
    92     #    self.close()
    93     
    94     def _fatal(self, error):
    95         log.error("Fatal BLIP connection error: %s",error)
    96         self.close()
    97     
    98     
    99     ### SENDING:
   100     
   101     def _outQueueMessage(self, msg,isNew=True):
   102         n = len(self.outBox)
   103         index = n
   104         if msg.urgent and n>1:
   105             while index > 0:
   106                 otherMsg = self.outBox[index-1]
   107                 if otherMsg.urgent:
   108                     if index<n:
   109                         index += 1
   110                     break
   111                 elif isNew and otherMsg._bytesWritten==0:
   112                     break
   113                 index -= 1
   114             else:
   115                 index = 1
   116         
   117         self.outBox.insert(index,msg)
   118         if isNew:
   119             log.info("Queuing outgoing message at index %i",index)
   120             if n==0:
   121                 self._sendNextFrame()
   122         else:
   123             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   124     
   125     def _sendNextFrame(self):
   126         while self.outBox:              #FIX: Don't send everything at once; only as space becomes available!
   127             n = len(self.outBox)
   128             if n > 0:
   129                 msg = self.outBox.pop(0)
   130                 frameSize = 4096
   131                 if msg.urgent or n==1 or not self.outBox[0].urgent:
   132                     frameSize *= 4
   133                 if msg._sendNextFrame(self,frameSize):
   134                     self._outQueueMessage(msg,isNew=False)
   135                 else:
   136                     log.info("Finished sending %s",msg)
   137     
   138     
   139     ### RECEIVING:
   140     
   141     def collect_incoming_data(self, data):
   142         if self.expectingHeader:
   143             if self.inHeader==None:
   144                 self.inHeader = data
   145             else:
   146                 self.inHeader += data
   147         else:
   148             self.inMessage._receivedData(data)
   149     
   150     def found_terminator(self):
   151         if self.expectingHeader:
   152             # Got a header:
   153             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   154             self.inHeader = None
   155             if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
   156             if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
   157             frameLen -= kFrameHeaderSize
   158             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   159                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   160             self.inMessage = self._inMessageForFrame(requestNo,flags)
   161             
   162             if frameLen > 0:
   163                 self.expectingHeader = False
   164                 self.set_terminator(frameLen)
   165             else:
   166                 self._endOfFrame()
   167         
   168         else:
   169             # Got the frame's payload:
   170             self._endOfFrame()
   171     
   172     def _inMessageForFrame(self, requestNo,flags):
   173         message = None
   174         msgType = flags & kMsgFlag_TypeMask
   175         if msgType==kMsgType_Request:
   176             message = self.pendingRequests.get(requestNo)
   177             if message==None and requestNo == self.inNumRequests+1:
   178                 message = IncomingRequest(self,requestNo,flags)
   179                 assert message!=None
   180                 self.pendingRequests[requestNo] = message
   181                 self.inNumRequests += 1
   182         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   183             message = self.pendingResponses.get(requestNo)
   184         
   185         if message != None:
   186             message._beginFrame(flags)
   187         else:
   188             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   189         return message
   190     
   191     def _endOfFrame(self):
   192         msg = self.inMessage
   193         self.inMessage = None
   194         self.expectingHeader = True
   195         self.inHeader = None
   196         self.set_terminator(kFrameHeaderSize) # wait for binary header
   197         if msg:
   198             log.debug("End of frame of %s",msg)
   199             if not msg.moreComing:
   200                 self._receivedMessage(msg)
   201     
   202     def _receivedMessage(self, msg):
   203         log.info("Received: %s",msg)
   204         # Remove from pending:
   205         if msg.isResponse:
   206             del self.pendingReplies[msg.requestNo]
   207         else:
   208             del self.pendingRequests[msg.requestNo]
   209         # Decode:
   210         try:
   211             msg._finished()
   212             if not msg.isResponse:
   213                 self.onRequest(msg)
   214         except Exception, x:
   215             log.error("Exception handling incoming message: %s", traceback.format_exc())
   216             #FIX: Send an error reply
   217 
   218 
   219 ### MESSAGES:
   220 
   221 
   222 class Message (object):
   223     "Abstract superclass of all request/response objects"
   224     
   225     def __init__(self, connection, properties=None, body=None):
   226         self.connection = connection
   227         self.properties = properties or {}
   228         self.body = body
   229     
   230     @property
   231     def flags(self):
   232         if self.isResponse:
   233             flags = kMsgType_Response
   234         else:
   235             flags = kMsgType_Request
   236         if self.urgent:     flags |= kMsgFlag_Urgent
   237         if self.compressed: flags |= kMsgFlag_Compressed
   238         if self.noReply:    flags |= kMsgFlag_NoReply
   239         if self.moreComing: flags |= kMsgFlag_MoreComing
   240         return flags
   241     
   242     def __str__(self):
   243         s = "%s[#%i" %(type(self).__name__,self.requestNo)
   244         if self.urgent:     s += " URG"
   245         if self.compressed: s += " CMP"
   246         if self.noReply:    s += " NOR"
   247         if self.moreComing: s += " MOR"
   248         if self.body:       s += " %i bytes" %len(self.body)
   249         return s+"]"
   250     
   251     def __repr__(self):
   252         s = str(self)
   253         if len(self.properties): s += repr(self.properties)
   254         return s
   255     
   256     @property 
   257     def isResponse(self):
   258         "Is this message a response?"
   259         return False
   260     
   261     @property 
   262     def contentType(self):
   263         return self.properties.get('Content-Type')
   264     
   265     def __getitem__(self, key):     return self.properties.get(key)
   266     def __contains__(self, key):    return key in self.properties
   267     def __len__(self):              return len(self.properties)
   268     def __nonzero__(self):          return True
   269     def __iter__(self):             return self.properties.__iter__()
   270 
   271 
   272 class IncomingMessage (Message):
   273     "Abstract superclass of incoming messages."
   274     
   275     def __init__(self, connection, requestNo, flags):
   276         super(IncomingMessage,self).__init__(connection)
   277         self.requestNo  = requestNo
   278         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   279         self.compressed = (flags & kMsgFlag_Compressed) != 0
   280         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   281         self.moreComing = (flags & kMsgFlag_MoreComing) != 0
   282         self.frames     = []
   283     
   284     def _beginFrame(self, flags):
   285         if (flags & kMsgFlag_MoreComing)==0:
   286             self.moreComing = False
   287     
   288     def _receivedData(self, data):
   289         self.frames.append(data)
   290     
   291     def _finished(self):
   292         encoded = "".join(self.frames)
   293         self.frames = None
   294         
   295         # Decode the properties:
   296         if len(encoded) < 2: raise MessageException, "missing properties length"
   297         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   298         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   299         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   300         
   301         proplist = encoded[2:propSize-1].split('\000')
   302         encoded = encoded[propSize:]
   303         if len(proplist) & 1: raise MessageException, "odd number of property strings"
   304         for i in xrange(0,len(proplist),2):
   305             def expand(str):
   306                 if len(str)==1:
   307                     str = IncomingMessage.__expandDict.get(str,str)
   308                 return str
   309             self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   310         
   311         # Decode the body:
   312         if self.compressed and len(encoded)>0:
   313             try:
   314                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   315             except zlib.error:
   316                 raise MessageException, sys.exc_info()[1]
   317         self.body = encoded
   318     
   319     __expandDict= {'\x01' : "Content-Type",
   320                    '\x02' : "Profile",
   321                    '\x03' : "application/octet-stream",
   322                    '\x04' : "text/plain; charset=UTF-8",
   323                    '\x05' : "text/xml",
   324                    '\x06' : "text/yaml",
   325                    '\x07' : "Channel",
   326                    '\x08' : "Error-Code",
   327                    '\x09' : "Error-Domain"}
   328 
   329 
   330 
   331 class OutgoingMessage (Message):
   332     "Abstract superclass of outgoing requests/responses."
   333     
   334     def __init__(self, connection, properties=None, body=None):
   335         Message.__init__(self,connection,properties,body)
   336         self.urgent = self.compressed = self.noReply = False
   337         self.moreComing = True
   338     
   339     def __setitem__(self, key,val):
   340         self.properties[key] = val
   341     def __delitem__(self, key):
   342         del self.properties[key]
   343     
   344     def send(self):
   345         "Sends this message."
   346         log.info("Sending %s",self)
   347         out = StringIO()
   348         for (key,value) in self.properties.iteritems():
   349             def _writePropString(str):
   350                 out.write(str)    #FIX: Abbreviate
   351                 out.write('\000')
   352             _writePropString(key)
   353             _writePropString(value)
   354         self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
   355         out.close()
   356         
   357         body = self.body
   358         if self.compressed:
   359             body = zlib.compress(body,5)
   360         self.encoded += body
   361         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   362         
   363         self.bytesSent = 0
   364         self.connection._outQueueMessage(self)
   365     
   366     def _sendNextFrame(self, conn,maxLen):
   367         pos = self.bytesSent
   368         payload = self.encoded[pos:pos+maxLen]
   369         pos += len(payload)
   370         self.moreComing = (pos < len(self.encoded))
   371         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   372         
   373         conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   374                                                    self.requestNo,
   375                                                    self.flags,
   376                                                    kFrameHeaderSize+len(payload)) )
   377         conn.push( payload )
   378         
   379         self.bytesSent = pos
   380         return self.moreComing
   381 
   382 
   383 class Request (object):
   384     @property
   385     def response(self):
   386         "The response object for this request."
   387         r = self.__dict__.get('_response')
   388         if r==None:
   389             r = self._response = self._createResponse()
   390         return r
   391 
   392 
   393 class Response (Message):
   394     def __init__(self, request):
   395         assert not request.noReply
   396         self.request = request
   397         self.requestNo = request.requestNo
   398         self.urgent = request.urgent
   399     
   400     @property
   401     def isResponse(self):
   402         return True
   403 
   404 
   405 
   406 class IncomingRequest (IncomingMessage, Request):
   407     def _createResponse(self):
   408         return OutgoingResponse(self)
   409 
   410 class OutgoingRequest (OutgoingMessage, Request):
   411     def _createResponse(self):
   412         return IncomingResponse(self)
   413 
   414 class IncomingResponse (IncomingMessage, Response):
   415     def __init__(self, request):
   416         IncomingMessage.__init__(self,request.connection,request.requestNo,0)
   417         Response.__init__(self,request)
   418         self.onComplete = None
   419     
   420     def _finished(self):
   421         super(IncomingResponse,self)._finished()
   422         if self.onComplete:
   423             try:
   424                 self.onComplete(self)
   425             except Exception, x:
   426                 log.error("Exception dispatching response: %s", traceback.format_exc())
   427             
   428 class OutgoingResponse (OutgoingMessage, Response):
   429     def __init__(self, request):
   430         OutgoingMessage.__init__(self,request.connection)
   431         Response.__init__(self,request)
   432 
   433 
   434 ### UNIT TESTS:
   435 
   436 
   437 class BLIPTests(unittest.TestCase):
   438     def setUp(self):
   439         def handleRequest(request):
   440             logging.info("Got request!: %r",request)
   441             body = request.body
   442             assert len(body)<32768
   443             assert request.contentType == 'application/octet-stream'
   444             assert int(request['Size']) == len(body)
   445             assert request['User-Agent'] == 'BLIPConnectionTester'
   446             for i in xrange(0,len(request.body)):
   447                 assert ord(body[i]) == i%256
   448             
   449             response = request.response
   450             response.body = request.body
   451             response['Content-Type'] = request.contentType
   452             response.send()
   453         
   454         listener = Listener(46353)
   455         listener.onRequest = handleRequest
   456     
   457     def testListener(self):
   458         logging.info("Waiting...")
   459         asyncore.loop()
   460 
   461 if __name__ == '__main__':
   462     logging.basicConfig(level=logging.INFO)
   463     unittest.main()