1.1 --- a/Python/BLIP.py Tue Jun 03 16:56:33 2008 -0700
1.2 +++ b/Python/BLIP.py Tue Jun 03 22:24:21 2008 -0700
1.3 @@ -19,6 +19,8 @@
1.4 import zlib
1.5
1.6
1.7 +# INTERNAL CONSTANTS -- NO TOUCHIES!
1.8 +
1.9 kFrameMagicNumber = 0x9B34F205
1.10 kFrameHeaderFormat = '!LLHH'
1.11 kFrameHeaderSize = 12
1.12 @@ -37,6 +39,7 @@
1.13 log = logging.getLogger('BLIP')
1.14 log.propagate = True
1.15
1.16 +
1.17 class MessageException(Exception):
1.18 pass
1.19
1.20 @@ -45,10 +48,12 @@
1.21
1.22
1.23 class Listener (asyncore.dispatcher):
1.24 + "BLIP listener/server class"
1.25 +
1.26 def __init__(self, port):
1.27 + "Create a listener on a port"
1.28 asyncore.dispatcher.__init__(self)
1.29 - self.onConnected = None
1.30 - self.onRequest = None
1.31 + self.onConnected = self.onRequest = None
1.32 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1.33 self.bind( ('',port) )
1.34 self.listen(5)
1.35 @@ -64,6 +69,8 @@
1.36
1.37 class Connection (asynchat.async_chat):
1.38 def __init__( self, address, conn=None ):
1.39 + "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
1.40 + "otherwise it'll open a new outgoing socket."
1.41 asynchat.async_chat.__init__(self,conn)
1.42 self.address = address
1.43 if conn:
1.44 @@ -92,11 +99,11 @@
1.45 ### SENDING:
1.46
1.47 def _outQueueMessage(self, msg,isNew=True):
1.48 - n = self.outBox.length
1.49 + n = len(self.outBox)
1.50 index = n
1.51 if msg.urgent and n>1:
1.52 while index > 0:
1.53 - otherMsg = self.outBox[index]
1.54 + otherMsg = self.outBox[index-1]
1.55 if otherMsg.urgent:
1.56 if index<n:
1.57 index += 1
1.58 @@ -106,20 +113,27 @@
1.59 index -= 1
1.60 else:
1.61 index = 1
1.62 -
1.63 +
1.64 self.outBox.insert(index,msg)
1.65 if isNew:
1.66 log.info("Queuing outgoing message at index %i",index)
1.67 + if n==0:
1.68 + self._sendNextFrame()
1.69 + else:
1.70 + log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
1.71
1.72 def _sendNextFrame(self):
1.73 - n = len(self.outBox)
1.74 - if n > 0:
1.75 - msg = self.outBox.pop(0)
1.76 - frameSize = 4096
1.77 - if msg.urgent or n==1 or not self.outBox[0].urgent:
1.78 - frameSize *= 4
1.79 - if msg._sendNextFrame(self):
1.80 - self._outQueueMessage(msg,isNew=False)
1.81 + while self.outBox: #FIX: Don't send everything at once; only as space becomes available!
1.82 + n = len(self.outBox)
1.83 + if n > 0:
1.84 + msg = self.outBox.pop(0)
1.85 + frameSize = 4096
1.86 + if msg.urgent or n==1 or not self.outBox[0].urgent:
1.87 + frameSize *= 4
1.88 + if msg._sendNextFrame(self,frameSize):
1.89 + self._outQueueMessage(msg,isNew=False)
1.90 + else:
1.91 + log.info("Finished sending %s",msg)
1.92
1.93
1.94 ### RECEIVING:
1.95 @@ -132,7 +146,7 @@
1.96 self.inHeader += data
1.97 else:
1.98 self.inMessage._receivedData(data)
1.99 -
1.100 +
1.101 def found_terminator(self):
1.102 if self.expectingHeader:
1.103 # Got a header:
1.104 @@ -150,7 +164,7 @@
1.105 self.set_terminator(frameLen)
1.106 else:
1.107 self._endOfFrame()
1.108 -
1.109 +
1.110 else:
1.111 # Got the frame's payload:
1.112 self._endOfFrame()
1.113 @@ -162,12 +176,13 @@
1.114 message = self.pendingRequests.get(requestNo)
1.115 if message==None and requestNo == self.inNumRequests+1:
1.116 message = IncomingRequest(self,requestNo,flags)
1.117 + assert message!=None
1.118 self.pendingRequests[requestNo] = message
1.119 self.inNumRequests += 1
1.120 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
1.121 message = self.pendingResponses.get(requestNo)
1.122 -
1.123 - if message:
1.124 +
1.125 + if message != None:
1.126 message._beginFrame(flags)
1.127 else:
1.128 log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
1.129 @@ -183,7 +198,7 @@
1.130 log.debug("End of frame of %s",msg)
1.131 if not msg.moreComing:
1.132 self._receivedMessage(msg)
1.133 -
1.134 +
1.135 def _receivedMessage(self, msg):
1.136 log.info("Received: %s",msg)
1.137 # Remove from pending:
1.138 @@ -194,20 +209,19 @@
1.139 # Decode:
1.140 try:
1.141 msg._finished()
1.142 + if not msg.isResponse:
1.143 + self.onRequest(msg)
1.144 except Exception, x:
1.145 - log.error("Exception parsing message: %s", traceback.format_exc())
1.146 - return
1.147 - # Dispatch:
1.148 - try:
1.149 - self.onRequest(msg)
1.150 - except Exception, x:
1.151 - log.error("Exception dispatching message: %s", traceback.format_exc())
1.152 + log.error("Exception handling incoming message: %s", traceback.format_exc())
1.153 #FIX: Send an error reply
1.154
1.155 +
1.156 ### MESSAGES:
1.157
1.158
1.159 class Message (object):
1.160 + "Abstract superclass of all request/response objects"
1.161 +
1.162 def __init__(self, connection, properties=None, body=None):
1.163 self.connection = connection
1.164 self.properties = properties or {}
1.165 @@ -215,7 +229,10 @@
1.166
1.167 @property
1.168 def flags(self):
1.169 - flags = kMsgType_Request
1.170 + if self.isResponse:
1.171 + flags = kMsgType_Response
1.172 + else:
1.173 + flags = kMsgType_Request
1.174 if self.urgent: flags |= kMsgFlag_Urgent
1.175 if self.compressed: flags |= kMsgFlag_Compressed
1.176 if self.noReply: flags |= kMsgFlag_NoReply
1.177 @@ -235,17 +252,30 @@
1.178 s = str(self)
1.179 if len(self.properties): s += repr(self.properties)
1.180 return s
1.181 -
1.182 - @property
1.183 +
1.184 + @property
1.185 def isResponse(self):
1.186 + "Is this message a response?"
1.187 return False
1.188 +
1.189 + @property
1.190 + def contentType(self):
1.191 + return self.properties.get('Content-Type')
1.192 +
1.193 + def __getitem__(self, key): return self.properties.get(key)
1.194 + def __contains__(self, key): return key in self.properties
1.195 + def __len__(self): return len(self.properties)
1.196 + def __nonzero__(self): return True
1.197 + def __iter__(self): return self.properties.__iter__()
1.198
1.199
1.200 class IncomingMessage (Message):
1.201 + "Abstract superclass of incoming messages."
1.202 +
1.203 def __init__(self, connection, requestNo, flags):
1.204 super(IncomingMessage,self).__init__(connection)
1.205 self.requestNo = requestNo
1.206 - self.urgent = (flags & kMsgFlag_Urgent) != 0
1.207 + self.urgent = (flags & kMsgFlag_Urgent) != 0
1.208 self.compressed = (flags & kMsgFlag_Compressed) != 0
1.209 self.noReply = (flags & kMsgFlag_NoReply) != 0
1.210 self.moreComing = (flags & kMsgFlag_MoreComing) != 0
1.211 @@ -254,7 +284,7 @@
1.212 def _beginFrame(self, flags):
1.213 if (flags & kMsgFlag_MoreComing)==0:
1.214 self.moreComing = False
1.215 -
1.216 +
1.217 def _receivedData(self, data):
1.218 self.frames.append(data)
1.219
1.220 @@ -295,30 +325,40 @@
1.221 '\x07' : "Channel",
1.222 '\x08' : "Error-Code",
1.223 '\x09' : "Error-Domain"}
1.224 -
1.225 +
1.226
1.227
1.228 class OutgoingMessage (Message):
1.229 + "Abstract superclass of outgoing requests/responses."
1.230 +
1.231 + def __init__(self, connection, properties=None, body=None):
1.232 + Message.__init__(self,connection,properties,body)
1.233 + self.urgent = self.compressed = self.noReply = False
1.234 + self.moreComing = True
1.235 +
1.236 + def __setitem__(self, key,val):
1.237 + self.properties[key] = val
1.238 + def __delitem__(self, key):
1.239 + del self.properties[key]
1.240
1.241 def send(self):
1.242 + "Sends this message."
1.243 + log.info("Sending %s",self)
1.244 out = StringIO()
1.245 - out.write("xx") # placeholder for properties length (16 bits)
1.246 - for (key,value) in self.properties:
1.247 - def _writePropString(self, str):
1.248 - out.write(str)
1.249 - #FIX: Abbreviate
1.250 + for (key,value) in self.properties.iteritems():
1.251 + def _writePropString(str):
1.252 + out.write(str) #FIX: Abbreviate
1.253 out.write('\000')
1.254 - self._writePropString(key)
1.255 - self._writePropString(value)
1.256 - propsLen = out.tell()
1.257 - self.encoded = out.stringvalue()
1.258 + _writePropString(key)
1.259 + _writePropString(value)
1.260 + self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
1.261 out.close()
1.262 - self.encoded[0:2] = struct.pack('!H',propsLen)
1.263
1.264 body = self.body
1.265 if self.compressed:
1.266 body = zlib.compress(body,5)
1.267 self.encoded += body
1.268 + log.debug("Encoded %s into %u bytes", self,len(self.encoded))
1.269
1.270 self.bytesSent = 0
1.271 self.connection._outQueueMessage(self)
1.272 @@ -327,46 +367,68 @@
1.273 pos = self.bytesSent
1.274 payload = self.encoded[pos:pos+maxLen]
1.275 pos += len(payload)
1.276 - if pos >= len(self.encoded):
1.277 - self.moreComing = False
1.278 -
1.279 - conn.push( struct.pack(kFrameHeaderFormat,
1.280 - kFrameMagicNumber,
1.281 - self.requestNo,
1.282 - self.flags,
1.283 - kFrameHeaderSize+len(payload)) )
1.284 + self.moreComing = (pos < len(self.encoded))
1.285 + log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
1.286 +
1.287 + conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
1.288 + self.requestNo,
1.289 + self.flags,
1.290 + kFrameHeaderSize+len(payload)) )
1.291 conn.push( payload )
1.292
1.293 self.bytesSent = pos
1.294 + return self.moreComing
1.295
1.296
1.297 -class Request (Message):
1.298 - pass
1.299 +class Request (object):
1.300 + @property
1.301 + def response(self):
1.302 + "The response object for this request."
1.303 + r = self.__dict__.get('_response')
1.304 + if r==None:
1.305 + r = self._response = self._createResponse()
1.306 + return r
1.307 +
1.308
1.309 class Response (Message):
1.310 + def __init__(self, request):
1.311 + assert not request.noReply
1.312 + self.request = request
1.313 + self.requestNo = request.requestNo
1.314 + self.urgent = request.urgent
1.315 +
1.316 @property
1.317 def isResponse(self):
1.318 return True
1.319
1.320 - @property
1.321 - def flags(self):
1.322 - flags = super(Response,self).flags() ^ kMsgType_Request
1.323 - flags ^= kMsgType_Response
1.324 - return flags
1.325 -
1.326
1.327
1.328 class IncomingRequest (IncomingMessage, Request):
1.329 - pass
1.330 + def _createResponse(self):
1.331 + return OutgoingResponse(self)
1.332
1.333 class OutgoingRequest (OutgoingMessage, Request):
1.334 - pass
1.335 + def _createResponse(self):
1.336 + return IncomingResponse(self)
1.337
1.338 class IncomingResponse (IncomingMessage, Response):
1.339 - pass
1.340 -
1.341 + def __init__(self, request):
1.342 + IncomingMessage.__init__(self,request.connection,request.requestNo,0)
1.343 + Response.__init__(self,request)
1.344 + self.onComplete = None
1.345 +
1.346 + def _finished(self):
1.347 + super(IncomingResponse,self)._finished()
1.348 + if self.onComplete:
1.349 + try:
1.350 + self.onComplete(self)
1.351 + except Exception, x:
1.352 + log.error("Exception dispatching response: %s", traceback.format_exc())
1.353 +
1.354 class OutgoingResponse (OutgoingMessage, Response):
1.355 - pass
1.356 + def __init__(self, request):
1.357 + OutgoingMessage.__init__(self,request.connection)
1.358 + Response.__init__(self,request)
1.359
1.360
1.361 ### UNIT TESTS:
1.362 @@ -374,8 +436,23 @@
1.363
1.364 class BLIPTests(unittest.TestCase):
1.365 def setUp(self):
1.366 + def handleRequest(request):
1.367 + logging.info("Got request!: %r",request)
1.368 + body = request.body
1.369 + assert len(body)<32768
1.370 + assert request.contentType == 'application/octet-stream'
1.371 + assert int(request['Size']) == len(body)
1.372 + assert request['User-Agent'] == 'BLIPConnectionTester'
1.373 + for i in xrange(0,len(request.body)):
1.374 + assert ord(body[i]) == i%256
1.375 +
1.376 + response = request.response
1.377 + response.body = request.body
1.378 + response['Content-Type'] = request.contentType
1.379 + response.send()
1.380 +
1.381 listener = Listener(46353)
1.382 - listener.onRequest = lambda req: logging.info("Got request!: %r",req)
1.383 + listener.onRequest = handleRequest
1.384
1.385 def testListener(self):
1.386 logging.info("Waiting...")