Python/BLIP.py
author morrowa
Fri Jul 03 17:50:28 2009 -0700 (2009-07-03)
changeset 58 6577813acf12
parent 56 6c3b5372a307
permissions -rw-r--r--
Fixed bug which caused PyBLIP to stop sending responses while the connection was closing.
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F206
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 kMsgFlag_Meta       = 0x0100
    40 
    41 kMsgType_Request    = 0
    42 kMsgType_Response   = 1
    43 kMsgType_Error      = 2
    44 
    45 kMsgProfile_Hi      = "Hi"
    46 kMsgProfile_Bye     = "Bye"
    47 
    48 # Logging Setup
    49 class NullLoggingHandler(logging.Handler):
    50     def emit(self, record):
    51         pass
    52 
    53 log = logging.getLogger('BLIP')
    54 # This line prevents the "No handlers found" warning if the calling code does not use logging.
    55 log.addHandler(NullLoggingHandler())
    56 log.propagate = True
    57 
    58 
    59 class MessageException(Exception):
    60     pass
    61 
    62 class ConnectionException(Exception):
    63     pass
    64 
    65 
    66 ### LISTENER AND CONNECTION CLASSES:
    67 
    68 
    69 class Listener (asyncore.dispatcher):
    70     "BLIP listener/server class"
    71     
    72     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    73         "Create a listener on a port"
    74         asyncore.dispatcher.__init__(self)
    75         self.onConnected = self.onRequest = None
    76         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    77         self.bind( ('',port) )
    78         self.listen(5)
    79         self.sslKeyFile=sslKeyFile
    80         self.sslCertFile=sslCertFile
    81         log.info("Listening on port %u", port)
    82     
    83     def handle_accept( self ):
    84         socket,address = self.accept()
    85         if self.sslKeyFile:
    86             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    87         conn = Connection(address, sock=socket, listener=self)
    88         conn.onRequest = self.onRequest
    89         if self.onConnected:
    90             self.onConnected(conn)
    91 
    92     def handle_error(self):
    93         (typ,val,trace) = sys.exc_info()
    94         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    95         self.close()
    96     
    97 
    98 
    99 class Connection (asynchat.async_chat):
   100     def __init__( self, address, sock=None, listener=None, ssl=None ):
   101         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
   102         "otherwise it'll open a new outgoing socket."
   103         if sock:
   104             asynchat.async_chat.__init__(self,sock)
   105             log.info("Accepted connection from %s",address)
   106             self.status = kOpen
   107         else:
   108             asynchat.async_chat.__init__(self)
   109             log.info("Opening connection to %s",address)
   110             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   111             self.status = kOpening
   112             if ssl:
   113                 ssl(self.socket)
   114             self.connect(address)
   115         self.address = address
   116         self.listener = listener
   117         self.onRequest = self.onCloseRequest = self.onCloseRefused = None
   118         self.pendingRequests = {}
   119         self.pendingResponses = {}
   120         self.outBox = []
   121         self.inMessage = None
   122         self.inNumRequests = self.outNumRequests = 0
   123         self.sending = False
   124         self._endOfFrame()
   125         self._closeWhenPossible = False
   126     
   127     def handle_connect(self):
   128         log.info("Connection open!")
   129         self.status = kOpen
   130     
   131     def handle_error(self):
   132         (typ,val,trace) = sys.exc_info()
   133         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   134         self.discard_buffers()
   135         self.status = kDisconnected
   136         self.close()
   137     
   138     
   139     ### SENDING:
   140     
   141     @property
   142     def isOpen(self):
   143         return self.status==kOpening or self.status==kOpen or self.status==kClosing
   144     
   145     @property
   146     def canSend(self):
   147         return self.isOpen and not self._closeWhenPossible
   148     
   149     def _sendMessage(self, msg):
   150         if self.isOpen:
   151             self._outQueueMessage(msg,True)
   152             if not self.sending:
   153                 log.debug("Waking up the output stream")
   154                 self.sending = True
   155                 self.push_with_producer(self)
   156             return True
   157         else:
   158             return False
   159     
   160     def _sendRequest(self, req):
   161         if self.canSend:
   162             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   163             response = req.response
   164             if response:
   165                 response.requestNo = requestNo
   166                 self.pendingResponses[requestNo] = response
   167                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   168             return self._sendMessage(req)
   169         else:
   170             log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
   171             return False
   172     
   173     def _outQueueMessage(self, msg,isNew=True):
   174         n = len(self.outBox)
   175         index = n
   176         if msg.urgent and n>1:
   177             while index > 0:
   178                 otherMsg = self.outBox[index-1]
   179                 if otherMsg.urgent:
   180                     if index<n:
   181                         index += 1
   182                     break
   183                 elif isNew and otherMsg.bytesSent==0:
   184                     break
   185                 index -= 1
   186             else:
   187                 index = 1
   188         
   189         self.outBox.insert(index,msg)
   190         if isNew:
   191             log.info("Queuing %s at index %i",msg,index)
   192         else:
   193             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   194     
   195     def more(self):
   196         n = len(self.outBox)
   197         if n > 0:
   198             msg = self.outBox.pop(0)
   199             frameSize = 4096
   200             if msg.urgent or n==1 or not self.outBox[0].urgent:
   201                 frameSize *= 4
   202             data = msg._sendNextFrame(frameSize)
   203             if msg._moreComing:
   204                 self._outQueueMessage(msg,isNew=False)
   205             else:
   206                 log.info("Finished sending %s",msg)
   207             return data
   208         else:
   209             log.debug("Nothing more to send")
   210             self.sending = False
   211             self._closeIfReady()
   212             return None
   213     
   214     ### RECEIVING:
   215     
   216     def collect_incoming_data(self, data):
   217         if self.expectingHeader:
   218             if self.inHeader==None:
   219                 self.inHeader = data
   220             else:
   221                 self.inHeader += data
   222         elif self.inMessage:
   223             self.inMessage._receivedData(data)
   224     
   225     def found_terminator(self):
   226         if self.expectingHeader:
   227             # Got a header:
   228             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   229             self.inHeader = None
   230             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   231             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   232             frameLen -= kFrameHeaderSize
   233             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   234                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   235             self.inMessage = self._inMessageForFrame(requestNo,flags)
   236             
   237             if frameLen > 0:
   238                 self.expectingHeader = False
   239                 self.set_terminator(frameLen)
   240             else:
   241                 self._endOfFrame()
   242         
   243         else:
   244             # Got the frame's payload:
   245             self._endOfFrame()
   246     
   247     def _inMessageForFrame(self, requestNo,flags):
   248         message = None
   249         msgType = flags & kMsgFlag_TypeMask
   250         if msgType==kMsgType_Request:
   251             message = self.pendingRequests.get(requestNo)
   252             if message==None and requestNo == self.inNumRequests+1:
   253                 message = IncomingRequest(self,requestNo,flags)
   254                 assert message!=None
   255                 self.pendingRequests[requestNo] = message
   256                 self.inNumRequests += 1
   257         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   258             message = self.pendingResponses.get(requestNo)
   259             message._updateFlags(flags)
   260         
   261         if message != None:
   262             message._beginFrame(flags)
   263         else:
   264             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   265         return message
   266     
   267     def _endOfFrame(self):
   268         msg = self.inMessage
   269         self.inMessage = None
   270         self.expectingHeader = True
   271         self.inHeader = None
   272         self.set_terminator(kFrameHeaderSize) # wait for binary header
   273         if msg:
   274             log.debug("End of frame of %s",msg)
   275             if not msg._moreComing:
   276                 self._receivedMessage(msg)
   277     
   278     def _receivedMessage(self, msg):
   279         log.info("Received: %s",msg)
   280         # Remove from pending:
   281         if msg.isResponse:
   282             del self.pendingResponses[msg.requestNo]
   283         else:
   284             del self.pendingRequests[msg.requestNo]
   285         # Decode:
   286         try:
   287             msg._finished()
   288             if not msg.isResponse:
   289                 if msg._meta:
   290                     self._dispatchMetaRequest(msg)
   291                 else:
   292                     self.onRequest(msg)
   293                     if not msg.response.sent:
   294                         log.error("**** Request received, but a response was never sent! Request: %r", msg)
   295         except Exception, x:
   296             log.error("Exception handling incoming message: %s", traceback.format_exc())
   297             #FIX: Send an error reply
   298         # Check to see if we're done and ready to close:
   299         self._closeIfReady()
   300     
   301     def _dispatchMetaRequest(self, request):
   302         """Handles dispatching internal meta requests."""
   303         if request['Profile'] == kMsgProfile_Bye:
   304             self._handleCloseRequest(request)
   305         else:
   306             response = request.response
   307             response.isError = True
   308             response['Error-Domain'] = "BLIP"
   309             response['Error-Code'] = 404
   310             response.body = "Unknown meta profile"
   311             response.send()
   312     
   313     ### CLOSING:
   314     
   315     def _handleCloseRequest(self, request):
   316         """Handles requests from a peer to close."""
   317         shouldClose = True
   318         if self.onCloseRequest:
   319             shouldClose = self.onCloseRequest()
   320         if not shouldClose:
   321             log.debug("Sending resfusal to close...")
   322             response = request.response
   323             response.isError = True
   324             response['Error-Domain'] = "BLIP"
   325             response['Error-Code'] = 403
   326             response.body = "Close request denied"
   327             response.send()
   328         else:
   329             log.debug("Sending permission to close...")
   330             response = request.response
   331             response.send()
   332     
   333     def close(self):
   334         """Publicly callable close method. Sends close request to peer."""
   335         if self.status != kOpen:
   336             return False
   337         log.info("Sending close request...")
   338         req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   339         req._meta = True
   340         req.response.onComplete = self._handleCloseResponse
   341         if not req.send():
   342             log.error("Error sending close request.")
   343             return False
   344         else:
   345             self.status = kClosing
   346         return True
   347     
   348     def _handleCloseResponse(self, response):
   349         """Called when we receive a response to a close request."""
   350         log.info("Received close response.")
   351         if response.isError:
   352             # remote refused to close
   353             if self.onCloseRefused:
   354                 self.onCloseRefused(response)
   355             self.status = kOpen
   356         else:
   357             # now wait until everything has finished sending, then actually close
   358             log.info("No refusal, actually closing...")
   359             self._closeWhenPossible = True
   360     
   361     def _closeIfReady(self):
   362         """Checks if all transmissions are complete and then closes the actual socket."""
   363         if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   364             # self._closeWhenPossible = False
   365             log.debug("_closeIfReady closing.")
   366             asynchat.async_chat.close(self)
   367     
   368     def handle_close(self):
   369         """Called when the socket actually closes."""
   370         log.info("Connection closed!")
   371         self.pendingRequests = self.pendingResponses = None
   372         self.outBox = None
   373         if self.status == kClosing:
   374             self.status = kClosed
   375         else:
   376             self.status = kDisconnected
   377         asyncore.dispatcher.close(self)
   378 
   379 
   380 ### MESSAGE CLASSES:
   381 
   382 
   383 class Message (object):
   384     "Abstract superclass of all request/response objects"
   385     
   386     def __init__(self, connection, body=None, properties=None):
   387         self.connection = connection
   388         self.body = body
   389         self.properties = properties or {}
   390         self.requestNo = None
   391     
   392     @property
   393     def flags(self):
   394         if self.isResponse:
   395             if self.isError:
   396                 flags = kMsgType_Error
   397             else:
   398                 flags = kMsgType_Response
   399         else:
   400             flags = kMsgType_Request
   401         if self.urgent:     flags |= kMsgFlag_Urgent
   402         if self.compressed: flags |= kMsgFlag_Compressed
   403         if self.noReply:    flags |= kMsgFlag_NoReply
   404         if self._moreComing:flags |= kMsgFlag_MoreComing
   405         if self._meta:      flags |= kMsgFlag_Meta
   406         return flags
   407     
   408     def __str__(self):
   409         s = "%s[" %(type(self).__name__)
   410         if self.requestNo != None:
   411             s += "#%i" %self.requestNo
   412         if self.urgent:     s += " URG"
   413         if self.compressed: s += " CMP"
   414         if self.noReply:    s += " NOR"
   415         if self._moreComing:s += " MOR"
   416         if self._meta:      s += " MET"
   417         if self.body:       s += " %i bytes" %len(self.body)
   418         return s+"]"
   419     
   420     def __repr__(self):
   421         s = str(self)
   422         if len(self.properties): s += repr(self.properties)
   423         return s
   424     
   425     @property
   426     def isResponse(self):
   427         "Is this message a response?"
   428         return False
   429     
   430     @property
   431     def contentType(self):
   432         return self.properties.get('Content-Type')
   433     
   434     def __getitem__(self, key):     return self.properties.get(key)
   435     def __contains__(self, key):    return key in self.properties
   436     def __len__(self):              return len(self.properties)
   437     def __nonzero__(self):          return True
   438     def __iter__(self):             return self.properties.__iter__()
   439 
   440 
   441 class IncomingMessage (Message):
   442     "Abstract superclass of incoming messages."
   443     
   444     def __init__(self, connection, requestNo, flags):
   445         super(IncomingMessage,self).__init__(connection)
   446         self.requestNo  = requestNo
   447         self._updateFlags(flags)
   448         self.frames     = []
   449     
   450     def _updateFlags(self, flags):
   451         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   452         self.compressed = (flags & kMsgFlag_Compressed) != 0
   453         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   454         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   455         self._meta      = (flags & kMsgFlag_Meta) != 0
   456         self.isError    = (flags & kMsgType_Error) != 0
   457     
   458     def _beginFrame(self, flags):
   459         """Received a frame header."""
   460         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   461     
   462     def _receivedData(self, data):
   463         """Received data from a frame."""
   464         self.frames.append(data)
   465     
   466     def _finished(self):
   467         """The entire message has been received; now decode it."""
   468         encoded = "".join(self.frames)
   469         self.frames = None
   470         
   471         # Decode the properties:
   472         if len(encoded) < 2: raise MessageException, "missing properties length"
   473         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   474         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   475         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   476         
   477         if propSize > 2:
   478             proplist = encoded[2:propSize-1].split('\000')
   479         
   480             if len(proplist) & 1: raise MessageException, "odd number of property strings"
   481             for i in xrange(0,len(proplist),2):
   482                 def expand(str):
   483                     if len(str)==1:
   484                         str = IncomingMessage.__expandDict.get(str,str)
   485                     return str
   486                 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   487         
   488         encoded = encoded[propSize:]
   489         # Decode the body:
   490         if self.compressed and len(encoded)>0:
   491             try:
   492                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   493             except zlib.error:
   494                 raise MessageException, sys.exc_info()[1]
   495         self.body = encoded
   496     
   497     __expandDict= {'\x01' : "Content-Type",
   498                    '\x02' : "Profile",
   499                    '\x03' : "application/octet-stream",
   500                    '\x04' : "text/plain; charset=UTF-8",
   501                    '\x05' : "text/xml",
   502                    '\x06' : "text/yaml",
   503                    '\x07' : "Channel",
   504                    '\x08' : "Error-Code",
   505                    '\x09' : "Error-Domain"}
   506 
   507 
   508 class OutgoingMessage (Message):
   509     "Abstract superclass of outgoing requests/responses."
   510     
   511     def __init__(self, connection, body=None, properties=None):
   512         Message.__init__(self,connection,body,properties)
   513         self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   514         self._moreComing = True
   515     
   516     def __setitem__(self, key,val):
   517         self.properties[key] = val
   518     def __delitem__(self, key):
   519         del self.properties[key]
   520     
   521     @property
   522     def sent(self):
   523         return hasattr(self,'encoded')
   524     
   525     def _encode(self):
   526         "Generates the message's encoded form, prior to sending it."
   527         out = StringIO()
   528         for (key,value) in self.properties.iteritems():
   529             def _writePropString(s):
   530                 out.write(str(s))    #FIX: Abbreviate
   531                 out.write('\000')
   532             _writePropString(key)
   533             _writePropString(value)
   534         propertiesSize = out.tell()
   535         assert propertiesSize<65536     #FIX: Return an error instead
   536         
   537         body = self.body or ""
   538         if self.compressed:
   539             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   540             out.write(z.compress(body))
   541             body = z.flush()
   542         out.write(body)
   543         
   544         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   545         out.close()
   546         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   547         self.bytesSent = 0
   548     
   549     def _sendNextFrame(self, maxLen):
   550         pos = self.bytesSent
   551         payload = self.encoded[pos:pos+maxLen]
   552         pos += len(payload)
   553         self._moreComing = (pos < len(self.encoded))
   554         if not self._moreComing:
   555             self.encoded = None
   556         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   557         
   558         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   559                                                    self.requestNo,
   560                                                    self.flags,
   561                                                    kFrameHeaderSize+len(payload))
   562         self.bytesSent = pos
   563         return header + payload
   564 
   565 
   566 class Request (object):
   567     @property
   568     def response(self):
   569         "The response object for this request."
   570         if self.noReply:
   571             return None
   572         r = self.__dict__.get('_response')
   573         if r==None:
   574             r = self._response = self._createResponse()
   575         return r
   576 
   577 
   578 class Response (Message):
   579     def _setRequest(self, request):
   580         assert not request.noReply
   581         self.request = request
   582         self.requestNo = request.requestNo
   583         self.urgent = request.urgent
   584     
   585     @property
   586     def isResponse(self):
   587         return True
   588 
   589 
   590 class IncomingRequest (IncomingMessage, Request):
   591     def _createResponse(self):
   592         return OutgoingResponse(self)
   593 
   594 
   595 class OutgoingRequest (OutgoingMessage, Request):
   596     def _createResponse(self):
   597         return IncomingResponse(self)
   598     
   599     def send(self):
   600         self._encode()
   601         return self.connection._sendRequest(self) and self.response
   602 
   603 
   604 class IncomingResponse (IncomingMessage, Response):
   605     def __init__(self, request):
   606         IncomingMessage.__init__(self,request.connection,None,0)
   607         self._setRequest(request)
   608         self.onComplete = None
   609     
   610     def _finished(self):
   611         super(IncomingResponse,self)._finished()
   612         if self.onComplete:
   613             try:
   614                 self.onComplete(self)
   615             except Exception, x:
   616                 log.error("Exception dispatching response: %s", traceback.format_exc())
   617 
   618 
   619 class OutgoingResponse (OutgoingMessage, Response):
   620     def __init__(self, request):
   621         OutgoingMessage.__init__(self,request.connection)
   622         self._setRequest(request)
   623     
   624     def send(self):
   625         self._encode()
   626         return self.connection._sendMessage(self)
   627 
   628 
   629 """
   630  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   631  
   632  Redistribution and use in source and binary forms, with or without modification, are permitted
   633  provided that the following conditions are met:
   634  
   635  * Redistributions of source code must retain the above copyright notice, this list of conditions
   636  and the following disclaimer.
   637  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   638  and the following disclaimer in the documentation and/or other materials provided with the
   639  distribution.
   640  
   641  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   642  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   643  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   644  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   645  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   646   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   647  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   648  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   649 """