jens@11
|
1 |
# encoding: utf-8
|
jens@11
|
2 |
"""
|
jens@11
|
3 |
BLIP.py
|
jens@11
|
4 |
|
jens@11
|
5 |
Created by Jens Alfke on 2008-06-03.
|
jens@13
|
6 |
Copyright notice and BSD license at end of file.
|
jens@11
|
7 |
"""
|
jens@11
|
8 |
|
jens@11
|
9 |
import asynchat
|
jens@11
|
10 |
import asyncore
|
jens@11
|
11 |
from cStringIO import StringIO
|
jens@11
|
12 |
import logging
|
jens@11
|
13 |
import socket
|
jens@11
|
14 |
import struct
|
jens@11
|
15 |
import sys
|
jens@11
|
16 |
import traceback
|
jens@11
|
17 |
import zlib
|
jens@11
|
18 |
|
jens@11
|
19 |
|
jens@13
|
20 |
# Connection status enumeration:
|
jens@13
|
21 |
kDisconnected = -1
|
jens@13
|
22 |
kClosed = 0
|
jens@13
|
23 |
kOpening = 1
|
jens@13
|
24 |
kOpen = 2
|
jens@13
|
25 |
kClosing = 3
|
jens@13
|
26 |
|
jens@13
|
27 |
|
jens@12
|
28 |
# INTERNAL CONSTANTS -- NO TOUCHIES!
|
jens@12
|
29 |
|
jens@11
|
30 |
kFrameMagicNumber = 0x9B34F205
|
jens@11
|
31 |
kFrameHeaderFormat = '!LLHH'
|
jens@11
|
32 |
kFrameHeaderSize = 12
|
jens@11
|
33 |
|
jens@11
|
34 |
kMsgFlag_TypeMask = 0x000F
|
jens@11
|
35 |
kMsgFlag_Compressed = 0x0010
|
jens@11
|
36 |
kMsgFlag_Urgent = 0x0020
|
jens@11
|
37 |
kMsgFlag_NoReply = 0x0040
|
jens@11
|
38 |
kMsgFlag_MoreComing = 0x0080
|
jens@11
|
39 |
|
jens@11
|
40 |
kMsgType_Request = 0
|
jens@11
|
41 |
kMsgType_Response = 1
|
jens@11
|
42 |
kMsgType_Error = 2
|
jens@11
|
43 |
|
jens@11
|
44 |
|
jens@11
|
45 |
log = logging.getLogger('BLIP')
|
jens@11
|
46 |
log.propagate = True
|
jens@11
|
47 |
|
jens@12
|
48 |
|
jens@11
|
49 |
class MessageException(Exception):
|
jens@11
|
50 |
pass
|
jens@11
|
51 |
|
jens@11
|
52 |
class ConnectionException(Exception):
|
jens@11
|
53 |
pass
|
jens@11
|
54 |
|
jens@11
|
55 |
|
jens@13
|
56 |
### LISTENER AND CONNECTION CLASSES:
|
jens@13
|
57 |
|
jens@13
|
58 |
|
jens@11
|
59 |
class Listener (asyncore.dispatcher):
|
jens@12
|
60 |
"BLIP listener/server class"
|
jens@12
|
61 |
|
jens@13
|
62 |
def __init__(self, port, sslKeyFile=None, sslCertFile=None):
|
jens@12
|
63 |
"Create a listener on a port"
|
jens@11
|
64 |
asyncore.dispatcher.__init__(self)
|
jens@12
|
65 |
self.onConnected = self.onRequest = None
|
jens@11
|
66 |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
jens@11
|
67 |
self.bind( ('',port) )
|
jens@11
|
68 |
self.listen(5)
|
jens@13
|
69 |
self.sslKeyFile=sslKeyFile
|
jens@13
|
70 |
self.sslCertFile=sslCertFile
|
jens@11
|
71 |
log.info("Listening on port %u", port)
|
jens@11
|
72 |
|
jens@11
|
73 |
def handle_accept( self ):
|
jens@13
|
74 |
socket,address = self.accept()
|
jens@13
|
75 |
if self.sslKeyFile:
|
jens@13
|
76 |
socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
|
jens@13
|
77 |
conn = Connection(address, sock=socket, listener=self)
|
jens@11
|
78 |
conn.onRequest = self.onRequest
|
jens@11
|
79 |
if self.onConnected:
|
jens@11
|
80 |
self.onConnected(conn)
|
jens@11
|
81 |
|
jens@13
|
82 |
def handle_error(self):
|
jens@13
|
83 |
(typ,val,trace) = sys.exc_info()
|
jens@13
|
84 |
log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
|
jens@13
|
85 |
self.close()
|
jens@13
|
86 |
|
jens@13
|
87 |
|
jens@11
|
88 |
|
jens@11
|
89 |
class Connection (asynchat.async_chat):
|
jens@13
|
90 |
def __init__( self, address, sock=None, listener=None, ssl=None ):
|
jens@12
|
91 |
"Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
|
jens@12
|
92 |
"otherwise it'll open a new outgoing socket."
|
jens@13
|
93 |
if sock:
|
jens@13
|
94 |
asynchat.async_chat.__init__(self,sock)
|
jens@11
|
95 |
log.info("Accepted connection from %s",address)
|
jens@13
|
96 |
self.status = kOpen
|
jens@11
|
97 |
else:
|
jens@13
|
98 |
asynchat.async_chat.__init__(self)
|
jens@11
|
99 |
log.info("Opening connection to %s",address)
|
jens@11
|
100 |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
jens@13
|
101 |
self.status = kOpening
|
jens@13
|
102 |
if ssl:
|
jens@13
|
103 |
ssl(self.socket)
|
jens@11
|
104 |
self.connect(address)
|
jens@13
|
105 |
self.address = address
|
jens@13
|
106 |
self.listener = listener
|
jens@11
|
107 |
self.onRequest = None
|
jens@11
|
108 |
self.pendingRequests = {}
|
jens@11
|
109 |
self.pendingResponses = {}
|
jens@11
|
110 |
self.outBox = []
|
jens@11
|
111 |
self.inMessage = None
|
jens@13
|
112 |
self.inNumRequests = self.outNumRequests = 0
|
jens@14
|
113 |
self.sending = False
|
jens@11
|
114 |
self._endOfFrame()
|
jens@11
|
115 |
|
jens@13
|
116 |
def close(self):
|
jens@13
|
117 |
if self.status > kClosed:
|
jens@13
|
118 |
self.status = kClosing
|
jens@13
|
119 |
log.info("Connection closing...")
|
jens@13
|
120 |
asynchat.async_chat.close(self)
|
jens@11
|
121 |
|
jens@13
|
122 |
def handle_connect(self):
|
jens@13
|
123 |
log.info("Connection open!")
|
jens@13
|
124 |
self.status = kOpen
|
jens@13
|
125 |
|
jens@13
|
126 |
def handle_error(self):
|
jens@13
|
127 |
(typ,val,trace) = sys.exc_info()
|
jens@13
|
128 |
log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
|
jens@13
|
129 |
self.discard_buffers()
|
jens@13
|
130 |
self.status = kDisconnected
|
jens@11
|
131 |
self.close()
|
jens@11
|
132 |
|
jens@13
|
133 |
def handle_close(self):
|
jens@13
|
134 |
log.info("Connection closed!")
|
jens@13
|
135 |
self.pendingRequests = self.pendingResponses = None
|
jens@13
|
136 |
self.outBox = None
|
jens@13
|
137 |
if self.status == kClosing:
|
jens@13
|
138 |
self.status = kClosed
|
jens@13
|
139 |
else:
|
jens@13
|
140 |
self.status = kDisconnected
|
jens@13
|
141 |
asynchat.async_chat.handle_close(self)
|
jens@13
|
142 |
|
jens@11
|
143 |
|
jens@11
|
144 |
### SENDING:
|
jens@11
|
145 |
|
jens@13
|
146 |
@property
|
jens@13
|
147 |
def canSend(self):
|
jens@13
|
148 |
return self.status==kOpening or self.status==kOpen
|
jens@13
|
149 |
|
jens@13
|
150 |
def _sendMessage(self, msg):
|
jens@13
|
151 |
if self.canSend:
|
jens@13
|
152 |
self._outQueueMessage(msg,True)
|
jens@14
|
153 |
if not self.sending:
|
jens@14
|
154 |
log.debug("Waking up the output stream")
|
jens@14
|
155 |
self.sending = True
|
jens@14
|
156 |
self.push_with_producer(self)
|
jens@13
|
157 |
return True
|
jens@13
|
158 |
else:
|
jens@13
|
159 |
return False
|
jens@13
|
160 |
|
jens@13
|
161 |
def _sendRequest(self, req):
|
jens@13
|
162 |
if self.canSend:
|
jens@13
|
163 |
requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
|
jens@13
|
164 |
response = req.response
|
jens@13
|
165 |
if response:
|
jens@13
|
166 |
response.requestNo = requestNo
|
jens@13
|
167 |
self.pendingResponses[requestNo] = response
|
jens@13
|
168 |
log.debug("pendingResponses[%i] := %s",requestNo,response)
|
jens@13
|
169 |
return self._sendMessage(req)
|
jens@13
|
170 |
else:
|
jens@13
|
171 |
return False
|
jens@13
|
172 |
|
jens@11
|
173 |
def _outQueueMessage(self, msg,isNew=True):
|
jens@12
|
174 |
n = len(self.outBox)
|
jens@11
|
175 |
index = n
|
jens@11
|
176 |
if msg.urgent and n>1:
|
jens@11
|
177 |
while index > 0:
|
jens@12
|
178 |
otherMsg = self.outBox[index-1]
|
jens@11
|
179 |
if otherMsg.urgent:
|
jens@11
|
180 |
if index<n:
|
jens@11
|
181 |
index += 1
|
jens@11
|
182 |
break
|
jens@14
|
183 |
elif isNew and otherMsg.bytesSent==0:
|
jens@11
|
184 |
break
|
jens@11
|
185 |
index -= 1
|
jens@11
|
186 |
else:
|
jens@11
|
187 |
index = 1
|
jens@12
|
188 |
|
jens@11
|
189 |
self.outBox.insert(index,msg)
|
jens@11
|
190 |
if isNew:
|
jens@13
|
191 |
log.info("Queuing %s at index %i",msg,index)
|
jens@12
|
192 |
else:
|
jens@12
|
193 |
log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
|
jens@11
|
194 |
|
jens@14
|
195 |
def more(self):
|
jens@14
|
196 |
n = len(self.outBox)
|
jens@14
|
197 |
if n > 0:
|
jens@14
|
198 |
msg = self.outBox.pop(0)
|
jens@14
|
199 |
frameSize = 4096
|
jens@14
|
200 |
if msg.urgent or n==1 or not self.outBox[0].urgent:
|
jens@14
|
201 |
frameSize *= 4
|
jens@14
|
202 |
data = msg._sendNextFrame(frameSize)
|
jens@14
|
203 |
if msg._moreComing:
|
jens@14
|
204 |
self._outQueueMessage(msg,isNew=False)
|
jens@14
|
205 |
else:
|
jens@14
|
206 |
log.info("Finished sending %s",msg)
|
jens@14
|
207 |
return data
|
jens@14
|
208 |
else:
|
jens@14
|
209 |
log.debug("Nothing more to send")
|
jens@14
|
210 |
self.sending = False
|
jens@14
|
211 |
return None
|
jens@11
|
212 |
|
jens@11
|
213 |
### RECEIVING:
|
jens@11
|
214 |
|
jens@11
|
215 |
def collect_incoming_data(self, data):
|
jens@11
|
216 |
if self.expectingHeader:
|
jens@11
|
217 |
if self.inHeader==None:
|
jens@11
|
218 |
self.inHeader = data
|
jens@11
|
219 |
else:
|
jens@11
|
220 |
self.inHeader += data
|
jens@13
|
221 |
elif self.inMessage:
|
jens@11
|
222 |
self.inMessage._receivedData(data)
|
jens@12
|
223 |
|
jens@11
|
224 |
def found_terminator(self):
|
jens@11
|
225 |
if self.expectingHeader:
|
jens@11
|
226 |
# Got a header:
|
jens@11
|
227 |
(magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
|
jens@11
|
228 |
self.inHeader = None
|
jens@13
|
229 |
if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
|
jens@13
|
230 |
if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
|
jens@11
|
231 |
frameLen -= kFrameHeaderSize
|
jens@11
|
232 |
log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
|
jens@11
|
233 |
(flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
|
jens@11
|
234 |
self.inMessage = self._inMessageForFrame(requestNo,flags)
|
jens@11
|
235 |
|
jens@11
|
236 |
if frameLen > 0:
|
jens@11
|
237 |
self.expectingHeader = False
|
jens@11
|
238 |
self.set_terminator(frameLen)
|
jens@11
|
239 |
else:
|
jens@11
|
240 |
self._endOfFrame()
|
jens@12
|
241 |
|
jens@11
|
242 |
else:
|
jens@11
|
243 |
# Got the frame's payload:
|
jens@11
|
244 |
self._endOfFrame()
|
jens@11
|
245 |
|
jens@11
|
246 |
def _inMessageForFrame(self, requestNo,flags):
|
jens@11
|
247 |
message = None
|
jens@11
|
248 |
msgType = flags & kMsgFlag_TypeMask
|
jens@11
|
249 |
if msgType==kMsgType_Request:
|
jens@11
|
250 |
message = self.pendingRequests.get(requestNo)
|
jens@11
|
251 |
if message==None and requestNo == self.inNumRequests+1:
|
jens@11
|
252 |
message = IncomingRequest(self,requestNo,flags)
|
jens@12
|
253 |
assert message!=None
|
jens@11
|
254 |
self.pendingRequests[requestNo] = message
|
jens@11
|
255 |
self.inNumRequests += 1
|
jens@11
|
256 |
elif msgType==kMsgType_Response or msgType==kMsgType_Error:
|
jens@11
|
257 |
message = self.pendingResponses.get(requestNo)
|
jens@12
|
258 |
|
jens@12
|
259 |
if message != None:
|
jens@11
|
260 |
message._beginFrame(flags)
|
jens@11
|
261 |
else:
|
jens@11
|
262 |
log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
|
jens@11
|
263 |
return message
|
jens@11
|
264 |
|
jens@11
|
265 |
def _endOfFrame(self):
|
jens@11
|
266 |
msg = self.inMessage
|
jens@11
|
267 |
self.inMessage = None
|
jens@11
|
268 |
self.expectingHeader = True
|
jens@11
|
269 |
self.inHeader = None
|
jens@11
|
270 |
self.set_terminator(kFrameHeaderSize) # wait for binary header
|
jens@11
|
271 |
if msg:
|
jens@11
|
272 |
log.debug("End of frame of %s",msg)
|
jens@13
|
273 |
if not msg._moreComing:
|
jens@11
|
274 |
self._receivedMessage(msg)
|
jens@12
|
275 |
|
jens@11
|
276 |
def _receivedMessage(self, msg):
|
jens@11
|
277 |
log.info("Received: %s",msg)
|
jens@11
|
278 |
# Remove from pending:
|
jens@11
|
279 |
if msg.isResponse:
|
jens@13
|
280 |
del self.pendingResponses[msg.requestNo]
|
jens@11
|
281 |
else:
|
jens@11
|
282 |
del self.pendingRequests[msg.requestNo]
|
jens@11
|
283 |
# Decode:
|
jens@11
|
284 |
try:
|
jens@11
|
285 |
msg._finished()
|
jens@12
|
286 |
if not msg.isResponse:
|
jens@12
|
287 |
self.onRequest(msg)
|
jens@11
|
288 |
except Exception, x:
|
jens@12
|
289 |
log.error("Exception handling incoming message: %s", traceback.format_exc())
|
jens@11
|
290 |
#FIX: Send an error reply
|
jens@11
|
291 |
|
jens@12
|
292 |
|
jens@13
|
293 |
### MESSAGE CLASSES:
|
jens@11
|
294 |
|
jens@11
|
295 |
|
jens@11
|
296 |
class Message (object):
|
jens@12
|
297 |
"Abstract superclass of all request/response objects"
|
jens@12
|
298 |
|
jens@13
|
299 |
def __init__(self, connection, body=None, properties=None):
|
jens@11
|
300 |
self.connection = connection
|
jens@13
|
301 |
self.body = body
|
jens@11
|
302 |
self.properties = properties or {}
|
jens@13
|
303 |
self.requestNo = None
|
jens@11
|
304 |
|
jens@11
|
305 |
@property
|
jens@11
|
306 |
def flags(self):
|
jens@12
|
307 |
if self.isResponse:
|
jens@12
|
308 |
flags = kMsgType_Response
|
jens@12
|
309 |
else:
|
jens@12
|
310 |
flags = kMsgType_Request
|
jens@11
|
311 |
if self.urgent: flags |= kMsgFlag_Urgent
|
jens@11
|
312 |
if self.compressed: flags |= kMsgFlag_Compressed
|
jens@11
|
313 |
if self.noReply: flags |= kMsgFlag_NoReply
|
jens@13
|
314 |
if self._moreComing:flags |= kMsgFlag_MoreComing
|
jens@11
|
315 |
return flags
|
jens@11
|
316 |
|
jens@11
|
317 |
def __str__(self):
|
jens@13
|
318 |
s = "%s[" %(type(self).__name__)
|
jens@13
|
319 |
if self.requestNo != None:
|
jens@13
|
320 |
s += "#%i" %self.requestNo
|
jens@11
|
321 |
if self.urgent: s += " URG"
|
jens@11
|
322 |
if self.compressed: s += " CMP"
|
jens@11
|
323 |
if self.noReply: s += " NOR"
|
jens@13
|
324 |
if self._moreComing:s += " MOR"
|
jens@11
|
325 |
if self.body: s += " %i bytes" %len(self.body)
|
jens@11
|
326 |
return s+"]"
|
jens@11
|
327 |
|
jens@11
|
328 |
def __repr__(self):
|
jens@11
|
329 |
s = str(self)
|
jens@11
|
330 |
if len(self.properties): s += repr(self.properties)
|
jens@11
|
331 |
return s
|
jens@12
|
332 |
|
jens@13
|
333 |
@property
|
jens@11
|
334 |
def isResponse(self):
|
jens@12
|
335 |
"Is this message a response?"
|
jens@11
|
336 |
return False
|
jens@12
|
337 |
|
jens@13
|
338 |
@property
|
jens@12
|
339 |
def contentType(self):
|
jens@12
|
340 |
return self.properties.get('Content-Type')
|
jens@12
|
341 |
|
jens@12
|
342 |
def __getitem__(self, key): return self.properties.get(key)
|
jens@12
|
343 |
def __contains__(self, key): return key in self.properties
|
jens@12
|
344 |
def __len__(self): return len(self.properties)
|
jens@12
|
345 |
def __nonzero__(self): return True
|
jens@12
|
346 |
def __iter__(self): return self.properties.__iter__()
|
jens@11
|
347 |
|
jens@11
|
348 |
|
jens@11
|
349 |
class IncomingMessage (Message):
|
jens@12
|
350 |
"Abstract superclass of incoming messages."
|
jens@12
|
351 |
|
jens@11
|
352 |
def __init__(self, connection, requestNo, flags):
|
jens@11
|
353 |
super(IncomingMessage,self).__init__(connection)
|
jens@11
|
354 |
self.requestNo = requestNo
|
jens@12
|
355 |
self.urgent = (flags & kMsgFlag_Urgent) != 0
|
jens@11
|
356 |
self.compressed = (flags & kMsgFlag_Compressed) != 0
|
jens@11
|
357 |
self.noReply = (flags & kMsgFlag_NoReply) != 0
|
jens@13
|
358 |
self._moreComing= (flags & kMsgFlag_MoreComing) != 0
|
jens@11
|
359 |
self.frames = []
|
jens@11
|
360 |
|
jens@11
|
361 |
def _beginFrame(self, flags):
|
jens@13
|
362 |
"""Received a frame header."""
|
jens@13
|
363 |
self._moreComing = (flags & kMsgFlag_MoreComing)!=0
|
jens@12
|
364 |
|
jens@11
|
365 |
def _receivedData(self, data):
|
jens@13
|
366 |
"""Received data from a frame."""
|
jens@11
|
367 |
self.frames.append(data)
|
jens@11
|
368 |
|
jens@11
|
369 |
def _finished(self):
|
jens@13
|
370 |
"""The entire message has been received; now decode it."""
|
jens@11
|
371 |
encoded = "".join(self.frames)
|
jens@11
|
372 |
self.frames = None
|
jens@11
|
373 |
|
jens@11
|
374 |
# Decode the properties:
|
jens@11
|
375 |
if len(encoded) < 2: raise MessageException, "missing properties length"
|
jens@11
|
376 |
propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
|
jens@11
|
377 |
if propSize>len(encoded): raise MessageException, "properties too long to fit"
|
jens@11
|
378 |
if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
|
jens@11
|
379 |
|
jens@11
|
380 |
proplist = encoded[2:propSize-1].split('\000')
|
jens@11
|
381 |
encoded = encoded[propSize:]
|
jens@11
|
382 |
if len(proplist) & 1: raise MessageException, "odd number of property strings"
|
jens@11
|
383 |
for i in xrange(0,len(proplist),2):
|
jens@11
|
384 |
def expand(str):
|
jens@11
|
385 |
if len(str)==1:
|
jens@11
|
386 |
str = IncomingMessage.__expandDict.get(str,str)
|
jens@11
|
387 |
return str
|
jens@11
|
388 |
self.properties[ expand(proplist[i])] = expand(proplist[i+1])
|
jens@11
|
389 |
|
jens@11
|
390 |
# Decode the body:
|
jens@11
|
391 |
if self.compressed and len(encoded)>0:
|
jens@11
|
392 |
try:
|
jens@11
|
393 |
encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format
|
jens@11
|
394 |
except zlib.error:
|
jens@11
|
395 |
raise MessageException, sys.exc_info()[1]
|
jens@11
|
396 |
self.body = encoded
|
jens@11
|
397 |
|
jens@11
|
398 |
__expandDict= {'\x01' : "Content-Type",
|
jens@11
|
399 |
'\x02' : "Profile",
|
jens@11
|
400 |
'\x03' : "application/octet-stream",
|
jens@11
|
401 |
'\x04' : "text/plain; charset=UTF-8",
|
jens@11
|
402 |
'\x05' : "text/xml",
|
jens@11
|
403 |
'\x06' : "text/yaml",
|
jens@11
|
404 |
'\x07' : "Channel",
|
jens@11
|
405 |
'\x08' : "Error-Code",
|
jens@11
|
406 |
'\x09' : "Error-Domain"}
|
jens@12
|
407 |
|
jens@11
|
408 |
|
jens@11
|
409 |
class OutgoingMessage (Message):
|
jens@12
|
410 |
"Abstract superclass of outgoing requests/responses."
|
jens@12
|
411 |
|
jens@13
|
412 |
def __init__(self, connection, body=None, properties=None):
|
jens@13
|
413 |
Message.__init__(self,connection,body,properties)
|
jens@12
|
414 |
self.urgent = self.compressed = self.noReply = False
|
jens@13
|
415 |
self._moreComing = True
|
jens@12
|
416 |
|
jens@12
|
417 |
def __setitem__(self, key,val):
|
jens@12
|
418 |
self.properties[key] = val
|
jens@12
|
419 |
def __delitem__(self, key):
|
jens@12
|
420 |
del self.properties[key]
|
jens@11
|
421 |
|
jens@13
|
422 |
@property
|
jens@13
|
423 |
def sent(self):
|
jens@13
|
424 |
return 'encoded' in self.__dict__
|
jens@13
|
425 |
|
jens@13
|
426 |
def _encode(self):
|
jens@13
|
427 |
"Generates the message's encoded form, prior to sending it."
|
jens@11
|
428 |
out = StringIO()
|
jens@12
|
429 |
for (key,value) in self.properties.iteritems():
|
jens@13
|
430 |
def _writePropString(s):
|
jens@13
|
431 |
out.write(str(s)) #FIX: Abbreviate
|
jens@11
|
432 |
out.write('\000')
|
jens@12
|
433 |
_writePropString(key)
|
jens@12
|
434 |
_writePropString(value)
|
jens@13
|
435 |
propertiesSize = out.tell()
|
jens@13
|
436 |
assert propertiesSize<65536 #FIX: Return an error instead
|
jens@11
|
437 |
|
jens@11
|
438 |
body = self.body
|
jens@11
|
439 |
if self.compressed:
|
jens@13
|
440 |
z = zlib.compressobj(6,zlib.DEFLATED,31) # window size of 31 needed for gzip format
|
jens@13
|
441 |
out.write(z.compress(body))
|
jens@13
|
442 |
body = z.flush()
|
jens@13
|
443 |
out.write(body)
|
jens@13
|
444 |
|
jens@13
|
445 |
self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
|
jens@13
|
446 |
out.close()
|
jens@12
|
447 |
log.debug("Encoded %s into %u bytes", self,len(self.encoded))
|
jens@11
|
448 |
self.bytesSent = 0
|
jens@11
|
449 |
|
jens@14
|
450 |
def _sendNextFrame(self, maxLen):
|
jens@11
|
451 |
pos = self.bytesSent
|
jens@11
|
452 |
payload = self.encoded[pos:pos+maxLen]
|
jens@11
|
453 |
pos += len(payload)
|
jens@13
|
454 |
self._moreComing = (pos < len(self.encoded))
|
jens@13
|
455 |
if not self._moreComing:
|
jens@13
|
456 |
self.encoded = None
|
jens@12
|
457 |
log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
|
jens@12
|
458 |
|
jens@14
|
459 |
header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
|
jens@12
|
460 |
self.requestNo,
|
jens@12
|
461 |
self.flags,
|
jens@14
|
462 |
kFrameHeaderSize+len(payload))
|
jens@11
|
463 |
self.bytesSent = pos
|
jens@14
|
464 |
return header + payload
|
jens@11
|
465 |
|
jens@11
|
466 |
|
jens@12
|
467 |
class Request (object):
|
jens@12
|
468 |
@property
|
jens@12
|
469 |
def response(self):
|
jens@12
|
470 |
"The response object for this request."
|
jens@13
|
471 |
if self.noReply:
|
jens@13
|
472 |
return None
|
jens@12
|
473 |
r = self.__dict__.get('_response')
|
jens@12
|
474 |
if r==None:
|
jens@12
|
475 |
r = self._response = self._createResponse()
|
jens@12
|
476 |
return r
|
jens@12
|
477 |
|
jens@11
|
478 |
|
jens@11
|
479 |
class Response (Message):
|
jens@13
|
480 |
def _setRequest(self, request):
|
jens@12
|
481 |
assert not request.noReply
|
jens@12
|
482 |
self.request = request
|
jens@12
|
483 |
self.requestNo = request.requestNo
|
jens@12
|
484 |
self.urgent = request.urgent
|
jens@12
|
485 |
|
jens@11
|
486 |
@property
|
jens@11
|
487 |
def isResponse(self):
|
jens@11
|
488 |
return True
|
jens@11
|
489 |
|
jens@11
|
490 |
|
jens@11
|
491 |
class IncomingRequest (IncomingMessage, Request):
|
jens@12
|
492 |
def _createResponse(self):
|
jens@12
|
493 |
return OutgoingResponse(self)
|
jens@11
|
494 |
|
jens@13
|
495 |
|
jens@11
|
496 |
class OutgoingRequest (OutgoingMessage, Request):
|
jens@12
|
497 |
def _createResponse(self):
|
jens@12
|
498 |
return IncomingResponse(self)
|
jens@13
|
499 |
|
jens@13
|
500 |
def send(self):
|
jens@13
|
501 |
self._encode()
|
jens@13
|
502 |
return self.connection._sendRequest(self) and self.response
|
jens@13
|
503 |
|
jens@11
|
504 |
|
jens@11
|
505 |
class IncomingResponse (IncomingMessage, Response):
|
jens@12
|
506 |
def __init__(self, request):
|
jens@13
|
507 |
IncomingMessage.__init__(self,request.connection,None,0)
|
jens@13
|
508 |
self._setRequest(request)
|
jens@12
|
509 |
self.onComplete = None
|
jens@12
|
510 |
|
jens@12
|
511 |
def _finished(self):
|
jens@12
|
512 |
super(IncomingResponse,self)._finished()
|
jens@12
|
513 |
if self.onComplete:
|
jens@12
|
514 |
try:
|
jens@12
|
515 |
self.onComplete(self)
|
jens@12
|
516 |
except Exception, x:
|
jens@12
|
517 |
log.error("Exception dispatching response: %s", traceback.format_exc())
|
jens@13
|
518 |
|
jens@13
|
519 |
|
jens@11
|
520 |
class OutgoingResponse (OutgoingMessage, Response):
|
jens@12
|
521 |
def __init__(self, request):
|
jens@12
|
522 |
OutgoingMessage.__init__(self,request.connection)
|
jens@13
|
523 |
self._setRequest(request)
|
jens@13
|
524 |
|
jens@13
|
525 |
def send(self):
|
jens@13
|
526 |
self._encode()
|
jens@13
|
527 |
return self.connection._sendMessage(self)
|
jens@11
|
528 |
|
jens@11
|
529 |
|
jens@13
|
530 |
"""
|
jens@13
|
531 |
Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
|
jens@13
|
532 |
|
jens@13
|
533 |
Redistribution and use in source and binary forms, with or without modification, are permitted
|
jens@13
|
534 |
provided that the following conditions are met:
|
jens@13
|
535 |
|
jens@13
|
536 |
* Redistributions of source code must retain the above copyright notice, this list of conditions
|
jens@13
|
537 |
and the following disclaimer.
|
jens@13
|
538 |
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions
|
jens@13
|
539 |
and the following disclaimer in the documentation and/or other materials provided with the
|
jens@13
|
540 |
distribution.
|
jens@13
|
541 |
|
jens@13
|
542 |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
|
jens@13
|
543 |
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
|
jens@13
|
544 |
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
|
jens@13
|
545 |
BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
jens@13
|
546 |
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
jens@13
|
547 |
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
jens@13
|
548 |
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
jens@13
|
549 |
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
jens@13
|
550 |
"""
|