Made C99 project default.
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F206
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
39 kMsgFlag_Meta = 0x0100
46 kMsgProfile_Bye = "Bye"
49 log = logging.getLogger('BLIP')
53 class MessageException(Exception):
56 class ConnectionException(Exception):
60 ### LISTENER AND CONNECTION CLASSES:
63 class Listener (asyncore.dispatcher):
64 "BLIP listener/server class"
66 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
67 "Create a listener on a port"
68 asyncore.dispatcher.__init__(self)
69 self.onConnected = self.onRequest = None
70 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
71 self.bind( ('',port) )
73 self.sslKeyFile=sslKeyFile
74 self.sslCertFile=sslCertFile
75 log.info("Listening on port %u", port)
77 def handle_accept( self ):
78 socket,address = self.accept()
80 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
81 conn = Connection(address, sock=socket, listener=self)
82 conn.onRequest = self.onRequest
84 self.onConnected(conn)
86 def handle_error(self):
87 (typ,val,trace) = sys.exc_info()
88 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
93 class Connection (asynchat.async_chat):
94 def __init__( self, address, sock=None, listener=None, ssl=None ):
95 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
96 "otherwise it'll open a new outgoing socket."
98 asynchat.async_chat.__init__(self,sock)
99 log.info("Accepted connection from %s",address)
102 asynchat.async_chat.__init__(self)
103 log.info("Opening connection to %s",address)
104 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.status = kOpening
108 self.connect(address)
109 self.address = address
110 self.listener = listener
111 self.onRequest = self.onCloseRequest = self.onCloseRefused = None
112 self.pendingRequests = {}
113 self.pendingResponses = {}
115 self.inMessage = None
116 self.inNumRequests = self.outNumRequests = 0
119 self._closeWhenPossible = False
121 def handle_connect(self):
122 log.info("Connection open!")
125 def handle_error(self):
126 (typ,val,trace) = sys.exc_info()
127 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
128 self.discard_buffers()
129 self.status = kDisconnected
137 return self.status==kOpening or self.status==kOpen
141 return self.isOpen and not self._closeWhenPossible
143 def _sendMessage(self, msg):
145 self._outQueueMessage(msg,True)
147 log.debug("Waking up the output stream")
149 self.push_with_producer(self)
154 def _sendRequest(self, req):
156 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
157 response = req.response
159 response.requestNo = requestNo
160 self.pendingResponses[requestNo] = response
161 log.debug("pendingResponses[%i] := %s",requestNo,response)
162 return self._sendMessage(req)
164 log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
167 def _outQueueMessage(self, msg,isNew=True):
170 if msg.urgent and n>1:
172 otherMsg = self.outBox[index-1]
177 elif isNew and otherMsg.bytesSent==0:
183 self.outBox.insert(index,msg)
185 log.info("Queuing %s at index %i",msg,index)
187 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
192 msg = self.outBox.pop(0)
194 if msg.urgent or n==1 or not self.outBox[0].urgent:
196 data = msg._sendNextFrame(frameSize)
198 self._outQueueMessage(msg,isNew=False)
200 log.info("Finished sending %s",msg)
203 log.debug("Nothing more to send")
210 def collect_incoming_data(self, data):
211 if self.expectingHeader:
212 if self.inHeader==None:
215 self.inHeader += data
217 self.inMessage._receivedData(data)
219 def found_terminator(self):
220 if self.expectingHeader:
222 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
224 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
225 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
226 frameLen -= kFrameHeaderSize
227 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
228 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
229 self.inMessage = self._inMessageForFrame(requestNo,flags)
232 self.expectingHeader = False
233 self.set_terminator(frameLen)
238 # Got the frame's payload:
241 def _inMessageForFrame(self, requestNo,flags):
243 msgType = flags & kMsgFlag_TypeMask
244 if msgType==kMsgType_Request:
245 message = self.pendingRequests.get(requestNo)
246 if message==None and requestNo == self.inNumRequests+1:
247 message = IncomingRequest(self,requestNo,flags)
249 self.pendingRequests[requestNo] = message
250 self.inNumRequests += 1
251 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
252 message = self.pendingResponses.get(requestNo)
253 message._updateFlags(flags)
256 message._beginFrame(flags)
258 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
261 def _endOfFrame(self):
263 self.inMessage = None
264 self.expectingHeader = True
266 self.set_terminator(kFrameHeaderSize) # wait for binary header
268 log.debug("End of frame of %s",msg)
269 if not msg._moreComing:
270 self._receivedMessage(msg)
272 def _receivedMessage(self, msg):
273 log.info("Received: %s",msg)
274 # Remove from pending:
276 del self.pendingResponses[msg.requestNo]
278 del self.pendingRequests[msg.requestNo]
282 if not msg.isResponse:
284 self._dispatchMetaRequest(msg)
288 log.error("Exception handling incoming message: %s", traceback.format_exc())
289 #FIX: Send an error reply
290 # Check to see if we're done and ready to close:
293 def _dispatchMetaRequest(self, request):
294 """Handles dispatching internal meta requests."""
295 if request['Profile'] == kMsgProfile_Bye:
296 self._handleCloseRequest(request)
298 response = request.response
299 response.isError = True
300 response['Error-Domain'] = "BLIP"
301 response['Error-Code'] = 404
302 response.body = "Unknown meta profile"
307 def _handleCloseRequest(self, request):
308 """Handles requests from a peer to close."""
310 if self.onCloseRequest:
311 shouldClose = self.onCloseRequest()
313 log.debug("Sending resfusal to close...")
314 response = request.response
315 response.isError = True
316 response['Error-Domain'] = "BLIP"
317 response['Error-Code'] = 403
318 response.body = "Close request denied"
321 log.debug("Sending permission to close...")
322 response = request.response
326 """Publicly callable close method. Sends close request to peer."""
327 if self.status != kOpen:
329 log.info("Sending close request...")
330 req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
332 req.response.onComplete = self._handleCloseResponse
334 log.error("Error sending close request.")
337 self.status = kClosing
340 def _handleCloseResponse(self, response):
341 """Called when we receive a response to a close request."""
342 log.info("Received close response.")
344 # remote refused to close
345 if self.onCloseRefused:
346 self.onCloseRefused(response)
349 # now wait until everything has finished sending, then actually close
350 log.info("No refusal, actually closing...")
351 self._closeWhenPossible = True
353 def _closeIfReady(self):
354 """Checks if all transmissions are complete and then closes the actual socket."""
355 if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
356 # self._closeWhenPossible = False
357 log.debug("_closeIfReady closing.")
358 asynchat.async_chat.close(self)
360 def handle_close(self):
361 """Called when the socket actually closes."""
362 log.info("Connection closed!")
363 self.pendingRequests = self.pendingResponses = None
365 if self.status == kClosing:
366 self.status = kClosed
368 self.status = kDisconnected
369 asyncore.dispatcher.close(self)
375 class Message (object):
376 "Abstract superclass of all request/response objects"
378 def __init__(self, connection, body=None, properties=None):
379 self.connection = connection
381 self.properties = properties or {}
382 self.requestNo = None
388 flags = kMsgType_Error
390 flags = kMsgType_Response
392 flags = kMsgType_Request
393 if self.urgent: flags |= kMsgFlag_Urgent
394 if self.compressed: flags |= kMsgFlag_Compressed
395 if self.noReply: flags |= kMsgFlag_NoReply
396 if self._moreComing:flags |= kMsgFlag_MoreComing
397 if self._meta: flags |= kMsgFlag_Meta
401 s = "%s[" %(type(self).__name__)
402 if self.requestNo != None:
403 s += "#%i" %self.requestNo
404 if self.urgent: s += " URG"
405 if self.compressed: s += " CMP"
406 if self.noReply: s += " NOR"
407 if self._moreComing:s += " MOR"
408 if self._meta: s += " MET"
409 if self.body: s += " %i bytes" %len(self.body)
414 if len(self.properties): s += repr(self.properties)
418 def isResponse(self):
419 "Is this message a response?"
423 def contentType(self):
424 return self.properties.get('Content-Type')
426 def __getitem__(self, key): return self.properties.get(key)
427 def __contains__(self, key): return key in self.properties
428 def __len__(self): return len(self.properties)
429 def __nonzero__(self): return True
430 def __iter__(self): return self.properties.__iter__()
433 class IncomingMessage (Message):
434 "Abstract superclass of incoming messages."
436 def __init__(self, connection, requestNo, flags):
437 super(IncomingMessage,self).__init__(connection)
438 self.requestNo = requestNo
439 self._updateFlags(flags)
442 def _updateFlags(self, flags):
443 self.urgent = (flags & kMsgFlag_Urgent) != 0
444 self.compressed = (flags & kMsgFlag_Compressed) != 0
445 self.noReply = (flags & kMsgFlag_NoReply) != 0
446 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
447 self._meta = (flags & kMsgFlag_Meta) != 0
448 self.isError = (flags & kMsgType_Error) != 0
450 def _beginFrame(self, flags):
451 """Received a frame header."""
452 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
454 def _receivedData(self, data):
455 """Received data from a frame."""
456 self.frames.append(data)
459 """The entire message has been received; now decode it."""
460 encoded = "".join(self.frames)
463 # Decode the properties:
464 if len(encoded) < 2: raise MessageException, "missing properties length"
465 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
466 if propSize>len(encoded): raise MessageException, "properties too long to fit"
467 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
470 proplist = encoded[2:propSize-1].split('\000')
472 if len(proplist) & 1: raise MessageException, "odd number of property strings"
473 for i in xrange(0,len(proplist),2):
476 str = IncomingMessage.__expandDict.get(str,str)
478 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
480 encoded = encoded[propSize:]
482 if self.compressed and len(encoded)>0:
484 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
486 raise MessageException, sys.exc_info()[1]
489 __expandDict= {'\x01' : "Content-Type",
491 '\x03' : "application/octet-stream",
492 '\x04' : "text/plain; charset=UTF-8",
494 '\x06' : "text/yaml",
496 '\x08' : "Error-Code",
497 '\x09' : "Error-Domain"}
500 class OutgoingMessage (Message):
501 "Abstract superclass of outgoing requests/responses."
503 def __init__(self, connection, body=None, properties=None):
504 Message.__init__(self,connection,body,properties)
505 self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
506 self._moreComing = True
508 def __setitem__(self, key,val):
509 self.properties[key] = val
510 def __delitem__(self, key):
511 del self.properties[key]
515 return hasattr(self,'encoded')
518 "Generates the message's encoded form, prior to sending it."
520 for (key,value) in self.properties.iteritems():
521 def _writePropString(s):
522 out.write(str(s)) #FIX: Abbreviate
524 _writePropString(key)
525 _writePropString(value)
526 propertiesSize = out.tell()
527 assert propertiesSize<65536 #FIX: Return an error instead
529 body = self.body or ""
531 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
532 out.write(z.compress(body))
536 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
538 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
541 def _sendNextFrame(self, maxLen):
543 payload = self.encoded[pos:pos+maxLen]
545 self._moreComing = (pos < len(self.encoded))
546 if not self._moreComing:
548 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
550 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
553 kFrameHeaderSize+len(payload))
555 return header + payload
558 class Request (object):
561 "The response object for this request."
564 r = self.__dict__.get('_response')
566 r = self._response = self._createResponse()
570 class Response (Message):
571 def _setRequest(self, request):
572 assert not request.noReply
573 self.request = request
574 self.requestNo = request.requestNo
575 self.urgent = request.urgent
578 def isResponse(self):
582 class IncomingRequest (IncomingMessage, Request):
583 def _createResponse(self):
584 return OutgoingResponse(self)
587 class OutgoingRequest (OutgoingMessage, Request):
588 def _createResponse(self):
589 return IncomingResponse(self)
593 return self.connection._sendRequest(self) and self.response
596 class IncomingResponse (IncomingMessage, Response):
597 def __init__(self, request):
598 IncomingMessage.__init__(self,request.connection,None,0)
599 self._setRequest(request)
600 self.onComplete = None
603 super(IncomingResponse,self)._finished()
606 self.onComplete(self)
608 log.error("Exception dispatching response: %s", traceback.format_exc())
611 class OutgoingResponse (OutgoingMessage, Response):
612 def __init__(self, request):
613 OutgoingMessage.__init__(self,request.connection)
614 self._setRequest(request)
618 return self.connection._sendMessage(self)
622 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
624 Redistribution and use in source and binary forms, with or without modification, are permitted
625 provided that the following conditions are met:
627 * Redistributions of source code must retain the above copyright notice, this list of conditions
628 and the following disclaimer.
629 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
630 and the following disclaimer in the documentation and/or other materials provided with the
633 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
634 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
635 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
636 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
637 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
638 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
639 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
640 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.