Fixed bug which caused PyBLIP to stop sending responses while the connection was closing.
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F206
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
39 kMsgFlag_Meta = 0x0100
46 kMsgProfile_Bye = "Bye"
49 class NullLoggingHandler(logging.Handler):
50 def emit(self, record):
53 log = logging.getLogger('BLIP')
54 # This line prevents the "No handlers found" warning if the calling code does not use logging.
55 log.addHandler(NullLoggingHandler())
59 class MessageException(Exception):
62 class ConnectionException(Exception):
66 ### LISTENER AND CONNECTION CLASSES:
69 class Listener (asyncore.dispatcher):
70 "BLIP listener/server class"
72 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
73 "Create a listener on a port"
74 asyncore.dispatcher.__init__(self)
75 self.onConnected = self.onRequest = None
76 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
77 self.bind( ('',port) )
79 self.sslKeyFile=sslKeyFile
80 self.sslCertFile=sslCertFile
81 log.info("Listening on port %u", port)
83 def handle_accept( self ):
84 socket,address = self.accept()
86 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
87 conn = Connection(address, sock=socket, listener=self)
88 conn.onRequest = self.onRequest
90 self.onConnected(conn)
92 def handle_error(self):
93 (typ,val,trace) = sys.exc_info()
94 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
99 class Connection (asynchat.async_chat):
100 def __init__( self, address, sock=None, listener=None, ssl=None ):
101 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
102 "otherwise it'll open a new outgoing socket."
104 asynchat.async_chat.__init__(self,sock)
105 log.info("Accepted connection from %s",address)
108 asynchat.async_chat.__init__(self)
109 log.info("Opening connection to %s",address)
110 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
111 self.status = kOpening
114 self.connect(address)
115 self.address = address
116 self.listener = listener
117 self.onRequest = self.onCloseRequest = self.onCloseRefused = None
118 self.pendingRequests = {}
119 self.pendingResponses = {}
121 self.inMessage = None
122 self.inNumRequests = self.outNumRequests = 0
125 self._closeWhenPossible = False
127 def handle_connect(self):
128 log.info("Connection open!")
131 def handle_error(self):
132 (typ,val,trace) = sys.exc_info()
133 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
134 self.discard_buffers()
135 self.status = kDisconnected
143 return self.status==kOpening or self.status==kOpen or self.status==kClosing
147 return self.isOpen and not self._closeWhenPossible
149 def _sendMessage(self, msg):
151 self._outQueueMessage(msg,True)
153 log.debug("Waking up the output stream")
155 self.push_with_producer(self)
160 def _sendRequest(self, req):
162 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
163 response = req.response
165 response.requestNo = requestNo
166 self.pendingResponses[requestNo] = response
167 log.debug("pendingResponses[%i] := %s",requestNo,response)
168 return self._sendMessage(req)
170 log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
173 def _outQueueMessage(self, msg,isNew=True):
176 if msg.urgent and n>1:
178 otherMsg = self.outBox[index-1]
183 elif isNew and otherMsg.bytesSent==0:
189 self.outBox.insert(index,msg)
191 log.info("Queuing %s at index %i",msg,index)
193 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
198 msg = self.outBox.pop(0)
200 if msg.urgent or n==1 or not self.outBox[0].urgent:
202 data = msg._sendNextFrame(frameSize)
204 self._outQueueMessage(msg,isNew=False)
206 log.info("Finished sending %s",msg)
209 log.debug("Nothing more to send")
216 def collect_incoming_data(self, data):
217 if self.expectingHeader:
218 if self.inHeader==None:
221 self.inHeader += data
223 self.inMessage._receivedData(data)
225 def found_terminator(self):
226 if self.expectingHeader:
228 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
230 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
231 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
232 frameLen -= kFrameHeaderSize
233 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
234 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
235 self.inMessage = self._inMessageForFrame(requestNo,flags)
238 self.expectingHeader = False
239 self.set_terminator(frameLen)
244 # Got the frame's payload:
247 def _inMessageForFrame(self, requestNo,flags):
249 msgType = flags & kMsgFlag_TypeMask
250 if msgType==kMsgType_Request:
251 message = self.pendingRequests.get(requestNo)
252 if message==None and requestNo == self.inNumRequests+1:
253 message = IncomingRequest(self,requestNo,flags)
255 self.pendingRequests[requestNo] = message
256 self.inNumRequests += 1
257 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
258 message = self.pendingResponses.get(requestNo)
259 message._updateFlags(flags)
262 message._beginFrame(flags)
264 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
267 def _endOfFrame(self):
269 self.inMessage = None
270 self.expectingHeader = True
272 self.set_terminator(kFrameHeaderSize) # wait for binary header
274 log.debug("End of frame of %s",msg)
275 if not msg._moreComing:
276 self._receivedMessage(msg)
278 def _receivedMessage(self, msg):
279 log.info("Received: %s",msg)
280 # Remove from pending:
282 del self.pendingResponses[msg.requestNo]
284 del self.pendingRequests[msg.requestNo]
288 if not msg.isResponse:
290 self._dispatchMetaRequest(msg)
293 if not msg.response.sent:
294 log.error("**** Request received, but a response was never sent! Request: %r", msg)
296 log.error("Exception handling incoming message: %s", traceback.format_exc())
297 #FIX: Send an error reply
298 # Check to see if we're done and ready to close:
301 def _dispatchMetaRequest(self, request):
302 """Handles dispatching internal meta requests."""
303 if request['Profile'] == kMsgProfile_Bye:
304 self._handleCloseRequest(request)
306 response = request.response
307 response.isError = True
308 response['Error-Domain'] = "BLIP"
309 response['Error-Code'] = 404
310 response.body = "Unknown meta profile"
315 def _handleCloseRequest(self, request):
316 """Handles requests from a peer to close."""
318 if self.onCloseRequest:
319 shouldClose = self.onCloseRequest()
321 log.debug("Sending resfusal to close...")
322 response = request.response
323 response.isError = True
324 response['Error-Domain'] = "BLIP"
325 response['Error-Code'] = 403
326 response.body = "Close request denied"
329 log.debug("Sending permission to close...")
330 response = request.response
334 """Publicly callable close method. Sends close request to peer."""
335 if self.status != kOpen:
337 log.info("Sending close request...")
338 req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
340 req.response.onComplete = self._handleCloseResponse
342 log.error("Error sending close request.")
345 self.status = kClosing
348 def _handleCloseResponse(self, response):
349 """Called when we receive a response to a close request."""
350 log.info("Received close response.")
352 # remote refused to close
353 if self.onCloseRefused:
354 self.onCloseRefused(response)
357 # now wait until everything has finished sending, then actually close
358 log.info("No refusal, actually closing...")
359 self._closeWhenPossible = True
361 def _closeIfReady(self):
362 """Checks if all transmissions are complete and then closes the actual socket."""
363 if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
364 # self._closeWhenPossible = False
365 log.debug("_closeIfReady closing.")
366 asynchat.async_chat.close(self)
368 def handle_close(self):
369 """Called when the socket actually closes."""
370 log.info("Connection closed!")
371 self.pendingRequests = self.pendingResponses = None
373 if self.status == kClosing:
374 self.status = kClosed
376 self.status = kDisconnected
377 asyncore.dispatcher.close(self)
383 class Message (object):
384 "Abstract superclass of all request/response objects"
386 def __init__(self, connection, body=None, properties=None):
387 self.connection = connection
389 self.properties = properties or {}
390 self.requestNo = None
396 flags = kMsgType_Error
398 flags = kMsgType_Response
400 flags = kMsgType_Request
401 if self.urgent: flags |= kMsgFlag_Urgent
402 if self.compressed: flags |= kMsgFlag_Compressed
403 if self.noReply: flags |= kMsgFlag_NoReply
404 if self._moreComing:flags |= kMsgFlag_MoreComing
405 if self._meta: flags |= kMsgFlag_Meta
409 s = "%s[" %(type(self).__name__)
410 if self.requestNo != None:
411 s += "#%i" %self.requestNo
412 if self.urgent: s += " URG"
413 if self.compressed: s += " CMP"
414 if self.noReply: s += " NOR"
415 if self._moreComing:s += " MOR"
416 if self._meta: s += " MET"
417 if self.body: s += " %i bytes" %len(self.body)
422 if len(self.properties): s += repr(self.properties)
426 def isResponse(self):
427 "Is this message a response?"
431 def contentType(self):
432 return self.properties.get('Content-Type')
434 def __getitem__(self, key): return self.properties.get(key)
435 def __contains__(self, key): return key in self.properties
436 def __len__(self): return len(self.properties)
437 def __nonzero__(self): return True
438 def __iter__(self): return self.properties.__iter__()
441 class IncomingMessage (Message):
442 "Abstract superclass of incoming messages."
444 def __init__(self, connection, requestNo, flags):
445 super(IncomingMessage,self).__init__(connection)
446 self.requestNo = requestNo
447 self._updateFlags(flags)
450 def _updateFlags(self, flags):
451 self.urgent = (flags & kMsgFlag_Urgent) != 0
452 self.compressed = (flags & kMsgFlag_Compressed) != 0
453 self.noReply = (flags & kMsgFlag_NoReply) != 0
454 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
455 self._meta = (flags & kMsgFlag_Meta) != 0
456 self.isError = (flags & kMsgType_Error) != 0
458 def _beginFrame(self, flags):
459 """Received a frame header."""
460 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
462 def _receivedData(self, data):
463 """Received data from a frame."""
464 self.frames.append(data)
467 """The entire message has been received; now decode it."""
468 encoded = "".join(self.frames)
471 # Decode the properties:
472 if len(encoded) < 2: raise MessageException, "missing properties length"
473 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
474 if propSize>len(encoded): raise MessageException, "properties too long to fit"
475 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
478 proplist = encoded[2:propSize-1].split('\000')
480 if len(proplist) & 1: raise MessageException, "odd number of property strings"
481 for i in xrange(0,len(proplist),2):
484 str = IncomingMessage.__expandDict.get(str,str)
486 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
488 encoded = encoded[propSize:]
490 if self.compressed and len(encoded)>0:
492 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
494 raise MessageException, sys.exc_info()[1]
497 __expandDict= {'\x01' : "Content-Type",
499 '\x03' : "application/octet-stream",
500 '\x04' : "text/plain; charset=UTF-8",
502 '\x06' : "text/yaml",
504 '\x08' : "Error-Code",
505 '\x09' : "Error-Domain"}
508 class OutgoingMessage (Message):
509 "Abstract superclass of outgoing requests/responses."
511 def __init__(self, connection, body=None, properties=None):
512 Message.__init__(self,connection,body,properties)
513 self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
514 self._moreComing = True
516 def __setitem__(self, key,val):
517 self.properties[key] = val
518 def __delitem__(self, key):
519 del self.properties[key]
523 return hasattr(self,'encoded')
526 "Generates the message's encoded form, prior to sending it."
528 for (key,value) in self.properties.iteritems():
529 def _writePropString(s):
530 out.write(str(s)) #FIX: Abbreviate
532 _writePropString(key)
533 _writePropString(value)
534 propertiesSize = out.tell()
535 assert propertiesSize<65536 #FIX: Return an error instead
537 body = self.body or ""
539 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
540 out.write(z.compress(body))
544 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
546 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
549 def _sendNextFrame(self, maxLen):
551 payload = self.encoded[pos:pos+maxLen]
553 self._moreComing = (pos < len(self.encoded))
554 if not self._moreComing:
556 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
558 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
561 kFrameHeaderSize+len(payload))
563 return header + payload
566 class Request (object):
569 "The response object for this request."
572 r = self.__dict__.get('_response')
574 r = self._response = self._createResponse()
578 class Response (Message):
579 def _setRequest(self, request):
580 assert not request.noReply
581 self.request = request
582 self.requestNo = request.requestNo
583 self.urgent = request.urgent
586 def isResponse(self):
590 class IncomingRequest (IncomingMessage, Request):
591 def _createResponse(self):
592 return OutgoingResponse(self)
595 class OutgoingRequest (OutgoingMessage, Request):
596 def _createResponse(self):
597 return IncomingResponse(self)
601 return self.connection._sendRequest(self) and self.response
604 class IncomingResponse (IncomingMessage, Response):
605 def __init__(self, request):
606 IncomingMessage.__init__(self,request.connection,None,0)
607 self._setRequest(request)
608 self.onComplete = None
611 super(IncomingResponse,self)._finished()
614 self.onComplete(self)
616 log.error("Exception dispatching response: %s", traceback.format_exc())
619 class OutgoingResponse (OutgoingMessage, Response):
620 def __init__(self, request):
621 OutgoingMessage.__init__(self,request.connection)
622 self._setRequest(request)
626 return self.connection._sendMessage(self)
630 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
632 Redistribution and use in source and binary forms, with or without modification, are permitted
633 provided that the following conditions are met:
635 * Redistributions of source code must retain the above copyright notice, this list of conditions
636 and the following disclaimer.
637 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
638 and the following disclaimer in the documentation and/or other materials provided with the
641 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
642 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
643 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
644 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
645 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
646 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
647 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
648 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.