jens@11: # encoding: utf-8 jens@11: """ jens@11: BLIP.py jens@11: jens@11: Created by Jens Alfke on 2008-06-03. jens@13: Copyright notice and BSD license at end of file. jens@11: """ jens@11: jens@11: import asynchat jens@11: import asyncore jens@11: from cStringIO import StringIO jens@11: import logging jens@11: import socket jens@11: import struct jens@11: import sys jens@11: import traceback jens@11: import zlib jens@11: jens@11: jens@13: # Connection status enumeration: jens@13: kDisconnected = -1 jens@13: kClosed = 0 jens@13: kOpening = 1 jens@13: kOpen = 2 jens@13: kClosing = 3 jens@13: jens@13: jens@12: # INTERNAL CONSTANTS -- NO TOUCHIES! jens@12: jens@11: kFrameMagicNumber = 0x9B34F205 jens@11: kFrameHeaderFormat = '!LLHH' jens@11: kFrameHeaderSize = 12 jens@11: jens@11: kMsgFlag_TypeMask = 0x000F jens@11: kMsgFlag_Compressed = 0x0010 jens@11: kMsgFlag_Urgent = 0x0020 jens@11: kMsgFlag_NoReply = 0x0040 jens@11: kMsgFlag_MoreComing = 0x0080 jens@11: jens@11: kMsgType_Request = 0 jens@11: kMsgType_Response = 1 jens@11: kMsgType_Error = 2 jens@11: jens@11: jens@11: log = logging.getLogger('BLIP') jens@11: log.propagate = True jens@11: jens@12: jens@11: class MessageException(Exception): jens@11: pass jens@11: jens@11: class ConnectionException(Exception): jens@11: pass jens@11: jens@11: jens@13: ### LISTENER AND CONNECTION CLASSES: jens@13: jens@13: jens@11: class Listener (asyncore.dispatcher): jens@12: "BLIP listener/server class" jens@12: jens@13: def __init__(self, port, sslKeyFile=None, sslCertFile=None): jens@12: "Create a listener on a port" jens@11: asyncore.dispatcher.__init__(self) jens@12: self.onConnected = self.onRequest = None jens@11: self.create_socket(socket.AF_INET, socket.SOCK_STREAM) jens@11: self.bind( ('',port) ) jens@11: self.listen(5) jens@13: self.sslKeyFile=sslKeyFile jens@13: self.sslCertFile=sslCertFile jens@11: log.info("Listening on port %u", port) jens@11: jens@11: def handle_accept( self ): jens@13: socket,address = self.accept() jens@13: if self.sslKeyFile: jens@13: socket.ssl(socket,self.sslKeyFile,self.sslCertFile) jens@13: conn = Connection(address, sock=socket, listener=self) jens@11: conn.onRequest = self.onRequest jens@11: if self.onConnected: jens@11: self.onConnected(conn) jens@11: jens@13: def handle_error(self): jens@13: (typ,val,trace) = sys.exc_info() jens@13: log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc()) jens@13: self.close() jens@13: jens@13: jens@11: jens@11: class Connection (asynchat.async_chat): jens@13: def __init__( self, address, sock=None, listener=None, ssl=None ): jens@12: "Opens a connection with the given address. If a connection/socket object is provided it'll use that," jens@12: "otherwise it'll open a new outgoing socket." jens@13: if sock: jens@13: asynchat.async_chat.__init__(self,sock) jens@11: log.info("Accepted connection from %s",address) jens@13: self.status = kOpen jens@11: else: jens@13: asynchat.async_chat.__init__(self) jens@11: log.info("Opening connection to %s",address) jens@11: self.create_socket(socket.AF_INET, socket.SOCK_STREAM) jens@13: self.status = kOpening jens@13: if ssl: jens@13: ssl(self.socket) jens@11: self.connect(address) jens@13: self.address = address jens@13: self.listener = listener jens@11: self.onRequest = None jens@11: self.pendingRequests = {} jens@11: self.pendingResponses = {} jens@11: self.outBox = [] jens@11: self.inMessage = None jens@13: self.inNumRequests = self.outNumRequests = 0 jens@14: self.sending = False jens@11: self._endOfFrame() jens@11: jens@13: def close(self): jens@13: if self.status > kClosed: jens@13: self.status = kClosing jens@13: log.info("Connection closing...") jens@13: asynchat.async_chat.close(self) jens@11: jens@13: def handle_connect(self): jens@13: log.info("Connection open!") jens@13: self.status = kOpen jens@13: jens@13: def handle_error(self): jens@13: (typ,val,trace) = sys.exc_info() jens@13: log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc()) jens@13: self.discard_buffers() jens@13: self.status = kDisconnected jens@11: self.close() jens@11: jens@13: def handle_close(self): jens@13: log.info("Connection closed!") jens@13: self.pendingRequests = self.pendingResponses = None jens@13: self.outBox = None jens@13: if self.status == kClosing: jens@13: self.status = kClosed jens@13: else: jens@13: self.status = kDisconnected jens@13: asynchat.async_chat.handle_close(self) jens@13: jens@11: jens@11: ### SENDING: jens@11: jens@13: @property jens@13: def canSend(self): jens@13: return self.status==kOpening or self.status==kOpen jens@13: jens@13: def _sendMessage(self, msg): jens@13: if self.canSend: jens@13: self._outQueueMessage(msg,True) jens@14: if not self.sending: jens@14: log.debug("Waking up the output stream") jens@14: self.sending = True jens@14: self.push_with_producer(self) jens@13: return True jens@13: else: jens@13: return False jens@13: jens@13: def _sendRequest(self, req): jens@13: if self.canSend: jens@13: requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1 jens@13: response = req.response jens@13: if response: jens@13: response.requestNo = requestNo jens@13: self.pendingResponses[requestNo] = response jens@13: log.debug("pendingResponses[%i] := %s",requestNo,response) jens@13: return self._sendMessage(req) jens@13: else: jens@13: return False jens@13: jens@11: def _outQueueMessage(self, msg,isNew=True): jens@12: n = len(self.outBox) jens@11: index = n jens@11: if msg.urgent and n>1: jens@11: while index > 0: jens@12: otherMsg = self.outBox[index-1] jens@11: if otherMsg.urgent: jens@11: if index 0: jens@14: msg = self.outBox.pop(0) jens@14: frameSize = 4096 jens@14: if msg.urgent or n==1 or not self.outBox[0].urgent: jens@14: frameSize *= 4 jens@14: data = msg._sendNextFrame(frameSize) jens@14: if msg._moreComing: jens@14: self._outQueueMessage(msg,isNew=False) jens@14: else: jens@14: log.info("Finished sending %s",msg) jens@14: return data jens@14: else: jens@14: log.debug("Nothing more to send") jens@14: self.sending = False jens@14: return None jens@11: jens@11: ### RECEIVING: jens@11: jens@11: def collect_incoming_data(self, data): jens@11: if self.expectingHeader: jens@11: if self.inHeader==None: jens@11: self.inHeader = data jens@11: else: jens@11: self.inHeader += data jens@13: elif self.inMessage: jens@11: self.inMessage._receivedData(data) jens@12: jens@11: def found_terminator(self): jens@11: if self.expectingHeader: jens@11: # Got a header: jens@11: (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader) jens@11: self.inHeader = None jens@13: if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic jens@13: if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen jens@11: frameLen -= kFrameHeaderSize jens@11: log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i", jens@11: (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen) jens@11: self.inMessage = self._inMessageForFrame(requestNo,flags) jens@11: jens@11: if frameLen > 0: jens@11: self.expectingHeader = False jens@11: self.set_terminator(frameLen) jens@11: else: jens@11: self._endOfFrame() jens@12: jens@11: else: jens@11: # Got the frame's payload: jens@11: self._endOfFrame() jens@11: jens@11: def _inMessageForFrame(self, requestNo,flags): jens@11: message = None jens@11: msgType = flags & kMsgFlag_TypeMask jens@11: if msgType==kMsgType_Request: jens@11: message = self.pendingRequests.get(requestNo) jens@11: if message==None and requestNo == self.inNumRequests+1: jens@11: message = IncomingRequest(self,requestNo,flags) jens@12: assert message!=None jens@11: self.pendingRequests[requestNo] = message jens@11: self.inNumRequests += 1 jens@11: elif msgType==kMsgType_Response or msgType==kMsgType_Error: jens@11: message = self.pendingResponses.get(requestNo) jens@12: jens@12: if message != None: jens@11: message._beginFrame(flags) jens@11: else: jens@11: log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo) jens@11: return message jens@11: jens@11: def _endOfFrame(self): jens@11: msg = self.inMessage jens@11: self.inMessage = None jens@11: self.expectingHeader = True jens@11: self.inHeader = None jens@11: self.set_terminator(kFrameHeaderSize) # wait for binary header jens@11: if msg: jens@11: log.debug("End of frame of %s",msg) jens@13: if not msg._moreComing: jens@11: self._receivedMessage(msg) jens@12: jens@11: def _receivedMessage(self, msg): jens@11: log.info("Received: %s",msg) jens@11: # Remove from pending: jens@11: if msg.isResponse: jens@13: del self.pendingResponses[msg.requestNo] jens@11: else: jens@11: del self.pendingRequests[msg.requestNo] jens@11: # Decode: jens@11: try: jens@11: msg._finished() jens@12: if not msg.isResponse: jens@12: self.onRequest(msg) jens@11: except Exception, x: jens@12: log.error("Exception handling incoming message: %s", traceback.format_exc()) jens@11: #FIX: Send an error reply jens@11: jens@12: jens@13: ### MESSAGE CLASSES: jens@11: jens@11: jens@11: class Message (object): jens@12: "Abstract superclass of all request/response objects" jens@12: jens@13: def __init__(self, connection, body=None, properties=None): jens@11: self.connection = connection jens@13: self.body = body jens@11: self.properties = properties or {} jens@13: self.requestNo = None jens@11: jens@11: @property jens@11: def flags(self): jens@12: if self.isResponse: jens@12: flags = kMsgType_Response jens@12: else: jens@12: flags = kMsgType_Request jens@11: if self.urgent: flags |= kMsgFlag_Urgent jens@11: if self.compressed: flags |= kMsgFlag_Compressed jens@11: if self.noReply: flags |= kMsgFlag_NoReply jens@13: if self._moreComing:flags |= kMsgFlag_MoreComing jens@11: return flags jens@11: jens@11: def __str__(self): jens@13: s = "%s[" %(type(self).__name__) jens@13: if self.requestNo != None: jens@13: s += "#%i" %self.requestNo jens@11: if self.urgent: s += " URG" jens@11: if self.compressed: s += " CMP" jens@11: if self.noReply: s += " NOR" jens@13: if self._moreComing:s += " MOR" jens@11: if self.body: s += " %i bytes" %len(self.body) jens@11: return s+"]" jens@11: jens@11: def __repr__(self): jens@11: s = str(self) jens@11: if len(self.properties): s += repr(self.properties) jens@11: return s jens@12: jens@13: @property jens@11: def isResponse(self): jens@12: "Is this message a response?" jens@11: return False jens@12: jens@13: @property jens@12: def contentType(self): jens@12: return self.properties.get('Content-Type') jens@12: jens@12: def __getitem__(self, key): return self.properties.get(key) jens@12: def __contains__(self, key): return key in self.properties jens@12: def __len__(self): return len(self.properties) jens@12: def __nonzero__(self): return True jens@12: def __iter__(self): return self.properties.__iter__() jens@11: jens@11: jens@11: class IncomingMessage (Message): jens@12: "Abstract superclass of incoming messages." jens@12: jens@11: def __init__(self, connection, requestNo, flags): jens@11: super(IncomingMessage,self).__init__(connection) jens@11: self.requestNo = requestNo jens@12: self.urgent = (flags & kMsgFlag_Urgent) != 0 jens@11: self.compressed = (flags & kMsgFlag_Compressed) != 0 jens@11: self.noReply = (flags & kMsgFlag_NoReply) != 0 jens@13: self._moreComing= (flags & kMsgFlag_MoreComing) != 0 jens@11: self.frames = [] jens@11: jens@11: def _beginFrame(self, flags): jens@13: """Received a frame header.""" jens@13: self._moreComing = (flags & kMsgFlag_MoreComing)!=0 jens@12: jens@11: def _receivedData(self, data): jens@13: """Received data from a frame.""" jens@11: self.frames.append(data) jens@11: jens@11: def _finished(self): jens@13: """The entire message has been received; now decode it.""" jens@11: encoded = "".join(self.frames) jens@11: self.frames = None jens@11: jens@11: # Decode the properties: jens@11: if len(encoded) < 2: raise MessageException, "missing properties length" jens@11: propSize = 2 + struct.unpack('!H',encoded[0:2])[0] jens@11: if propSize>len(encoded): raise MessageException, "properties too long to fit" jens@11: if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated" jens@11: jens@11: proplist = encoded[2:propSize-1].split('\000') jens@11: encoded = encoded[propSize:] jens@11: if len(proplist) & 1: raise MessageException, "odd number of property strings" jens@11: for i in xrange(0,len(proplist),2): jens@11: def expand(str): jens@11: if len(str)==1: jens@11: str = IncomingMessage.__expandDict.get(str,str) jens@11: return str jens@11: self.properties[ expand(proplist[i])] = expand(proplist[i+1]) jens@11: jens@11: # Decode the body: jens@11: if self.compressed and len(encoded)>0: jens@11: try: jens@11: encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format jens@11: except zlib.error: jens@11: raise MessageException, sys.exc_info()[1] jens@11: self.body = encoded jens@11: jens@11: __expandDict= {'\x01' : "Content-Type", jens@11: '\x02' : "Profile", jens@11: '\x03' : "application/octet-stream", jens@11: '\x04' : "text/plain; charset=UTF-8", jens@11: '\x05' : "text/xml", jens@11: '\x06' : "text/yaml", jens@11: '\x07' : "Channel", jens@11: '\x08' : "Error-Code", jens@11: '\x09' : "Error-Domain"} jens@12: jens@11: jens@11: class OutgoingMessage (Message): jens@12: "Abstract superclass of outgoing requests/responses." jens@12: jens@13: def __init__(self, connection, body=None, properties=None): jens@13: Message.__init__(self,connection,body,properties) jens@12: self.urgent = self.compressed = self.noReply = False jens@13: self._moreComing = True jens@12: jens@12: def __setitem__(self, key,val): jens@12: self.properties[key] = val jens@12: def __delitem__(self, key): jens@12: del self.properties[key] jens@11: jens@13: @property jens@13: def sent(self): jens@13: return 'encoded' in self.__dict__ jens@13: jens@13: def _encode(self): jens@13: "Generates the message's encoded form, prior to sending it." jens@11: out = StringIO() jens@12: for (key,value) in self.properties.iteritems(): jens@13: def _writePropString(s): jens@13: out.write(str(s)) #FIX: Abbreviate jens@11: out.write('\000') jens@12: _writePropString(key) jens@12: _writePropString(value) jens@13: propertiesSize = out.tell() jens@13: assert propertiesSize<65536 #FIX: Return an error instead jens@11: jens@11: body = self.body jens@11: if self.compressed: jens@13: z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format jens@13: out.write(z.compress(body)) jens@13: body = z.flush() jens@13: out.write(body) jens@13: jens@13: self.encoded = struct.pack('!H',propertiesSize) + out.getvalue() jens@13: out.close() jens@12: log.debug("Encoded %s into %u bytes", self,len(self.encoded)) jens@11: self.bytesSent = 0 jens@11: jens@14: def _sendNextFrame(self, maxLen): jens@11: pos = self.bytesSent jens@11: payload = self.encoded[pos:pos+maxLen] jens@11: pos += len(payload) jens@13: self._moreComing = (pos < len(self.encoded)) jens@13: if not self._moreComing: jens@13: self.encoded = None jens@12: log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos) jens@12: jens@14: header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber, jens@12: self.requestNo, jens@12: self.flags, jens@14: kFrameHeaderSize+len(payload)) jens@11: self.bytesSent = pos jens@14: return header + payload jens@11: jens@11: jens@12: class Request (object): jens@12: @property jens@12: def response(self): jens@12: "The response object for this request." jens@13: if self.noReply: jens@13: return None jens@12: r = self.__dict__.get('_response') jens@12: if r==None: jens@12: r = self._response = self._createResponse() jens@12: return r jens@12: jens@11: jens@11: class Response (Message): jens@13: def _setRequest(self, request): jens@12: assert not request.noReply jens@12: self.request = request jens@12: self.requestNo = request.requestNo jens@12: self.urgent = request.urgent jens@12: jens@11: @property jens@11: def isResponse(self): jens@11: return True jens@11: jens@11: jens@11: class IncomingRequest (IncomingMessage, Request): jens@12: def _createResponse(self): jens@12: return OutgoingResponse(self) jens@11: jens@13: jens@11: class OutgoingRequest (OutgoingMessage, Request): jens@12: def _createResponse(self): jens@12: return IncomingResponse(self) jens@13: jens@13: def send(self): jens@13: self._encode() jens@13: return self.connection._sendRequest(self) and self.response jens@13: jens@11: jens@11: class IncomingResponse (IncomingMessage, Response): jens@12: def __init__(self, request): jens@13: IncomingMessage.__init__(self,request.connection,None,0) jens@13: self._setRequest(request) jens@12: self.onComplete = None jens@12: jens@12: def _finished(self): jens@12: super(IncomingResponse,self)._finished() jens@12: if self.onComplete: jens@12: try: jens@12: self.onComplete(self) jens@12: except Exception, x: jens@12: log.error("Exception dispatching response: %s", traceback.format_exc()) jens@13: jens@13: jens@11: class OutgoingResponse (OutgoingMessage, Response): jens@12: def __init__(self, request): jens@12: OutgoingMessage.__init__(self,request.connection) jens@13: self._setRequest(request) jens@13: jens@13: def send(self): jens@13: self._encode() jens@13: return self.connection._sendMessage(self) jens@11: jens@11: jens@13: """ jens@13: Copyright (c) 2008, Jens Alfke . All rights reserved. jens@13: jens@13: Redistribution and use in source and binary forms, with or without modification, are permitted jens@13: provided that the following conditions are met: jens@13: jens@13: * Redistributions of source code must retain the above copyright notice, this list of conditions jens@13: and the following disclaimer. jens@13: * Redistributions in binary form must reproduce the above copyright notice, this list of conditions jens@13: and the following disclaimer in the documentation and/or other materials provided with the jens@13: distribution. jens@13: jens@13: THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR jens@13: IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND jens@13: FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI- jens@13: BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES jens@13: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR jens@13: PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN jens@13: CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF jens@13: THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. jens@13: """