Fixed two CF memory leaks. (Fixes issue #5)
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F205
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
45 log = logging.getLogger('BLIP')
49 class MessageException(Exception):
52 class ConnectionException(Exception):
56 ### LISTENER AND CONNECTION CLASSES:
59 class Listener (asyncore.dispatcher):
60 "BLIP listener/server class"
62 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
63 "Create a listener on a port"
64 asyncore.dispatcher.__init__(self)
65 self.onConnected = self.onRequest = None
66 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
67 self.bind( ('',port) )
69 self.sslKeyFile=sslKeyFile
70 self.sslCertFile=sslCertFile
71 log.info("Listening on port %u", port)
73 def handle_accept( self ):
74 socket,address = self.accept()
76 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
77 conn = Connection(address, sock=socket, listener=self)
78 conn.onRequest = self.onRequest
80 self.onConnected(conn)
82 def handle_error(self):
83 (typ,val,trace) = sys.exc_info()
84 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
89 class Connection (asynchat.async_chat):
90 def __init__( self, address, sock=None, listener=None, ssl=None ):
91 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
92 "otherwise it'll open a new outgoing socket."
94 asynchat.async_chat.__init__(self,sock)
95 log.info("Accepted connection from %s",address)
98 asynchat.async_chat.__init__(self)
99 log.info("Opening connection to %s",address)
100 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
101 self.status = kOpening
104 self.connect(address)
105 self.address = address
106 self.listener = listener
107 self.onRequest = None
108 self.pendingRequests = {}
109 self.pendingResponses = {}
111 self.inMessage = None
112 self.inNumRequests = self.outNumRequests = 0
117 if self.status > kClosed:
118 self.status = kClosing
119 log.info("Connection closing...")
120 asynchat.async_chat.close(self)
122 def handle_connect(self):
123 log.info("Connection open!")
126 def handle_error(self):
127 (typ,val,trace) = sys.exc_info()
128 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
129 self.discard_buffers()
130 self.status = kDisconnected
133 def handle_close(self):
134 log.info("Connection closed!")
135 self.pendingRequests = self.pendingResponses = None
137 if self.status == kClosing:
138 self.status = kClosed
140 self.status = kDisconnected
141 asynchat.async_chat.handle_close(self)
148 return self.status==kOpening or self.status==kOpen
150 def _sendMessage(self, msg):
152 self._outQueueMessage(msg,True)
154 log.debug("Waking up the output stream")
156 self.push_with_producer(self)
161 def _sendRequest(self, req):
163 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
164 response = req.response
166 response.requestNo = requestNo
167 self.pendingResponses[requestNo] = response
168 log.debug("pendingResponses[%i] := %s",requestNo,response)
169 return self._sendMessage(req)
173 def _outQueueMessage(self, msg,isNew=True):
176 if msg.urgent and n>1:
178 otherMsg = self.outBox[index-1]
183 elif isNew and otherMsg.bytesSent==0:
189 self.outBox.insert(index,msg)
191 log.info("Queuing %s at index %i",msg,index)
193 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
198 msg = self.outBox.pop(0)
200 if msg.urgent or n==1 or not self.outBox[0].urgent:
202 data = msg._sendNextFrame(frameSize)
204 self._outQueueMessage(msg,isNew=False)
206 log.info("Finished sending %s",msg)
209 log.debug("Nothing more to send")
215 def collect_incoming_data(self, data):
216 if self.expectingHeader:
217 if self.inHeader==None:
220 self.inHeader += data
222 self.inMessage._receivedData(data)
224 def found_terminator(self):
225 if self.expectingHeader:
227 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
229 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
230 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
231 frameLen -= kFrameHeaderSize
232 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
233 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
234 self.inMessage = self._inMessageForFrame(requestNo,flags)
237 self.expectingHeader = False
238 self.set_terminator(frameLen)
243 # Got the frame's payload:
246 def _inMessageForFrame(self, requestNo,flags):
248 msgType = flags & kMsgFlag_TypeMask
249 if msgType==kMsgType_Request:
250 message = self.pendingRequests.get(requestNo)
251 if message==None and requestNo == self.inNumRequests+1:
252 message = IncomingRequest(self,requestNo,flags)
254 self.pendingRequests[requestNo] = message
255 self.inNumRequests += 1
256 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
257 message = self.pendingResponses.get(requestNo)
260 message._beginFrame(flags)
262 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
265 def _endOfFrame(self):
267 self.inMessage = None
268 self.expectingHeader = True
270 self.set_terminator(kFrameHeaderSize) # wait for binary header
272 log.debug("End of frame of %s",msg)
273 if not msg._moreComing:
274 self._receivedMessage(msg)
276 def _receivedMessage(self, msg):
277 log.info("Received: %s",msg)
278 # Remove from pending:
280 del self.pendingResponses[msg.requestNo]
282 del self.pendingRequests[msg.requestNo]
286 if not msg.isResponse:
289 log.error("Exception handling incoming message: %s", traceback.format_exc())
290 #FIX: Send an error reply
296 class Message (object):
297 "Abstract superclass of all request/response objects"
299 def __init__(self, connection, body=None, properties=None):
300 self.connection = connection
302 self.properties = properties or {}
303 self.requestNo = None
308 flags = kMsgType_Response
310 flags = kMsgType_Request
311 if self.urgent: flags |= kMsgFlag_Urgent
312 if self.compressed: flags |= kMsgFlag_Compressed
313 if self.noReply: flags |= kMsgFlag_NoReply
314 if self._moreComing:flags |= kMsgFlag_MoreComing
318 s = "%s[" %(type(self).__name__)
319 if self.requestNo != None:
320 s += "#%i" %self.requestNo
321 if self.urgent: s += " URG"
322 if self.compressed: s += " CMP"
323 if self.noReply: s += " NOR"
324 if self._moreComing:s += " MOR"
325 if self.body: s += " %i bytes" %len(self.body)
330 if len(self.properties): s += repr(self.properties)
334 def isResponse(self):
335 "Is this message a response?"
339 def contentType(self):
340 return self.properties.get('Content-Type')
342 def __getitem__(self, key): return self.properties.get(key)
343 def __contains__(self, key): return key in self.properties
344 def __len__(self): return len(self.properties)
345 def __nonzero__(self): return True
346 def __iter__(self): return self.properties.__iter__()
349 class IncomingMessage (Message):
350 "Abstract superclass of incoming messages."
352 def __init__(self, connection, requestNo, flags):
353 super(IncomingMessage,self).__init__(connection)
354 self.requestNo = requestNo
355 self.urgent = (flags & kMsgFlag_Urgent) != 0
356 self.compressed = (flags & kMsgFlag_Compressed) != 0
357 self.noReply = (flags & kMsgFlag_NoReply) != 0
358 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
361 def _beginFrame(self, flags):
362 """Received a frame header."""
363 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
365 def _receivedData(self, data):
366 """Received data from a frame."""
367 self.frames.append(data)
370 """The entire message has been received; now decode it."""
371 encoded = "".join(self.frames)
374 # Decode the properties:
375 if len(encoded) < 2: raise MessageException, "missing properties length"
376 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
377 if propSize>len(encoded): raise MessageException, "properties too long to fit"
378 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
380 proplist = encoded[2:propSize-1].split('\000')
381 encoded = encoded[propSize:]
382 if len(proplist) & 1: raise MessageException, "odd number of property strings"
383 for i in xrange(0,len(proplist),2):
386 str = IncomingMessage.__expandDict.get(str,str)
388 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
391 if self.compressed and len(encoded)>0:
393 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
395 raise MessageException, sys.exc_info()[1]
398 __expandDict= {'\x01' : "Content-Type",
400 '\x03' : "application/octet-stream",
401 '\x04' : "text/plain; charset=UTF-8",
403 '\x06' : "text/yaml",
405 '\x08' : "Error-Code",
406 '\x09' : "Error-Domain"}
409 class OutgoingMessage (Message):
410 "Abstract superclass of outgoing requests/responses."
412 def __init__(self, connection, body=None, properties=None):
413 Message.__init__(self,connection,body,properties)
414 self.urgent = self.compressed = self.noReply = False
415 self._moreComing = True
417 def __setitem__(self, key,val):
418 self.properties[key] = val
419 def __delitem__(self, key):
420 del self.properties[key]
424 return hasattr(self,'encoded')
427 "Generates the message's encoded form, prior to sending it."
429 for (key,value) in self.properties.iteritems():
430 def _writePropString(s):
431 out.write(str(s)) #FIX: Abbreviate
433 _writePropString(key)
434 _writePropString(value)
435 propertiesSize = out.tell()
436 assert propertiesSize<65536 #FIX: Return an error instead
440 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
441 out.write(z.compress(body))
445 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
447 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
450 def _sendNextFrame(self, maxLen):
452 payload = self.encoded[pos:pos+maxLen]
454 self._moreComing = (pos < len(self.encoded))
455 if not self._moreComing:
457 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
459 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
462 kFrameHeaderSize+len(payload))
464 return header + payload
467 class Request (object):
470 "The response object for this request."
473 r = self.__dict__.get('_response')
475 r = self._response = self._createResponse()
479 class Response (Message):
480 def _setRequest(self, request):
481 assert not request.noReply
482 self.request = request
483 self.requestNo = request.requestNo
484 self.urgent = request.urgent
487 def isResponse(self):
491 class IncomingRequest (IncomingMessage, Request):
492 def _createResponse(self):
493 return OutgoingResponse(self)
496 class OutgoingRequest (OutgoingMessage, Request):
497 def _createResponse(self):
498 return IncomingResponse(self)
502 return self.connection._sendRequest(self) and self.response
505 class IncomingResponse (IncomingMessage, Response):
506 def __init__(self, request):
507 IncomingMessage.__init__(self,request.connection,None,0)
508 self._setRequest(request)
509 self.onComplete = None
512 super(IncomingResponse,self)._finished()
515 self.onComplete(self)
517 log.error("Exception dispatching response: %s", traceback.format_exc())
520 class OutgoingResponse (OutgoingMessage, Response):
521 def __init__(self, request):
522 OutgoingMessage.__init__(self,request.connection)
523 self._setRequest(request)
527 return self.connection._sendMessage(self)
531 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
533 Redistribution and use in source and binary forms, with or without modification, are permitted
534 provided that the following conditions are met:
536 * Redistributions of source code must retain the above copyright notice, this list of conditions
537 and the following disclaimer.
538 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
539 and the following disclaimer in the documentation and/or other materials provided with the
542 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
543 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
544 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
545 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
546 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
547 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
548 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
549 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.