Fixed two CF memory leaks. (Fixes issue #5)
     5 Created by Jens Alfke on 2008-06-03.
 
     6 Copyright notice and BSD license at end of file.
 
    11 from cStringIO import StringIO
 
    20 # Connection status enumeration:
 
    28 # INTERNAL CONSTANTS -- NO TOUCHIES!
 
    30 kFrameMagicNumber   = 0x9B34F205
 
    31 kFrameHeaderFormat  = '!LLHH'
 
    34 kMsgFlag_TypeMask   = 0x000F
 
    35 kMsgFlag_Compressed = 0x0010
 
    36 kMsgFlag_Urgent     = 0x0020
 
    37 kMsgFlag_NoReply    = 0x0040
 
    38 kMsgFlag_MoreComing = 0x0080
 
    45 log = logging.getLogger('BLIP')
 
    49 class MessageException(Exception):
 
    52 class ConnectionException(Exception):
 
    56 ### LISTENER AND CONNECTION CLASSES:
 
    59 class Listener (asyncore.dispatcher):
 
    60     "BLIP listener/server class"
 
    62     def __init__(self, port, sslKeyFile=None, sslCertFile=None):
 
    63         "Create a listener on a port"
 
    64         asyncore.dispatcher.__init__(self)
 
    65         self.onConnected = self.onRequest = None
 
    66         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
    67         self.bind( ('',port) )
 
    69         self.sslKeyFile=sslKeyFile
 
    70         self.sslCertFile=sslCertFile
 
    71         log.info("Listening on port %u", port)
 
    73     def handle_accept( self ):
 
    74         socket,address = self.accept()
 
    76             socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
 
    77         conn = Connection(address, sock=socket, listener=self)
 
    78         conn.onRequest = self.onRequest
 
    80             self.onConnected(conn)
 
    82     def handle_error(self):
 
    83         (typ,val,trace) = sys.exc_info()
 
    84         log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
 
    89 class Connection (asynchat.async_chat):
 
    90     def __init__( self, address, sock=None, listener=None, ssl=None ):
 
    91         "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
 
    92         "otherwise it'll open a new outgoing socket."
 
    94             asynchat.async_chat.__init__(self,sock)
 
    95             log.info("Accepted connection from %s",address)
 
    98             asynchat.async_chat.__init__(self)
 
    99             log.info("Opening connection to %s",address)
 
   100             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
   101             self.status = kOpening
 
   104             self.connect(address)
 
   105         self.address = address
 
   106         self.listener = listener
 
   107         self.onRequest = None
 
   108         self.pendingRequests = {}
 
   109         self.pendingResponses = {}
 
   111         self.inMessage = None
 
   112         self.inNumRequests = self.outNumRequests = 0
 
   117         if self.status > kClosed:
 
   118             self.status = kClosing
 
   119             log.info("Connection closing...")
 
   120         asynchat.async_chat.close(self)
 
   122     def handle_connect(self):
 
   123         log.info("Connection open!")
 
   126     def handle_error(self):
 
   127         (typ,val,trace) = sys.exc_info()
 
   128         log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
 
   129         self.discard_buffers()
 
   130         self.status = kDisconnected
 
   133     def handle_close(self):
 
   134         log.info("Connection closed!")
 
   135         self.pendingRequests = self.pendingResponses = None
 
   137         if self.status == kClosing:
 
   138             self.status = kClosed
 
   140             self.status = kDisconnected
 
   141         asynchat.async_chat.handle_close(self)
 
   148         return self.status==kOpening or self.status==kOpen
 
   150     def _sendMessage(self, msg):
 
   152             self._outQueueMessage(msg,True)
 
   154                 log.debug("Waking up the output stream")
 
   156                 self.push_with_producer(self)
 
   161     def _sendRequest(self, req):
 
   163             requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
 
   164             response = req.response
 
   166                 response.requestNo = requestNo
 
   167                 self.pendingResponses[requestNo] = response
 
   168                 log.debug("pendingResponses[%i] := %s",requestNo,response)
 
   169             return self._sendMessage(req)
 
   173     def _outQueueMessage(self, msg,isNew=True):
 
   176         if msg.urgent and n>1:
 
   178                 otherMsg = self.outBox[index-1]
 
   183                 elif isNew and otherMsg.bytesSent==0:
 
   189         self.outBox.insert(index,msg)
 
   191             log.info("Queuing %s at index %i",msg,index)
 
   193             log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
 
   198             msg = self.outBox.pop(0)
 
   200             if msg.urgent or n==1 or not self.outBox[0].urgent:
 
   202             data = msg._sendNextFrame(frameSize)
 
   204                 self._outQueueMessage(msg,isNew=False)
 
   206                 log.info("Finished sending %s",msg)
 
   209             log.debug("Nothing more to send")
 
   215     def collect_incoming_data(self, data):
 
   216         if self.expectingHeader:
 
   217             if self.inHeader==None:
 
   220                 self.inHeader += data
 
   222             self.inMessage._receivedData(data)
 
   224     def found_terminator(self):
 
   225         if self.expectingHeader:
 
   227             (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
 
   229             if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
 
   230             if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
 
   231             frameLen -= kFrameHeaderSize
 
   232             log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
 
   233                         (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
 
   234             self.inMessage = self._inMessageForFrame(requestNo,flags)
 
   237                 self.expectingHeader = False
 
   238                 self.set_terminator(frameLen)
 
   243             # Got the frame's payload:
 
   246     def _inMessageForFrame(self, requestNo,flags):
 
   248         msgType = flags & kMsgFlag_TypeMask
 
   249         if msgType==kMsgType_Request:
 
   250             message = self.pendingRequests.get(requestNo)
 
   251             if message==None and requestNo == self.inNumRequests+1:
 
   252                 message = IncomingRequest(self,requestNo,flags)
 
   254                 self.pendingRequests[requestNo] = message
 
   255                 self.inNumRequests += 1
 
   256         elif msgType==kMsgType_Response or msgType==kMsgType_Error:
 
   257             message = self.pendingResponses.get(requestNo)
 
   260             message._beginFrame(flags)
 
   262             log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
 
   265     def _endOfFrame(self):
 
   267         self.inMessage = None
 
   268         self.expectingHeader = True
 
   270         self.set_terminator(kFrameHeaderSize) # wait for binary header
 
   272             log.debug("End of frame of %s",msg)
 
   273             if not msg._moreComing:
 
   274                 self._receivedMessage(msg)
 
   276     def _receivedMessage(self, msg):
 
   277         log.info("Received: %s",msg)
 
   278         # Remove from pending:
 
   280             del self.pendingResponses[msg.requestNo]
 
   282             del self.pendingRequests[msg.requestNo]
 
   286             if not msg.isResponse:
 
   289             log.error("Exception handling incoming message: %s", traceback.format_exc())
 
   290             #FIX: Send an error reply
 
   296 class Message (object):
 
   297     "Abstract superclass of all request/response objects"
 
   299     def __init__(self, connection, body=None, properties=None):
 
   300         self.connection = connection
 
   302         self.properties = properties or {}
 
   303         self.requestNo = None
 
   308             flags = kMsgType_Response
 
   310             flags = kMsgType_Request
 
   311         if self.urgent:     flags |= kMsgFlag_Urgent
 
   312         if self.compressed: flags |= kMsgFlag_Compressed
 
   313         if self.noReply:    flags |= kMsgFlag_NoReply
 
   314         if self._moreComing:flags |= kMsgFlag_MoreComing
 
   318         s = "%s[" %(type(self).__name__)
 
   319         if self.requestNo != None:
 
   320             s += "#%i" %self.requestNo
 
   321         if self.urgent:     s += " URG"
 
   322         if self.compressed: s += " CMP"
 
   323         if self.noReply:    s += " NOR"
 
   324         if self._moreComing:s += " MOR"
 
   325         if self.body:       s += " %i bytes" %len(self.body)
 
   330         if len(self.properties): s += repr(self.properties)
 
   334     def isResponse(self):
 
   335         "Is this message a response?"
 
   339     def contentType(self):
 
   340         return self.properties.get('Content-Type')
 
   342     def __getitem__(self, key):     return self.properties.get(key)
 
   343     def __contains__(self, key):    return key in self.properties
 
   344     def __len__(self):              return len(self.properties)
 
   345     def __nonzero__(self):          return True
 
   346     def __iter__(self):             return self.properties.__iter__()
 
   349 class IncomingMessage (Message):
 
   350     "Abstract superclass of incoming messages."
 
   352     def __init__(self, connection, requestNo, flags):
 
   353         super(IncomingMessage,self).__init__(connection)
 
   354         self.requestNo  = requestNo
 
   355         self.urgent     = (flags & kMsgFlag_Urgent) != 0
 
   356         self.compressed = (flags & kMsgFlag_Compressed) != 0
 
   357         self.noReply    = (flags & kMsgFlag_NoReply) != 0
 
   358         self._moreComing= (flags & kMsgFlag_MoreComing) != 0
 
   361     def _beginFrame(self, flags):
 
   362         """Received a frame header."""
 
   363         self._moreComing = (flags & kMsgFlag_MoreComing)!=0
 
   365     def _receivedData(self, data):
 
   366         """Received data from a frame."""
 
   367         self.frames.append(data)
 
   370         """The entire message has been received; now decode it."""
 
   371         encoded = "".join(self.frames)
 
   374         # Decode the properties:
 
   375         if len(encoded) < 2: raise MessageException, "missing properties length"
 
   376         propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
 
   377         if propSize>len(encoded): raise MessageException, "properties too long to fit"
 
   378         if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
 
   380         proplist = encoded[2:propSize-1].split('\000')
 
   381         encoded = encoded[propSize:]
 
   382         if len(proplist) & 1: raise MessageException, "odd number of property strings"
 
   383         for i in xrange(0,len(proplist),2):
 
   386                     str = IncomingMessage.__expandDict.get(str,str)
 
   388             self.properties[ expand(proplist[i])] = expand(proplist[i+1])
 
   391         if self.compressed and len(encoded)>0:
 
   393                 encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
 
   395                 raise MessageException, sys.exc_info()[1]
 
   398     __expandDict= {'\x01' : "Content-Type",
 
   400                    '\x03' : "application/octet-stream",
 
   401                    '\x04' : "text/plain; charset=UTF-8",
 
   403                    '\x06' : "text/yaml",
 
   405                    '\x08' : "Error-Code",
 
   406                    '\x09' : "Error-Domain"}
 
   409 class OutgoingMessage (Message):
 
   410     "Abstract superclass of outgoing requests/responses."
 
   412     def __init__(self, connection, body=None, properties=None):
 
   413         Message.__init__(self,connection,body,properties)
 
   414         self.urgent = self.compressed = self.noReply = False
 
   415         self._moreComing = True
 
   417     def __setitem__(self, key,val):
 
   418         self.properties[key] = val
 
   419     def __delitem__(self, key):
 
   420         del self.properties[key]
 
   424         return hasattr(self,'encoded')
 
   427         "Generates the message's encoded form, prior to sending it."
 
   429         for (key,value) in self.properties.iteritems():
 
   430             def _writePropString(s):
 
   431                 out.write(str(s))    #FIX: Abbreviate
 
   433             _writePropString(key)
 
   434             _writePropString(value)
 
   435         propertiesSize = out.tell()
 
   436         assert propertiesSize<65536     #FIX: Return an error instead
 
   440             z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
 
   441             out.write(z.compress(body))
 
   445         self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
 
   447         log.debug("Encoded %s into %u bytes", self,len(self.encoded))
 
   450     def _sendNextFrame(self, maxLen):
 
   452         payload = self.encoded[pos:pos+maxLen]
 
   454         self._moreComing = (pos < len(self.encoded))
 
   455         if not self._moreComing:
 
   457         log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
 
   459         header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
 
   462                                                    kFrameHeaderSize+len(payload))
 
   464         return header + payload
 
   467 class Request (object):
 
   470         "The response object for this request."
 
   473         r = self.__dict__.get('_response')
 
   475             r = self._response = self._createResponse()
 
   479 class Response (Message):
 
   480     def _setRequest(self, request):
 
   481         assert not request.noReply
 
   482         self.request = request
 
   483         self.requestNo = request.requestNo
 
   484         self.urgent = request.urgent
 
   487     def isResponse(self):
 
   491 class IncomingRequest (IncomingMessage, Request):
 
   492     def _createResponse(self):
 
   493         return OutgoingResponse(self)
 
   496 class OutgoingRequest (OutgoingMessage, Request):
 
   497     def _createResponse(self):
 
   498         return IncomingResponse(self)
 
   502         return self.connection._sendRequest(self) and self.response
 
   505 class IncomingResponse (IncomingMessage, Response):
 
   506     def __init__(self, request):
 
   507         IncomingMessage.__init__(self,request.connection,None,0)
 
   508         self._setRequest(request)
 
   509         self.onComplete = None
 
   512         super(IncomingResponse,self)._finished()
 
   515                 self.onComplete(self)
 
   517                 log.error("Exception dispatching response: %s", traceback.format_exc())
 
   520 class OutgoingResponse (OutgoingMessage, Response):
 
   521     def __init__(self, request):
 
   522         OutgoingMessage.__init__(self,request.connection)
 
   523         self._setRequest(request)
 
   527         return self.connection._sendMessage(self)
 
   531  Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
 
   533  Redistribution and use in source and binary forms, with or without modification, are permitted
 
   534  provided that the following conditions are met:
 
   536  * Redistributions of source code must retain the above copyright notice, this list of conditions
 
   537  and the following disclaimer.
 
   538  * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
 
   539  and the following disclaimer in the documentation and/or other materials provided with the
 
   542  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
 
   543  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
 
   544  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
 
   545  BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
   546  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 
   547   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 
   548  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
 
   549  THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.