BROKEN COMMIT. Majority of code to handle closing has been added. Listeners do not close correctly.
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F206
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
39 kMsgFlag_Meta = 0x0100
46 kMsgProfile_Bye = "Bye"
49 log = logging.getLogger('BLIP')
53 class MessageException(Exception):
56 class ConnectionException(Exception):
60 ### LISTENER AND CONNECTION CLASSES:
63 class Listener (asyncore.dispatcher):
64 "BLIP listener/server class"
66 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
67 "Create a listener on a port"
68 asyncore.dispatcher.__init__(self)
69 self.onConnected = self.onRequest = None
70 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
71 self.bind( ('',port) )
73 self.sslKeyFile=sslKeyFile
74 self.sslCertFile=sslCertFile
75 log.info("Listening on port %u", port)
77 def handle_accept( self ):
78 socket,address = self.accept()
80 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
81 conn = Connection(address, sock=socket, listener=self)
82 conn.onRequest = self.onRequest
84 self.onConnected(conn)
86 def handle_error(self):
87 (typ,val,trace) = sys.exc_info()
88 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
93 class Connection (asynchat.async_chat):
94 def __init__( self, address, sock=None, listener=None, ssl=None ):
95 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
96 "otherwise it'll open a new outgoing socket."
98 asynchat.async_chat.__init__(self,sock)
99 log.info("Accepted connection from %s",address)
102 asynchat.async_chat.__init__(self)
103 log.info("Opening connection to %s",address)
104 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.status = kOpening
108 self.connect(address)
109 self.address = address
110 self.listener = listener
111 self.onRequest = self.onCloseRequest = self.onCloseRefused = None
112 self.pendingRequests = {}
113 self.pendingResponses = {}
115 self.inMessage = None
116 self.inNumRequests = self.outNumRequests = 0
119 self._closeWhenPossible = False
121 def handle_connect(self):
122 log.info("Connection open!")
125 def handle_error(self):
126 (typ,val,trace) = sys.exc_info()
127 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
128 self.discard_buffers()
129 self.status = kDisconnected
137 return self.status==kOpening or self.status==kOpen
141 return self.isOpen and not self._closeWhenPossible
143 def _sendMessage(self, msg):
145 self._outQueueMessage(msg,True)
147 log.debug("Waking up the output stream")
149 self.push_with_producer(self)
154 def _sendRequest(self, req):
156 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
157 response = req.response
159 response.requestNo = requestNo
160 self.pendingResponses[requestNo] = response
161 log.debug("pendingResponses[%i] := %s",requestNo,response)
162 return self._sendMessage(req)
166 def _outQueueMessage(self, msg,isNew=True):
169 if msg.urgent and n>1:
171 otherMsg = self.outBox[index-1]
176 elif isNew and otherMsg.bytesSent==0:
182 self.outBox.insert(index,msg)
184 log.info("Queuing %s at index %i",msg,index)
186 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
191 msg = self.outBox.pop(0)
193 if msg.urgent or n==1 or not self.outBox[0].urgent:
195 data = msg._sendNextFrame(frameSize)
197 self._outQueueMessage(msg,isNew=False)
199 log.info("Finished sending %s",msg)
202 log.debug("Nothing more to send")
209 def collect_incoming_data(self, data):
210 if self.expectingHeader:
211 if self.inHeader==None:
214 self.inHeader += data
216 self.inMessage._receivedData(data)
218 def found_terminator(self):
219 if self.expectingHeader:
221 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
223 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
224 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
225 frameLen -= kFrameHeaderSize
226 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
227 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
228 self.inMessage = self._inMessageForFrame(requestNo,flags)
231 self.expectingHeader = False
232 self.set_terminator(frameLen)
237 # Got the frame's payload:
240 def _inMessageForFrame(self, requestNo,flags):
242 msgType = flags & kMsgFlag_TypeMask
243 if msgType==kMsgType_Request:
244 message = self.pendingRequests.get(requestNo)
245 if message==None and requestNo == self.inNumRequests+1:
246 message = IncomingRequest(self,requestNo,flags)
248 self.pendingRequests[requestNo] = message
249 self.inNumRequests += 1
250 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
251 message = self.pendingResponses.get(requestNo)
252 message._updateFlags(flags)
255 message._beginFrame(flags)
257 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
260 def _endOfFrame(self):
262 self.inMessage = None
263 self.expectingHeader = True
265 self.set_terminator(kFrameHeaderSize) # wait for binary header
267 log.debug("End of frame of %s",msg)
268 if not msg._moreComing:
269 self._receivedMessage(msg)
271 def _receivedMessage(self, msg):
272 log.info("Received: %s",msg)
273 # Remove from pending:
275 del self.pendingResponses[msg.requestNo]
277 del self.pendingRequests[msg.requestNo]
281 if not msg.isResponse:
283 self._dispatchMetaRequest(msg)
287 log.error("Exception handling incoming message: %s", traceback.format_exc())
288 #FIX: Send an error reply
289 # Check to see if we're done and ready to close:
292 def _dispatchMetaRequest(self, request):
293 """Handles dispatching internal meta requests."""
294 if request['Profile'] == kMsgProfile_Bye:
296 if self.onCloseRequest:
297 shouldClose = self.onCloseRequest()
299 log.debug("Sending resfusal to close...")
300 response = request.response
301 response.isError = True
302 response['Error-Domain'] = "BLIP"
303 response['Error-Code'] = 403
304 response.body = "Close request denied"
307 log.debug("Sending permission to close...")
308 response = request.response
311 response = request.response
312 response.isError = True
313 response['Error-Domain'] = "BLIP"
314 response['Error-Code'] = 404
315 response.body = "Unknown meta profile"
321 """Publicly callable close method. Sends close request to peer."""
322 if self.status != kOpen:
324 log.info("Sending close request...")
325 req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
327 req.response.onComplete = self._handleCloseResponse
329 log.error("Error sending close request.")
332 self.status = kClosing
335 def _handleCloseResponse(self, response):
336 """Called when we receive a response to a close request."""
337 log.info("Received close response.")
339 # remote refused to close
340 if self.onCloseRefused:
341 self.onCloseRefused(response)
344 # now wait until everything has finished sending, then actually close
345 log.info("No refusal, actually closing...")
346 self._closeWhenPossible = True
348 def _closeIfReady(self):
349 """Checks if all transmissions are complete and then closes the actual socket."""
350 if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
351 # self._closeWhenPossible = False
352 log.debug("_closeIfReady closing.")
353 asynchat.async_chat.close(self)
355 def handle_close(self):
356 """Called when the socket actually closes."""
357 log.info("Connection closed!")
358 self.pendingRequests = self.pendingResponses = None
360 if self.status == kClosing:
361 self.status = kClosed
363 self.status = kDisconnected
369 class Message (object):
370 "Abstract superclass of all request/response objects"
372 def __init__(self, connection, body=None, properties=None):
373 self.connection = connection
375 self.properties = properties or {}
376 self.requestNo = None
382 flags = kMsgType_Error
384 flags = kMsgType_Response
386 flags = kMsgType_Request
387 if self.urgent: flags |= kMsgFlag_Urgent
388 if self.compressed: flags |= kMsgFlag_Compressed
389 if self.noReply: flags |= kMsgFlag_NoReply
390 if self._moreComing:flags |= kMsgFlag_MoreComing
391 if self._meta: flags |= kMsgFlag_Meta
395 s = "%s[" %(type(self).__name__)
396 if self.requestNo != None:
397 s += "#%i" %self.requestNo
398 if self.urgent: s += " URG"
399 if self.compressed: s += " CMP"
400 if self.noReply: s += " NOR"
401 if self._moreComing:s += " MOR"
402 if self._meta: s += " MET"
403 if self.body: s += " %i bytes" %len(self.body)
408 if len(self.properties): s += repr(self.properties)
412 def isResponse(self):
413 "Is this message a response?"
417 def contentType(self):
418 return self.properties.get('Content-Type')
420 def __getitem__(self, key): return self.properties.get(key)
421 def __contains__(self, key): return key in self.properties
422 def __len__(self): return len(self.properties)
423 def __nonzero__(self): return True
424 def __iter__(self): return self.properties.__iter__()
427 class IncomingMessage (Message):
428 "Abstract superclass of incoming messages."
430 def __init__(self, connection, requestNo, flags):
431 super(IncomingMessage,self).__init__(connection)
432 self.requestNo = requestNo
433 self._updateFlags(flags)
436 def _updateFlags(self, flags):
437 self.urgent = (flags & kMsgFlag_Urgent) != 0
438 self.compressed = (flags & kMsgFlag_Compressed) != 0
439 self.noReply = (flags & kMsgFlag_NoReply) != 0
440 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
441 self._meta = (flags & kMsgFlag_Meta) != 0
442 self.isError = (flags & kMsgType_Error) != 0
444 def _beginFrame(self, flags):
445 """Received a frame header."""
446 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
448 def _receivedData(self, data):
449 """Received data from a frame."""
450 self.frames.append(data)
453 """The entire message has been received; now decode it."""
454 encoded = "".join(self.frames)
457 # Decode the properties:
458 if len(encoded) < 2: raise MessageException, "missing properties length"
459 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
460 if propSize>len(encoded): raise MessageException, "properties too long to fit"
461 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
464 proplist = encoded[2:propSize-1].split('\000')
466 if len(proplist) & 1: raise MessageException, "odd number of property strings"
467 for i in xrange(0,len(proplist),2):
470 str = IncomingMessage.__expandDict.get(str,str)
472 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
474 encoded = encoded[propSize:]
476 if self.compressed and len(encoded)>0:
478 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
480 raise MessageException, sys.exc_info()[1]
483 __expandDict= {'\x01' : "Content-Type",
485 '\x03' : "application/octet-stream",
486 '\x04' : "text/plain; charset=UTF-8",
488 '\x06' : "text/yaml",
490 '\x08' : "Error-Code",
491 '\x09' : "Error-Domain"}
494 class OutgoingMessage (Message):
495 "Abstract superclass of outgoing requests/responses."
497 def __init__(self, connection, body=None, properties=None):
498 Message.__init__(self,connection,body,properties)
499 self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
500 self._moreComing = True
502 def __setitem__(self, key,val):
503 self.properties[key] = val
504 def __delitem__(self, key):
505 del self.properties[key]
509 return hasattr(self,'encoded')
512 "Generates the message's encoded form, prior to sending it."
514 for (key,value) in self.properties.iteritems():
515 def _writePropString(s):
516 out.write(str(s)) #FIX: Abbreviate
518 _writePropString(key)
519 _writePropString(value)
520 propertiesSize = out.tell()
521 assert propertiesSize<65536 #FIX: Return an error instead
523 body = self.body or ""
525 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
526 out.write(z.compress(body))
530 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
532 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
535 def _sendNextFrame(self, maxLen):
537 payload = self.encoded[pos:pos+maxLen]
539 self._moreComing = (pos < len(self.encoded))
540 if not self._moreComing:
542 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
544 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
547 kFrameHeaderSize+len(payload))
549 return header + payload
552 class Request (object):
555 "The response object for this request."
558 r = self.__dict__.get('_response')
560 r = self._response = self._createResponse()
564 class Response (Message):
565 def _setRequest(self, request):
566 assert not request.noReply
567 self.request = request
568 self.requestNo = request.requestNo
569 self.urgent = request.urgent
572 def isResponse(self):
576 class IncomingRequest (IncomingMessage, Request):
577 def _createResponse(self):
578 return OutgoingResponse(self)
581 class OutgoingRequest (OutgoingMessage, Request):
582 def _createResponse(self):
583 return IncomingResponse(self)
587 return self.connection._sendRequest(self) and self.response
590 class IncomingResponse (IncomingMessage, Response):
591 def __init__(self, request):
592 IncomingMessage.__init__(self,request.connection,None,0)
593 self._setRequest(request)
594 self.onComplete = None
597 super(IncomingResponse,self)._finished()
600 self.onComplete(self)
602 log.error("Exception dispatching response: %s", traceback.format_exc())
605 class OutgoingResponse (OutgoingMessage, Response):
606 def __init__(self, request):
607 OutgoingMessage.__init__(self,request.connection)
608 self._setRequest(request)
612 return self.connection._sendMessage(self)
616 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
618 Redistribution and use in source and binary forms, with or without modification, are permitted
619 provided that the following conditions are met:
621 * Redistributions of source code must retain the above copyright notice, this list of conditions
622 and the following disclaimer.
623 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
624 and the following disclaimer in the documentation and/or other materials provided with the
627 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
628 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
629 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
630 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
631 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
632 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
633 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
634 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.