Python/BLIP.py
author morrowa
Thu Jul 02 17:51:35 2009 -0700 (2009-07-02)
changeset 55 f240f9023393
parent 53 e9f209a24d53
child 56 6c3b5372a307
permissions -rw-r--r--
Made C99 project default.
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F206
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 kMsgFlag_Meta       = 0x0100
    40 
    41 kMsgType_Request    = 0
    42 kMsgType_Response   = 1
    43 kMsgType_Error      = 2
    44 
    45 kMsgProfile_Hi      = "Hi"
    46 kMsgProfile_Bye     = "Bye"
    47 
    48 
    49 log = logging.getLogger('BLIP')
    50 log.propagate = True
    51 
    52 
    53 class MessageException(Exception):
    54     pass
    55 
    56 class ConnectionException(Exception):
    57     pass
    58 
    59 
    60 ### LISTENER AND CONNECTION CLASSES:
    61 
    62 
    63 class Listener (asyncore.dispatcher):
    64     "BLIP listener/server class"
    65     
    66     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    67         "Create a listener on a port"
    68         asyncore.dispatcher.__init__(self)
    69         self.onConnected = self.onRequest = None
    70         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    71         self.bind( ('',port) )
    72         self.listen(5)
    73         self.sslKeyFile=sslKeyFile
    74         self.sslCertFile=sslCertFile
    75         log.info("Listening on port %u", port)
    76     
    77     def handle_accept( self ):
    78         socket,address = self.accept()
    79         if self.sslKeyFile:
    80             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    81         conn = Connection(address, sock=socket, listener=self)
    82         conn.onRequest = self.onRequest
    83         if self.onConnected:
    84             self.onConnected(conn)
    85 
    86     def handle_error(self):
    87         (typ,val,trace) = sys.exc_info()
    88         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    89         self.close()
    90     
    91 
    92 
    93 class Connection (asynchat.async_chat):
    94     def __init__( self, address, sock=None, listener=None, ssl=None ):
    95         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    96         "otherwise it'll open a new outgoing socket."
    97         if sock:
    98             asynchat.async_chat.__init__(self,sock)
    99             log.info("Accepted connection from %s",address)
   100             self.status = kOpen
   101         else:
   102             asynchat.async_chat.__init__(self)
   103             log.info("Opening connection to %s",address)
   104             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   105             self.status = kOpening
   106             if ssl:
   107                 ssl(self.socket)
   108             self.connect(address)
   109         self.address = address
   110         self.listener = listener
   111         self.onRequest = self.onCloseRequest = self.onCloseRefused = None
   112         self.pendingRequests = {}
   113         self.pendingResponses = {}
   114         self.outBox = []
   115         self.inMessage = None
   116         self.inNumRequests = self.outNumRequests = 0
   117         self.sending = False
   118         self._endOfFrame()
   119         self._closeWhenPossible = False
   120     
   121     def handle_connect(self):
   122         log.info("Connection open!")
   123         self.status = kOpen
   124     
   125     def handle_error(self):
   126         (typ,val,trace) = sys.exc_info()
   127         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   128         self.discard_buffers()
   129         self.status = kDisconnected
   130         self.close()
   131     
   132     
   133     ### SENDING:
   134     
   135     @property
   136     def isOpen(self):
   137         return self.status==kOpening or self.status==kOpen
   138     
   139     @property
   140     def canSend(self):
   141         return self.isOpen and not self._closeWhenPossible
   142     
   143     def _sendMessage(self, msg):
   144         if self.isOpen:
   145             self._outQueueMessage(msg,True)
   146             if not self.sending:
   147                 log.debug("Waking up the output stream")
   148                 self.sending = True
   149                 self.push_with_producer(self)
   150             return True
   151         else:
   152             return False
   153     
   154     def _sendRequest(self, req):
   155         if self.canSend:
   156             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   157             response = req.response
   158             if response:
   159                 response.requestNo = requestNo
   160                 self.pendingResponses[requestNo] = response
   161                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   162             return self._sendMessage(req)
   163         else:
   164             log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
   165             return False
   166     
   167     def _outQueueMessage(self, msg,isNew=True):
   168         n = len(self.outBox)
   169         index = n
   170         if msg.urgent and n>1:
   171             while index > 0:
   172                 otherMsg = self.outBox[index-1]
   173                 if otherMsg.urgent:
   174                     if index<n:
   175                         index += 1
   176                     break
   177                 elif isNew and otherMsg.bytesSent==0:
   178                     break
   179                 index -= 1
   180             else:
   181                 index = 1
   182         
   183         self.outBox.insert(index,msg)
   184         if isNew:
   185             log.info("Queuing %s at index %i",msg,index)
   186         else:
   187             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   188     
   189     def more(self):
   190         n = len(self.outBox)
   191         if n > 0:
   192             msg = self.outBox.pop(0)
   193             frameSize = 4096
   194             if msg.urgent or n==1 or not self.outBox[0].urgent:
   195                 frameSize *= 4
   196             data = msg._sendNextFrame(frameSize)
   197             if msg._moreComing:
   198                 self._outQueueMessage(msg,isNew=False)
   199             else:
   200                 log.info("Finished sending %s",msg)
   201             return data
   202         else:
   203             log.debug("Nothing more to send")
   204             self.sending = False
   205             self._closeIfReady()
   206             return None
   207     
   208     ### RECEIVING:
   209     
   210     def collect_incoming_data(self, data):
   211         if self.expectingHeader:
   212             if self.inHeader==None:
   213                 self.inHeader = data
   214             else:
   215                 self.inHeader += data
   216         elif self.inMessage:
   217             self.inMessage._receivedData(data)
   218     
   219     def found_terminator(self):
   220         if self.expectingHeader:
   221             # Got a header:
   222             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   223             self.inHeader = None
   224             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   225             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   226             frameLen -= kFrameHeaderSize
   227             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   228                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   229             self.inMessage = self._inMessageForFrame(requestNo,flags)
   230             
   231             if frameLen > 0:
   232                 self.expectingHeader = False
   233                 self.set_terminator(frameLen)
   234             else:
   235                 self._endOfFrame()
   236         
   237         else:
   238             # Got the frame's payload:
   239             self._endOfFrame()
   240     
   241     def _inMessageForFrame(self, requestNo,flags):
   242         message = None
   243         msgType = flags & kMsgFlag_TypeMask
   244         if msgType==kMsgType_Request:
   245             message = self.pendingRequests.get(requestNo)
   246             if message==None and requestNo == self.inNumRequests+1:
   247                 message = IncomingRequest(self,requestNo,flags)
   248                 assert message!=None
   249                 self.pendingRequests[requestNo] = message
   250                 self.inNumRequests += 1
   251         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   252             message = self.pendingResponses.get(requestNo)
   253             message._updateFlags(flags)
   254         
   255         if message != None:
   256             message._beginFrame(flags)
   257         else:
   258             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   259         return message
   260     
   261     def _endOfFrame(self):
   262         msg = self.inMessage
   263         self.inMessage = None
   264         self.expectingHeader = True
   265         self.inHeader = None
   266         self.set_terminator(kFrameHeaderSize) # wait for binary header
   267         if msg:
   268             log.debug("End of frame of %s",msg)
   269             if not msg._moreComing:
   270                 self._receivedMessage(msg)
   271     
   272     def _receivedMessage(self, msg):
   273         log.info("Received: %s",msg)
   274         # Remove from pending:
   275         if msg.isResponse:
   276             del self.pendingResponses[msg.requestNo]
   277         else:
   278             del self.pendingRequests[msg.requestNo]
   279         # Decode:
   280         try:
   281             msg._finished()
   282             if not msg.isResponse:
   283                 if msg._meta:
   284                     self._dispatchMetaRequest(msg)
   285                 else:
   286                     self.onRequest(msg)
   287         except Exception, x:
   288             log.error("Exception handling incoming message: %s", traceback.format_exc())
   289             #FIX: Send an error reply
   290         # Check to see if we're done and ready to close:
   291         self._closeIfReady()
   292     
   293     def _dispatchMetaRequest(self, request):
   294         """Handles dispatching internal meta requests."""
   295         if request['Profile'] == kMsgProfile_Bye:
   296             self._handleCloseRequest(request)
   297         else:
   298             response = request.response
   299             response.isError = True
   300             response['Error-Domain'] = "BLIP"
   301             response['Error-Code'] = 404
   302             response.body = "Unknown meta profile"
   303             response.send()
   304     
   305     ### CLOSING:
   306     
   307     def _handleCloseRequest(self, request):
   308         """Handles requests from a peer to close."""
   309         shouldClose = True
   310         if self.onCloseRequest:
   311             shouldClose = self.onCloseRequest()
   312         if not shouldClose:
   313             log.debug("Sending resfusal to close...")
   314             response = request.response
   315             response.isError = True
   316             response['Error-Domain'] = "BLIP"
   317             response['Error-Code'] = 403
   318             response.body = "Close request denied"
   319             response.send()
   320         else:
   321             log.debug("Sending permission to close...")
   322             response = request.response
   323             response.send()
   324     
   325     def close(self):
   326         """Publicly callable close method. Sends close request to peer."""
   327         if self.status != kOpen:
   328             return False
   329         log.info("Sending close request...")
   330         req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   331         req._meta = True
   332         req.response.onComplete = self._handleCloseResponse
   333         if not req.send():
   334             log.error("Error sending close request.")
   335             return False
   336         else:
   337             self.status = kClosing
   338         return True
   339     
   340     def _handleCloseResponse(self, response):
   341         """Called when we receive a response to a close request."""
   342         log.info("Received close response.")
   343         if response.isError:
   344             # remote refused to close
   345             if self.onCloseRefused:
   346                 self.onCloseRefused(response)
   347             self.status = kOpen
   348         else:
   349             # now wait until everything has finished sending, then actually close
   350             log.info("No refusal, actually closing...")
   351             self._closeWhenPossible = True
   352     
   353     def _closeIfReady(self):
   354         """Checks if all transmissions are complete and then closes the actual socket."""
   355         if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   356             # self._closeWhenPossible = False
   357             log.debug("_closeIfReady closing.")
   358             asynchat.async_chat.close(self)
   359     
   360     def handle_close(self):
   361         """Called when the socket actually closes."""
   362         log.info("Connection closed!")
   363         self.pendingRequests = self.pendingResponses = None
   364         self.outBox = None
   365         if self.status == kClosing:
   366             self.status = kClosed
   367         else:
   368             self.status = kDisconnected
   369         asyncore.dispatcher.close(self)
   370 
   371 
   372 ### MESSAGE CLASSES:
   373 
   374 
   375 class Message (object):
   376     "Abstract superclass of all request/response objects"
   377     
   378     def __init__(self, connection, body=None, properties=None):
   379         self.connection = connection
   380         self.body = body
   381         self.properties = properties or {}
   382         self.requestNo = None
   383     
   384     @property
   385     def flags(self):
   386         if self.isResponse:
   387             if self.isError:
   388                 flags = kMsgType_Error
   389             else:
   390                 flags = kMsgType_Response
   391         else:
   392             flags = kMsgType_Request
   393         if self.urgent:     flags |= kMsgFlag_Urgent
   394         if self.compressed: flags |= kMsgFlag_Compressed
   395         if self.noReply:    flags |= kMsgFlag_NoReply
   396         if self._moreComing:flags |= kMsgFlag_MoreComing
   397         if self._meta:      flags |= kMsgFlag_Meta
   398         return flags
   399     
   400     def __str__(self):
   401         s = "%s[" %(type(self).__name__)
   402         if self.requestNo != None:
   403             s += "#%i" %self.requestNo
   404         if self.urgent:     s += " URG"
   405         if self.compressed: s += " CMP"
   406         if self.noReply:    s += " NOR"
   407         if self._moreComing:s += " MOR"
   408         if self._meta:      s += " MET"
   409         if self.body:       s += " %i bytes" %len(self.body)
   410         return s+"]"
   411     
   412     def __repr__(self):
   413         s = str(self)
   414         if len(self.properties): s += repr(self.properties)
   415         return s
   416     
   417     @property
   418     def isResponse(self):
   419         "Is this message a response?"
   420         return False
   421     
   422     @property
   423     def contentType(self):
   424         return self.properties.get('Content-Type')
   425     
   426     def __getitem__(self, key):     return self.properties.get(key)
   427     def __contains__(self, key):    return key in self.properties
   428     def __len__(self):              return len(self.properties)
   429     def __nonzero__(self):          return True
   430     def __iter__(self):             return self.properties.__iter__()
   431 
   432 
   433 class IncomingMessage (Message):
   434     "Abstract superclass of incoming messages."
   435     
   436     def __init__(self, connection, requestNo, flags):
   437         super(IncomingMessage,self).__init__(connection)
   438         self.requestNo  = requestNo
   439         self._updateFlags(flags)
   440         self.frames     = []
   441     
   442     def _updateFlags(self, flags):
   443         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   444         self.compressed = (flags & kMsgFlag_Compressed) != 0
   445         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   446         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   447         self._meta      = (flags & kMsgFlag_Meta) != 0
   448         self.isError    = (flags & kMsgType_Error) != 0
   449     
   450     def _beginFrame(self, flags):
   451         """Received a frame header."""
   452         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   453     
   454     def _receivedData(self, data):
   455         """Received data from a frame."""
   456         self.frames.append(data)
   457     
   458     def _finished(self):
   459         """The entire message has been received; now decode it."""
   460         encoded = "".join(self.frames)
   461         self.frames = None
   462         
   463         # Decode the properties:
   464         if len(encoded) < 2: raise MessageException, "missing properties length"
   465         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   466         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   467         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   468         
   469         if propSize > 2:
   470             proplist = encoded[2:propSize-1].split('\000')
   471         
   472             if len(proplist) & 1: raise MessageException, "odd number of property strings"
   473             for i in xrange(0,len(proplist),2):
   474                 def expand(str):
   475                     if len(str)==1:
   476                         str = IncomingMessage.__expandDict.get(str,str)
   477                     return str
   478                 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   479         
   480         encoded = encoded[propSize:]
   481         # Decode the body:
   482         if self.compressed and len(encoded)>0:
   483             try:
   484                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   485             except zlib.error:
   486                 raise MessageException, sys.exc_info()[1]
   487         self.body = encoded
   488     
   489     __expandDict= {'\x01' : "Content-Type",
   490                    '\x02' : "Profile",
   491                    '\x03' : "application/octet-stream",
   492                    '\x04' : "text/plain; charset=UTF-8",
   493                    '\x05' : "text/xml",
   494                    '\x06' : "text/yaml",
   495                    '\x07' : "Channel",
   496                    '\x08' : "Error-Code",
   497                    '\x09' : "Error-Domain"}
   498 
   499 
   500 class OutgoingMessage (Message):
   501     "Abstract superclass of outgoing requests/responses."
   502     
   503     def __init__(self, connection, body=None, properties=None):
   504         Message.__init__(self,connection,body,properties)
   505         self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   506         self._moreComing = True
   507     
   508     def __setitem__(self, key,val):
   509         self.properties[key] = val
   510     def __delitem__(self, key):
   511         del self.properties[key]
   512     
   513     @property
   514     def sent(self):
   515         return hasattr(self,'encoded')
   516     
   517     def _encode(self):
   518         "Generates the message's encoded form, prior to sending it."
   519         out = StringIO()
   520         for (key,value) in self.properties.iteritems():
   521             def _writePropString(s):
   522                 out.write(str(s))    #FIX: Abbreviate
   523                 out.write('\000')
   524             _writePropString(key)
   525             _writePropString(value)
   526         propertiesSize = out.tell()
   527         assert propertiesSize<65536     #FIX: Return an error instead
   528         
   529         body = self.body or ""
   530         if self.compressed:
   531             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   532             out.write(z.compress(body))
   533             body = z.flush()
   534         out.write(body)
   535         
   536         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   537         out.close()
   538         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   539         self.bytesSent = 0
   540     
   541     def _sendNextFrame(self, maxLen):
   542         pos = self.bytesSent
   543         payload = self.encoded[pos:pos+maxLen]
   544         pos += len(payload)
   545         self._moreComing = (pos < len(self.encoded))
   546         if not self._moreComing:
   547             self.encoded = None
   548         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   549         
   550         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   551                                                    self.requestNo,
   552                                                    self.flags,
   553                                                    kFrameHeaderSize+len(payload))
   554         self.bytesSent = pos
   555         return header + payload
   556 
   557 
   558 class Request (object):
   559     @property
   560     def response(self):
   561         "The response object for this request."
   562         if self.noReply:
   563             return None
   564         r = self.__dict__.get('_response')
   565         if r==None:
   566             r = self._response = self._createResponse()
   567         return r
   568 
   569 
   570 class Response (Message):
   571     def _setRequest(self, request):
   572         assert not request.noReply
   573         self.request = request
   574         self.requestNo = request.requestNo
   575         self.urgent = request.urgent
   576     
   577     @property
   578     def isResponse(self):
   579         return True
   580 
   581 
   582 class IncomingRequest (IncomingMessage, Request):
   583     def _createResponse(self):
   584         return OutgoingResponse(self)
   585 
   586 
   587 class OutgoingRequest (OutgoingMessage, Request):
   588     def _createResponse(self):
   589         return IncomingResponse(self)
   590     
   591     def send(self):
   592         self._encode()
   593         return self.connection._sendRequest(self) and self.response
   594 
   595 
   596 class IncomingResponse (IncomingMessage, Response):
   597     def __init__(self, request):
   598         IncomingMessage.__init__(self,request.connection,None,0)
   599         self._setRequest(request)
   600         self.onComplete = None
   601     
   602     def _finished(self):
   603         super(IncomingResponse,self)._finished()
   604         if self.onComplete:
   605             try:
   606                 self.onComplete(self)
   607             except Exception, x:
   608                 log.error("Exception dispatching response: %s", traceback.format_exc())
   609 
   610 
   611 class OutgoingResponse (OutgoingMessage, Response):
   612     def __init__(self, request):
   613         OutgoingMessage.__init__(self,request.connection)
   614         self._setRequest(request)
   615     
   616     def send(self):
   617         self._encode()
   618         return self.connection._sendMessage(self)
   619 
   620 
   621 """
   622  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   623  
   624  Redistribution and use in source and binary forms, with or without modification, are permitted
   625  provided that the following conditions are met:
   626  
   627  * Redistributions of source code must retain the above copyright notice, this list of conditions
   628  and the following disclaimer.
   629  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   630  and the following disclaimer in the documentation and/or other materials provided with the
   631  distribution.
   632  
   633  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   634  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   635  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   636  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   637  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   638   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   639  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   640  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   641 """