1.1 --- a/Python/BLIP.py Wed Jun 11 14:58:38 2008 -0700
1.2 +++ b/Python/BLIP.py Tue Jun 23 11:44:30 2009 -0700
1.3 @@ -27,7 +27,7 @@
1.4
1.5 # INTERNAL CONSTANTS -- NO TOUCHIES!
1.6
1.7 -kFrameMagicNumber = 0x9B34F205
1.8 +kFrameMagicNumber = 0x9B34F206
1.9 kFrameHeaderFormat = '!LLHH'
1.10 kFrameHeaderSize = 12
1.11
1.12 @@ -36,11 +36,15 @@
1.13 kMsgFlag_Urgent = 0x0020
1.14 kMsgFlag_NoReply = 0x0040
1.15 kMsgFlag_MoreComing = 0x0080
1.16 +kMsgFlag_Meta = 0x0100
1.17
1.18 kMsgType_Request = 0
1.19 kMsgType_Response = 1
1.20 kMsgType_Error = 2
1.21
1.22 +kMsgProfile_Hi = "Hi"
1.23 +kMsgProfile_Bye = "Bye"
1.24 +
1.25
1.26 log = logging.getLogger('BLIP')
1.27 log.propagate = True
1.28 @@ -104,7 +108,7 @@
1.29 self.connect(address)
1.30 self.address = address
1.31 self.listener = listener
1.32 - self.onRequest = None
1.33 + self.onRequest = self.onCloseRequest = self.onCloseRefused = None
1.34 self.pendingRequests = {}
1.35 self.pendingResponses = {}
1.36 self.outBox = []
1.37 @@ -112,12 +116,7 @@
1.38 self.inNumRequests = self.outNumRequests = 0
1.39 self.sending = False
1.40 self._endOfFrame()
1.41 -
1.42 - def close(self):
1.43 - if self.status > kClosed:
1.44 - self.status = kClosing
1.45 - log.info("Connection closing...")
1.46 - asynchat.async_chat.close(self)
1.47 + self._closeWhenPossible = False
1.48
1.49 def handle_connect(self):
1.50 log.info("Connection open!")
1.51 @@ -130,25 +129,19 @@
1.52 self.status = kDisconnected
1.53 self.close()
1.54
1.55 - def handle_close(self):
1.56 - log.info("Connection closed!")
1.57 - self.pendingRequests = self.pendingResponses = None
1.58 - self.outBox = None
1.59 - if self.status == kClosing:
1.60 - self.status = kClosed
1.61 - else:
1.62 - self.status = kDisconnected
1.63 - asynchat.async_chat.handle_close(self)
1.64 -
1.65
1.66 ### SENDING:
1.67
1.68 @property
1.69 - def canSend(self):
1.70 + def isOpen(self):
1.71 return self.status==kOpening or self.status==kOpen
1.72
1.73 + @property
1.74 + def canSend(self):
1.75 + return self.isOpen and not self._closeWhenPossible
1.76 +
1.77 def _sendMessage(self, msg):
1.78 - if self.canSend:
1.79 + if self.isOpen:
1.80 self._outQueueMessage(msg,True)
1.81 if not self.sending:
1.82 log.debug("Waking up the output stream")
1.83 @@ -208,6 +201,7 @@
1.84 else:
1.85 log.debug("Nothing more to send")
1.86 self.sending = False
1.87 + self._closeIfReady()
1.88 return None
1.89
1.90 ### RECEIVING:
1.91 @@ -255,6 +249,7 @@
1.92 self.inNumRequests += 1
1.93 elif msgType==kMsgType_Response or msgType==kMsgType_Error:
1.94 message = self.pendingResponses.get(requestNo)
1.95 + message._updateFlags(flags)
1.96
1.97 if message != None:
1.98 message._beginFrame(flags)
1.99 @@ -284,10 +279,88 @@
1.100 try:
1.101 msg._finished()
1.102 if not msg.isResponse:
1.103 - self.onRequest(msg)
1.104 + if msg._meta:
1.105 + self._dispatchMetaRequest(msg)
1.106 + else:
1.107 + self.onRequest(msg)
1.108 except Exception, x:
1.109 log.error("Exception handling incoming message: %s", traceback.format_exc())
1.110 #FIX: Send an error reply
1.111 + # Check to see if we're done and ready to close:
1.112 + self._closeIfReady()
1.113 +
1.114 + def _dispatchMetaRequest(self, request):
1.115 + """Handles dispatching internal meta requests."""
1.116 + if request['Profile'] == kMsgProfile_Bye:
1.117 + shouldClose = True
1.118 + if self.onCloseRequest:
1.119 + shouldClose = self.onCloseRequest()
1.120 + if not shouldClose:
1.121 + log.debug("Sending resfusal to close...")
1.122 + response = request.response
1.123 + response.isError = True
1.124 + response['Error-Domain'] = "BLIP"
1.125 + response['Error-Code'] = 403
1.126 + response.body = "Close request denied"
1.127 + response.send()
1.128 + else:
1.129 + log.debug("Sending permission to close...")
1.130 + response = request.response
1.131 + response.send()
1.132 + else:
1.133 + response = request.response
1.134 + response.isError = True
1.135 + response['Error-Domain'] = "BLIP"
1.136 + response['Error-Code'] = 404
1.137 + response.body = "Unknown meta profile"
1.138 + response.send()
1.139 +
1.140 + ### CLOSING:
1.141 +
1.142 + def close(self):
1.143 + """Publicly callable close method. Sends close request to peer."""
1.144 + if self.status != kOpen:
1.145 + return False
1.146 + log.info("Sending close request...")
1.147 + req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
1.148 + req._meta = True
1.149 + req.response.onComplete = self._handleCloseResponse
1.150 + if not req.send():
1.151 + log.error("Error sending close request.")
1.152 + return False
1.153 + else:
1.154 + self.status = kClosing
1.155 + return True
1.156 +
1.157 + def _handleCloseResponse(self, response):
1.158 + """Called when we receive a response to a close request."""
1.159 + log.info("Received close response.")
1.160 + if response.isError:
1.161 + # remote refused to close
1.162 + if self.onCloseRefused:
1.163 + self.onCloseRefused(response)
1.164 + self.status = kOpen
1.165 + else:
1.166 + # now wait until everything has finished sending, then actually close
1.167 + log.info("No refusal, actually closing...")
1.168 + self._closeWhenPossible = True
1.169 +
1.170 + def _closeIfReady(self):
1.171 + """Checks if all transmissions are complete and then closes the actual socket."""
1.172 + if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
1.173 + # self._closeWhenPossible = False
1.174 + log.debug("_closeIfReady closing.")
1.175 + asynchat.async_chat.close(self)
1.176 +
1.177 + def handle_close(self):
1.178 + """Called when the socket actually closes."""
1.179 + log.info("Connection closed!")
1.180 + self.pendingRequests = self.pendingResponses = None
1.181 + self.outBox = None
1.182 + if self.status == kClosing:
1.183 + self.status = kClosed
1.184 + else:
1.185 + self.status = kDisconnected
1.186
1.187
1.188 ### MESSAGE CLASSES:
1.189 @@ -305,13 +378,17 @@
1.190 @property
1.191 def flags(self):
1.192 if self.isResponse:
1.193 - flags = kMsgType_Response
1.194 + if self.isError:
1.195 + flags = kMsgType_Error
1.196 + else:
1.197 + flags = kMsgType_Response
1.198 else:
1.199 flags = kMsgType_Request
1.200 if self.urgent: flags |= kMsgFlag_Urgent
1.201 if self.compressed: flags |= kMsgFlag_Compressed
1.202 if self.noReply: flags |= kMsgFlag_NoReply
1.203 if self._moreComing:flags |= kMsgFlag_MoreComing
1.204 + if self._meta: flags |= kMsgFlag_Meta
1.205 return flags
1.206
1.207 def __str__(self):
1.208 @@ -322,6 +399,7 @@
1.209 if self.compressed: s += " CMP"
1.210 if self.noReply: s += " NOR"
1.211 if self._moreComing:s += " MOR"
1.212 + if self._meta: s += " MET"
1.213 if self.body: s += " %i bytes" %len(self.body)
1.214 return s+"]"
1.215
1.216 @@ -352,11 +430,16 @@
1.217 def __init__(self, connection, requestNo, flags):
1.218 super(IncomingMessage,self).__init__(connection)
1.219 self.requestNo = requestNo
1.220 + self._updateFlags(flags)
1.221 + self.frames = []
1.222 +
1.223 + def _updateFlags(self, flags):
1.224 self.urgent = (flags & kMsgFlag_Urgent) != 0
1.225 self.compressed = (flags & kMsgFlag_Compressed) != 0
1.226 self.noReply = (flags & kMsgFlag_NoReply) != 0
1.227 self._moreComing= (flags & kMsgFlag_MoreComing) != 0
1.228 - self.frames = []
1.229 + self._meta = (flags & kMsgFlag_Meta) != 0
1.230 + self.isError = (flags & kMsgType_Error) != 0
1.231
1.232 def _beginFrame(self, flags):
1.233 """Received a frame header."""
1.234 @@ -377,16 +460,18 @@
1.235 if propSize>len(encoded): raise MessageException, "properties too long to fit"
1.236 if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
1.237
1.238 - proplist = encoded[2:propSize-1].split('\000')
1.239 + if propSize > 2:
1.240 + proplist = encoded[2:propSize-1].split('\000')
1.241 +
1.242 + if len(proplist) & 1: raise MessageException, "odd number of property strings"
1.243 + for i in xrange(0,len(proplist),2):
1.244 + def expand(str):
1.245 + if len(str)==1:
1.246 + str = IncomingMessage.__expandDict.get(str,str)
1.247 + return str
1.248 + self.properties[ expand(proplist[i])] = expand(proplist[i+1])
1.249 +
1.250 encoded = encoded[propSize:]
1.251 - if len(proplist) & 1: raise MessageException, "odd number of property strings"
1.252 - for i in xrange(0,len(proplist),2):
1.253 - def expand(str):
1.254 - if len(str)==1:
1.255 - str = IncomingMessage.__expandDict.get(str,str)
1.256 - return str
1.257 - self.properties[ expand(proplist[i])] = expand(proplist[i+1])
1.258 -
1.259 # Decode the body:
1.260 if self.compressed and len(encoded)>0:
1.261 try:
1.262 @@ -411,7 +496,7 @@
1.263
1.264 def __init__(self, connection, body=None, properties=None):
1.265 Message.__init__(self,connection,body,properties)
1.266 - self.urgent = self.compressed = self.noReply = False
1.267 + self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
1.268 self._moreComing = True
1.269
1.270 def __setitem__(self, key,val):
1.271 @@ -435,7 +520,7 @@
1.272 propertiesSize = out.tell()
1.273 assert propertiesSize<65536 #FIX: Return an error instead
1.274
1.275 - body = self.body
1.276 + body = self.body or ""
1.277 if self.compressed:
1.278 z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
1.279 out.write(z.compress(body))