Python/BLIP.py
author Jens Alfke <jens@mooseyard.com>
Tue Apr 28 10:36:28 2009 -0700 (2009-04-28)
changeset 29 59689fbdcf77
parent 14 bb5faa9995d5
child 51 de59ce19f42e
permissions -rw-r--r--
Fixed two CF memory leaks. (Fixes issue #5)
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F205
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 
    40 kMsgType_Request    = 0
    41 kMsgType_Response   = 1
    42 kMsgType_Error      = 2
    43 
    44 
    45 log = logging.getLogger('BLIP')
    46 log.propagate = True
    47 
    48 
    49 class MessageException(Exception):
    50     pass
    51 
    52 class ConnectionException(Exception):
    53     pass
    54 
    55 
    56 ### LISTENER AND CONNECTION CLASSES:
    57 
    58 
    59 class Listener (asyncore.dispatcher):
    60     "BLIP listener/server class"
    61     
    62     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    63         "Create a listener on a port"
    64         asyncore.dispatcher.__init__(self)
    65         self.onConnected = self.onRequest = None
    66         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    67         self.bind( ('',port) )
    68         self.listen(5)
    69         self.sslKeyFile=sslKeyFile
    70         self.sslCertFile=sslCertFile
    71         log.info("Listening on port %u", port)
    72     
    73     def handle_accept( self ):
    74         socket,address = self.accept()
    75         if self.sslKeyFile:
    76             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    77         conn = Connection(address, sock=socket, listener=self)
    78         conn.onRequest = self.onRequest
    79         if self.onConnected:
    80             self.onConnected(conn)
    81 
    82     def handle_error(self):
    83         (typ,val,trace) = sys.exc_info()
    84         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    85         self.close()
    86     
    87 
    88 
    89 class Connection (asynchat.async_chat):
    90     def __init__( self, address, sock=None, listener=None, ssl=None ):
    91         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    92         "otherwise it'll open a new outgoing socket."
    93         if sock:
    94             asynchat.async_chat.__init__(self,sock)
    95             log.info("Accepted connection from %s",address)
    96             self.status = kOpen
    97         else:
    98             asynchat.async_chat.__init__(self)
    99             log.info("Opening connection to %s",address)
   100             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   101             self.status = kOpening
   102             if ssl:
   103                 ssl(self.socket)
   104             self.connect(address)
   105         self.address = address
   106         self.listener = listener
   107         self.onRequest = None
   108         self.pendingRequests = {}
   109         self.pendingResponses = {}
   110         self.outBox = []
   111         self.inMessage = None
   112         self.inNumRequests = self.outNumRequests = 0
   113         self.sending = False
   114         self._endOfFrame()
   115     
   116     def close(self):
   117         if self.status > kClosed:
   118             self.status = kClosing
   119             log.info("Connection closing...")
   120         asynchat.async_chat.close(self)
   121     
   122     def handle_connect(self):
   123         log.info("Connection open!")
   124         self.status = kOpen
   125     
   126     def handle_error(self):
   127         (typ,val,trace) = sys.exc_info()
   128         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   129         self.discard_buffers()
   130         self.status = kDisconnected
   131         self.close()
   132     
   133     def handle_close(self):
   134         log.info("Connection closed!")
   135         self.pendingRequests = self.pendingResponses = None
   136         self.outBox = None
   137         if self.status == kClosing:
   138             self.status = kClosed
   139         else:
   140             self.status = kDisconnected
   141         asynchat.async_chat.handle_close(self)
   142         
   143     
   144     ### SENDING:
   145     
   146     @property
   147     def canSend(self):
   148         return self.status==kOpening or self.status==kOpen
   149     
   150     def _sendMessage(self, msg):
   151         if self.canSend:
   152             self._outQueueMessage(msg,True)
   153             if not self.sending:
   154                 log.debug("Waking up the output stream")
   155                 self.sending = True
   156                 self.push_with_producer(self)
   157             return True
   158         else:
   159             return False
   160     
   161     def _sendRequest(self, req):
   162         if self.canSend:
   163             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   164             response = req.response
   165             if response:
   166                 response.requestNo = requestNo
   167                 self.pendingResponses[requestNo] = response
   168                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   169             return self._sendMessage(req)
   170         else:
   171             return False
   172     
   173     def _outQueueMessage(self, msg,isNew=True):
   174         n = len(self.outBox)
   175         index = n
   176         if msg.urgent and n>1:
   177             while index > 0:
   178                 otherMsg = self.outBox[index-1]
   179                 if otherMsg.urgent:
   180                     if index<n:
   181                         index += 1
   182                     break
   183                 elif isNew and otherMsg.bytesSent==0:
   184                     break
   185                 index -= 1
   186             else:
   187                 index = 1
   188         
   189         self.outBox.insert(index,msg)
   190         if isNew:
   191             log.info("Queuing %s at index %i",msg,index)
   192         else:
   193             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   194     
   195     def more(self):
   196         n = len(self.outBox)
   197         if n > 0:
   198             msg = self.outBox.pop(0)
   199             frameSize = 4096
   200             if msg.urgent or n==1 or not self.outBox[0].urgent:
   201                 frameSize *= 4
   202             data = msg._sendNextFrame(frameSize)
   203             if msg._moreComing:
   204                 self._outQueueMessage(msg,isNew=False)
   205             else:
   206                 log.info("Finished sending %s",msg)
   207             return data
   208         else:
   209             log.debug("Nothing more to send")
   210             self.sending = False
   211             return None
   212     
   213     ### RECEIVING:
   214     
   215     def collect_incoming_data(self, data):
   216         if self.expectingHeader:
   217             if self.inHeader==None:
   218                 self.inHeader = data
   219             else:
   220                 self.inHeader += data
   221         elif self.inMessage:
   222             self.inMessage._receivedData(data)
   223     
   224     def found_terminator(self):
   225         if self.expectingHeader:
   226             # Got a header:
   227             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   228             self.inHeader = None
   229             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   230             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   231             frameLen -= kFrameHeaderSize
   232             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   233                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   234             self.inMessage = self._inMessageForFrame(requestNo,flags)
   235             
   236             if frameLen > 0:
   237                 self.expectingHeader = False
   238                 self.set_terminator(frameLen)
   239             else:
   240                 self._endOfFrame()
   241         
   242         else:
   243             # Got the frame's payload:
   244             self._endOfFrame()
   245     
   246     def _inMessageForFrame(self, requestNo,flags):
   247         message = None
   248         msgType = flags & kMsgFlag_TypeMask
   249         if msgType==kMsgType_Request:
   250             message = self.pendingRequests.get(requestNo)
   251             if message==None and requestNo == self.inNumRequests+1:
   252                 message = IncomingRequest(self,requestNo,flags)
   253                 assert message!=None
   254                 self.pendingRequests[requestNo] = message
   255                 self.inNumRequests += 1
   256         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   257             message = self.pendingResponses.get(requestNo)
   258         
   259         if message != None:
   260             message._beginFrame(flags)
   261         else:
   262             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   263         return message
   264     
   265     def _endOfFrame(self):
   266         msg = self.inMessage
   267         self.inMessage = None
   268         self.expectingHeader = True
   269         self.inHeader = None
   270         self.set_terminator(kFrameHeaderSize) # wait for binary header
   271         if msg:
   272             log.debug("End of frame of %s",msg)
   273             if not msg._moreComing:
   274                 self._receivedMessage(msg)
   275     
   276     def _receivedMessage(self, msg):
   277         log.info("Received: %s",msg)
   278         # Remove from pending:
   279         if msg.isResponse:
   280             del self.pendingResponses[msg.requestNo]
   281         else:
   282             del self.pendingRequests[msg.requestNo]
   283         # Decode:
   284         try:
   285             msg._finished()
   286             if not msg.isResponse:
   287                 self.onRequest(msg)
   288         except Exception, x:
   289             log.error("Exception handling incoming message: %s", traceback.format_exc())
   290             #FIX: Send an error reply
   291 
   292 
   293 ### MESSAGE CLASSES:
   294 
   295 
   296 class Message (object):
   297     "Abstract superclass of all request/response objects"
   298     
   299     def __init__(self, connection, body=None, properties=None):
   300         self.connection = connection
   301         self.body = body
   302         self.properties = properties or {}
   303         self.requestNo = None
   304     
   305     @property
   306     def flags(self):
   307         if self.isResponse:
   308             flags = kMsgType_Response
   309         else:
   310             flags = kMsgType_Request
   311         if self.urgent:     flags |= kMsgFlag_Urgent
   312         if self.compressed: flags |= kMsgFlag_Compressed
   313         if self.noReply:    flags |= kMsgFlag_NoReply
   314         if self._moreComing:flags |= kMsgFlag_MoreComing
   315         return flags
   316     
   317     def __str__(self):
   318         s = "%s[" %(type(self).__name__)
   319         if self.requestNo != None:
   320             s += "#%i" %self.requestNo
   321         if self.urgent:     s += " URG"
   322         if self.compressed: s += " CMP"
   323         if self.noReply:    s += " NOR"
   324         if self._moreComing:s += " MOR"
   325         if self.body:       s += " %i bytes" %len(self.body)
   326         return s+"]"
   327     
   328     def __repr__(self):
   329         s = str(self)
   330         if len(self.properties): s += repr(self.properties)
   331         return s
   332     
   333     @property
   334     def isResponse(self):
   335         "Is this message a response?"
   336         return False
   337     
   338     @property
   339     def contentType(self):
   340         return self.properties.get('Content-Type')
   341     
   342     def __getitem__(self, key):     return self.properties.get(key)
   343     def __contains__(self, key):    return key in self.properties
   344     def __len__(self):              return len(self.properties)
   345     def __nonzero__(self):          return True
   346     def __iter__(self):             return self.properties.__iter__()
   347 
   348 
   349 class IncomingMessage (Message):
   350     "Abstract superclass of incoming messages."
   351     
   352     def __init__(self, connection, requestNo, flags):
   353         super(IncomingMessage,self).__init__(connection)
   354         self.requestNo  = requestNo
   355         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   356         self.compressed = (flags & kMsgFlag_Compressed) != 0
   357         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   358         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   359         self.frames     = []
   360     
   361     def _beginFrame(self, flags):
   362         """Received a frame header."""
   363         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   364     
   365     def _receivedData(self, data):
   366         """Received data from a frame."""
   367         self.frames.append(data)
   368     
   369     def _finished(self):
   370         """The entire message has been received; now decode it."""
   371         encoded = "".join(self.frames)
   372         self.frames = None
   373         
   374         # Decode the properties:
   375         if len(encoded) < 2: raise MessageException, "missing properties length"
   376         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   377         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   378         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   379         
   380         proplist = encoded[2:propSize-1].split('\000')
   381         encoded = encoded[propSize:]
   382         if len(proplist) & 1: raise MessageException, "odd number of property strings"
   383         for i in xrange(0,len(proplist),2):
   384             def expand(str):
   385                 if len(str)==1:
   386                     str = IncomingMessage.__expandDict.get(str,str)
   387                 return str
   388             self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   389         
   390         # Decode the body:
   391         if self.compressed and len(encoded)>0:
   392             try:
   393                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   394             except zlib.error:
   395                 raise MessageException, sys.exc_info()[1]
   396         self.body = encoded
   397     
   398     __expandDict= {'\x01' : "Content-Type",
   399                    '\x02' : "Profile",
   400                    '\x03' : "application/octet-stream",
   401                    '\x04' : "text/plain; charset=UTF-8",
   402                    '\x05' : "text/xml",
   403                    '\x06' : "text/yaml",
   404                    '\x07' : "Channel",
   405                    '\x08' : "Error-Code",
   406                    '\x09' : "Error-Domain"}
   407 
   408 
   409 class OutgoingMessage (Message):
   410     "Abstract superclass of outgoing requests/responses."
   411     
   412     def __init__(self, connection, body=None, properties=None):
   413         Message.__init__(self,connection,body,properties)
   414         self.urgent = self.compressed = self.noReply = False
   415         self._moreComing = True
   416     
   417     def __setitem__(self, key,val):
   418         self.properties[key] = val
   419     def __delitem__(self, key):
   420         del self.properties[key]
   421     
   422     @property
   423     def sent(self):
   424         return hasattr(self,'encoded')
   425     
   426     def _encode(self):
   427         "Generates the message's encoded form, prior to sending it."
   428         out = StringIO()
   429         for (key,value) in self.properties.iteritems():
   430             def _writePropString(s):
   431                 out.write(str(s))    #FIX: Abbreviate
   432                 out.write('\000')
   433             _writePropString(key)
   434             _writePropString(value)
   435         propertiesSize = out.tell()
   436         assert propertiesSize<65536     #FIX: Return an error instead
   437         
   438         body = self.body
   439         if self.compressed:
   440             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   441             out.write(z.compress(body))
   442             body = z.flush()
   443         out.write(body)
   444         
   445         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   446         out.close()
   447         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   448         self.bytesSent = 0
   449     
   450     def _sendNextFrame(self, maxLen):
   451         pos = self.bytesSent
   452         payload = self.encoded[pos:pos+maxLen]
   453         pos += len(payload)
   454         self._moreComing = (pos < len(self.encoded))
   455         if not self._moreComing:
   456             self.encoded = None
   457         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   458         
   459         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   460                                                    self.requestNo,
   461                                                    self.flags,
   462                                                    kFrameHeaderSize+len(payload))
   463         self.bytesSent = pos
   464         return header + payload
   465 
   466 
   467 class Request (object):
   468     @property
   469     def response(self):
   470         "The response object for this request."
   471         if self.noReply:
   472             return None
   473         r = self.__dict__.get('_response')
   474         if r==None:
   475             r = self._response = self._createResponse()
   476         return r
   477 
   478 
   479 class Response (Message):
   480     def _setRequest(self, request):
   481         assert not request.noReply
   482         self.request = request
   483         self.requestNo = request.requestNo
   484         self.urgent = request.urgent
   485     
   486     @property
   487     def isResponse(self):
   488         return True
   489 
   490 
   491 class IncomingRequest (IncomingMessage, Request):
   492     def _createResponse(self):
   493         return OutgoingResponse(self)
   494 
   495 
   496 class OutgoingRequest (OutgoingMessage, Request):
   497     def _createResponse(self):
   498         return IncomingResponse(self)
   499     
   500     def send(self):
   501         self._encode()
   502         return self.connection._sendRequest(self) and self.response
   503 
   504 
   505 class IncomingResponse (IncomingMessage, Response):
   506     def __init__(self, request):
   507         IncomingMessage.__init__(self,request.connection,None,0)
   508         self._setRequest(request)
   509         self.onComplete = None
   510     
   511     def _finished(self):
   512         super(IncomingResponse,self)._finished()
   513         if self.onComplete:
   514             try:
   515                 self.onComplete(self)
   516             except Exception, x:
   517                 log.error("Exception dispatching response: %s", traceback.format_exc())
   518 
   519 
   520 class OutgoingResponse (OutgoingMessage, Response):
   521     def __init__(self, request):
   522         OutgoingMessage.__init__(self,request.connection)
   523         self._setRequest(request)
   524     
   525     def send(self):
   526         self._encode()
   527         return self.connection._sendMessage(self)
   528 
   529 
   530 """
   531  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   532  
   533  Redistribution and use in source and binary forms, with or without modification, are permitted
   534  provided that the following conditions are met:
   535  
   536  * Redistributions of source code must retain the above copyright notice, this list of conditions
   537  and the following disclaimer.
   538  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   539  and the following disclaimer in the documentation and/or other materials provided with the
   540  distribution.
   541  
   542  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   543  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   544  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   545  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   546  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   547   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   548  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   549  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   550 """