Removed unnecessary files. Toned down logging. Added null logging handler to BLIP so client code doesn't have to use logging. Modified test drivers to work against Cocoa versions.
5 Created by Jens Alfke on 2008-06-03.
6 Copyright notice and BSD license at end of file.
11 from cStringIO import StringIO
20 # Connection status enumeration:
28 # INTERNAL CONSTANTS -- NO TOUCHIES!
30 kFrameMagicNumber = 0x9B34F206
31 kFrameHeaderFormat = '!LLHH'
34 kMsgFlag_TypeMask = 0x000F
35 kMsgFlag_Compressed = 0x0010
36 kMsgFlag_Urgent = 0x0020
37 kMsgFlag_NoReply = 0x0040
38 kMsgFlag_MoreComing = 0x0080
39 kMsgFlag_Meta = 0x0100
46 kMsgProfile_Bye = "Bye"
49 class NullLoggingHandler(logging.Handler):
50 def emit(self, record):
53 log = logging.getLogger('BLIP')
54 # This line prevents the "No handlers found" warning if the calling code does not use logging.
55 log.addHandler(NullLoggingHandler())
59 class MessageException(Exception):
62 class ConnectionException(Exception):
66 ### LISTENER AND CONNECTION CLASSES:
69 class Listener (asyncore.dispatcher):
70 "BLIP listener/server class"
72 def __init__(self, port, sslKeyFile=None, sslCertFile=None):
73 "Create a listener on a port"
74 asyncore.dispatcher.__init__(self)
75 self.onConnected = self.onRequest = None
76 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
77 self.bind( ('',port) )
79 self.sslKeyFile=sslKeyFile
80 self.sslCertFile=sslCertFile
81 log.info("Listening on port %u", port)
83 def handle_accept( self ):
84 socket,address = self.accept()
86 socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
87 conn = Connection(address, sock=socket, listener=self)
88 conn.onRequest = self.onRequest
90 self.onConnected(conn)
92 def handle_error(self):
93 (typ,val,trace) = sys.exc_info()
94 log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
99 class Connection (asynchat.async_chat):
100 def __init__( self, address, sock=None, listener=None, ssl=None ):
101 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
102 "otherwise it'll open a new outgoing socket."
104 asynchat.async_chat.__init__(self,sock)
105 log.info("Accepted connection from %s",address)
108 asynchat.async_chat.__init__(self)
109 log.info("Opening connection to %s",address)
110 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
111 self.status = kOpening
114 self.connect(address)
115 self.address = address
116 self.listener = listener
117 self.onRequest = self.onCloseRequest = self.onCloseRefused = None
118 self.pendingRequests = {}
119 self.pendingResponses = {}
121 self.inMessage = None
122 self.inNumRequests = self.outNumRequests = 0
125 self._closeWhenPossible = False
127 def handle_connect(self):
128 log.info("Connection open!")
131 def handle_error(self):
132 (typ,val,trace) = sys.exc_info()
133 log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
134 self.discard_buffers()
135 self.status = kDisconnected
143 return self.status==kOpening or self.status==kOpen
147 return self.isOpen and not self._closeWhenPossible
149 def _sendMessage(self, msg):
151 self._outQueueMessage(msg,True)
153 log.debug("Waking up the output stream")
155 self.push_with_producer(self)
160 def _sendRequest(self, req):
162 requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
163 response = req.response
165 response.requestNo = requestNo
166 self.pendingResponses[requestNo] = response
167 log.debug("pendingResponses[%i] := %s",requestNo,response)
168 return self._sendMessage(req)
170 log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
173 def _outQueueMessage(self, msg,isNew=True):
176 if msg.urgent and n>1:
178 otherMsg = self.outBox[index-1]
183 elif isNew and otherMsg.bytesSent==0:
189 self.outBox.insert(index,msg)
191 log.info("Queuing %s at index %i",msg,index)
193 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
198 msg = self.outBox.pop(0)
200 if msg.urgent or n==1 or not self.outBox[0].urgent:
202 data = msg._sendNextFrame(frameSize)
204 self._outQueueMessage(msg,isNew=False)
206 log.info("Finished sending %s",msg)
209 log.debug("Nothing more to send")
216 def collect_incoming_data(self, data):
217 if self.expectingHeader:
218 if self.inHeader==None:
221 self.inHeader += data
223 self.inMessage._receivedData(data)
225 def found_terminator(self):
226 if self.expectingHeader:
228 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
230 if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
231 if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
232 frameLen -= kFrameHeaderSize
233 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
234 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
235 self.inMessage = self._inMessageForFrame(requestNo,flags)
238 self.expectingHeader = False
239 self.set_terminator(frameLen)
244 # Got the frame's payload:
247 def _inMessageForFrame(self, requestNo,flags):
249 msgType = flags & kMsgFlag_TypeMask
250 if msgType==kMsgType_Request:
251 message = self.pendingRequests.get(requestNo)
252 if message==None and requestNo == self.inNumRequests+1:
253 message = IncomingRequest(self,requestNo,flags)
255 self.pendingRequests[requestNo] = message
256 self.inNumRequests += 1
257 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
258 message = self.pendingResponses.get(requestNo)
259 message._updateFlags(flags)
262 message._beginFrame(flags)
264 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
267 def _endOfFrame(self):
269 self.inMessage = None
270 self.expectingHeader = True
272 self.set_terminator(kFrameHeaderSize) # wait for binary header
274 log.debug("End of frame of %s",msg)
275 if not msg._moreComing:
276 self._receivedMessage(msg)
278 def _receivedMessage(self, msg):
279 log.info("Received: %s",msg)
280 # Remove from pending:
282 del self.pendingResponses[msg.requestNo]
284 del self.pendingRequests[msg.requestNo]
288 if not msg.isResponse:
290 self._dispatchMetaRequest(msg)
294 log.error("Exception handling incoming message: %s", traceback.format_exc())
295 #FIX: Send an error reply
296 # Check to see if we're done and ready to close:
299 def _dispatchMetaRequest(self, request):
300 """Handles dispatching internal meta requests."""
301 if request['Profile'] == kMsgProfile_Bye:
302 self._handleCloseRequest(request)
304 response = request.response
305 response.isError = True
306 response['Error-Domain'] = "BLIP"
307 response['Error-Code'] = 404
308 response.body = "Unknown meta profile"
313 def _handleCloseRequest(self, request):
314 """Handles requests from a peer to close."""
316 if self.onCloseRequest:
317 shouldClose = self.onCloseRequest()
319 log.debug("Sending resfusal to close...")
320 response = request.response
321 response.isError = True
322 response['Error-Domain'] = "BLIP"
323 response['Error-Code'] = 403
324 response.body = "Close request denied"
327 log.debug("Sending permission to close...")
328 response = request.response
332 """Publicly callable close method. Sends close request to peer."""
333 if self.status != kOpen:
335 log.info("Sending close request...")
336 req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
338 req.response.onComplete = self._handleCloseResponse
340 log.error("Error sending close request.")
343 self.status = kClosing
346 def _handleCloseResponse(self, response):
347 """Called when we receive a response to a close request."""
348 log.info("Received close response.")
350 # remote refused to close
351 if self.onCloseRefused:
352 self.onCloseRefused(response)
355 # now wait until everything has finished sending, then actually close
356 log.info("No refusal, actually closing...")
357 self._closeWhenPossible = True
359 def _closeIfReady(self):
360 """Checks if all transmissions are complete and then closes the actual socket."""
361 if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
362 # self._closeWhenPossible = False
363 log.debug("_closeIfReady closing.")
364 asynchat.async_chat.close(self)
366 def handle_close(self):
367 """Called when the socket actually closes."""
368 log.info("Connection closed!")
369 self.pendingRequests = self.pendingResponses = None
371 if self.status == kClosing:
372 self.status = kClosed
374 self.status = kDisconnected
375 asyncore.dispatcher.close(self)
381 class Message (object):
382 "Abstract superclass of all request/response objects"
384 def __init__(self, connection, body=None, properties=None):
385 self.connection = connection
387 self.properties = properties or {}
388 self.requestNo = None
394 flags = kMsgType_Error
396 flags = kMsgType_Response
398 flags = kMsgType_Request
399 if self.urgent: flags |= kMsgFlag_Urgent
400 if self.compressed: flags |= kMsgFlag_Compressed
401 if self.noReply: flags |= kMsgFlag_NoReply
402 if self._moreComing:flags |= kMsgFlag_MoreComing
403 if self._meta: flags |= kMsgFlag_Meta
407 s = "%s[" %(type(self).__name__)
408 if self.requestNo != None:
409 s += "#%i" %self.requestNo
410 if self.urgent: s += " URG"
411 if self.compressed: s += " CMP"
412 if self.noReply: s += " NOR"
413 if self._moreComing:s += " MOR"
414 if self._meta: s += " MET"
415 if self.body: s += " %i bytes" %len(self.body)
420 if len(self.properties): s += repr(self.properties)
424 def isResponse(self):
425 "Is this message a response?"
429 def contentType(self):
430 return self.properties.get('Content-Type')
432 def __getitem__(self, key): return self.properties.get(key)
433 def __contains__(self, key): return key in self.properties
434 def __len__(self): return len(self.properties)
435 def __nonzero__(self): return True
436 def __iter__(self): return self.properties.__iter__()
439 class IncomingMessage (Message):
440 "Abstract superclass of incoming messages."
442 def __init__(self, connection, requestNo, flags):
443 super(IncomingMessage,self).__init__(connection)
444 self.requestNo = requestNo
445 self._updateFlags(flags)
448 def _updateFlags(self, flags):
449 self.urgent = (flags & kMsgFlag_Urgent) != 0
450 self.compressed = (flags & kMsgFlag_Compressed) != 0
451 self.noReply = (flags & kMsgFlag_NoReply) != 0
452 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
453 self._meta = (flags & kMsgFlag_Meta) != 0
454 self.isError = (flags & kMsgType_Error) != 0
456 def _beginFrame(self, flags):
457 """Received a frame header."""
458 self._moreComing = (flags & kMsgFlag_MoreComing)!=0
460 def _receivedData(self, data):
461 """Received data from a frame."""
462 self.frames.append(data)
465 """The entire message has been received; now decode it."""
466 encoded = "".join(self.frames)
469 # Decode the properties:
470 if len(encoded) < 2: raise MessageException, "missing properties length"
471 propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
472 if propSize>len(encoded): raise MessageException, "properties too long to fit"
473 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
476 proplist = encoded[2:propSize-1].split('\000')
478 if len(proplist) & 1: raise MessageException, "odd number of property strings"
479 for i in xrange(0,len(proplist),2):
482 str = IncomingMessage.__expandDict.get(str,str)
484 self.properties[ expand(proplist[i])] = expand(proplist[i+1])
486 encoded = encoded[propSize:]
488 if self.compressed and len(encoded)>0:
490 encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
492 raise MessageException, sys.exc_info()[1]
495 __expandDict= {'\x01' : "Content-Type",
497 '\x03' : "application/octet-stream",
498 '\x04' : "text/plain; charset=UTF-8",
500 '\x06' : "text/yaml",
502 '\x08' : "Error-Code",
503 '\x09' : "Error-Domain"}
506 class OutgoingMessage (Message):
507 "Abstract superclass of outgoing requests/responses."
509 def __init__(self, connection, body=None, properties=None):
510 Message.__init__(self,connection,body,properties)
511 self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
512 self._moreComing = True
514 def __setitem__(self, key,val):
515 self.properties[key] = val
516 def __delitem__(self, key):
517 del self.properties[key]
521 return hasattr(self,'encoded')
524 "Generates the message's encoded form, prior to sending it."
526 for (key,value) in self.properties.iteritems():
527 def _writePropString(s):
528 out.write(str(s)) #FIX: Abbreviate
530 _writePropString(key)
531 _writePropString(value)
532 propertiesSize = out.tell()
533 assert propertiesSize<65536 #FIX: Return an error instead
535 body = self.body or ""
537 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
538 out.write(z.compress(body))
542 self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
544 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
547 def _sendNextFrame(self, maxLen):
549 payload = self.encoded[pos:pos+maxLen]
551 self._moreComing = (pos < len(self.encoded))
552 if not self._moreComing:
554 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
556 header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
559 kFrameHeaderSize+len(payload))
561 return header + payload
564 class Request (object):
567 "The response object for this request."
570 r = self.__dict__.get('_response')
572 r = self._response = self._createResponse()
576 class Response (Message):
577 def _setRequest(self, request):
578 assert not request.noReply
579 self.request = request
580 self.requestNo = request.requestNo
581 self.urgent = request.urgent
584 def isResponse(self):
588 class IncomingRequest (IncomingMessage, Request):
589 def _createResponse(self):
590 return OutgoingResponse(self)
593 class OutgoingRequest (OutgoingMessage, Request):
594 def _createResponse(self):
595 return IncomingResponse(self)
599 return self.connection._sendRequest(self) and self.response
602 class IncomingResponse (IncomingMessage, Response):
603 def __init__(self, request):
604 IncomingMessage.__init__(self,request.connection,None,0)
605 self._setRequest(request)
606 self.onComplete = None
609 super(IncomingResponse,self)._finished()
612 self.onComplete(self)
614 log.error("Exception dispatching response: %s", traceback.format_exc())
617 class OutgoingResponse (OutgoingMessage, Response):
618 def __init__(self, request):
619 OutgoingMessage.__init__(self,request.connection)
620 self._setRequest(request)
624 return self.connection._sendMessage(self)
628 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
630 Redistribution and use in source and binary forms, with or without modification, are permitted
631 provided that the following conditions are met:
633 * Redistributions of source code must retain the above copyright notice, this list of conditions
634 and the following disclaimer.
635 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
636 and the following disclaimer in the documentation and/or other materials provided with the
639 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
640 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
641 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
642 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
643 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
644 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
645 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
646 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.