1.1 --- a/Python/BLIP.py Tue Jun 03 22:24:21 2008 -0700
1.2 +++ b/Python/BLIP.py Wed Jun 04 17:11:20 2008 -0700
1.3 @@ -1,10 +1,9 @@
1.4 -#!/usr/bin/env python
1.5 # encoding: utf-8
1.6 """
1.7 BLIP.py
1.8
1.9 Created by Jens Alfke on 2008-06-03.
1.10 -Copyright (c) 2008 Jens Alfke. All rights reserved.
1.11 +Copyright notice and BSD license at end of file.
1.12 """
1.13
1.14 import asynchat
1.15 @@ -15,10 +14,17 @@
1.16 import struct
1.17 import sys
1.18 import traceback
1.19 -import unittest
1.20 import zlib
1.21
1.22
1.23 +# Connection status enumeration:
1.24 +kDisconnected = -1
1.25 +kClosed = 0
1.26 +kOpening = 1
1.27 +kOpen = 2
1.28 +kClosing = 3
1.29 +
1.30 +
1.31 # INTERNAL CONSTANTS -- NO TOUCHIES!
1.32
1.33 kFrameMagicNumber = 0x9B34F205
1.34 @@ -47,57 +53,118 @@
1.35 pass
1.36
1.37
1.38 +### LISTENER AND CONNECTION CLASSES:
1.39 +
1.40 +
1.41 class Listener (asyncore.dispatcher):
1.42 "BLIP listener/server class"
1.43
1.44 - def __init__(self, port):
1.45 + def __init__(self, port, sslKeyFile=None, sslCertFile=None):
1.46 "Create a listener on a port"
1.47 asyncore.dispatcher.__init__(self)
1.48 self.onConnected = self.onRequest = None
1.49 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1.50 self.bind( ('',port) )
1.51 self.listen(5)
1.52 + self.sslKeyFile=sslKeyFile
1.53 + self.sslCertFile=sslCertFile
1.54 log.info("Listening on port %u", port)
1.55
1.56 def handle_accept( self ):
1.57 - client,address = self.accept()
1.58 - conn = Connection(address,client)
1.59 + socket,address = self.accept()
1.60 + if self.sslKeyFile:
1.61 + socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
1.62 + conn = Connection(address, sock=socket, listener=self)
1.63 conn.onRequest = self.onRequest
1.64 if self.onConnected:
1.65 self.onConnected(conn)
1.66
1.67 + def handle_error(self):
1.68 + (typ,val,trace) = sys.exc_info()
1.69 + log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
1.70 + self.close()
1.71 +
1.72 +
1.73
1.74 class Connection (asynchat.async_chat):
1.75 - def __init__( self, address, conn=None ):
1.76 + def __init__( self, address, sock=None, listener=None, ssl=None ):
1.77 "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
1.78 "otherwise it'll open a new outgoing socket."
1.79 - asynchat.async_chat.__init__(self,conn)
1.80 - self.address = address
1.81 - if conn:
1.82 + if sock:
1.83 + asynchat.async_chat.__init__(self,sock)
1.84 log.info("Accepted connection from %s",address)
1.85 + self.status = kOpen
1.86 else:
1.87 + asynchat.async_chat.__init__(self)
1.88 log.info("Opening connection to %s",address)
1.89 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1.90 + self.status = kOpening
1.91 + if ssl:
1.92 + ssl(self.socket)
1.93 self.connect(address)
1.94 + self.address = address
1.95 + self.listener = listener
1.96 self.onRequest = None
1.97 self.pendingRequests = {}
1.98 self.pendingResponses = {}
1.99 self.outBox = []
1.100 self.inMessage = None
1.101 - self.inNumRequests = 0
1.102 + self.inNumRequests = self.outNumRequests = 0
1.103 self._endOfFrame()
1.104
1.105 - #def handle_error(self,x):
1.106 - # log.error("Uncaught exception: %s",x)
1.107 - # self.close()
1.108 + def close(self):
1.109 + if self.status > kClosed:
1.110 + self.status = kClosing
1.111 + log.info("Connection closing...")
1.112 + asynchat.async_chat.close(self)
1.113
1.114 - def _fatal(self, error):
1.115 - log.error("Fatal BLIP connection error: %s",error)
1.116 + def handle_connect(self):
1.117 + log.info("Connection open!")
1.118 + self.status = kOpen
1.119 +
1.120 + def handle_error(self):
1.121 + (typ,val,trace) = sys.exc_info()
1.122 + log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
1.123 + self.discard_buffers()
1.124 + self.status = kDisconnected
1.125 self.close()
1.126
1.127 + def handle_close(self):
1.128 + log.info("Connection closed!")
1.129 + self.pendingRequests = self.pendingResponses = None
1.130 + self.outBox = None
1.131 + if self.status == kClosing:
1.132 + self.status = kClosed
1.133 + else:
1.134 + self.status = kDisconnected
1.135 + asynchat.async_chat.handle_close(self)
1.136 +
1.137
1.138 ### SENDING:
1.139
1.140 + @property
1.141 + def canSend(self):
1.142 + return self.status==kOpening or self.status==kOpen
1.143 +
1.144 + def _sendMessage(self, msg):
1.145 + if self.canSend:
1.146 + self._outQueueMessage(msg,True)
1.147 + return True
1.148 + else:
1.149 + return False
1.150 +
1.151 + def _sendRequest(self, req):
1.152 + if self.canSend:
1.153 + requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
1.154 + response = req.response
1.155 + if response:
1.156 + response.requestNo = requestNo
1.157 + self.pendingResponses[requestNo] = response
1.158 + log.debug("pendingResponses[%i] := %s",requestNo,response)
1.159 + return self._sendMessage(req)
1.160 + else:
1.161 + return False
1.162 +
1.163 def _outQueueMessage(self, msg,isNew=True):
1.164 n = len(self.outBox)
1.165 index = n
1.166 @@ -116,7 +183,7 @@
1.167
1.168 self.outBox.insert(index,msg)
1.169 if isNew:
1.170 - log.info("Queuing outgoing message at index %i",index)
1.171 + log.info("Queuing %s at index %i",msg,index)
1.172 if n==0:
1.173 self._sendNextFrame()
1.174 else:
1.175 @@ -144,7 +211,7 @@
1.176 self.inHeader = data
1.177 else:
1.178 self.inHeader += data
1.179 - else:
1.180 + elif self.inMessage:
1.181 self.inMessage._receivedData(data)
1.182
1.183 def found_terminator(self):
1.184 @@ -152,8 +219,8 @@
1.185 # Got a header:
1.186 (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
1.187 self.inHeader = None
1.188 - if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
1.189 - if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
1.190 + if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
1.191 + if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
1.192 frameLen -= kFrameHeaderSize
1.193 log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
1.194 (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
1.195 @@ -196,14 +263,14 @@
1.196 self.set_terminator(kFrameHeaderSize) # wait for binary header
1.197 if msg:
1.198 log.debug("End of frame of %s",msg)
1.199 - if not msg.moreComing:
1.200 + if not msg._moreComing:
1.201 self._receivedMessage(msg)
1.202
1.203 def _receivedMessage(self, msg):
1.204 log.info("Received: %s",msg)
1.205 # Remove from pending:
1.206 if msg.isResponse:
1.207 - del self.pendingReplies[msg.requestNo]
1.208 + del self.pendingResponses[msg.requestNo]
1.209 else:
1.210 del self.pendingRequests[msg.requestNo]
1.211 # Decode:
1.212 @@ -216,16 +283,17 @@
1.213 #FIX: Send an error reply
1.214
1.215
1.216 -### MESSAGES:
1.217 +### MESSAGE CLASSES:
1.218
1.219
1.220 class Message (object):
1.221 "Abstract superclass of all request/response objects"
1.222
1.223 - def __init__(self, connection, properties=None, body=None):
1.224 + def __init__(self, connection, body=None, properties=None):
1.225 self.connection = connection
1.226 + self.body = body
1.227 self.properties = properties or {}
1.228 - self.body = body
1.229 + self.requestNo = None
1.230
1.231 @property
1.232 def flags(self):
1.233 @@ -236,15 +304,17 @@
1.234 if self.urgent: flags |= kMsgFlag_Urgent
1.235 if self.compressed: flags |= kMsgFlag_Compressed
1.236 if self.noReply: flags |= kMsgFlag_NoReply
1.237 - if self.moreComing: flags |= kMsgFlag_MoreComing
1.238 + if self._moreComing:flags |= kMsgFlag_MoreComing
1.239 return flags
1.240
1.241 def __str__(self):
1.242 - s = "%s[#%i" %(type(self).__name__,self.requestNo)
1.243 + s = "%s[" %(type(self).__name__)
1.244 + if self.requestNo != None:
1.245 + s += "#%i" %self.requestNo
1.246 if self.urgent: s += " URG"
1.247 if self.compressed: s += " CMP"
1.248 if self.noReply: s += " NOR"
1.249 - if self.moreComing: s += " MOR"
1.250 + if self._moreComing:s += " MOR"
1.251 if self.body: s += " %i bytes" %len(self.body)
1.252 return s+"]"
1.253
1.254 @@ -253,12 +323,12 @@
1.255 if len(self.properties): s += repr(self.properties)
1.256 return s
1.257
1.258 - @property
1.259 + @property
1.260 def isResponse(self):
1.261 "Is this message a response?"
1.262 return False
1.263
1.264 - @property
1.265 + @property
1.266 def contentType(self):
1.267 return self.properties.get('Content-Type')
1.268
1.269 @@ -278,17 +348,19 @@
1.270 self.urgent = (flags & kMsgFlag_Urgent) != 0
1.271 self.compressed = (flags & kMsgFlag_Compressed) != 0
1.272 self.noReply = (flags & kMsgFlag_NoReply) != 0
1.273 - self.moreComing = (flags & kMsgFlag_MoreComing) != 0
1.274 + self._moreComing= (flags & kMsgFlag_MoreComing) != 0
1.275 self.frames = []
1.276
1.277 def _beginFrame(self, flags):
1.278 - if (flags & kMsgFlag_MoreComing)==0:
1.279 - self.moreComing = False
1.280 + """Received a frame header."""
1.281 + self._moreComing = (flags & kMsgFlag_MoreComing)!=0
1.282
1.283 def _receivedData(self, data):
1.284 + """Received data from a frame."""
1.285 self.frames.append(data)
1.286
1.287 def _finished(self):
1.288 + """The entire message has been received; now decode it."""
1.289 encoded = "".join(self.frames)
1.290 self.frames = None
1.291
1.292 @@ -327,47 +399,54 @@
1.293 '\x09' : "Error-Domain"}
1.294
1.295
1.296 -
1.297 class OutgoingMessage (Message):
1.298 "Abstract superclass of outgoing requests/responses."
1.299
1.300 - def __init__(self, connection, properties=None, body=None):
1.301 - Message.__init__(self,connection,properties,body)
1.302 + def __init__(self, connection, body=None, properties=None):
1.303 + Message.__init__(self,connection,body,properties)
1.304 self.urgent = self.compressed = self.noReply = False
1.305 - self.moreComing = True
1.306 + self._moreComing = True
1.307
1.308 def __setitem__(self, key,val):
1.309 self.properties[key] = val
1.310 def __delitem__(self, key):
1.311 del self.properties[key]
1.312
1.313 - def send(self):
1.314 - "Sends this message."
1.315 - log.info("Sending %s",self)
1.316 + @property
1.317 + def sent(self):
1.318 + return 'encoded' in self.__dict__
1.319 +
1.320 + def _encode(self):
1.321 + "Generates the message's encoded form, prior to sending it."
1.322 out = StringIO()
1.323 for (key,value) in self.properties.iteritems():
1.324 - def _writePropString(str):
1.325 - out.write(str) #FIX: Abbreviate
1.326 + def _writePropString(s):
1.327 + out.write(str(s)) #FIX: Abbreviate
1.328 out.write('\000')
1.329 _writePropString(key)
1.330 _writePropString(value)
1.331 - self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
1.332 - out.close()
1.333 + propertiesSize = out.tell()
1.334 + assert propertiesSize<65536 #FIX: Return an error instead
1.335
1.336 body = self.body
1.337 if self.compressed:
1.338 - body = zlib.compress(body,5)
1.339 - self.encoded += body
1.340 + z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
1.341 + out.write(z.compress(body))
1.342 + body = z.flush()
1.343 + out.write(body)
1.344 +
1.345 + self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
1.346 + out.close()
1.347 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
1.348 -
1.349 self.bytesSent = 0
1.350 - self.connection._outQueueMessage(self)
1.351
1.352 def _sendNextFrame(self, conn,maxLen):
1.353 pos = self.bytesSent
1.354 payload = self.encoded[pos:pos+maxLen]
1.355 pos += len(payload)
1.356 - self.moreComing = (pos < len(self.encoded))
1.357 + self._moreComing = (pos < len(self.encoded))
1.358 + if not self._moreComing:
1.359 + self.encoded = None
1.360 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
1.361
1.362 conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
1.363 @@ -377,13 +456,15 @@
1.364 conn.push( payload )
1.365
1.366 self.bytesSent = pos
1.367 - return self.moreComing
1.368 + return self._moreComing
1.369
1.370
1.371 class Request (object):
1.372 @property
1.373 def response(self):
1.374 "The response object for this request."
1.375 + if self.noReply:
1.376 + return None
1.377 r = self.__dict__.get('_response')
1.378 if r==None:
1.379 r = self._response = self._createResponse()
1.380 @@ -391,7 +472,7 @@
1.381
1.382
1.383 class Response (Message):
1.384 - def __init__(self, request):
1.385 + def _setRequest(self, request):
1.386 assert not request.noReply
1.387 self.request = request
1.388 self.requestNo = request.requestNo
1.389 @@ -402,19 +483,24 @@
1.390 return True
1.391
1.392
1.393 -
1.394 class IncomingRequest (IncomingMessage, Request):
1.395 def _createResponse(self):
1.396 return OutgoingResponse(self)
1.397
1.398 +
1.399 class OutgoingRequest (OutgoingMessage, Request):
1.400 def _createResponse(self):
1.401 return IncomingResponse(self)
1.402 +
1.403 + def send(self):
1.404 + self._encode()
1.405 + return self.connection._sendRequest(self) and self.response
1.406 +
1.407
1.408 class IncomingResponse (IncomingMessage, Response):
1.409 def __init__(self, request):
1.410 - IncomingMessage.__init__(self,request.connection,request.requestNo,0)
1.411 - Response.__init__(self,request)
1.412 + IncomingMessage.__init__(self,request.connection,None,0)
1.413 + self._setRequest(request)
1.414 self.onComplete = None
1.415
1.416 def _finished(self):
1.417 @@ -424,40 +510,36 @@
1.418 self.onComplete(self)
1.419 except Exception, x:
1.420 log.error("Exception dispatching response: %s", traceback.format_exc())
1.421 -
1.422 +
1.423 +
1.424 class OutgoingResponse (OutgoingMessage, Response):
1.425 def __init__(self, request):
1.426 OutgoingMessage.__init__(self,request.connection)
1.427 - Response.__init__(self,request)
1.428 + self._setRequest(request)
1.429 +
1.430 + def send(self):
1.431 + self._encode()
1.432 + return self.connection._sendMessage(self)
1.433
1.434
1.435 -### UNIT TESTS:
1.436 -
1.437 -
1.438 -class BLIPTests(unittest.TestCase):
1.439 - def setUp(self):
1.440 - def handleRequest(request):
1.441 - logging.info("Got request!: %r",request)
1.442 - body = request.body
1.443 - assert len(body)<32768
1.444 - assert request.contentType == 'application/octet-stream'
1.445 - assert int(request['Size']) == len(body)
1.446 - assert request['User-Agent'] == 'BLIPConnectionTester'
1.447 - for i in xrange(0,len(request.body)):
1.448 - assert ord(body[i]) == i%256
1.449 -
1.450 - response = request.response
1.451 - response.body = request.body
1.452 - response['Content-Type'] = request.contentType
1.453 - response.send()
1.454 -
1.455 - listener = Listener(46353)
1.456 - listener.onRequest = handleRequest
1.457 -
1.458 - def testListener(self):
1.459 - logging.info("Waiting...")
1.460 - asyncore.loop()
1.461 -
1.462 -if __name__ == '__main__':
1.463 - logging.basicConfig(level=logging.INFO)
1.464 - unittest.main()
1.465 \ No newline at end of file
1.466 +"""
1.467 + Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
1.468 +
1.469 + Redistribution and use in source and binary forms, with or without modification, are permitted
1.470 + provided that the following conditions are met:
1.471 +
1.472 + * Redistributions of source code must retain the above copyright notice, this list of conditions
1.473 + and the following disclaimer.
1.474 + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
1.475 + and the following disclaimer in the documentation and/or other materials provided with the
1.476 + distribution.
1.477 +
1.478 + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
1.479 + IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
1.480 + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
1.481 + BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
1.482 + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
1.483 + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
1.484 + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
1.485 + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1.486 +"""