Python/BLIP.py
changeset 51 de59ce19f42e
parent 16 6f608b552b77
child 53 e9f209a24d53
     1.1 --- a/Python/BLIP.py	Wed Jun 11 14:58:38 2008 -0700
     1.2 +++ b/Python/BLIP.py	Tue Jun 23 11:44:30 2009 -0700
     1.3 @@ -27,7 +27,7 @@
     1.4  
     1.5  # INTERNAL CONSTANTS -- NO TOUCHIES!
     1.6  
     1.7 -kFrameMagicNumber   = 0x9B34F205
     1.8 +kFrameMagicNumber   = 0x9B34F206
     1.9  kFrameHeaderFormat  = '!LLHH'
    1.10  kFrameHeaderSize    = 12
    1.11  
    1.12 @@ -36,11 +36,15 @@
    1.13  kMsgFlag_Urgent     = 0x0020
    1.14  kMsgFlag_NoReply    = 0x0040
    1.15  kMsgFlag_MoreComing = 0x0080
    1.16 +kMsgFlag_Meta       = 0x0100
    1.17  
    1.18  kMsgType_Request    = 0
    1.19  kMsgType_Response   = 1
    1.20  kMsgType_Error      = 2
    1.21  
    1.22 +kMsgProfile_Hi      = "Hi"
    1.23 +kMsgProfile_Bye     = "Bye"
    1.24 +
    1.25  
    1.26  log = logging.getLogger('BLIP')
    1.27  log.propagate = True
    1.28 @@ -104,7 +108,7 @@
    1.29              self.connect(address)
    1.30          self.address = address
    1.31          self.listener = listener
    1.32 -        self.onRequest = None
    1.33 +        self.onRequest = self.onCloseRequest = self.onCloseRefused = None
    1.34          self.pendingRequests = {}
    1.35          self.pendingResponses = {}
    1.36          self.outBox = []
    1.37 @@ -112,12 +116,7 @@
    1.38          self.inNumRequests = self.outNumRequests = 0
    1.39          self.sending = False
    1.40          self._endOfFrame()
    1.41 -    
    1.42 -    def close(self):
    1.43 -        if self.status > kClosed:
    1.44 -            self.status = kClosing
    1.45 -            log.info("Connection closing...")
    1.46 -        asynchat.async_chat.close(self)
    1.47 +        self._closeWhenPossible = False
    1.48      
    1.49      def handle_connect(self):
    1.50          log.info("Connection open!")
    1.51 @@ -130,25 +129,19 @@
    1.52          self.status = kDisconnected
    1.53          self.close()
    1.54      
    1.55 -    def handle_close(self):
    1.56 -        log.info("Connection closed!")
    1.57 -        self.pendingRequests = self.pendingResponses = None
    1.58 -        self.outBox = None
    1.59 -        if self.status == kClosing:
    1.60 -            self.status = kClosed
    1.61 -        else:
    1.62 -            self.status = kDisconnected
    1.63 -        asynchat.async_chat.handle_close(self)
    1.64 -        
    1.65      
    1.66      ### SENDING:
    1.67      
    1.68      @property
    1.69 -    def canSend(self):
    1.70 +    def isOpen(self):
    1.71          return self.status==kOpening or self.status==kOpen
    1.72      
    1.73 +    @property
    1.74 +    def canSend(self):
    1.75 +        return self.isOpen and not self._closeWhenPossible
    1.76 +    
    1.77      def _sendMessage(self, msg):
    1.78 -        if self.canSend:
    1.79 +        if self.isOpen:
    1.80              self._outQueueMessage(msg,True)
    1.81              if not self.sending:
    1.82                  log.debug("Waking up the output stream")
    1.83 @@ -208,6 +201,7 @@
    1.84          else:
    1.85              log.debug("Nothing more to send")
    1.86              self.sending = False
    1.87 +            self._closeIfReady()
    1.88              return None
    1.89      
    1.90      ### RECEIVING:
    1.91 @@ -255,6 +249,7 @@
    1.92                  self.inNumRequests += 1
    1.93          elif msgType==kMsgType_Response or msgType==kMsgType_Error:
    1.94              message = self.pendingResponses.get(requestNo)
    1.95 +            message._updateFlags(flags)
    1.96          
    1.97          if message != None:
    1.98              message._beginFrame(flags)
    1.99 @@ -284,10 +279,88 @@
   1.100          try:
   1.101              msg._finished()
   1.102              if not msg.isResponse:
   1.103 -                self.onRequest(msg)
   1.104 +                if msg._meta:
   1.105 +                    self._dispatchMetaRequest(msg)
   1.106 +                else:
   1.107 +                    self.onRequest(msg)
   1.108          except Exception, x:
   1.109              log.error("Exception handling incoming message: %s", traceback.format_exc())
   1.110              #FIX: Send an error reply
   1.111 +        # Check to see if we're done and ready to close:
   1.112 +        self._closeIfReady()
   1.113 +    
   1.114 +    def _dispatchMetaRequest(self, request):
   1.115 +        """Handles dispatching internal meta requests."""
   1.116 +        if request['Profile'] == kMsgProfile_Bye:
   1.117 +            shouldClose = True
   1.118 +            if self.onCloseRequest:
   1.119 +                shouldClose = self.onCloseRequest()
   1.120 +            if not shouldClose:
   1.121 +                log.debug("Sending resfusal to close...")
   1.122 +                response = request.response
   1.123 +                response.isError = True
   1.124 +                response['Error-Domain'] = "BLIP"
   1.125 +                response['Error-Code'] = 403
   1.126 +                response.body = "Close request denied"
   1.127 +                response.send()
   1.128 +            else:
   1.129 +                log.debug("Sending permission to close...")
   1.130 +                response = request.response
   1.131 +                response.send()
   1.132 +        else:
   1.133 +            response = request.response
   1.134 +            response.isError = True
   1.135 +            response['Error-Domain'] = "BLIP"
   1.136 +            response['Error-Code'] = 404
   1.137 +            response.body = "Unknown meta profile"
   1.138 +            response.send()
   1.139 +    
   1.140 +    ### CLOSING:
   1.141 +    
   1.142 +    def close(self):
   1.143 +        """Publicly callable close method. Sends close request to peer."""
   1.144 +        if self.status != kOpen:
   1.145 +            return False
   1.146 +        log.info("Sending close request...")
   1.147 +        req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
   1.148 +        req._meta = True
   1.149 +        req.response.onComplete = self._handleCloseResponse
   1.150 +        if not req.send():
   1.151 +            log.error("Error sending close request.")
   1.152 +            return False
   1.153 +        else:
   1.154 +            self.status = kClosing
   1.155 +        return True
   1.156 +    
   1.157 +    def _handleCloseResponse(self, response):
   1.158 +        """Called when we receive a response to a close request."""
   1.159 +        log.info("Received close response.")
   1.160 +        if response.isError:
   1.161 +            # remote refused to close
   1.162 +            if self.onCloseRefused:
   1.163 +                self.onCloseRefused(response)
   1.164 +            self.status = kOpen
   1.165 +        else:
   1.166 +            # now wait until everything has finished sending, then actually close
   1.167 +            log.info("No refusal, actually closing...")
   1.168 +            self._closeWhenPossible = True
   1.169 +    
   1.170 +    def _closeIfReady(self):
   1.171 +        """Checks if all transmissions are complete and then closes the actual socket."""
   1.172 +        if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
   1.173 +            # self._closeWhenPossible = False
   1.174 +            log.debug("_closeIfReady closing.")
   1.175 +            asynchat.async_chat.close(self)
   1.176 +    
   1.177 +    def handle_close(self):
   1.178 +        """Called when the socket actually closes."""
   1.179 +        log.info("Connection closed!")
   1.180 +        self.pendingRequests = self.pendingResponses = None
   1.181 +        self.outBox = None
   1.182 +        if self.status == kClosing:
   1.183 +            self.status = kClosed
   1.184 +        else:
   1.185 +            self.status = kDisconnected
   1.186  
   1.187  
   1.188  ### MESSAGE CLASSES:
   1.189 @@ -305,13 +378,17 @@
   1.190      @property
   1.191      def flags(self):
   1.192          if self.isResponse:
   1.193 -            flags = kMsgType_Response
   1.194 +            if self.isError:
   1.195 +                flags = kMsgType_Error
   1.196 +            else:
   1.197 +                flags = kMsgType_Response
   1.198          else:
   1.199              flags = kMsgType_Request
   1.200          if self.urgent:     flags |= kMsgFlag_Urgent
   1.201          if self.compressed: flags |= kMsgFlag_Compressed
   1.202          if self.noReply:    flags |= kMsgFlag_NoReply
   1.203          if self._moreComing:flags |= kMsgFlag_MoreComing
   1.204 +        if self._meta:      flags |= kMsgFlag_Meta
   1.205          return flags
   1.206      
   1.207      def __str__(self):
   1.208 @@ -322,6 +399,7 @@
   1.209          if self.compressed: s += " CMP"
   1.210          if self.noReply:    s += " NOR"
   1.211          if self._moreComing:s += " MOR"
   1.212 +        if self._meta:      s += " MET"
   1.213          if self.body:       s += " %i bytes" %len(self.body)
   1.214          return s+"]"
   1.215      
   1.216 @@ -352,11 +430,16 @@
   1.217      def __init__(self, connection, requestNo, flags):
   1.218          super(IncomingMessage,self).__init__(connection)
   1.219          self.requestNo  = requestNo
   1.220 +        self._updateFlags(flags)
   1.221 +        self.frames     = []
   1.222 +    
   1.223 +    def _updateFlags(self, flags):
   1.224          self.urgent     = (flags & kMsgFlag_Urgent) != 0
   1.225          self.compressed = (flags & kMsgFlag_Compressed) != 0
   1.226          self.noReply    = (flags & kMsgFlag_NoReply) != 0
   1.227          self._moreComing= (flags & kMsgFlag_MoreComing) != 0
   1.228 -        self.frames     = []
   1.229 +        self._meta      = (flags & kMsgFlag_Meta) != 0
   1.230 +        self.isError    = (flags & kMsgType_Error) != 0
   1.231      
   1.232      def _beginFrame(self, flags):
   1.233          """Received a frame header."""
   1.234 @@ -377,16 +460,18 @@
   1.235          if propSize>len(encoded): raise MessageException, "properties too long to fit"
   1.236          if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   1.237          
   1.238 -        proplist = encoded[2:propSize-1].split('\000')
   1.239 +        if propSize > 2:
   1.240 +            proplist = encoded[2:propSize-1].split('\000')
   1.241 +        
   1.242 +            if len(proplist) & 1: raise MessageException, "odd number of property strings"
   1.243 +            for i in xrange(0,len(proplist),2):
   1.244 +                def expand(str):
   1.245 +                    if len(str)==1:
   1.246 +                        str = IncomingMessage.__expandDict.get(str,str)
   1.247 +                    return str
   1.248 +                self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   1.249 +        
   1.250          encoded = encoded[propSize:]
   1.251 -        if len(proplist) & 1: raise MessageException, "odd number of property strings"
   1.252 -        for i in xrange(0,len(proplist),2):
   1.253 -            def expand(str):
   1.254 -                if len(str)==1:
   1.255 -                    str = IncomingMessage.__expandDict.get(str,str)
   1.256 -                return str
   1.257 -            self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   1.258 -        
   1.259          # Decode the body:
   1.260          if self.compressed and len(encoded)>0:
   1.261              try:
   1.262 @@ -411,7 +496,7 @@
   1.263      
   1.264      def __init__(self, connection, body=None, properties=None):
   1.265          Message.__init__(self,connection,body,properties)
   1.266 -        self.urgent = self.compressed = self.noReply = False
   1.267 +        self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
   1.268          self._moreComing = True
   1.269      
   1.270      def __setitem__(self, key,val):
   1.271 @@ -435,7 +520,7 @@
   1.272          propertiesSize = out.tell()
   1.273          assert propertiesSize<65536     #FIX: Return an error instead
   1.274          
   1.275 -        body = self.body
   1.276 +        body = self.body or ""
   1.277          if self.compressed:
   1.278              z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
   1.279              out.write(z.compress(body))