# HG changeset patch # User Jens Alfke # Date 1212638985 25200 # Node ID bb5faa9995d543d1e274fd5b37b31c5e251edb14 # Parent 84c2d38f924cbc44fbc31bc724c56d548a913071 Python: Optimized frame sending somewhat (frame buffers are generated on the fly as the socket has room.) diff -r 84c2d38f924c -r bb5faa9995d5 Python/BLIP.py --- a/Python/BLIP.py Wed Jun 04 17:11:20 2008 -0700 +++ b/Python/BLIP.py Wed Jun 04 21:09:45 2008 -0700 @@ -110,6 +110,7 @@ self.outBox = [] self.inMessage = None self.inNumRequests = self.outNumRequests = 0 + self.sending = False self._endOfFrame() def close(self): @@ -149,6 +150,10 @@ def _sendMessage(self, msg): if self.canSend: self._outQueueMessage(msg,True) + if not self.sending: + log.debug("Waking up the output stream") + self.sending = True + self.push_with_producer(self) return True else: return False @@ -175,7 +180,7 @@ if index 0: - msg = self.outBox.pop(0) - frameSize = 4096 - if msg.urgent or n==1 or not self.outBox[0].urgent: - frameSize *= 4 - if msg._sendNextFrame(self,frameSize): - self._outQueueMessage(msg,isNew=False) - else: - log.info("Finished sending %s",msg) - + def more(self): + n = len(self.outBox) + if n > 0: + msg = self.outBox.pop(0) + frameSize = 4096 + if msg.urgent or n==1 or not self.outBox[0].urgent: + frameSize *= 4 + data = msg._sendNextFrame(frameSize) + if msg._moreComing: + self._outQueueMessage(msg,isNew=False) + else: + log.info("Finished sending %s",msg) + return data + else: + log.debug("Nothing more to send") + self.sending = False + return None ### RECEIVING: @@ -440,7 +447,7 @@ log.debug("Encoded %s into %u bytes", self,len(self.encoded)) self.bytesSent = 0 - def _sendNextFrame(self, conn,maxLen): + def _sendNextFrame(self, maxLen): pos = self.bytesSent payload = self.encoded[pos:pos+maxLen] pos += len(payload) @@ -449,14 +456,12 @@ self.encoded = None log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos) - conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber, + header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber, self.requestNo, self.flags, - kFrameHeaderSize+len(payload)) ) - conn.push( payload ) - + kFrameHeaderSize+len(payload)) self.bytesSent = pos - return self._moreComing + return header + payload class Request (object): diff -r 84c2d38f924c -r bb5faa9995d5 Python/BLIPConnectionTest.py --- a/Python/BLIPConnectionTest.py Wed Jun 04 17:11:20 2008 -0700 +++ b/Python/BLIPConnectionTest.py Wed Jun 04 21:09:45 2008 -0700 @@ -17,7 +17,9 @@ import unittest -kSendInterval = 2.0 +kSendInterval = 0.2 +kNBatchedMessages = 10 +kUrgentEvery = 4 def randbool(): return random.randint(0,1) == 1 @@ -27,6 +29,7 @@ def setUp(self): self.connection = Connection( ('localhost',46353) ) + self.nRepliesPending = 0 def sendRequest(self): size = random.randint(0,32767) @@ -41,28 +44,31 @@ 'Date': datetime.now(), 'Size': size}) req.compressed = randbool() - req.urgent = randbool() + req.urgent = (random.randint(0,kUrgentEvery-1)==0) req.response.onComplete = self.gotResponse return req.send() def gotResponse(self, response): - logging.info("Got response!: %s",response) + self.nRepliesPending -= 1 + logging.info("Got response!: %s (%i pending)",response,self.nRepliesPending) request = response.request assert response.body == request.body def testClient(self): lastReqTime = None - nRequests = 0 - while nRequests < 10: + nIterations = 0 + while nIterations < 10: asyncore.loop(timeout=kSendInterval,count=1) now = datetime.now() - if self.connection.status!=kOpening and not lastReqTime or (now-lastReqTime).seconds >= kSendInterval: + if self.connection.status!=kOpening and (not lastReqTime or (now-lastReqTime).microseconds >= kSendInterval*1.0e6): lastReqTime = now - if not self.sendRequest(): - logging.warn("Couldn't send request (connection is probably closed)") - break; - nRequests += 1 + for i in xrange(0,kNBatchedMessages): + if not self.sendRequest(): + logging.warn("Couldn't send request (connection is probably closed)") + break; + self.nRepliesPending += 1 + nIterations += 1 def tearDown(self): self.connection.close()