Python/BLIP.py
author morrowa
Tue Jun 23 12:46:40 2009 -0700 (2009-06-23)
changeset 53 e9f209a24d53
parent 51 de59ce19f42e
child 54 6d1392a3e0a6
permissions -rw-r--r--
Connections opened by listeners now close correctly.
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F206
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 kMsgFlag_Meta       = 0x0100
    40 
    41 kMsgType_Request    = 0
    42 kMsgType_Response   = 1
    43 kMsgType_Error      = 2
    44 
    45 kMsgProfile_Hi      = "Hi"
    46 kMsgProfile_Bye     = "Bye"
    47 
    48 
    49 log = logging.getLogger('BLIP')
    50 log.propagate = True
    51 
    52 
    53 class MessageException(Exception):
    54     pass
    55 
    56 class ConnectionException(Exception):
    57     pass
    58 
    59 
    60 ### LISTENER AND CONNECTION CLASSES:
    61 
    62 
    63 class Listener (asyncore.dispatcher):
    64     "BLIP listener/server class"
    65     
    66     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    67         "Create a listener on a port"
    68         asyncore.dispatcher.__init__(self)
    69         self.onConnected = self.onRequest = None
    70         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    71         self.bind( ('',port) )
    72         self.listen(5)
    73         self.sslKeyFile=sslKeyFile
    74         self.sslCertFile=sslCertFile
    75         log.info("Listening on port %u", port)
    76     
    77     def handle_accept( self ):
    78         socket,address = self.accept()
    79         if self.sslKeyFile:
    80             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    81         conn = Connection(address, sock=socket, listener=self)
    82         conn.onRequest = self.onRequest
    83         if self.onConnected:
    84             self.onConnected(conn)
    85 
    86     def handle_error(self):
    87         (typ,val,trace) = sys.exc_info()
    88         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    89         self.close()
    90     
    91 
    92 
    93 class Connection (asynchat.async_chat):
    94     def __init__( self, address, sock=None, listener=None, ssl=None ):
    95         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    96         "otherwise it'll open a new outgoing socket."
    97         if sock:
    98             asynchat.async_chat.__init__(self,sock)
    99             log.info("Accepted connection from %s",address)
   100             self.status = kOpen
   101         else:
   102             asynchat.async_chat.__init__(self)
   103             log.info("Opening connection to %s",address)
   104             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   105             self.status = kOpening
   106             if ssl:
   107                 ssl(self.socket)
   108             self.connect(address)
   109         self.address = address
   110         self.listener = listener
   111         self.onRequest = self.onCloseRequest = self.onCloseRefused = None
   112         self.pendingRequests = {}
   113         self.pendingResponses = {}
   114         self.outBox = []
   115         self.inMessage = None
   116         self.inNumRequests = self.outNumRequests = 0
   117         self.sending = False
   118         self._endOfFrame()
   119         self._closeWhenPossible = False
   120     
   121     def handle_connect(self):
   122         log.info("Connection open!")
   123         self.status = kOpen
   124     
   125     def handle_error(self):
   126         (typ,val,trace) = sys.exc_info()
   127         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   128         self.discard_buffers()
   129         self.status = kDisconnected
   130         self.close()
   131     
   132     
   133     ### SENDING:
   134     
   135     @property
   136     def isOpen(self):
   137         return self.status==kOpening or self.status==kOpen
   138     
   139     @property
   140     def canSend(self):
   141         return self.isOpen and not self._closeWhenPossible
   142     
   143     def _sendMessage(self, msg):
   144         if self.isOpen:
   145             self._outQueueMessage(msg,True)
   146             if not self.sending:
   147                 log.debug("Waking up the output stream")
   148                 self.sending = True
   149                 self.push_with_producer(self)
   150             return True
   151         else:
   152             return False
   153     
   154     def _sendRequest(self, req):
   155         if self.canSend:
   156             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   157             response = req.response
   158             if response:
   159                 response.requestNo = requestNo
   160                 self.pendingResponses[requestNo] = response
   161                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   162             return self._sendMessage(req)
   163         else:
   164             return False
   165     
   166     def _outQueueMessage(self, msg,isNew=True):
   167         n = len(self.outBox)
   168         index = n
   169         if msg.urgent and n>1:
   170             while index > 0:
   171                 otherMsg = self.outBox[index-1]
   172                 if otherMsg.urgent:
   173                     if index<n:
   174                         index += 1
   175                     break
   176                 elif isNew and otherMsg.bytesSent==0:
   177                     break
   178                 index -= 1
   179             else:
   180                 index = 1
   181         
   182         self.outBox.insert(index,msg)
   183         if isNew:
   184             log.info("Queuing %s at index %i",msg,index)
   185         else:
   186             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   187     
   188     def more(self):
   189         n = len(self.outBox)
   190         if n > 0:
   191             msg = self.outBox.pop(0)
   192             frameSize = 4096
   193             if msg.urgent or n==1 or not self.outBox[0].urgent:
   194                 frameSize *= 4
   195             data = msg._sendNextFrame(frameSize)
   196             if msg._moreComing:
   197                 self._outQueueMessage(msg,isNew=False)
   198             else:
   199                 log.info("Finished sending %s",msg)
   200             return data
   201         else:
   202             log.debug("Nothing more to send")
   203             self.sending = False
   204             self._closeIfReady()
   205             return None
   206     
   207     ### RECEIVING:
   208     
   209     def collect_incoming_data(self, data):
   210         if self.expectingHeader:
   211             if self.inHeader==None:
   212                 self.inHeader = data
   213             else:
   214                 self.inHeader += data
   215         elif self.inMessage:
   216             self.inMessage._receivedData(data)
   217     
   218     def found_terminator(self):
   219         if self.expectingHeader:
   220             # Got a header:
   221             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   222             self.inHeader = None
   223             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   224             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   225             frameLen -= kFrameHeaderSize
   226             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   227                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   228             self.inMessage = self._inMessageForFrame(requestNo,flags)
   229             
   230             if frameLen > 0:
   231                 self.expectingHeader = False
   232                 self.set_terminator(frameLen)
   233             else:
   234                 self._endOfFrame()
   235         
   236         else:
   237             # Got the frame's payload:
   238             self._endOfFrame()
   239     
   240     def _inMessageForFrame(self, requestNo,flags):
   241         message = None
   242         msgType = flags & kMsgFlag_TypeMask
   243         if msgType==kMsgType_Request:
   244             message = self.pendingRequests.get(requestNo)
   245             if message==None and requestNo == self.inNumRequests+1:
   246                 message = IncomingRequest(self,requestNo,flags)
   247                 assert message!=None
   248                 self.pendingRequests[requestNo] = message
   249                 self.inNumRequests += 1
   250         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   251             message = self.pendingResponses.get(requestNo)
   252             message._updateFlags(flags)
   253         
   254         if message != None:
   255             message._beginFrame(flags)
   256         else:
   257             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   258         return message
   259     
   260     def _endOfFrame(self):
   261         msg = self.inMessage
   262         self.inMessage = None
   263         self.expectingHeader = True
   264         self.inHeader = None
   265         self.set_terminator(kFrameHeaderSize) # wait for binary header
   266         if msg:
   267             log.debug("End of frame of %s",msg)
   268             if not msg._moreComing:
   269                 self._receivedMessage(msg)
   270     
   271     def _receivedMessage(self, msg):
   272         log.info("Received: %s",msg)
   273         # Remove from pending:
   274         if msg.isResponse:
   275             del self.pendingResponses[msg.requestNo]
   276         else:
   277             del self.pendingRequests[msg.requestNo]
   278         # Decode:
   279         try:
   280             msg._finished()
   281             if not msg.isResponse:
   282                 if msg._meta:
   283                     self._dispatchMetaRequest(msg)
   284                 else:
   285                     self.onRequest(msg)
   286         except Exception, x:
   287             log.error("Exception handling incoming message: %s", traceback.format_exc())
   288             #FIX: Send an error reply
   289         # Check to see if we're done and ready to close:
   290         self._closeIfReady()
   291     
   292     def _dispatchMetaRequest(self, request):
   293         """Handles dispatching internal meta requests."""
   294         if request['Profile'] == kMsgProfile_Bye:
   295             shouldClose = True
   296             if self.onCloseRequest:
   297                 shouldClose = self.onCloseRequest()
   298             if not shouldClose:
   299                 log.debug("Sending resfusal to close...")
   300                 response = request.response
   301                 response.isError = True
   302                 response['Error-Domain'] = "BLIP"
   303                 response['Error-Code'] = 403
   304                 response.body = "Close request denied"
   305                 response.send()
   306             else:
   307                 log.debug("Sending permission to close...")
   308                 response = request.response
   309                 response.send()
   310         else:
   311             response = request.response
   312             response.isError = True
   313             response['Error-Domain'] = "BLIP"
   314             response['Error-Code'] = 404
   315             response.body = "Unknown meta profile"
   316             response.send()
   317     
   318     ### CLOSING:
   319     
   320     def close(self):
   321         """Publicly callable close method. Sends close request to peer."""
   322         if self.status != kOpen:
   323             return False
   324         log.info("Sending close request...")
   325         req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   326         req._meta = True
   327         req.response.onComplete = self._handleCloseResponse
   328         if not req.send():
   329             log.error("Error sending close request.")
   330             return False
   331         else:
   332             self.status = kClosing
   333         return True
   334     
   335     def _handleCloseResponse(self, response):
   336         """Called when we receive a response to a close request."""
   337         log.info("Received close response.")
   338         if response.isError:
   339             # remote refused to close
   340             if self.onCloseRefused:
   341                 self.onCloseRefused(response)
   342             self.status = kOpen
   343         else:
   344             # now wait until everything has finished sending, then actually close
   345             log.info("No refusal, actually closing...")
   346             self._closeWhenPossible = True
   347     
   348     def _closeIfReady(self):
   349         """Checks if all transmissions are complete and then closes the actual socket."""
   350         if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   351             # self._closeWhenPossible = False
   352             log.debug("_closeIfReady closing.")
   353             asynchat.async_chat.close(self)
   354     
   355     def handle_close(self):
   356         """Called when the socket actually closes."""
   357         log.info("Connection closed!")
   358         self.pendingRequests = self.pendingResponses = None
   359         self.outBox = None
   360         if self.status == kClosing:
   361             self.status = kClosed
   362         else:
   363             self.status = kDisconnected
   364         asyncore.dispatcher.close(self)
   365 
   366 
   367 ### MESSAGE CLASSES:
   368 
   369 
   370 class Message (object):
   371     "Abstract superclass of all request/response objects"
   372     
   373     def __init__(self, connection, body=None, properties=None):
   374         self.connection = connection
   375         self.body = body
   376         self.properties = properties or {}
   377         self.requestNo = None
   378     
   379     @property
   380     def flags(self):
   381         if self.isResponse:
   382             if self.isError:
   383                 flags = kMsgType_Error
   384             else:
   385                 flags = kMsgType_Response
   386         else:
   387             flags = kMsgType_Request
   388         if self.urgent:     flags |= kMsgFlag_Urgent
   389         if self.compressed: flags |= kMsgFlag_Compressed
   390         if self.noReply:    flags |= kMsgFlag_NoReply
   391         if self._moreComing:flags |= kMsgFlag_MoreComing
   392         if self._meta:      flags |= kMsgFlag_Meta
   393         return flags
   394     
   395     def __str__(self):
   396         s = "%s[" %(type(self).__name__)
   397         if self.requestNo != None:
   398             s += "#%i" %self.requestNo
   399         if self.urgent:     s += " URG"
   400         if self.compressed: s += " CMP"
   401         if self.noReply:    s += " NOR"
   402         if self._moreComing:s += " MOR"
   403         if self._meta:      s += " MET"
   404         if self.body:       s += " %i bytes" %len(self.body)
   405         return s+"]"
   406     
   407     def __repr__(self):
   408         s = str(self)
   409         if len(self.properties): s += repr(self.properties)
   410         return s
   411     
   412     @property
   413     def isResponse(self):
   414         "Is this message a response?"
   415         return False
   416     
   417     @property
   418     def contentType(self):
   419         return self.properties.get('Content-Type')
   420     
   421     def __getitem__(self, key):     return self.properties.get(key)
   422     def __contains__(self, key):    return key in self.properties
   423     def __len__(self):              return len(self.properties)
   424     def __nonzero__(self):          return True
   425     def __iter__(self):             return self.properties.__iter__()
   426 
   427 
   428 class IncomingMessage (Message):
   429     "Abstract superclass of incoming messages."
   430     
   431     def __init__(self, connection, requestNo, flags):
   432         super(IncomingMessage,self).__init__(connection)
   433         self.requestNo  = requestNo
   434         self._updateFlags(flags)
   435         self.frames     = []
   436     
   437     def _updateFlags(self, flags):
   438         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   439         self.compressed = (flags & kMsgFlag_Compressed) != 0
   440         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   441         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   442         self._meta      = (flags & kMsgFlag_Meta) != 0
   443         self.isError    = (flags & kMsgType_Error) != 0
   444     
   445     def _beginFrame(self, flags):
   446         """Received a frame header."""
   447         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   448     
   449     def _receivedData(self, data):
   450         """Received data from a frame."""
   451         self.frames.append(data)
   452     
   453     def _finished(self):
   454         """The entire message has been received; now decode it."""
   455         encoded = "".join(self.frames)
   456         self.frames = None
   457         
   458         # Decode the properties:
   459         if len(encoded) < 2: raise MessageException, "missing properties length"
   460         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   461         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   462         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   463         
   464         if propSize > 2:
   465             proplist = encoded[2:propSize-1].split('\000')
   466         
   467             if len(proplist) & 1: raise MessageException, "odd number of property strings"
   468             for i in xrange(0,len(proplist),2):
   469                 def expand(str):
   470                     if len(str)==1:
   471                         str = IncomingMessage.__expandDict.get(str,str)
   472                     return str
   473                 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   474         
   475         encoded = encoded[propSize:]
   476         # Decode the body:
   477         if self.compressed and len(encoded)>0:
   478             try:
   479                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   480             except zlib.error:
   481                 raise MessageException, sys.exc_info()[1]
   482         self.body = encoded
   483     
   484     __expandDict= {'\x01' : "Content-Type",
   485                    '\x02' : "Profile",
   486                    '\x03' : "application/octet-stream",
   487                    '\x04' : "text/plain; charset=UTF-8",
   488                    '\x05' : "text/xml",
   489                    '\x06' : "text/yaml",
   490                    '\x07' : "Channel",
   491                    '\x08' : "Error-Code",
   492                    '\x09' : "Error-Domain"}
   493 
   494 
   495 class OutgoingMessage (Message):
   496     "Abstract superclass of outgoing requests/responses."
   497     
   498     def __init__(self, connection, body=None, properties=None):
   499         Message.__init__(self,connection,body,properties)
   500         self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   501         self._moreComing = True
   502     
   503     def __setitem__(self, key,val):
   504         self.properties[key] = val
   505     def __delitem__(self, key):
   506         del self.properties[key]
   507     
   508     @property
   509     def sent(self):
   510         return hasattr(self,'encoded')
   511     
   512     def _encode(self):
   513         "Generates the message's encoded form, prior to sending it."
   514         out = StringIO()
   515         for (key,value) in self.properties.iteritems():
   516             def _writePropString(s):
   517                 out.write(str(s))    #FIX: Abbreviate
   518                 out.write('\000')
   519             _writePropString(key)
   520             _writePropString(value)
   521         propertiesSize = out.tell()
   522         assert propertiesSize<65536     #FIX: Return an error instead
   523         
   524         body = self.body or ""
   525         if self.compressed:
   526             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   527             out.write(z.compress(body))
   528             body = z.flush()
   529         out.write(body)
   530         
   531         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   532         out.close()
   533         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   534         self.bytesSent = 0
   535     
   536     def _sendNextFrame(self, maxLen):
   537         pos = self.bytesSent
   538         payload = self.encoded[pos:pos+maxLen]
   539         pos += len(payload)
   540         self._moreComing = (pos < len(self.encoded))
   541         if not self._moreComing:
   542             self.encoded = None
   543         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   544         
   545         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   546                                                    self.requestNo,
   547                                                    self.flags,
   548                                                    kFrameHeaderSize+len(payload))
   549         self.bytesSent = pos
   550         return header + payload
   551 
   552 
   553 class Request (object):
   554     @property
   555     def response(self):
   556         "The response object for this request."
   557         if self.noReply:
   558             return None
   559         r = self.__dict__.get('_response')
   560         if r==None:
   561             r = self._response = self._createResponse()
   562         return r
   563 
   564 
   565 class Response (Message):
   566     def _setRequest(self, request):
   567         assert not request.noReply
   568         self.request = request
   569         self.requestNo = request.requestNo
   570         self.urgent = request.urgent
   571     
   572     @property
   573     def isResponse(self):
   574         return True
   575 
   576 
   577 class IncomingRequest (IncomingMessage, Request):
   578     def _createResponse(self):
   579         return OutgoingResponse(self)
   580 
   581 
   582 class OutgoingRequest (OutgoingMessage, Request):
   583     def _createResponse(self):
   584         return IncomingResponse(self)
   585     
   586     def send(self):
   587         self._encode()
   588         return self.connection._sendRequest(self) and self.response
   589 
   590 
   591 class IncomingResponse (IncomingMessage, Response):
   592     def __init__(self, request):
   593         IncomingMessage.__init__(self,request.connection,None,0)
   594         self._setRequest(request)
   595         self.onComplete = None
   596     
   597     def _finished(self):
   598         super(IncomingResponse,self)._finished()
   599         if self.onComplete:
   600             try:
   601                 self.onComplete(self)
   602             except Exception, x:
   603                 log.error("Exception dispatching response: %s", traceback.format_exc())
   604 
   605 
   606 class OutgoingResponse (OutgoingMessage, Response):
   607     def __init__(self, request):
   608         OutgoingMessage.__init__(self,request.connection)
   609         self._setRequest(request)
   610     
   611     def send(self):
   612         self._encode()
   613         return self.connection._sendMessage(self)
   614 
   615 
   616 """
   617  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   618  
   619  Redistribution and use in source and binary forms, with or without modification, are permitted
   620  provided that the following conditions are met:
   621  
   622  * Redistributions of source code must retain the above copyright notice, this list of conditions
   623  and the following disclaimer.
   624  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   625  and the following disclaimer in the documentation and/or other materials provided with the
   626  distribution.
   627  
   628  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   629  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   630  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   631  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   632  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   633   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   634  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   635  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   636 """