Python/BLIP.py
author morrowa@betelgeuse.local
Tue Jun 23 11:44:30 2009 -0700 (2009-06-23)
changeset 51 de59ce19f42e
parent 16 6f608b552b77
child 53 e9f209a24d53
permissions -rw-r--r--
BROKEN COMMIT. Majority of code to handle closing has been added. Listeners do not close correctly.
     1 # encoding: utf-8
     2 """
     3 BLIP.py
     4 
     5 Created by Jens Alfke on 2008-06-03.
     6 Copyright notice and BSD license at end of file.
     7 """
     8 
     9 import asynchat
    10 import asyncore
    11 from cStringIO import StringIO
    12 import logging
    13 import socket
    14 import struct
    15 import sys
    16 import traceback
    17 import zlib
    18 
    19 
    20 # Connection status enumeration:
    21 kDisconnected = -1
    22 kClosed  = 0
    23 kOpening = 1
    24 kOpen    = 2
    25 kClosing = 3
    26 
    27 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
    29 
    30 kFrameMagicNumber   = 0x9B34F206
    31 kFrameHeaderFormat  = '!LLHH'
    32 kFrameHeaderSize    = 12
    33 
    34 kMsgFlag_TypeMask   = 0x000F
    35 kMsgFlag_Compressed = 0x0010
    36 kMsgFlag_Urgent     = 0x0020
    37 kMsgFlag_NoReply    = 0x0040
    38 kMsgFlag_MoreComing = 0x0080
    39 kMsgFlag_Meta       = 0x0100
    40 
    41 kMsgType_Request    = 0
    42 kMsgType_Response   = 1
    43 kMsgType_Error      = 2
    44 
    45 kMsgProfile_Hi      = "Hi"
    46 kMsgProfile_Bye     = "Bye"
    47 
    48 
    49 log = logging.getLogger('BLIP')
    50 log.propagate = True
    51 
    52 
    53 class MessageException(Exception):
    54     pass
    55 
    56 class ConnectionException(Exception):
    57     pass
    58 
    59 
    60 ### LISTENER AND CONNECTION CLASSES:
    61 
    62 
    63 class Listener (asyncore.dispatcher):
    64     "BLIP listener/server class"
    65     
    66     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    67         "Create a listener on a port"
    68         asyncore.dispatcher.__init__(self)
    69         self.onConnected = self.onRequest = None
    70         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    71         self.bind( ('',port) )
    72         self.listen(5)
    73         self.sslKeyFile=sslKeyFile
    74         self.sslCertFile=sslCertFile
    75         log.info("Listening on port %u", port)
    76     
    77     def handle_accept( self ):
    78         socket,address = self.accept()
    79         if self.sslKeyFile:
    80             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    81         conn = Connection(address, sock=socket, listener=self)
    82         conn.onRequest = self.onRequest
    83         if self.onConnected:
    84             self.onConnected(conn)
    85 
    86     def handle_error(self):
    87         (typ,val,trace) = sys.exc_info()
    88         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    89         self.close()
    90     
    91 
    92 
    93 class Connection (asynchat.async_chat):
    94     def __init__( self, address, sock=None, listener=None, ssl=None ):
    95         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    96         "otherwise it'll open a new outgoing socket."
    97         if sock:
    98             asynchat.async_chat.__init__(self,sock)
    99             log.info("Accepted connection from %s",address)
   100             self.status = kOpen
   101         else:
   102             asynchat.async_chat.__init__(self)
   103             log.info("Opening connection to %s",address)
   104             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   105             self.status = kOpening
   106             if ssl:
   107                 ssl(self.socket)
   108             self.connect(address)
   109         self.address = address
   110         self.listener = listener
   111         self.onRequest = self.onCloseRequest = self.onCloseRefused = None
   112         self.pendingRequests = {}
   113         self.pendingResponses = {}
   114         self.outBox = []
   115         self.inMessage = None
   116         self.inNumRequests = self.outNumRequests = 0
   117         self.sending = False
   118         self._endOfFrame()
   119         self._closeWhenPossible = False
   120     
   121     def handle_connect(self):
   122         log.info("Connection open!")
   123         self.status = kOpen
   124     
   125     def handle_error(self):
   126         (typ,val,trace) = sys.exc_info()
   127         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   128         self.discard_buffers()
   129         self.status = kDisconnected
   130         self.close()
   131     
   132     
   133     ### SENDING:
   134     
   135     @property
   136     def isOpen(self):
   137         return self.status==kOpening or self.status==kOpen
   138     
   139     @property
   140     def canSend(self):
   141         return self.isOpen and not self._closeWhenPossible
   142     
   143     def _sendMessage(self, msg):
   144         if self.isOpen:
   145             self._outQueueMessage(msg,True)
   146             if not self.sending:
   147                 log.debug("Waking up the output stream")
   148                 self.sending = True
   149                 self.push_with_producer(self)
   150             return True
   151         else:
   152             return False
   153     
   154     def _sendRequest(self, req):
   155         if self.canSend:
   156             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   157             response = req.response
   158             if response:
   159                 response.requestNo = requestNo
   160                 self.pendingResponses[requestNo] = response
   161                 log.debug("pendingResponses[%i] := %s",requestNo,response)
   162             return self._sendMessage(req)
   163         else:
   164             return False
   165     
   166     def _outQueueMessage(self, msg,isNew=True):
   167         n = len(self.outBox)
   168         index = n
   169         if msg.urgent and n>1:
   170             while index > 0:
   171                 otherMsg = self.outBox[index-1]
   172                 if otherMsg.urgent:
   173                     if index<n:
   174                         index += 1
   175                     break
   176                 elif isNew and otherMsg.bytesSent==0:
   177                     break
   178                 index -= 1
   179             else:
   180                 index = 1
   181         
   182         self.outBox.insert(index,msg)
   183         if isNew:
   184             log.info("Queuing %s at index %i",msg,index)
   185         else:
   186             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
   187     
   188     def more(self):
   189         n = len(self.outBox)
   190         if n > 0:
   191             msg = self.outBox.pop(0)
   192             frameSize = 4096
   193             if msg.urgent or n==1 or not self.outBox[0].urgent:
   194                 frameSize *= 4
   195             data = msg._sendNextFrame(frameSize)
   196             if msg._moreComing:
   197                 self._outQueueMessage(msg,isNew=False)
   198             else:
   199                 log.info("Finished sending %s",msg)
   200             return data
   201         else:
   202             log.debug("Nothing more to send")
   203             self.sending = False
   204             self._closeIfReady()
   205             return None
   206     
   207     ### RECEIVING:
   208     
   209     def collect_incoming_data(self, data):
   210         if self.expectingHeader:
   211             if self.inHeader==None:
   212                 self.inHeader = data
   213             else:
   214                 self.inHeader += data
   215         elif self.inMessage:
   216             self.inMessage._receivedData(data)
   217     
   218     def found_terminator(self):
   219         if self.expectingHeader:
   220             # Got a header:
   221             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   222             self.inHeader = None
   223             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   224             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   225             frameLen -= kFrameHeaderSize
   226             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   227                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   228             self.inMessage = self._inMessageForFrame(requestNo,flags)
   229             
   230             if frameLen > 0:
   231                 self.expectingHeader = False
   232                 self.set_terminator(frameLen)
   233             else:
   234                 self._endOfFrame()
   235         
   236         else:
   237             # Got the frame's payload:
   238             self._endOfFrame()
   239     
   240     def _inMessageForFrame(self, requestNo,flags):
   241         message = None
   242         msgType = flags & kMsgFlag_TypeMask
   243         if msgType==kMsgType_Request:
   244             message = self.pendingRequests.get(requestNo)
   245             if message==None and requestNo == self.inNumRequests+1:
   246                 message = IncomingRequest(self,requestNo,flags)
   247                 assert message!=None
   248                 self.pendingRequests[requestNo] = message
   249                 self.inNumRequests += 1
   250         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   251             message = self.pendingResponses.get(requestNo)
   252             message._updateFlags(flags)
   253         
   254         if message != None:
   255             message._beginFrame(flags)
   256         else:
   257             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   258         return message
   259     
   260     def _endOfFrame(self):
   261         msg = self.inMessage
   262         self.inMessage = None
   263         self.expectingHeader = True
   264         self.inHeader = None
   265         self.set_terminator(kFrameHeaderSize) # wait for binary header
   266         if msg:
   267             log.debug("End of frame of %s",msg)
   268             if not msg._moreComing:
   269                 self._receivedMessage(msg)
   270     
   271     def _receivedMessage(self, msg):
   272         log.info("Received: %s",msg)
   273         # Remove from pending:
   274         if msg.isResponse:
   275             del self.pendingResponses[msg.requestNo]
   276         else:
   277             del self.pendingRequests[msg.requestNo]
   278         # Decode:
   279         try:
   280             msg._finished()
   281             if not msg.isResponse:
   282                 if msg._meta:
   283                     self._dispatchMetaRequest(msg)
   284                 else:
   285                     self.onRequest(msg)
   286         except Exception, x:
   287             log.error("Exception handling incoming message: %s", traceback.format_exc())
   288             #FIX: Send an error reply
   289         # Check to see if we're done and ready to close:
   290         self._closeIfReady()
   291     
   292     def _dispatchMetaRequest(self, request):
   293         """Handles dispatching internal meta requests."""
   294         if request['Profile'] == kMsgProfile_Bye:
   295             shouldClose = True
   296             if self.onCloseRequest:
   297                 shouldClose = self.onCloseRequest()
   298             if not shouldClose:
   299                 log.debug("Sending resfusal to close...")
   300                 response = request.response
   301                 response.isError = True
   302                 response['Error-Domain'] = "BLIP"
   303                 response['Error-Code'] = 403
   304                 response.body = "Close request denied"
   305                 response.send()
   306             else:
   307                 log.debug("Sending permission to close...")
   308                 response = request.response
   309                 response.send()
   310         else:
   311             response = request.response
   312             response.isError = True
   313             response['Error-Domain'] = "BLIP"
   314             response['Error-Code'] = 404
   315             response.body = "Unknown meta profile"
   316             response.send()
   317     
   318     ### CLOSING:
   319     
   320     def close(self):
   321         """Publicly callable close method. Sends close request to peer."""
   322         if self.status != kOpen:
   323             return False
   324         log.info("Sending close request...")
   325         req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   326         req._meta = True
   327         req.response.onComplete = self._handleCloseResponse
   328         if not req.send():
   329             log.error("Error sending close request.")
   330             return False
   331         else:
   332             self.status = kClosing
   333         return True
   334     
   335     def _handleCloseResponse(self, response):
   336         """Called when we receive a response to a close request."""
   337         log.info("Received close response.")
   338         if response.isError:
   339             # remote refused to close
   340             if self.onCloseRefused:
   341                 self.onCloseRefused(response)
   342             self.status = kOpen
   343         else:
   344             # now wait until everything has finished sending, then actually close
   345             log.info("No refusal, actually closing...")
   346             self._closeWhenPossible = True
   347     
   348     def _closeIfReady(self):
   349         """Checks if all transmissions are complete and then closes the actual socket."""
   350         if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   351             # self._closeWhenPossible = False
   352             log.debug("_closeIfReady closing.")
   353             asynchat.async_chat.close(self)
   354     
   355     def handle_close(self):
   356         """Called when the socket actually closes."""
   357         log.info("Connection closed!")
   358         self.pendingRequests = self.pendingResponses = None
   359         self.outBox = None
   360         if self.status == kClosing:
   361             self.status = kClosed
   362         else:
   363             self.status = kDisconnected
   364 
   365 
   366 ### MESSAGE CLASSES:
   367 
   368 
   369 class Message (object):
   370     "Abstract superclass of all request/response objects"
   371     
   372     def __init__(self, connection, body=None, properties=None):
   373         self.connection = connection
   374         self.body = body
   375         self.properties = properties or {}
   376         self.requestNo = None
   377     
   378     @property
   379     def flags(self):
   380         if self.isResponse:
   381             if self.isError:
   382                 flags = kMsgType_Error
   383             else:
   384                 flags = kMsgType_Response
   385         else:
   386             flags = kMsgType_Request
   387         if self.urgent:     flags |= kMsgFlag_Urgent
   388         if self.compressed: flags |= kMsgFlag_Compressed
   389         if self.noReply:    flags |= kMsgFlag_NoReply
   390         if self._moreComing:flags |= kMsgFlag_MoreComing
   391         if self._meta:      flags |= kMsgFlag_Meta
   392         return flags
   393     
   394     def __str__(self):
   395         s = "%s[" %(type(self).__name__)
   396         if self.requestNo != None:
   397             s += "#%i" %self.requestNo
   398         if self.urgent:     s += " URG"
   399         if self.compressed: s += " CMP"
   400         if self.noReply:    s += " NOR"
   401         if self._moreComing:s += " MOR"
   402         if self._meta:      s += " MET"
   403         if self.body:       s += " %i bytes" %len(self.body)
   404         return s+"]"
   405     
   406     def __repr__(self):
   407         s = str(self)
   408         if len(self.properties): s += repr(self.properties)
   409         return s
   410     
   411     @property
   412     def isResponse(self):
   413         "Is this message a response?"
   414         return False
   415     
   416     @property
   417     def contentType(self):
   418         return self.properties.get('Content-Type')
   419     
   420     def __getitem__(self, key):     return self.properties.get(key)
   421     def __contains__(self, key):    return key in self.properties
   422     def __len__(self):              return len(self.properties)
   423     def __nonzero__(self):          return True
   424     def __iter__(self):             return self.properties.__iter__()
   425 
   426 
   427 class IncomingMessage (Message):
   428     "Abstract superclass of incoming messages."
   429     
   430     def __init__(self, connection, requestNo, flags):
   431         super(IncomingMessage,self).__init__(connection)
   432         self.requestNo  = requestNo
   433         self._updateFlags(flags)
   434         self.frames     = []
   435     
   436     def _updateFlags(self, flags):
   437         self.urgent     = (flags & kMsgFlag_Urgent) != 0
   438         self.compressed = (flags & kMsgFlag_Compressed) != 0
   439         self.noReply    = (flags & kMsgFlag_NoReply) != 0
   440         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   441         self._meta      = (flags & kMsgFlag_Meta) != 0
   442         self.isError    = (flags & kMsgType_Error) != 0
   443     
   444     def _beginFrame(self, flags):
   445         """Received a frame header."""
   446         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   447     
   448     def _receivedData(self, data):
   449         """Received data from a frame."""
   450         self.frames.append(data)
   451     
   452     def _finished(self):
   453         """The entire message has been received; now decode it."""
   454         encoded = "".join(self.frames)
   455         self.frames = None
   456         
   457         # Decode the properties:
   458         if len(encoded) < 2: raise MessageException, "missing properties length"
   459         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   460         if propSize>len(encoded): raise MessageException, "properties too long to fit"
   461         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   462         
   463         if propSize > 2:
   464             proplist = encoded[2:propSize-1].split('\000')
   465         
   466             if len(proplist) & 1: raise MessageException, "odd number of property strings"
   467             for i in xrange(0,len(proplist),2):
   468                 def expand(str):
   469                     if len(str)==1:
   470                         str = IncomingMessage.__expandDict.get(str,str)
   471                     return str
   472                 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   473         
   474         encoded = encoded[propSize:]
   475         # Decode the body:
   476         if self.compressed and len(encoded)>0:
   477             try:
   478                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   479             except zlib.error:
   480                 raise MessageException, sys.exc_info()[1]
   481         self.body = encoded
   482     
   483     __expandDict= {'\x01' : "Content-Type",
   484                    '\x02' : "Profile",
   485                    '\x03' : "application/octet-stream",
   486                    '\x04' : "text/plain; charset=UTF-8",
   487                    '\x05' : "text/xml",
   488                    '\x06' : "text/yaml",
   489                    '\x07' : "Channel",
   490                    '\x08' : "Error-Code",
   491                    '\x09' : "Error-Domain"}
   492 
   493 
   494 class OutgoingMessage (Message):
   495     "Abstract superclass of outgoing requests/responses."
   496     
   497     def __init__(self, connection, body=None, properties=None):
   498         Message.__init__(self,connection,body,properties)
   499         self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   500         self._moreComing = True
   501     
   502     def __setitem__(self, key,val):
   503         self.properties[key] = val
   504     def __delitem__(self, key):
   505         del self.properties[key]
   506     
   507     @property
   508     def sent(self):
   509         return hasattr(self,'encoded')
   510     
   511     def _encode(self):
   512         "Generates the message's encoded form, prior to sending it."
   513         out = StringIO()
   514         for (key,value) in self.properties.iteritems():
   515             def _writePropString(s):
   516                 out.write(str(s))    #FIX: Abbreviate
   517                 out.write('\000')
   518             _writePropString(key)
   519             _writePropString(value)
   520         propertiesSize = out.tell()
   521         assert propertiesSize<65536     #FIX: Return an error instead
   522         
   523         body = self.body or ""
   524         if self.compressed:
   525             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   526             out.write(z.compress(body))
   527             body = z.flush()
   528         out.write(body)
   529         
   530         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   531         out.close()
   532         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   533         self.bytesSent = 0
   534     
   535     def _sendNextFrame(self, maxLen):
   536         pos = self.bytesSent
   537         payload = self.encoded[pos:pos+maxLen]
   538         pos += len(payload)
   539         self._moreComing = (pos < len(self.encoded))
   540         if not self._moreComing:
   541             self.encoded = None
   542         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   543         
   544         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   545                                                    self.requestNo,
   546                                                    self.flags,
   547                                                    kFrameHeaderSize+len(payload))
   548         self.bytesSent = pos
   549         return header + payload
   550 
   551 
   552 class Request (object):
   553     @property
   554     def response(self):
   555         "The response object for this request."
   556         if self.noReply:
   557             return None
   558         r = self.__dict__.get('_response')
   559         if r==None:
   560             r = self._response = self._createResponse()
   561         return r
   562 
   563 
   564 class Response (Message):
   565     def _setRequest(self, request):
   566         assert not request.noReply
   567         self.request = request
   568         self.requestNo = request.requestNo
   569         self.urgent = request.urgent
   570     
   571     @property
   572     def isResponse(self):
   573         return True
   574 
   575 
   576 class IncomingRequest (IncomingMessage, Request):
   577     def _createResponse(self):
   578         return OutgoingResponse(self)
   579 
   580 
   581 class OutgoingRequest (OutgoingMessage, Request):
   582     def _createResponse(self):
   583         return IncomingResponse(self)
   584     
   585     def send(self):
   586         self._encode()
   587         return self.connection._sendRequest(self) and self.response
   588 
   589 
   590 class IncomingResponse (IncomingMessage, Response):
   591     def __init__(self, request):
   592         IncomingMessage.__init__(self,request.connection,None,0)
   593         self._setRequest(request)
   594         self.onComplete = None
   595     
   596     def _finished(self):
   597         super(IncomingResponse,self)._finished()
   598         if self.onComplete:
   599             try:
   600                 self.onComplete(self)
   601             except Exception, x:
   602                 log.error("Exception dispatching response: %s", traceback.format_exc())
   603 
   604 
   605 class OutgoingResponse (OutgoingMessage, Response):
   606     def __init__(self, request):
   607         OutgoingMessage.__init__(self,request.connection)
   608         self._setRequest(request)
   609     
   610     def send(self):
   611         self._encode()
   612         return self.connection._sendMessage(self)
   613 
   614 
   615 """
   616  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   617  
   618  Redistribution and use in source and binary forms, with or without modification, are permitted
   619  provided that the following conditions are met:
   620  
   621  * Redistributions of source code must retain the above copyright notice, this list of conditions
   622  and the following disclaimer.
   623  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   624  and the following disclaimer in the documentation and/or other materials provided with the
   625  distribution.
   626  
   627  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   628  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   629  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   630  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   631  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   632   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   633  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   634  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   635 """