Connections opened by listeners now close correctly.
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F206
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
39 kMsgFlag_Meta = 0x0100
46 kMsgProfile_Bye = "Bye"
49 log = logging.getLogger('BLIP')
53 class MessageException(Exception):
56 class ConnectionException(Exception):
60 ### LISTENER AND CONNECTION CLASSES:
63 class Listener (asyncore.dispatcher):
64 "BLIP listener/server class"
66 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
67 "Create a listener on a port"
68 asyncore.dispatcher.__init__(self)
69 self.onConnected = self.onRequest = None
70 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
71 self.bind( ('',port) )
73 self.sslKeyFile=sslKeyFile
74 self.sslCertFile=sslCertFile
75 log.info("Listening on port %u", port)
77 def handle_accept( self ):
78 socket,address = self.accept()
80 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
81 conn = Connection(address, sock=socket, listener=self)
82 conn.onRequest = self.onRequest
84 self.onConnected(conn)
86 def handle_error(self):
87 (typ,val,trace) = sys.exc_info()
88 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
93 class Connection (asynchat.async_chat):
94 def __init__( self, address, sock=None, listener=None, ssl=None ):
95 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
96 "otherwise it'll open a new outgoing socket."
98 asynchat.async_chat.__init__(self,sock)
99 log.info("Accepted connection from %s",address)
102 asynchat.async_chat.__init__(self)
103 log.info("Opening connection to %s",address)
104 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.status = kOpening
108 self.connect(address)
109 self.address = address
110 self.listener = listener
111 self.onRequest = self.onCloseRequest = self.onCloseRefused = None
112 self.pendingRequests = {}
113 self.pendingResponses = {}
115 self.inMessage = None
116 self.inNumRequests = self.outNumRequests = 0
119 self._closeWhenPossible = False
121 def handle_connect(self):
122 log.info("Connection open!")
125 def handle_error(self):
126 (typ,val,trace) = sys.exc_info()
127 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
128 self.discard_buffers()
129 self.status = kDisconnected
137 return self.status==kOpening or self.status==kOpen
141 return self.isOpen and not self._closeWhenPossible
143 def _sendMessage(self, msg):
145 self._outQueueMessage(msg,True)
147 log.debug("Waking up the output stream")
149 self.push_with_producer(self)
154 def _sendRequest(self, req):
156 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
157 response = req.response
159 response.requestNo = requestNo
160 self.pendingResponses[requestNo] = response
161 log.debug("pendingResponses[%i] := %s",requestNo,response)
162 return self._sendMessage(req)
166 def _outQueueMessage(self, msg,isNew=True):
169 if msg.urgent and n>1:
171 otherMsg = self.outBox[index-1]
176 elif isNew and otherMsg.bytesSent==0:
182 self.outBox.insert(index,msg)
184 log.info("Queuing %s at index %i",msg,index)
186 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
191 msg = self.outBox.pop(0)
193 if msg.urgent or n==1 or not self.outBox[0].urgent:
195 data = msg._sendNextFrame(frameSize)
197 self._outQueueMessage(msg,isNew=False)
199 log.info("Finished sending %s",msg)
202 log.debug("Nothing more to send")
209 def collect_incoming_data(self, data):
210 if self.expectingHeader:
211 if self.inHeader==None:
214 self.inHeader += data
216 self.inMessage._receivedData(data)
218 def found_terminator(self):
219 if self.expectingHeader:
221 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
223 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
224 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
225 frameLen -= kFrameHeaderSize
226 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
227 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
228 self.inMessage = self._inMessageForFrame(requestNo,flags)
231 self.expectingHeader = False
232 self.set_terminator(frameLen)
237 # Got the frame's payload:
240 def _inMessageForFrame(self, requestNo,flags):
242 msgType = flags & kMsgFlag_TypeMask
243 if msgType==kMsgType_Request:
244 message = self.pendingRequests.get(requestNo)
245 if message==None and requestNo == self.inNumRequests+1:
246 message = IncomingRequest(self,requestNo,flags)
248 self.pendingRequests[requestNo] = message
249 self.inNumRequests += 1
250 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
251 message = self.pendingResponses.get(requestNo)
252 message._updateFlags(flags)
255 message._beginFrame(flags)
257 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
260 def _endOfFrame(self):
262 self.inMessage = None
263 self.expectingHeader = True
265 self.set_terminator(kFrameHeaderSize) # wait for binary header
267 log.debug("End of frame of %s",msg)
268 if not msg._moreComing:
269 self._receivedMessage(msg)
271 def _receivedMessage(self, msg):
272 log.info("Received: %s",msg)
273 # Remove from pending:
275 del self.pendingResponses[msg.requestNo]
277 del self.pendingRequests[msg.requestNo]
281 if not msg.isResponse:
283 self._dispatchMetaRequest(msg)
287 log.error("Exception handling incoming message: %s", traceback.format_exc())
288 #FIX: Send an error reply
289 # Check to see if we're done and ready to close:
292 def _dispatchMetaRequest(self, request):
293 """Handles dispatching internal meta requests."""
294 if request['Profile'] == kMsgProfile_Bye:
296 if self.onCloseRequest:
297 shouldClose = self.onCloseRequest()
299 log.debug("Sending resfusal to close...")
300 response = request.response
301 response.isError = True
302 response['Error-Domain'] = "BLIP"
303 response['Error-Code'] = 403
304 response.body = "Close request denied"
307 log.debug("Sending permission to close...")
308 response = request.response
311 response = request.response
312 response.isError = True
313 response['Error-Domain'] = "BLIP"
314 response['Error-Code'] = 404
315 response.body = "Unknown meta profile"
321 """Publicly callable close method. Sends close request to peer."""
322 if self.status != kOpen:
324 log.info("Sending close request...")
325 req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
327 req.response.onComplete = self._handleCloseResponse
329 log.error("Error sending close request.")
332 self.status = kClosing
335 def _handleCloseResponse(self, response):
336 """Called when we receive a response to a close request."""
337 log.info("Received close response.")
339 # remote refused to close
340 if self.onCloseRefused:
341 self.onCloseRefused(response)
344 # now wait until everything has finished sending, then actually close
345 log.info("No refusal, actually closing...")
346 self._closeWhenPossible = True
348 def _closeIfReady(self):
349 """Checks if all transmissions are complete and then closes the actual socket."""
350 if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
351 # self._closeWhenPossible = False
352 log.debug("_closeIfReady closing.")
353 asynchat.async_chat.close(self)
355 def handle_close(self):
356 """Called when the socket actually closes."""
357 log.info("Connection closed!")
358 self.pendingRequests = self.pendingResponses = None
360 if self.status == kClosing:
361 self.status = kClosed
363 self.status = kDisconnected
364 asyncore.dispatcher.close(self)
370 class Message (object):
371 "Abstract superclass of all request/response objects"
373 def __init__(self, connection, body=None, properties=None):
374 self.connection = connection
376 self.properties = properties or {}
377 self.requestNo = None
383 flags = kMsgType_Error
385 flags = kMsgType_Response
387 flags = kMsgType_Request
388 if self.urgent: flags |= kMsgFlag_Urgent
389 if self.compressed: flags |= kMsgFlag_Compressed
390 if self.noReply: flags |= kMsgFlag_NoReply
391 if self._moreComing:flags |= kMsgFlag_MoreComing
392 if self._meta: flags |= kMsgFlag_Meta
396 s = "%s[" %(type(self).__name__)
397 if self.requestNo != None:
398 s += "#%i" %self.requestNo
399 if self.urgent: s += " URG"
400 if self.compressed: s += " CMP"
401 if self.noReply: s += " NOR"
402 if self._moreComing:s += " MOR"
403 if self._meta: s += " MET"
404 if self.body: s += " %i bytes" %len(self.body)
409 if len(self.properties): s += repr(self.properties)
413 def isResponse(self):
414 "Is this message a response?"
418 def contentType(self):
419 return self.properties.get('Content-Type')
421 def __getitem__(self, key): return self.properties.get(key)
422 def __contains__(self, key): return key in self.properties
423 def __len__(self): return len(self.properties)
424 def __nonzero__(self): return True
425 def __iter__(self): return self.properties.__iter__()
428 class IncomingMessage (Message):
429 "Abstract superclass of incoming messages."
431 def __init__(self, connection, requestNo, flags):
432 super(IncomingMessage,self).__init__(connection)
433 self.requestNo = requestNo
434 self._updateFlags(flags)
437 def _updateFlags(self, flags):
438 self.urgent = (flags & kMsgFlag_Urgent) != 0
439 self.compressed = (flags & kMsgFlag_Compressed) != 0
440 self.noReply = (flags & kMsgFlag_NoReply) != 0
441 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
442 self._meta = (flags & kMsgFlag_Meta) != 0
443 self.isError = (flags & kMsgType_Error) != 0
445 def _beginFrame(self, flags):
446 """Received a frame header."""
447 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
449 def _receivedData(self, data):
450 """Received data from a frame."""
451 self.frames.append(data)
454 """The entire message has been received; now decode it."""
455 encoded = "".join(self.frames)
458 # Decode the properties:
459 if len(encoded) < 2: raise MessageException, "missing properties length"
460 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
461 if propSize>len(encoded): raise MessageException, "properties too long to fit"
462 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
465 proplist = encoded[2:propSize-1].split('\000')
467 if len(proplist) & 1: raise MessageException, "odd number of property strings"
468 for i in xrange(0,len(proplist),2):
471 str = IncomingMessage.__expandDict.get(str,str)
473 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
475 encoded = encoded[propSize:]
477 if self.compressed and len(encoded)>0:
479 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
481 raise MessageException, sys.exc_info()[1]
484 __expandDict= {'\x01' : "Content-Type",
486 '\x03' : "application/octet-stream",
487 '\x04' : "text/plain; charset=UTF-8",
489 '\x06' : "text/yaml",
491 '\x08' : "Error-Code",
492 '\x09' : "Error-Domain"}
495 class OutgoingMessage (Message):
496 "Abstract superclass of outgoing requests/responses."
498 def __init__(self, connection, body=None, properties=None):
499 Message.__init__(self,connection,body,properties)
500 self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
501 self._moreComing = True
503 def __setitem__(self, key,val):
504 self.properties[key] = val
505 def __delitem__(self, key):
506 del self.properties[key]
510 return hasattr(self,'encoded')
513 "Generates the message's encoded form, prior to sending it."
515 for (key,value) in self.properties.iteritems():
516 def _writePropString(s):
517 out.write(str(s)) #FIX: Abbreviate
519 _writePropString(key)
520 _writePropString(value)
521 propertiesSize = out.tell()
522 assert propertiesSize<65536 #FIX: Return an error instead
524 body = self.body or ""
526 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
527 out.write(z.compress(body))
531 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
533 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
536 def _sendNextFrame(self, maxLen):
538 payload = self.encoded[pos:pos+maxLen]
540 self._moreComing = (pos < len(self.encoded))
541 if not self._moreComing:
543 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
545 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
548 kFrameHeaderSize+len(payload))
550 return header + payload
553 class Request (object):
556 "The response object for this request."
559 r = self.__dict__.get('_response')
561 r = self._response = self._createResponse()
565 class Response (Message):
566 def _setRequest(self, request):
567 assert not request.noReply
568 self.request = request
569 self.requestNo = request.requestNo
570 self.urgent = request.urgent
573 def isResponse(self):
577 class IncomingRequest (IncomingMessage, Request):
578 def _createResponse(self):
579 return OutgoingResponse(self)
582 class OutgoingRequest (OutgoingMessage, Request):
583 def _createResponse(self):
584 return IncomingResponse(self)
588 return self.connection._sendRequest(self) and self.response
591 class IncomingResponse (IncomingMessage, Response):
592 def __init__(self, request):
593 IncomingMessage.__init__(self,request.connection,None,0)
594 self._setRequest(request)
595 self.onComplete = None
598 super(IncomingResponse,self)._finished()
601 self.onComplete(self)
603 log.error("Exception dispatching response: %s", traceback.format_exc())
606 class OutgoingResponse (OutgoingMessage, Response):
607 def __init__(self, request):
608 OutgoingMessage.__init__(self,request.connection)
609 self._setRequest(request)
613 return self.connection._sendMessage(self)
617 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
619 Redistribution and use in source and binary forms, with or without modification, are permitted
620 provided that the following conditions are met:
622 * Redistributions of source code must retain the above copyright notice, this list of conditions
623 and the following disclaimer.
624 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
625 and the following disclaimer in the documentation and/or other materials provided with the
628 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
629 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
630 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
631 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
632 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
633 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
634 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
635 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.