Python: Optimized frame sending somewhat (frame buffers are generated on the fly as the socket has room.)
1.1 --- a/Python/BLIP.py Wed Jun 04 17:11:20 2008 -0700
1.2 +++ b/Python/BLIP.py Wed Jun 04 21:09:45 2008 -0700
1.3 @@ -110,6 +110,7 @@
1.4 self.outBox = []
1.5 self.inMessage = None
1.6 self.inNumRequests = self.outNumRequests = 0
1.7 + self.sending = False
1.8 self._endOfFrame()
1.9
1.10 def close(self):
1.11 @@ -149,6 +150,10 @@
1.12 def _sendMessage(self, msg):
1.13 if self.canSend:
1.14 self._outQueueMessage(msg,True)
1.15 + if not self.sending:
1.16 + log.debug("Waking up the output stream")
1.17 + self.sending = True
1.18 + self.push_with_producer(self)
1.19 return True
1.20 else:
1.21 return False
1.22 @@ -175,7 +180,7 @@
1.23 if index<n:
1.24 index += 1
1.25 break
1.26 - elif isNew and otherMsg._bytesWritten==0:
1.27 + elif isNew and otherMsg.bytesSent==0:
1.28 break
1.29 index -= 1
1.30 else:
1.31 @@ -184,24 +189,26 @@
1.32 self.outBox.insert(index,msg)
1.33 if isNew:
1.34 log.info("Queuing %s at index %i",msg,index)
1.35 - if n==0:
1.36 - self._sendNextFrame()
1.37 else:
1.38 log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
1.39
1.40 - def _sendNextFrame(self):
1.41 - while self.outBox: #FIX: Don't send everything at once; only as space becomes available!
1.42 - n = len(self.outBox)
1.43 - if n > 0:
1.44 - msg = self.outBox.pop(0)
1.45 - frameSize = 4096
1.46 - if msg.urgent or n==1 or not self.outBox[0].urgent:
1.47 - frameSize *= 4
1.48 - if msg._sendNextFrame(self,frameSize):
1.49 - self._outQueueMessage(msg,isNew=False)
1.50 - else:
1.51 - log.info("Finished sending %s",msg)
1.52 -
1.53 + def more(self):
1.54 + n = len(self.outBox)
1.55 + if n > 0:
1.56 + msg = self.outBox.pop(0)
1.57 + frameSize = 4096
1.58 + if msg.urgent or n==1 or not self.outBox[0].urgent:
1.59 + frameSize *= 4
1.60 + data = msg._sendNextFrame(frameSize)
1.61 + if msg._moreComing:
1.62 + self._outQueueMessage(msg,isNew=False)
1.63 + else:
1.64 + log.info("Finished sending %s",msg)
1.65 + return data
1.66 + else:
1.67 + log.debug("Nothing more to send")
1.68 + self.sending = False
1.69 + return None
1.70
1.71 ### RECEIVING:
1.72
1.73 @@ -440,7 +447,7 @@
1.74 log.debug("Encoded %s into %u bytes", self,len(self.encoded))
1.75 self.bytesSent = 0
1.76
1.77 - def _sendNextFrame(self, conn,maxLen):
1.78 + def _sendNextFrame(self, maxLen):
1.79 pos = self.bytesSent
1.80 payload = self.encoded[pos:pos+maxLen]
1.81 pos += len(payload)
1.82 @@ -449,14 +456,12 @@
1.83 self.encoded = None
1.84 log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
1.85
1.86 - conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
1.87 + header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
1.88 self.requestNo,
1.89 self.flags,
1.90 - kFrameHeaderSize+len(payload)) )
1.91 - conn.push( payload )
1.92 -
1.93 + kFrameHeaderSize+len(payload))
1.94 self.bytesSent = pos
1.95 - return self._moreComing
1.96 + return header + payload
1.97
1.98
1.99 class Request (object):
2.1 --- a/Python/BLIPConnectionTest.py Wed Jun 04 17:11:20 2008 -0700
2.2 +++ b/Python/BLIPConnectionTest.py Wed Jun 04 21:09:45 2008 -0700
2.3 @@ -17,7 +17,9 @@
2.4 import unittest
2.5
2.6
2.7 -kSendInterval = 2.0
2.8 +kSendInterval = 0.2
2.9 +kNBatchedMessages = 10
2.10 +kUrgentEvery = 4
2.11
2.12 def randbool():
2.13 return random.randint(0,1) == 1
2.14 @@ -27,6 +29,7 @@
2.15
2.16 def setUp(self):
2.17 self.connection = Connection( ('localhost',46353) )
2.18 + self.nRepliesPending = 0
2.19
2.20 def sendRequest(self):
2.21 size = random.randint(0,32767)
2.22 @@ -41,28 +44,31 @@
2.23 'Date': datetime.now(),
2.24 'Size': size})
2.25 req.compressed = randbool()
2.26 - req.urgent = randbool()
2.27 + req.urgent = (random.randint(0,kUrgentEvery-1)==0)
2.28 req.response.onComplete = self.gotResponse
2.29 return req.send()
2.30
2.31 def gotResponse(self, response):
2.32 - logging.info("Got response!: %s",response)
2.33 + self.nRepliesPending -= 1
2.34 + logging.info("Got response!: %s (%i pending)",response,self.nRepliesPending)
2.35 request = response.request
2.36 assert response.body == request.body
2.37
2.38 def testClient(self):
2.39 lastReqTime = None
2.40 - nRequests = 0
2.41 - while nRequests < 10:
2.42 + nIterations = 0
2.43 + while nIterations < 10:
2.44 asyncore.loop(timeout=kSendInterval,count=1)
2.45
2.46 now = datetime.now()
2.47 - if self.connection.status!=kOpening and not lastReqTime or (now-lastReqTime).seconds >= kSendInterval:
2.48 + if self.connection.status!=kOpening and (not lastReqTime or (now-lastReqTime).microseconds >= kSendInterval*1.0e6):
2.49 lastReqTime = now
2.50 - if not self.sendRequest():
2.51 - logging.warn("Couldn't send request (connection is probably closed)")
2.52 - break;
2.53 - nRequests += 1
2.54 + for i in xrange(0,kNBatchedMessages):
2.55 + if not self.sendRequest():
2.56 + logging.warn("Couldn't send request (connection is probably closed)")
2.57 + break;
2.58 + self.nRepliesPending += 1
2.59 + nIterations += 1
2.60
2.61 def tearDown(self):
2.62 self.connection.close()