Python/BLIP.py
changeset 13 84c2d38f924c
parent 12 710113961756
child 14 bb5faa9995d5
     1.1 --- a/Python/BLIP.py	Tue Jun 03 22:24:21 2008 -0700
     1.2 +++ b/Python/BLIP.py	Wed Jun 04 17:11:20 2008 -0700
     1.3 @@ -1,10 +1,9 @@
     1.4 -#!/usr/bin/env python
     1.5  # encoding: utf-8
     1.6  """
     1.7  BLIP.py
     1.8  
     1.9  Created by Jens Alfke on 2008-06-03.
    1.10 -Copyright (c) 2008 Jens Alfke. All rights reserved.
    1.11 +Copyright notice and BSD license at end of file.
    1.12  """
    1.13  
    1.14  import asynchat
    1.15 @@ -15,10 +14,17 @@
    1.16  import struct
    1.17  import sys
    1.18  import traceback
    1.19 -import unittest
    1.20  import zlib
    1.21  
    1.22  
    1.23 +# Connection status enumeration:
    1.24 +kDisconnected = -1
    1.25 +kClosed  = 0
    1.26 +kOpening = 1
    1.27 +kOpen    = 2
    1.28 +kClosing = 3
    1.29 +
    1.30 +
    1.31  # INTERNAL CONSTANTS -- NO TOUCHIES!
    1.32  
    1.33  kFrameMagicNumber   = 0x9B34F205
    1.34 @@ -47,57 +53,118 @@
    1.35      pass
    1.36  
    1.37  
    1.38 +### LISTENER AND CONNECTION CLASSES:
    1.39 +
    1.40 +
    1.41  class Listener (asyncore.dispatcher):
    1.42      "BLIP listener/server class"
    1.43      
    1.44 -    def __init__(self, port):
    1.45 +    def __init__(self, port, sslKeyFile=None, sslCertFile=None):
    1.46          "Create a listener on a port"
    1.47          asyncore.dispatcher.__init__(self)
    1.48          self.onConnected = self.onRequest = None
    1.49          self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    1.50          self.bind( ('',port) )
    1.51          self.listen(5)
    1.52 +        self.sslKeyFile=sslKeyFile
    1.53 +        self.sslCertFile=sslCertFile
    1.54          log.info("Listening on port %u", port)
    1.55      
    1.56      def handle_accept( self ):
    1.57 -        client,address = self.accept()
    1.58 -        conn = Connection(address,client)
    1.59 +        socket,address = self.accept()
    1.60 +        if self.sslKeyFile:
    1.61 +            socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
    1.62 +        conn = Connection(address, sock=socket, listener=self)
    1.63          conn.onRequest = self.onRequest
    1.64          if self.onConnected:
    1.65              self.onConnected(conn)
    1.66  
    1.67 +    def handle_error(self):
    1.68 +        (typ,val,trace) = sys.exc_info()
    1.69 +        log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
    1.70 +        self.close()
    1.71 +    
    1.72 +
    1.73  
    1.74  class Connection (asynchat.async_chat):
    1.75 -    def __init__( self, address, conn=None ):
    1.76 +    def __init__( self, address, sock=None, listener=None, ssl=None ):
    1.77          "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
    1.78          "otherwise it'll open a new outgoing socket."
    1.79 -        asynchat.async_chat.__init__(self,conn)
    1.80 -        self.address = address
    1.81 -        if conn:
    1.82 +        if sock:
    1.83 +            asynchat.async_chat.__init__(self,sock)
    1.84              log.info("Accepted connection from %s",address)
    1.85 +            self.status = kOpen
    1.86          else:
    1.87 +            asynchat.async_chat.__init__(self)
    1.88              log.info("Opening connection to %s",address)
    1.89              self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    1.90 +            self.status = kOpening
    1.91 +            if ssl:
    1.92 +                ssl(self.socket)
    1.93              self.connect(address)
    1.94 +        self.address = address
    1.95 +        self.listener = listener
    1.96          self.onRequest = None
    1.97          self.pendingRequests = {}
    1.98          self.pendingResponses = {}
    1.99          self.outBox = []
   1.100          self.inMessage = None
   1.101 -        self.inNumRequests = 0
   1.102 +        self.inNumRequests = self.outNumRequests = 0
   1.103          self._endOfFrame()
   1.104      
   1.105 -    #def handle_error(self,x):
   1.106 -    #    log.error("Uncaught exception: %s",x)
   1.107 -    #    self.close()
   1.108 +    def close(self):
   1.109 +        if self.status > kClosed:
   1.110 +            self.status = kClosing
   1.111 +            log.info("Connection closing...")
   1.112 +        asynchat.async_chat.close(self)
   1.113      
   1.114 -    def _fatal(self, error):
   1.115 -        log.error("Fatal BLIP connection error: %s",error)
   1.116 +    def handle_connect(self):
   1.117 +        log.info("Connection open!")
   1.118 +        self.status = kOpen
   1.119 +    
   1.120 +    def handle_error(self):
   1.121 +        (typ,val,trace) = sys.exc_info()
   1.122 +        log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
   1.123 +        self.discard_buffers()
   1.124 +        self.status = kDisconnected
   1.125          self.close()
   1.126      
   1.127 +    def handle_close(self):
   1.128 +        log.info("Connection closed!")
   1.129 +        self.pendingRequests = self.pendingResponses = None
   1.130 +        self.outBox = None
   1.131 +        if self.status == kClosing:
   1.132 +            self.status = kClosed
   1.133 +        else:
   1.134 +            self.status = kDisconnected
   1.135 +        asynchat.async_chat.handle_close(self)
   1.136 +        
   1.137      
   1.138      ### SENDING:
   1.139      
   1.140 +    @property
   1.141 +    def canSend(self):
   1.142 +        return self.status==kOpening or self.status==kOpen
   1.143 +    
   1.144 +    def _sendMessage(self, msg):
   1.145 +        if self.canSend:
   1.146 +            self._outQueueMessage(msg,True)
   1.147 +            return True
   1.148 +        else:
   1.149 +            return False
   1.150 +    
   1.151 +    def _sendRequest(self, req):
   1.152 +        if self.canSend:
   1.153 +            requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
   1.154 +            response = req.response
   1.155 +            if response:
   1.156 +                response.requestNo = requestNo
   1.157 +                self.pendingResponses[requestNo] = response
   1.158 +                log.debug("pendingResponses[%i] := %s",requestNo,response)
   1.159 +            return self._sendMessage(req)
   1.160 +        else:
   1.161 +            return False
   1.162 +    
   1.163      def _outQueueMessage(self, msg,isNew=True):
   1.164          n = len(self.outBox)
   1.165          index = n
   1.166 @@ -116,7 +183,7 @@
   1.167          
   1.168          self.outBox.insert(index,msg)
   1.169          if isNew:
   1.170 -            log.info("Queuing outgoing message at index %i",index)
   1.171 +            log.info("Queuing %s at index %i",msg,index)
   1.172              if n==0:
   1.173                  self._sendNextFrame()
   1.174          else:
   1.175 @@ -144,7 +211,7 @@
   1.176                  self.inHeader = data
   1.177              else:
   1.178                  self.inHeader += data
   1.179 -        else:
   1.180 +        elif self.inMessage:
   1.181              self.inMessage._receivedData(data)
   1.182      
   1.183      def found_terminator(self):
   1.184 @@ -152,8 +219,8 @@
   1.185              # Got a header:
   1.186              (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   1.187              self.inHeader = None
   1.188 -            if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
   1.189 -            if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
   1.190 +            if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
   1.191 +            if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
   1.192              frameLen -= kFrameHeaderSize
   1.193              log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   1.194                          (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   1.195 @@ -196,14 +263,14 @@
   1.196          self.set_terminator(kFrameHeaderSize) # wait for binary header
   1.197          if msg:
   1.198              log.debug("End of frame of %s",msg)
   1.199 -            if not msg.moreComing:
   1.200 +            if not msg._moreComing:
   1.201                  self._receivedMessage(msg)
   1.202      
   1.203      def _receivedMessage(self, msg):
   1.204          log.info("Received: %s",msg)
   1.205          # Remove from pending:
   1.206          if msg.isResponse:
   1.207 -            del self.pendingReplies[msg.requestNo]
   1.208 +            del self.pendingResponses[msg.requestNo]
   1.209          else:
   1.210              del self.pendingRequests[msg.requestNo]
   1.211          # Decode:
   1.212 @@ -216,16 +283,17 @@
   1.213              #FIX: Send an error reply
   1.214  
   1.215  
   1.216 -### MESSAGES:
   1.217 +### MESSAGE CLASSES:
   1.218  
   1.219  
   1.220  class Message (object):
   1.221      "Abstract superclass of all request/response objects"
   1.222      
   1.223 -    def __init__(self, connection, properties=None, body=None):
   1.224 +    def __init__(self, connection, body=None, properties=None):
   1.225          self.connection = connection
   1.226 +        self.body = body
   1.227          self.properties = properties or {}
   1.228 -        self.body = body
   1.229 +        self.requestNo = None
   1.230      
   1.231      @property
   1.232      def flags(self):
   1.233 @@ -236,15 +304,17 @@
   1.234          if self.urgent:     flags |= kMsgFlag_Urgent
   1.235          if self.compressed: flags |= kMsgFlag_Compressed
   1.236          if self.noReply:    flags |= kMsgFlag_NoReply
   1.237 -        if self.moreComing: flags |= kMsgFlag_MoreComing
   1.238 +        if self._moreComing:flags |= kMsgFlag_MoreComing
   1.239          return flags
   1.240      
   1.241      def __str__(self):
   1.242 -        s = "%s[#%i" %(type(self).__name__,self.requestNo)
   1.243 +        s = "%s[" %(type(self).__name__)
   1.244 +        if self.requestNo != None:
   1.245 +            s += "#%i" %self.requestNo
   1.246          if self.urgent:     s += " URG"
   1.247          if self.compressed: s += " CMP"
   1.248          if self.noReply:    s += " NOR"
   1.249 -        if self.moreComing: s += " MOR"
   1.250 +        if self._moreComing:s += " MOR"
   1.251          if self.body:       s += " %i bytes" %len(self.body)
   1.252          return s+"]"
   1.253      
   1.254 @@ -253,12 +323,12 @@
   1.255          if len(self.properties): s += repr(self.properties)
   1.256          return s
   1.257      
   1.258 -    @property 
   1.259 +    @property
   1.260      def isResponse(self):
   1.261          "Is this message a response?"
   1.262          return False
   1.263      
   1.264 -    @property 
   1.265 +    @property
   1.266      def contentType(self):
   1.267          return self.properties.get('Content-Type')
   1.268      
   1.269 @@ -278,17 +348,19 @@
   1.270          self.urgent     = (flags & kMsgFlag_Urgent) != 0
   1.271          self.compressed = (flags & kMsgFlag_Compressed) != 0
   1.272          self.noReply    = (flags & kMsgFlag_NoReply) != 0
   1.273 -        self.moreComing = (flags & kMsgFlag_MoreComing) != 0
   1.274 +        self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   1.275          self.frames     = []
   1.276      
   1.277      def _beginFrame(self, flags):
   1.278 -        if (flags & kMsgFlag_MoreComing)==0:
   1.279 -            self.moreComing = False
   1.280 +        """Received a frame header."""
   1.281 +        self._moreComing = (flags & kMsgFlag_MoreComing)!=0
   1.282      
   1.283      def _receivedData(self, data):
   1.284 +        """Received data from a frame."""
   1.285          self.frames.append(data)
   1.286      
   1.287      def _finished(self):
   1.288 +        """The entire message has been received; now decode it."""
   1.289          encoded = "".join(self.frames)
   1.290          self.frames = None
   1.291          
   1.292 @@ -327,47 +399,54 @@
   1.293                     '\x09' : "Error-Domain"}
   1.294  
   1.295  
   1.296 -
   1.297  class OutgoingMessage (Message):
   1.298      "Abstract superclass of outgoing requests/responses."
   1.299      
   1.300 -    def __init__(self, connection, properties=None, body=None):
   1.301 -        Message.__init__(self,connection,properties,body)
   1.302 +    def __init__(self, connection, body=None, properties=None):
   1.303 +        Message.__init__(self,connection,body,properties)
   1.304          self.urgent = self.compressed = self.noReply = False
   1.305 -        self.moreComing = True
   1.306 +        self._moreComing = True
   1.307      
   1.308      def __setitem__(self, key,val):
   1.309          self.properties[key] = val
   1.310      def __delitem__(self, key):
   1.311          del self.properties[key]
   1.312      
   1.313 -    def send(self):
   1.314 -        "Sends this message."
   1.315 -        log.info("Sending %s",self)
   1.316 +    @property
   1.317 +    def sent(self):
   1.318 +        return 'encoded' in self.__dict__
   1.319 +    
   1.320 +    def _encode(self):
   1.321 +        "Generates the message's encoded form, prior to sending it."
   1.322          out = StringIO()
   1.323          for (key,value) in self.properties.iteritems():
   1.324 -            def _writePropString(str):
   1.325 -                out.write(str)    #FIX: Abbreviate
   1.326 +            def _writePropString(s):
   1.327 +                out.write(str(s))    #FIX: Abbreviate
   1.328                  out.write('\000')
   1.329              _writePropString(key)
   1.330              _writePropString(value)
   1.331 -        self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
   1.332 -        out.close()
   1.333 +        propertiesSize = out.tell()
   1.334 +        assert propertiesSize<65536     #FIX: Return an error instead
   1.335          
   1.336          body = self.body
   1.337          if self.compressed:
   1.338 -            body = zlib.compress(body,5)
   1.339 -        self.encoded += body
   1.340 +            z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   1.341 +            out.write(z.compress(body))
   1.342 +            body = z.flush()
   1.343 +        out.write(body)
   1.344 +        
   1.345 +        self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
   1.346 +        out.close()
   1.347          log.debug("Encoded %s into %u bytes", self,len(self.encoded))
   1.348 -        
   1.349          self.bytesSent = 0
   1.350 -        self.connection._outQueueMessage(self)
   1.351      
   1.352      def _sendNextFrame(self, conn,maxLen):
   1.353          pos = self.bytesSent
   1.354          payload = self.encoded[pos:pos+maxLen]
   1.355          pos += len(payload)
   1.356 -        self.moreComing = (pos < len(self.encoded))
   1.357 +        self._moreComing = (pos < len(self.encoded))
   1.358 +        if not self._moreComing:
   1.359 +            self.encoded = None
   1.360          log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
   1.361          
   1.362          conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
   1.363 @@ -377,13 +456,15 @@
   1.364          conn.push( payload )
   1.365          
   1.366          self.bytesSent = pos
   1.367 -        return self.moreComing
   1.368 +        return self._moreComing
   1.369  
   1.370  
   1.371  class Request (object):
   1.372      @property
   1.373      def response(self):
   1.374          "The response object for this request."
   1.375 +        if self.noReply:
   1.376 +            return None
   1.377          r = self.__dict__.get('_response')
   1.378          if r==None:
   1.379              r = self._response = self._createResponse()
   1.380 @@ -391,7 +472,7 @@
   1.381  
   1.382  
   1.383  class Response (Message):
   1.384 -    def __init__(self, request):
   1.385 +    def _setRequest(self, request):
   1.386          assert not request.noReply
   1.387          self.request = request
   1.388          self.requestNo = request.requestNo
   1.389 @@ -402,19 +483,24 @@
   1.390          return True
   1.391  
   1.392  
   1.393 -
   1.394  class IncomingRequest (IncomingMessage, Request):
   1.395      def _createResponse(self):
   1.396          return OutgoingResponse(self)
   1.397  
   1.398 +
   1.399  class OutgoingRequest (OutgoingMessage, Request):
   1.400      def _createResponse(self):
   1.401          return IncomingResponse(self)
   1.402 +    
   1.403 +    def send(self):
   1.404 +        self._encode()
   1.405 +        return self.connection._sendRequest(self) and self.response
   1.406 +
   1.407  
   1.408  class IncomingResponse (IncomingMessage, Response):
   1.409      def __init__(self, request):
   1.410 -        IncomingMessage.__init__(self,request.connection,request.requestNo,0)
   1.411 -        Response.__init__(self,request)
   1.412 +        IncomingMessage.__init__(self,request.connection,None,0)
   1.413 +        self._setRequest(request)
   1.414          self.onComplete = None
   1.415      
   1.416      def _finished(self):
   1.417 @@ -424,40 +510,36 @@
   1.418                  self.onComplete(self)
   1.419              except Exception, x:
   1.420                  log.error("Exception dispatching response: %s", traceback.format_exc())
   1.421 -            
   1.422 +
   1.423 +
   1.424  class OutgoingResponse (OutgoingMessage, Response):
   1.425      def __init__(self, request):
   1.426          OutgoingMessage.__init__(self,request.connection)
   1.427 -        Response.__init__(self,request)
   1.428 +        self._setRequest(request)
   1.429 +    
   1.430 +    def send(self):
   1.431 +        self._encode()
   1.432 +        return self.connection._sendMessage(self)
   1.433  
   1.434  
   1.435 -### UNIT TESTS:
   1.436 -
   1.437 -
   1.438 -class BLIPTests(unittest.TestCase):
   1.439 -    def setUp(self):
   1.440 -        def handleRequest(request):
   1.441 -            logging.info("Got request!: %r",request)
   1.442 -            body = request.body
   1.443 -            assert len(body)<32768
   1.444 -            assert request.contentType == 'application/octet-stream'
   1.445 -            assert int(request['Size']) == len(body)
   1.446 -            assert request['User-Agent'] == 'BLIPConnectionTester'
   1.447 -            for i in xrange(0,len(request.body)):
   1.448 -                assert ord(body[i]) == i%256
   1.449 -            
   1.450 -            response = request.response
   1.451 -            response.body = request.body
   1.452 -            response['Content-Type'] = request.contentType
   1.453 -            response.send()
   1.454 -        
   1.455 -        listener = Listener(46353)
   1.456 -        listener.onRequest = handleRequest
   1.457 -    
   1.458 -    def testListener(self):
   1.459 -        logging.info("Waiting...")
   1.460 -        asyncore.loop()
   1.461 -
   1.462 -if __name__ == '__main__':
   1.463 -    logging.basicConfig(level=logging.INFO)
   1.464 -    unittest.main()
   1.465 \ No newline at end of file
   1.466 +"""
   1.467 + Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
   1.468 + 
   1.469 + Redistribution and use in source and binary forms, with or without modification, are permitted
   1.470 + provided that the following conditions are met:
   1.471 + 
   1.472 + * Redistributions of source code must retain the above copyright notice, this list of conditions
   1.473 + and the following disclaimer.
   1.474 + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
   1.475 + and the following disclaimer in the documentation and/or other materials provided with the
   1.476 + distribution.
   1.477 + 
   1.478 + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
   1.479 + IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
   1.480 + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
   1.481 + BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
   1.482 + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
   1.483 +  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   1.484 + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
   1.485 + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   1.486 +"""