1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/Python/BLIP.py Tue Jun 03 16:56:33 2008 -0700
1.3 @@ -0,0 +1,386 @@
1.4 +#!/usr/bin/env python
1.5 +# encoding: utf-8
1.6 +"""
1.7 +BLIP.py
1.8 +
1.9 +Created by Jens Alfke on 2008-06-03.
1.10 +Copyright (c) 2008 Jens Alfke. All rights reserved.
1.11 +"""
1.12 +
1.13 +import asynchat
1.14 +import asyncore
1.15 +from cStringIO import StringIO
1.16 +import logging
1.17 +import socket
1.18 +import struct
1.19 +import sys
1.20 +import traceback
1.21 +import unittest
1.22 +import zlib
1.23 +
1.24 +
1.25 +kFrameMagicNumber = 0x9B34F205
1.26 +kFrameHeaderFormat = '!LLHH'
1.27 +kFrameHeaderSize = 12
1.28 +
1.29 +kMsgFlag_TypeMask = 0x000F
1.30 +kMsgFlag_Compressed = 0x0010
1.31 +kMsgFlag_Urgent = 0x0020
1.32 +kMsgFlag_NoReply = 0x0040
1.33 +kMsgFlag_MoreComing = 0x0080
1.34 +
1.35 +kMsgType_Request = 0
1.36 +kMsgType_Response = 1
1.37 +kMsgType_Error = 2
1.38 +
1.39 +
1.40 +log = logging.getLogger('BLIP')
1.41 +log.propagate = True
1.42 +
1.43 +class MessageException(Exception):
1.44 + pass
1.45 +
1.46 +class ConnectionException(Exception):
1.47 + pass
1.48 +
1.49 +
1.50 +class Listener (asyncore.dispatcher):
1.51 + def __init__(self, port):
1.52 + asyncore.dispatcher.__init__(self)
1.53 + self.onConnected = None
1.54 + self.onRequest = None
1.55 + self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1.56 + self.bind( ('',port) )
1.57 + self.listen(5)
1.58 + log.info("Listening on port %u", port)
1.59 +
1.60 + def handle_accept( self ):
1.61 + client,address = self.accept()
1.62 + conn = Connection(address,client)
1.63 + conn.onRequest = self.onRequest
1.64 + if self.onConnected:
1.65 + self.onConnected(conn)
1.66 +
1.67 +
1.68 +class Connection (asynchat.async_chat):
1.69 + def __init__( self, address, conn=None ):
1.70 + asynchat.async_chat.__init__(self,conn)
1.71 + self.address = address
1.72 + if conn:
1.73 + log.info("Accepted connection from %s",address)
1.74 + else:
1.75 + log.info("Opening connection to %s",address)
1.76 + self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1.77 + self.connect(address)
1.78 + self.onRequest = None
1.79 + self.pendingRequests = {}
1.80 + self.pendingResponses = {}
1.81 + self.outBox = []
1.82 + self.inMessage = None
1.83 + self.inNumRequests = 0
1.84 + self._endOfFrame()
1.85 +
1.86 + #def handle_error(self,x):
1.87 + # log.error("Uncaught exception: %s",x)
1.88 + # self.close()
1.89 +
1.90 + def _fatal(self, error):
1.91 + log.error("Fatal BLIP connection error: %s",error)
1.92 + self.close()
1.93 +
1.94 +
1.95 + ### SENDING:
1.96 +
1.97 + def _outQueueMessage(self, msg,isNew=True):
1.98 + n = self.outBox.length
1.99 + index = n
1.100 + if msg.urgent and n>1:
1.101 + while index > 0:
1.102 + otherMsg = self.outBox[index]
1.103 + if otherMsg.urgent:
1.104 + if index<n:
1.105 + index += 1
1.106 + break
1.107 + elif isNew and otherMsg._bytesWritten==0:
1.108 + break
1.109 + index -= 1
1.110 + else:
1.111 + index = 1
1.112 +
1.113 + self.outBox.insert(index,msg)
1.114 + if isNew:
1.115 + log.info("Queuing outgoing message at index %i",index)
1.116 +
1.117 + def _sendNextFrame(self):
1.118 + n = len(self.outBox)
1.119 + if n > 0:
1.120 + msg = self.outBox.pop(0)
1.121 + frameSize = 4096
1.122 + if msg.urgent or n==1 or not self.outBox[0].urgent:
1.123 + frameSize *= 4
1.124 + if msg._sendNextFrame(self):
1.125 + self._outQueueMessage(msg,isNew=False)
1.126 +
1.127 +
1.128 + ### RECEIVING:
1.129 +
1.130 + def collect_incoming_data(self, data):
1.131 + if self.expectingHeader:
1.132 + if self.inHeader==None:
1.133 + self.inHeader = data
1.134 + else:
1.135 + self.inHeader += data
1.136 + else:
1.137 + self.inMessage._receivedData(data)
1.138 +
1.139 + def found_terminator(self):
1.140 + if self.expectingHeader:
1.141 + # Got a header:
1.142 + (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
1.143 + self.inHeader = None
1.144 + if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
1.145 + if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
1.146 + frameLen -= kFrameHeaderSize
1.147 + log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
1.148 + (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
1.149 + self.inMessage = self._inMessageForFrame(requestNo,flags)
1.150 +
1.151 + if frameLen > 0:
1.152 + self.expectingHeader = False
1.153 + self.set_terminator(frameLen)
1.154 + else:
1.155 + self._endOfFrame()
1.156 +
1.157 + else:
1.158 + # Got the frame's payload:
1.159 + self._endOfFrame()
1.160 +
1.161 + def _inMessageForFrame(self, requestNo,flags):
1.162 + message = None
1.163 + msgType = flags & kMsgFlag_TypeMask
1.164 + if msgType==kMsgType_Request:
1.165 + message = self.pendingRequests.get(requestNo)
1.166 + if message==None and requestNo == self.inNumRequests+1:
1.167 + message = IncomingRequest(self,requestNo,flags)
1.168 + self.pendingRequests[requestNo] = message
1.169 + self.inNumRequests += 1
1.170 + elif msgType==kMsgType_Response or msgType==kMsgType_Error:
1.171 + message = self.pendingResponses.get(requestNo)
1.172 +
1.173 + if message:
1.174 + message._beginFrame(flags)
1.175 + else:
1.176 + log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
1.177 + return message
1.178 +
1.179 + def _endOfFrame(self):
1.180 + msg = self.inMessage
1.181 + self.inMessage = None
1.182 + self.expectingHeader = True
1.183 + self.inHeader = None
1.184 + self.set_terminator(kFrameHeaderSize) # wait for binary header
1.185 + if msg:
1.186 + log.debug("End of frame of %s",msg)
1.187 + if not msg.moreComing:
1.188 + self._receivedMessage(msg)
1.189 +
1.190 + def _receivedMessage(self, msg):
1.191 + log.info("Received: %s",msg)
1.192 + # Remove from pending:
1.193 + if msg.isResponse:
1.194 + del self.pendingReplies[msg.requestNo]
1.195 + else:
1.196 + del self.pendingRequests[msg.requestNo]
1.197 + # Decode:
1.198 + try:
1.199 + msg._finished()
1.200 + except Exception, x:
1.201 + log.error("Exception parsing message: %s", traceback.format_exc())
1.202 + return
1.203 + # Dispatch:
1.204 + try:
1.205 + self.onRequest(msg)
1.206 + except Exception, x:
1.207 + log.error("Exception dispatching message: %s", traceback.format_exc())
1.208 + #FIX: Send an error reply
1.209 +
1.210 +### MESSAGES:
1.211 +
1.212 +
1.213 +class Message (object):
1.214 + def __init__(self, connection, properties=None, body=None):
1.215 + self.connection = connection
1.216 + self.properties = properties or {}
1.217 + self.body = body
1.218 +
1.219 + @property
1.220 + def flags(self):
1.221 + flags = kMsgType_Request
1.222 + if self.urgent: flags |= kMsgFlag_Urgent
1.223 + if self.compressed: flags |= kMsgFlag_Compressed
1.224 + if self.noReply: flags |= kMsgFlag_NoReply
1.225 + if self.moreComing: flags |= kMsgFlag_MoreComing
1.226 + return flags
1.227 +
1.228 + def __str__(self):
1.229 + s = "%s[#%i" %(type(self).__name__,self.requestNo)
1.230 + if self.urgent: s += " URG"
1.231 + if self.compressed: s += " CMP"
1.232 + if self.noReply: s += " NOR"
1.233 + if self.moreComing: s += " MOR"
1.234 + if self.body: s += " %i bytes" %len(self.body)
1.235 + return s+"]"
1.236 +
1.237 + def __repr__(self):
1.238 + s = str(self)
1.239 + if len(self.properties): s += repr(self.properties)
1.240 + return s
1.241 +
1.242 + @property
1.243 + def isResponse(self):
1.244 + return False
1.245 +
1.246 +
1.247 +class IncomingMessage (Message):
1.248 + def __init__(self, connection, requestNo, flags):
1.249 + super(IncomingMessage,self).__init__(connection)
1.250 + self.requestNo = requestNo
1.251 + self.urgent = (flags & kMsgFlag_Urgent) != 0
1.252 + self.compressed = (flags & kMsgFlag_Compressed) != 0
1.253 + self.noReply = (flags & kMsgFlag_NoReply) != 0
1.254 + self.moreComing = (flags & kMsgFlag_MoreComing) != 0
1.255 + self.frames = []
1.256 +
1.257 + def _beginFrame(self, flags):
1.258 + if (flags & kMsgFlag_MoreComing)==0:
1.259 + self.moreComing = False
1.260 +
1.261 + def _receivedData(self, data):
1.262 + self.frames.append(data)
1.263 +
1.264 + def _finished(self):
1.265 + encoded = "".join(self.frames)
1.266 + self.frames = None
1.267 +
1.268 + # Decode the properties:
1.269 + if len(encoded) < 2: raise MessageException, "missing properties length"
1.270 + propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
1.271 + if propSize>len(encoded): raise MessageException, "properties too long to fit"
1.272 + if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
1.273 +
1.274 + proplist = encoded[2:propSize-1].split('\000')
1.275 + encoded = encoded[propSize:]
1.276 + if len(proplist) & 1: raise MessageException, "odd number of property strings"
1.277 + for i in xrange(0,len(proplist),2):
1.278 + def expand(str):
1.279 + if len(str)==1:
1.280 + str = IncomingMessage.__expandDict.get(str,str)
1.281 + return str
1.282 + self.properties[ expand(proplist[i])] = expand(proplist[i+1])
1.283 +
1.284 + # Decode the body:
1.285 + if self.compressed and len(encoded)>0:
1.286 + try:
1.287 + encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
1.288 + except zlib.error:
1.289 + raise MessageException, sys.exc_info()[1]
1.290 + self.body = encoded
1.291 +
1.292 + __expandDict= {'\x01' : "Content-Type",
1.293 + '\x02' : "Profile",
1.294 + '\x03' : "application/octet-stream",
1.295 + '\x04' : "text/plain; charset=UTF-8",
1.296 + '\x05' : "text/xml",
1.297 + '\x06' : "text/yaml",
1.298 + '\x07' : "Channel",
1.299 + '\x08' : "Error-Code",
1.300 + '\x09' : "Error-Domain"}
1.301 +
1.302 +
1.303 +
1.304 +class OutgoingMessage (Message):
1.305 +
1.306 + def send(self):
1.307 + out = StringIO()
1.308 + out.write("xx") # placeholder for properties length (16 bits)
1.309 + for (key,value) in self.properties:
1.310 + def _writePropString(self, str):
1.311 + out.write(str)
1.312 + #FIX: Abbreviate
1.313 + out.write('\000')
1.314 + self._writePropString(key)
1.315 + self._writePropString(value)
1.316 + propsLen = out.tell()
1.317 + self.encoded = out.stringvalue()
1.318 + out.close()
1.319 + self.encoded[0:2] = struct.pack('!H',propsLen)
1.320 +
1.321 + body = self.body
1.322 + if self.compressed:
1.323 + body = zlib.compress(body,5)
1.324 + self.encoded += body
1.325 +
1.326 + self.bytesSent = 0
1.327 + self.connection._outQueueMessage(self)
1.328 +
1.329 + def _sendNextFrame(self, conn,maxLen):
1.330 + pos = self.bytesSent
1.331 + payload = self.encoded[pos:pos+maxLen]
1.332 + pos += len(payload)
1.333 + if pos >= len(self.encoded):
1.334 + self.moreComing = False
1.335 +
1.336 + conn.push( struct.pack(kFrameHeaderFormat,
1.337 + kFrameMagicNumber,
1.338 + self.requestNo,
1.339 + self.flags,
1.340 + kFrameHeaderSize+len(payload)) )
1.341 + conn.push( payload )
1.342 +
1.343 + self.bytesSent = pos
1.344 +
1.345 +
1.346 +class Request (Message):
1.347 + pass
1.348 +
1.349 +class Response (Message):
1.350 + @property
1.351 + def isResponse(self):
1.352 + return True
1.353 +
1.354 + @property
1.355 + def flags(self):
1.356 + flags = super(Response,self).flags() ^ kMsgType_Request
1.357 + flags ^= kMsgType_Response
1.358 + return flags
1.359 +
1.360 +
1.361 +
1.362 +class IncomingRequest (IncomingMessage, Request):
1.363 + pass
1.364 +
1.365 +class OutgoingRequest (OutgoingMessage, Request):
1.366 + pass
1.367 +
1.368 +class IncomingResponse (IncomingMessage, Response):
1.369 + pass
1.370 +
1.371 +class OutgoingResponse (OutgoingMessage, Response):
1.372 + pass
1.373 +
1.374 +
1.375 +### UNIT TESTS:
1.376 +
1.377 +
1.378 +class BLIPTests(unittest.TestCase):
1.379 + def setUp(self):
1.380 + listener = Listener(46353)
1.381 + listener.onRequest = lambda req: logging.info("Got request!: %r",req)
1.382 +
1.383 + def testListener(self):
1.384 + logging.info("Waiting...")
1.385 + asyncore.loop()
1.386 +
1.387 +if __name__ == '__main__':
1.388 + logging.basicConfig(level=logging.INFO)
1.389 + unittest.main()
1.390 \ No newline at end of file