Python/BLIP.py
changeset 11 29e8b03c05d4
child 12 710113961756
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/Python/BLIP.py	Tue Jun 03 16:56:33 2008 -0700
     1.3 @@ -0,0 +1,386 @@
     1.4 +#!/usr/bin/env python
     1.5 +# encoding: utf-8
     1.6 +"""
     1.7 +BLIP.py
     1.8 +
     1.9 +Created by Jens Alfke on 2008-06-03.
    1.10 +Copyright (c) 2008 Jens Alfke. All rights reserved.
    1.11 +"""
    1.12 +
    1.13 +import asynchat
    1.14 +import asyncore
    1.15 +from cStringIO import StringIO
    1.16 +import logging
    1.17 +import socket
    1.18 +import struct
    1.19 +import sys
    1.20 +import traceback
    1.21 +import unittest
    1.22 +import zlib
    1.23 +
    1.24 +
    1.25 +kFrameMagicNumber   = 0x9B34F205
    1.26 +kFrameHeaderFormat  = '!LLHH'
    1.27 +kFrameHeaderSize    = 12
    1.28 +
    1.29 +kMsgFlag_TypeMask   = 0x000F
    1.30 +kMsgFlag_Compressed = 0x0010
    1.31 +kMsgFlag_Urgent     = 0x0020
    1.32 +kMsgFlag_NoReply    = 0x0040
    1.33 +kMsgFlag_MoreComing = 0x0080
    1.34 +
    1.35 +kMsgType_Request    = 0
    1.36 +kMsgType_Response   = 1
    1.37 +kMsgType_Error      = 2
    1.38 +
    1.39 +
    1.40 +log = logging.getLogger('BLIP')
    1.41 +log.propagate = True
    1.42 +
    1.43 +class MessageException(Exception):
    1.44 +    pass
    1.45 +
    1.46 +class ConnectionException(Exception):
    1.47 +    pass
    1.48 +
    1.49 +
    1.50 +class Listener (asyncore.dispatcher):
    1.51 +    def __init__(self, port):
    1.52 +        asyncore.dispatcher.__init__(self)
    1.53 +        self.onConnected = None
    1.54 +        self.onRequest = None
    1.55 +        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    1.56 +        self.bind( ('',port) )
    1.57 +        self.listen(5)
    1.58 +        log.info("Listening on port %u", port)
    1.59 +    
    1.60 +    def handle_accept( self ):
    1.61 +        client,address = self.accept()
    1.62 +        conn = Connection(address,client)
    1.63 +        conn.onRequest = self.onRequest
    1.64 +        if self.onConnected:
    1.65 +            self.onConnected(conn)
    1.66 +
    1.67 +
    1.68 +class Connection (asynchat.async_chat):
    1.69 +    def __init__( self, address, conn=None ):
    1.70 +        asynchat.async_chat.__init__(self,conn)
    1.71 +        self.address = address
    1.72 +        if conn:
    1.73 +            log.info("Accepted connection from %s",address)
    1.74 +        else:
    1.75 +            log.info("Opening connection to %s",address)
    1.76 +            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    1.77 +            self.connect(address)
    1.78 +        self.onRequest = None
    1.79 +        self.pendingRequests = {}
    1.80 +        self.pendingResponses = {}
    1.81 +        self.outBox = []
    1.82 +        self.inMessage = None
    1.83 +        self.inNumRequests = 0
    1.84 +        self._endOfFrame()
    1.85 +    
    1.86 +    #def handle_error(self,x):
    1.87 +    #    log.error("Uncaught exception: %s",x)
    1.88 +    #    self.close()
    1.89 +    
    1.90 +    def _fatal(self, error):
    1.91 +        log.error("Fatal BLIP connection error: %s",error)
    1.92 +        self.close()
    1.93 +    
    1.94 +    
    1.95 +    ### SENDING:
    1.96 +    
    1.97 +    def _outQueueMessage(self, msg,isNew=True):
    1.98 +        n = self.outBox.length
    1.99 +        index = n
   1.100 +        if msg.urgent and n>1:
   1.101 +            while index > 0:
   1.102 +                otherMsg = self.outBox[index]
   1.103 +                if otherMsg.urgent:
   1.104 +                    if index<n:
   1.105 +                        index += 1
   1.106 +                    break
   1.107 +                elif isNew and otherMsg._bytesWritten==0:
   1.108 +                    break
   1.109 +                index -= 1
   1.110 +            else:
   1.111 +                index = 1
   1.112 +                    
   1.113 +        self.outBox.insert(index,msg)
   1.114 +        if isNew:
   1.115 +            log.info("Queuing outgoing message at index %i",index)
   1.116 +    
   1.117 +    def _sendNextFrame(self):
   1.118 +        n = len(self.outBox)
   1.119 +        if n > 0:
   1.120 +            msg = self.outBox.pop(0)
   1.121 +            frameSize = 4096
   1.122 +            if msg.urgent or n==1 or not self.outBox[0].urgent:
   1.123 +                frameSize *= 4
   1.124 +            if msg._sendNextFrame(self):
   1.125 +                self._outQueueMessage(msg,isNew=False)
   1.126 +    
   1.127 +    
   1.128 +    ### RECEIVING:
   1.129 +    
   1.130 +    def collect_incoming_data(self, data):
   1.131 +        if self.expectingHeader:
   1.132 +            if self.inHeader==None:
   1.133 +                self.inHeader = data
   1.134 +            else:
   1.135 +                self.inHeader += data
   1.136 +        else:
   1.137 +            self.inMessage._receivedData(data)
   1.138 +        
   1.139 +    def found_terminator(self):
   1.140 +        if self.expectingHeader:
   1.141 +            # Got a header:
   1.142 +            (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   1.143 +            self.inHeader = None
   1.144 +            if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
   1.145 +            if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
   1.146 +            frameLen -= kFrameHeaderSize
   1.147 +            log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   1.148 +                        (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   1.149 +            self.inMessage = self._inMessageForFrame(requestNo,flags)
   1.150 +            
   1.151 +            if frameLen > 0:
   1.152 +                self.expectingHeader = False
   1.153 +                self.set_terminator(frameLen)
   1.154 +            else:
   1.155 +                self._endOfFrame()
   1.156 +                
   1.157 +        else:
   1.158 +            # Got the frame's payload:
   1.159 +            self._endOfFrame()
   1.160 +    
   1.161 +    def _inMessageForFrame(self, requestNo,flags):
   1.162 +        message = None
   1.163 +        msgType = flags & kMsgFlag_TypeMask
   1.164 +        if msgType==kMsgType_Request:
   1.165 +            message = self.pendingRequests.get(requestNo)
   1.166 +            if message==None and requestNo == self.inNumRequests+1:
   1.167 +                message = IncomingRequest(self,requestNo,flags)
   1.168 +                self.pendingRequests[requestNo] = message
   1.169 +                self.inNumRequests += 1
   1.170 +        elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   1.171 +            message = self.pendingResponses.get(requestNo)
   1.172 +            
   1.173 +        if message:
   1.174 +            message._beginFrame(flags)
   1.175 +        else:
   1.176 +            log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   1.177 +        return message
   1.178 +    
   1.179 +    def _endOfFrame(self):
   1.180 +        msg = self.inMessage
   1.181 +        self.inMessage = None
   1.182 +        self.expectingHeader = True
   1.183 +        self.inHeader = None
   1.184 +        self.set_terminator(kFrameHeaderSize) # wait for binary header
   1.185 +        if msg:
   1.186 +            log.debug("End of frame of %s",msg)
   1.187 +            if not msg.moreComing:
   1.188 +                self._receivedMessage(msg)
   1.189 +
   1.190 +    def _receivedMessage(self, msg):
   1.191 +        log.info("Received: %s",msg)
   1.192 +        # Remove from pending:
   1.193 +        if msg.isResponse:
   1.194 +            del self.pendingReplies[msg.requestNo]
   1.195 +        else:
   1.196 +            del self.pendingRequests[msg.requestNo]
   1.197 +        # Decode:
   1.198 +        try:
   1.199 +            msg._finished()
   1.200 +        except Exception, x:
   1.201 +            log.error("Exception parsing message: %s", traceback.format_exc())
   1.202 +            return
   1.203 +        # Dispatch:
   1.204 +        try:
   1.205 +            self.onRequest(msg)
   1.206 +        except Exception, x:
   1.207 +            log.error("Exception dispatching message: %s", traceback.format_exc())
   1.208 +            #FIX: Send an error reply
   1.209 +
   1.210 +### MESSAGES:
   1.211 +
   1.212 +
   1.213 +class Message (object):
   1.214 +    def __init__(self, connection, properties=None, body=None):
   1.215 +        self.connection = connection
   1.216 +        self.properties = properties or {}
   1.217 +        self.body = body
   1.218 +    
   1.219 +    @property
   1.220 +    def flags(self):
   1.221 +        flags = kMsgType_Request
   1.222 +        if self.urgent:     flags |= kMsgFlag_Urgent
   1.223 +        if self.compressed: flags |= kMsgFlag_Compressed
   1.224 +        if self.noReply:    flags |= kMsgFlag_NoReply
   1.225 +        if self.moreComing: flags |= kMsgFlag_MoreComing
   1.226 +        return flags
   1.227 +    
   1.228 +    def __str__(self):
   1.229 +        s = "%s[#%i" %(type(self).__name__,self.requestNo)
   1.230 +        if self.urgent:     s += " URG"
   1.231 +        if self.compressed: s += " CMP"
   1.232 +        if self.noReply:    s += " NOR"
   1.233 +        if self.moreComing: s += " MOR"
   1.234 +        if self.body:       s += " %i bytes" %len(self.body)
   1.235 +        return s+"]"
   1.236 +    
   1.237 +    def __repr__(self):
   1.238 +        s = str(self)
   1.239 +        if len(self.properties): s += repr(self.properties)
   1.240 +        return s
   1.241 +        
   1.242 +    @property
   1.243 +    def isResponse(self):
   1.244 +        return False
   1.245 +
   1.246 +
   1.247 +class IncomingMessage (Message):
   1.248 +    def __init__(self, connection, requestNo, flags):
   1.249 +        super(IncomingMessage,self).__init__(connection)
   1.250 +        self.requestNo  = requestNo
   1.251 +        self.urgent     = (flags & kMsgFlag_Urgent) != 0 
   1.252 +        self.compressed = (flags & kMsgFlag_Compressed) != 0
   1.253 +        self.noReply    = (flags & kMsgFlag_NoReply) != 0
   1.254 +        self.moreComing = (flags & kMsgFlag_MoreComing) != 0
   1.255 +        self.frames     = []
   1.256 +    
   1.257 +    def _beginFrame(self, flags):
   1.258 +        if (flags & kMsgFlag_MoreComing)==0:
   1.259 +            self.moreComing = False
   1.260 +
   1.261 +    def _receivedData(self, data):
   1.262 +        self.frames.append(data)
   1.263 +    
   1.264 +    def _finished(self):
   1.265 +        encoded = "".join(self.frames)
   1.266 +        self.frames = None
   1.267 +        
   1.268 +        # Decode the properties:
   1.269 +        if len(encoded) < 2: raise MessageException, "missing properties length"
   1.270 +        propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   1.271 +        if propSize>len(encoded): raise MessageException, "properties too long to fit"
   1.272 +        if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   1.273 +        
   1.274 +        proplist = encoded[2:propSize-1].split('\000')
   1.275 +        encoded = encoded[propSize:]
   1.276 +        if len(proplist) & 1: raise MessageException, "odd number of property strings"
   1.277 +        for i in xrange(0,len(proplist),2):
   1.278 +            def expand(str):
   1.279 +                if len(str)==1:
   1.280 +                    str = IncomingMessage.__expandDict.get(str,str)
   1.281 +                return str
   1.282 +            self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   1.283 +        
   1.284 +        # Decode the body:
   1.285 +        if self.compressed and len(encoded)>0:
   1.286 +            try:
   1.287 +                encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   1.288 +            except zlib.error:
   1.289 +                raise MessageException, sys.exc_info()[1]
   1.290 +        self.body = encoded
   1.291 +    
   1.292 +    __expandDict= {'\x01' : "Content-Type",
   1.293 +                   '\x02' : "Profile",
   1.294 +                   '\x03' : "application/octet-stream",
   1.295 +                   '\x04' : "text/plain; charset=UTF-8",
   1.296 +                   '\x05' : "text/xml",
   1.297 +                   '\x06' : "text/yaml",
   1.298 +                   '\x07' : "Channel",
   1.299 +                   '\x08' : "Error-Code",
   1.300 +                   '\x09' : "Error-Domain"}
   1.301 +        
   1.302 +
   1.303 +
   1.304 +class OutgoingMessage (Message):
   1.305 +    
   1.306 +    def send(self):
   1.307 +        out = StringIO()
   1.308 +        out.write("xx")         # placeholder for properties length (16 bits)
   1.309 +        for (key,value) in self.properties:
   1.310 +            def _writePropString(self, str):
   1.311 +                out.write(str)
   1.312 +                #FIX: Abbreviate
   1.313 +                out.write('\000')
   1.314 +            self._writePropString(key)
   1.315 +            self._writePropString(value)
   1.316 +        propsLen = out.tell()
   1.317 +        self.encoded = out.stringvalue()
   1.318 +        out.close()
   1.319 +        self.encoded[0:2] = struct.pack('!H',propsLen)
   1.320 +        
   1.321 +        body = self.body
   1.322 +        if self.compressed:
   1.323 +            body = zlib.compress(body,5)
   1.324 +        self.encoded += body
   1.325 +        
   1.326 +        self.bytesSent = 0
   1.327 +        self.connection._outQueueMessage(self)
   1.328 +    
   1.329 +    def _sendNextFrame(self, conn,maxLen):
   1.330 +        pos = self.bytesSent
   1.331 +        payload = self.encoded[pos:pos+maxLen]
   1.332 +        pos += len(payload)
   1.333 +        if pos >= len(self.encoded):
   1.334 +            self.moreComing = False
   1.335 +
   1.336 +        conn.push( struct.pack(kFrameHeaderFormat, 
   1.337 +                               kFrameMagicNumber,
   1.338 +                               self.requestNo,
   1.339 +                               self.flags,
   1.340 +                               kFrameHeaderSize+len(payload)) )
   1.341 +        conn.push( payload )
   1.342 +        
   1.343 +        self.bytesSent = pos
   1.344 +
   1.345 +
   1.346 +class Request (Message):
   1.347 +    pass
   1.348 +
   1.349 +class Response (Message):
   1.350 +    @property
   1.351 +    def isResponse(self):
   1.352 +        return True
   1.353 +
   1.354 +    @property
   1.355 +    def flags(self):
   1.356 +        flags = super(Response,self).flags() ^ kMsgType_Request
   1.357 +        flags ^= kMsgType_Response
   1.358 +        return flags
   1.359 +
   1.360 +
   1.361 +
   1.362 +class IncomingRequest (IncomingMessage, Request):
   1.363 +    pass
   1.364 +
   1.365 +class OutgoingRequest (OutgoingMessage, Request):
   1.366 +    pass
   1.367 +
   1.368 +class IncomingResponse (IncomingMessage, Response):
   1.369 +    pass
   1.370 +
   1.371 +class OutgoingResponse (OutgoingMessage, Response):
   1.372 +    pass
   1.373 +
   1.374 +
   1.375 +### UNIT TESTS:
   1.376 +
   1.377 +
   1.378 +class BLIPTests(unittest.TestCase):
   1.379 +    def setUp(self):
   1.380 +        listener = Listener(46353)
   1.381 +        listener.onRequest = lambda req: logging.info("Got request!: %r",req)
   1.382 +    
   1.383 +    def testListener(self):
   1.384 +        logging.info("Waiting...")
   1.385 +        asyncore.loop()
   1.386 +
   1.387 +if __name__ == '__main__':
   1.388 +    logging.basicConfig(level=logging.INFO)
   1.389 +    unittest.main()
   1.390 \ No newline at end of file