Python/BLIP.py
author morrowa
Thu Jul 02 19:58:11 2009 -0700 (2009-07-02)
changeset 56 6c3b5372a307
parent 54 6d1392a3e0a6
child 58 6577813acf12
permissions -rw-r--r--
Removed unnecessary files. Toned down logging. Added null logging handler to BLIP so client code doesn't have to use logging. Modified test drivers to work against Cocoa versions.
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F206
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 kMsgFlag_Meta       = 0x0100
    40 
    41 kMsgType_Request    = 0
    42 kMsgType_Response   = 1
    43 kMsgType_Error      = 2
    44 
    45 kMsgProfile_Hi      = "Hi"
    46 kMsgProfile_Bye     = "Bye"
    47 
    48 # Logging Setup
    49 class NullLoggingHandler(logging.Handler):
    50     def emit(self, record):
    51         pass
    52 
    53 log = logging.getLogger('BLIP')
    54 # This line prevents the "No handlers found" warning if the calling code does not use logging.
    55 log.addHandler(NullLoggingHandler())
    56 log.propagate = True
    57 
    58 
    59 class MessageException(Exception):
    60     pass
    61 
    62 class ConnectionException(Exception):
    63     pass
    64 
    65 
    66 ### LISTENER AND CONNECTION CLASSES:
    67 
    68 
    69 class Listener (asyncore.dispatcher):
    70     "BLIP listener/server class"
    71     
    72     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    73         "Create a listener on a port"
    74         asyncore.dispatcher.__init__(self)
    75         self.onConnected = self.onRequest = None
    76         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    77         self.bind( ('',port) )
    78         self.listen(5)
    79         self.sslKeyFile=sslKeyFile
    80         self.sslCertFile=sslCertFile
    81         log.info("Listening on port %u", port)
    82     
    83     def handle_accept( self ):
    84         socket,address = self.accept()
    85         if self.sslKeyFile:
    86             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    87         conn = Connection(address, sock=socket, listener=self)
    88         conn.onRequest = self.onRequest
    89         if self.onConnected:
    90             self.onConnected(conn)
    91 
    92     def handle_error(self):
    93         (typ,val,trace) = sys.exc_info()
    94         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    95         self.close()
    96     
    97 
    98 
    99 class Connection (asynchat.async_chat):
   100     def __init__( self, address, sock=None, listener=None, ssl=None ):
   101         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
   102         "otherwise it'll open a new outgoing socket."
   103         if sock:
   104             asynchat.async_chat.__init__(self,sock)
   105             log.info("Accepted connection from %s",address)
   106             self.status = kOpen
   107         else:
   108             asynchat.async_chat.__init__(self)
   109             log.info("Opening connection to %s",address)
   110             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   111             self.status = kOpening
   112             if ssl:
   113                 ssl(self.socket)
   114             self.connect(address)
   115         self.address = address
   116         self.listener = listener
   117         self.onRequest = self.onCloseRequest = self.onCloseRefused = None
   118         self.pendingRequests = {}
   119         self.pendingResponses = {}
   120         self.outBox = []
   121         self.inMessage = None
   122         self.inNumRequests = self.outNumRequests = 0
   123         self.sending = False
   124         self._endOfFrame()
   125         self._closeWhenPossible = False
   126     
   127     def handle_connect(self):
   128         log.info("Connection open!")
   129         self.status = kOpen
   130     
   131     def handle_error(self):
   132         (typ,val,trace) = sys.exc_info()
   133         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   134         self.discard_buffers()
   135         self.status = kDisconnected
   136         self.close()
   137     
   138     
   139     ### SENDING:
   140     
   141     @property
   142     def isOpen(self):
   143         return self.status==kOpening or self.status==kOpen
   144     
   145     @property
   146     def canSend(self):
   147         return self.isOpen and not self._closeWhenPossible
   148     
   149     def _sendMessage(self, msg):
   150         if self.isOpen:
   151             self._outQueueMessage(msg,True)
   152             if not self.sending:
   153                 log.debug("Waking up the output stream")
   154                 self.sending = True
   155                 self.push_with_producer(self)
   156             return True
   157         else:
   158             return False
   159     
   160     def _sendRequest(self, req):
   161         if self.canSend:
   162             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   163             response = req.response
   164             if response:
   165                 response.requestNo = requestNo
   166                 self.pendingResponses[requestNo] = response
   167                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   168             return self._sendMessage(req)
   169         else:
   170             log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
   171             return False
   172     
   173     def _outQueueMessage(self, msg,isNew=True):
   174         n = len(self.outBox)
   175         index = n
   176         if msg.urgent and n>1:
   177             while index > 0:
   178                 otherMsg = self.outBox[index-1]
   179                 if otherMsg.urgent:
   180                     if index<n:
   181                         index += 1
   182                     break
   183                 elif isNew and otherMsg.bytesSent==0:
   184                     break
   185                 index -= 1
   186             else:
   187                 index = 1
   188         
   189         self.outBox.insert(index,msg)
   190         if isNew:
   191             log.info("Queuing %s at index %i",msg,index)
   192         else:
   193             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   194     
   195     def more(self):
   196         n = len(self.outBox)
   197         if n > 0:
   198             msg = self.outBox.pop(0)
   199             frameSize = 4096
   200             if msg.urgent or n==1 or not self.outBox[0].urgent:
   201                 frameSize *= 4
   202             data = msg._sendNextFrame(frameSize)
   203             if msg._moreComing:
   204                 self._outQueueMessage(msg,isNew=False)
   205             else:
   206                 log.info("Finished sending %s",msg)
   207             return data
   208         else:
   209             log.debug("Nothing more to send")
   210             self.sending = False
   211             self._closeIfReady()
   212             return None
   213     
   214     ### RECEIVING:
   215     
   216     def collect_incoming_data(self, data):
   217         if self.expectingHeader:
   218             if self.inHeader==None:
   219                 self.inHeader = data
   220             else:
   221                 self.inHeader += data
   222         elif self.inMessage:
   223             self.inMessage._receivedData(data)
   224     
   225     def found_terminator(self):
   226         if self.expectingHeader:
   227             # Got a header:
   228             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   229             self.inHeader = None
   230             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   231             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   232             frameLen -= kFrameHeaderSize
   233             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   234                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   235             self.inMessage = self._inMessageForFrame(requestNo,flags)
   236             
   237             if frameLen > 0:
   238                 self.expectingHeader = False
   239                 self.set_terminator(frameLen)
   240             else:
   241                 self._endOfFrame()
   242         
   243         else:
   244             # Got the frame's payload:
   245             self._endOfFrame()
   246     
   247     def _inMessageForFrame(self, requestNo,flags):
   248         message = None
   249         msgType = flags & kMsgFlag_TypeMask
   250         if msgType==kMsgType_Request:
   251             message = self.pendingRequests.get(requestNo)
   252             if message==None and requestNo == self.inNumRequests+1:
   253                 message = IncomingRequest(self,requestNo,flags)
   254                 assert message!=None
   255                 self.pendingRequests[requestNo] = message
   256                 self.inNumRequests += 1
   257         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   258             message = self.pendingResponses.get(requestNo)
   259             message._updateFlags(flags)
   260         
   261         if message != None:
   262             message._beginFrame(flags)
   263         else:
   264             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   265         return message
   266     
   267     def _endOfFrame(self):
   268         msg = self.inMessage
   269         self.inMessage = None
   270         self.expectingHeader = True
   271         self.inHeader = None
   272         self.set_terminator(kFrameHeaderSize) # wait for binary header
   273         if msg:
   274             log.debug("End of frame of %s",msg)
   275             if not msg._moreComing:
   276                 self._receivedMessage(msg)
   277     
   278     def _receivedMessage(self, msg):
   279         log.info("Received: %s",msg)
   280         # Remove from pending:
   281         if msg.isResponse:
   282             del self.pendingResponses[msg.requestNo]
   283         else:
   284             del self.pendingRequests[msg.requestNo]
   285         # Decode:
   286         try:
   287             msg._finished()
   288             if not msg.isResponse:
   289                 if msg._meta:
   290                     self._dispatchMetaRequest(msg)
   291                 else:
   292                     self.onRequest(msg)
   293         except Exception, x:
   294             log.error("Exception handling incoming message: %s", traceback.format_exc())
   295             #FIX: Send an error reply
   296         # Check to see if we're done and ready to close:
   297         self._closeIfReady()
   298     
   299     def _dispatchMetaRequest(self, request):
   300         """Handles dispatching internal meta requests."""
   301         if request['Profile'] == kMsgProfile_Bye:
   302             self._handleCloseRequest(request)
   303         else:
   304             response = request.response
   305             response.isError = True
   306             response['Error-Domain'] = "BLIP"
   307             response['Error-Code'] = 404
   308             response.body = "Unknown meta profile"
   309             response.send()
   310     
   311     ### CLOSING:
   312     
   313     def _handleCloseRequest(self, request):
   314         """Handles requests from a peer to close."""
   315         shouldClose = True
   316         if self.onCloseRequest:
   317             shouldClose = self.onCloseRequest()
   318         if not shouldClose:
   319             log.debug("Sending resfusal to close...")
   320             response = request.response
   321             response.isError = True
   322             response['Error-Domain'] = "BLIP"
   323             response['Error-Code'] = 403
   324             response.body = "Close request denied"
   325             response.send()
   326         else:
   327             log.debug("Sending permission to close...")
   328             response = request.response
   329             response.send()
   330     
   331     def close(self):
   332         """Publicly callable close method. Sends close request to peer."""
   333         if self.status != kOpen:
   334             return False
   335         log.info("Sending close request...")
   336         req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   337         req._meta = True
   338         req.response.onComplete = self._handleCloseResponse
   339         if not req.send():
   340             log.error("Error sending close request.")
   341             return False
   342         else:
   343             self.status = kClosing
   344         return True
   345     
   346     def _handleCloseResponse(self, response):
   347         """Called when we receive a response to a close request."""
   348         log.info("Received close response.")
   349         if response.isError:
   350             # remote refused to close
   351             if self.onCloseRefused:
   352                 self.onCloseRefused(response)
   353             self.status = kOpen
   354         else:
   355             # now wait until everything has finished sending, then actually close
   356             log.info("No refusal, actually closing...")
   357             self._closeWhenPossible = True
   358     
   359     def _closeIfReady(self):
   360         """Checks if all transmissions are complete and then closes the actual socket."""
   361         if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   362             # self._closeWhenPossible = False
   363             log.debug("_closeIfReady closing.")
   364             asynchat.async_chat.close(self)
   365     
   366     def handle_close(self):
   367         """Called when the socket actually closes."""
   368         log.info("Connection closed!")
   369         self.pendingRequests = self.pendingResponses = None
   370         self.outBox = None
   371         if self.status == kClosing:
   372             self.status = kClosed
   373         else:
   374             self.status = kDisconnected
   375         asyncore.dispatcher.close(self)
   376 
   377 
   378 ### MESSAGE CLASSES:
   379 
   380 
   381 class Message (object):
   382     "Abstract superclass of all request/response objects"
   383     
   384     def __init__(self, connection, body=None, properties=None):
   385         self.connection = connection
   386         self.body = body
   387         self.properties = properties or {}
   388         self.requestNo = None
   389     
   390     @property
   391     def flags(self):
   392         if self.isResponse:
   393             if self.isError:
   394                 flags = kMsgType_Error
   395             else:
   396                 flags = kMsgType_Response
   397         else:
   398             flags = kMsgType_Request
   399         if self.urgent:     flags |= kMsgFlag_Urgent
   400         if self.compressed: flags |= kMsgFlag_Compressed
   401         if self.noReply:    flags |= kMsgFlag_NoReply
   402         if self._moreComing:flags |= kMsgFlag_MoreComing
   403         if self._meta:      flags |= kMsgFlag_Meta
   404         return flags
   405     
   406     def __str__(self):
   407         s = "%s[" %(type(self).__name__)
   408         if self.requestNo != None:
   409             s += "#%i" %self.requestNo
   410         if self.urgent:     s += " URG"
   411         if self.compressed: s += " CMP"
   412         if self.noReply:    s += " NOR"
   413         if self._moreComing:s += " MOR"
   414         if self._meta:      s += " MET"
   415         if self.body:       s += " %i bytes" %len(self.body)
   416         return s+"]"
   417     
   418     def __repr__(self):
   419         s = str(self)
   420         if len(self.properties): s += repr(self.properties)
   421         return s
   422     
   423     @property
   424     def isResponse(self):
   425         "Is this message a response?"
   426         return False
   427     
   428     @property
   429     def contentType(self):
   430         return self.properties.get('Content-Type')
   431     
   432     def __getitem__(self, key):     return self.properties.get(key)
   433     def __contains__(self, key):    return key in self.properties
   434     def __len__(self):              return len(self.properties)
   435     def __nonzero__(self):          return True
   436     def __iter__(self):             return self.properties.__iter__()
   437 
   438 
   439 class IncomingMessage (Message):
   440     "Abstract superclass of incoming messages."
   441     
   442     def __init__(self, connection, requestNo, flags):
   443         super(IncomingMessage,self).__init__(connection)
   444         self.requestNo  = requestNo
   445         self._updateFlags(flags)
   446         self.frames     = []
   447     
   448     def _updateFlags(self, flags):
   449         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   450         self.compressed = (flags & kMsgFlag_Compressed) != 0
   451         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   452         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   453         self._meta      = (flags & kMsgFlag_Meta) != 0
   454         self.isError    = (flags & kMsgType_Error) != 0
   455     
   456     def _beginFrame(self, flags):
   457         """Received a frame header."""
   458         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   459     
   460     def _receivedData(self, data):
   461         """Received data from a frame."""
   462         self.frames.append(data)
   463     
   464     def _finished(self):
   465         """The entire message has been received; now decode it."""
   466         encoded = "".join(self.frames)
   467         self.frames = None
   468         
   469         # Decode the properties:
   470         if len(encoded) < 2: raise MessageException, "missing properties length"
   471         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   472         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   473         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   474         
   475         if propSize > 2:
   476             proplist = encoded[2:propSize-1].split('\000')
   477         
   478             if len(proplist) & 1: raise MessageException, "odd number of property strings"
   479             for i in xrange(0,len(proplist),2):
   480                 def expand(str):
   481                     if len(str)==1:
   482                         str = IncomingMessage.__expandDict.get(str,str)
   483                     return str
   484                 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   485         
   486         encoded = encoded[propSize:]
   487         # Decode the body:
   488         if self.compressed and len(encoded)>0:
   489             try:
   490                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   491             except zlib.error:
   492                 raise MessageException, sys.exc_info()[1]
   493         self.body = encoded
   494     
   495     __expandDict= {'\x01' : "Content-Type",
   496                    '\x02' : "Profile",
   497                    '\x03' : "application/octet-stream",
   498                    '\x04' : "text/plain; charset=UTF-8",
   499                    '\x05' : "text/xml",
   500                    '\x06' : "text/yaml",
   501                    '\x07' : "Channel",
   502                    '\x08' : "Error-Code",
   503                    '\x09' : "Error-Domain"}
   504 
   505 
   506 class OutgoingMessage (Message):
   507     "Abstract superclass of outgoing requests/responses."
   508     
   509     def __init__(self, connection, body=None, properties=None):
   510         Message.__init__(self,connection,body,properties)
   511         self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   512         self._moreComing = True
   513     
   514     def __setitem__(self, key,val):
   515         self.properties[key] = val
   516     def __delitem__(self, key):
   517         del self.properties[key]
   518     
   519     @property
   520     def sent(self):
   521         return hasattr(self,'encoded')
   522     
   523     def _encode(self):
   524         "Generates the message's encoded form, prior to sending it."
   525         out = StringIO()
   526         for (key,value) in self.properties.iteritems():
   527             def _writePropString(s):
   528                 out.write(str(s))    #FIX: Abbreviate
   529                 out.write('\000')
   530             _writePropString(key)
   531             _writePropString(value)
   532         propertiesSize = out.tell()
   533         assert propertiesSize<65536     #FIX: Return an error instead
   534         
   535         body = self.body or ""
   536         if self.compressed:
   537             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   538             out.write(z.compress(body))
   539             body = z.flush()
   540         out.write(body)
   541         
   542         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   543         out.close()
   544         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   545         self.bytesSent = 0
   546     
   547     def _sendNextFrame(self, maxLen):
   548         pos = self.bytesSent
   549         payload = self.encoded[pos:pos+maxLen]
   550         pos += len(payload)
   551         self._moreComing = (pos < len(self.encoded))
   552         if not self._moreComing:
   553             self.encoded = None
   554         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   555         
   556         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   557                                                    self.requestNo,
   558                                                    self.flags,
   559                                                    kFrameHeaderSize+len(payload))
   560         self.bytesSent = pos
   561         return header + payload
   562 
   563 
   564 class Request (object):
   565     @property
   566     def response(self):
   567         "The response object for this request."
   568         if self.noReply:
   569             return None
   570         r = self.__dict__.get('_response')
   571         if r==None:
   572             r = self._response = self._createResponse()
   573         return r
   574 
   575 
   576 class Response (Message):
   577     def _setRequest(self, request):
   578         assert not request.noReply
   579         self.request = request
   580         self.requestNo = request.requestNo
   581         self.urgent = request.urgent
   582     
   583     @property
   584     def isResponse(self):
   585         return True
   586 
   587 
   588 class IncomingRequest (IncomingMessage, Request):
   589     def _createResponse(self):
   590         return OutgoingResponse(self)
   591 
   592 
   593 class OutgoingRequest (OutgoingMessage, Request):
   594     def _createResponse(self):
   595         return IncomingResponse(self)
   596     
   597     def send(self):
   598         self._encode()
   599         return self.connection._sendRequest(self) and self.response
   600 
   601 
   602 class IncomingResponse (IncomingMessage, Response):
   603     def __init__(self, request):
   604         IncomingMessage.__init__(self,request.connection,None,0)
   605         self._setRequest(request)
   606         self.onComplete = None
   607     
   608     def _finished(self):
   609         super(IncomingResponse,self)._finished()
   610         if self.onComplete:
   611             try:
   612                 self.onComplete(self)
   613             except Exception, x:
   614                 log.error("Exception dispatching response: %s", traceback.format_exc())
   615 
   616 
   617 class OutgoingResponse (OutgoingMessage, Response):
   618     def __init__(self, request):
   619         OutgoingMessage.__init__(self,request.connection)
   620         self._setRequest(request)
   621     
   622     def send(self):
   623         self._encode()
   624         return self.connection._sendMessage(self)
   625 
   626 
   627 """
   628  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   629  
   630  Redistribution and use in source and binary forms, with or without modification, are permitted
   631  provided that the following conditions are met:
   632  
   633  * Redistributions of source code must retain the above copyright notice, this list of conditions
   634  and the following disclaimer.
   635  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   636  and the following disclaimer in the documentation and/or other materials provided with the
   637  distribution.
   638  
   639  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   640  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   641  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   642  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   643  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   644   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   645  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   646  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   647 """