Python: Optimized frame sending somewhat (frame buffers are generated on the fly as the socket has room.)
authorJens Alfke <jens@mooseyard.com>
Wed Jun 04 21:09:45 2008 -0700 (2008-06-04)
changeset 14bb5faa9995d5
parent 13 84c2d38f924c
child 15 f723174fbc24
Python: Optimized frame sending somewhat (frame buffers are generated on the fly as the socket has room.)
Python/BLIP.py
Python/BLIPConnectionTest.py
     1.1 --- a/Python/BLIP.py	Wed Jun 04 17:11:20 2008 -0700
     1.2 +++ b/Python/BLIP.py	Wed Jun 04 21:09:45 2008 -0700
     1.3 @@ -110,6 +110,7 @@
     1.4          self.outBox = []
     1.5          self.inMessage = None
     1.6          self.inNumRequests = self.outNumRequests = 0
     1.7 +        self.sending = False
     1.8          self._endOfFrame()
     1.9      
    1.10      def close(self):
    1.11 @@ -149,6 +150,10 @@
    1.12      def _sendMessage(self, msg):
    1.13          if self.canSend:
    1.14              self._outQueueMessage(msg,True)
    1.15 +            if not self.sending:
    1.16 +                log.debug("Waking up the output stream")
    1.17 +                self.sending = True
    1.18 +                self.push_with_producer(self)
    1.19              return True
    1.20          else:
    1.21              return False
    1.22 @@ -175,7 +180,7 @@
    1.23                      if index<n:
    1.24                          index += 1
    1.25                      break
    1.26 -                elif isNew and otherMsg._bytesWritten==0:
    1.27 +                elif isNew and otherMsg.bytesSent==0:
    1.28                      break
    1.29                  index -= 1
    1.30              else:
    1.31 @@ -184,24 +189,26 @@
    1.32          self.outBox.insert(index,msg)
    1.33          if isNew:
    1.34              log.info("Queuing %s at index %i",msg,index)
    1.35 -            if n==0:
    1.36 -                self._sendNextFrame()
    1.37          else:
    1.38              log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
    1.39      
    1.40 -    def _sendNextFrame(self):
    1.41 -        while self.outBox:              #FIX: Don't send everything at once; only as space becomes available!
    1.42 -            n = len(self.outBox)
    1.43 -            if n > 0:
    1.44 -                msg = self.outBox.pop(0)
    1.45 -                frameSize = 4096
    1.46 -                if msg.urgent or n==1 or not self.outBox[0].urgent:
    1.47 -                    frameSize *= 4
    1.48 -                if msg._sendNextFrame(self,frameSize):
    1.49 -                    self._outQueueMessage(msg,isNew=False)
    1.50 -                else:
    1.51 -                    log.info("Finished sending %s",msg)
    1.52 -    
    1.53 +    def more(self):
    1.54 +        n = len(self.outBox)
    1.55 +        if n > 0:
    1.56 +            msg = self.outBox.pop(0)
    1.57 +            frameSize = 4096
    1.58 +            if msg.urgent or n==1 or not self.outBox[0].urgent:
    1.59 +                frameSize *= 4
    1.60 +            data = msg._sendNextFrame(frameSize)
    1.61 +            if msg._moreComing:
    1.62 +                self._outQueueMessage(msg,isNew=False)
    1.63 +            else:
    1.64 +                log.info("Finished sending %s",msg)
    1.65 +            return data
    1.66 +        else:
    1.67 +            log.debug("Nothing more to send")
    1.68 +            self.sending = False
    1.69 +            return None
    1.70      
    1.71      ### RECEIVING:
    1.72      
    1.73 @@ -440,7 +447,7 @@
    1.74          log.debug("Encoded %s into %u bytes", self,len(self.encoded))
    1.75          self.bytesSent = 0
    1.76      
    1.77 -    def _sendNextFrame(self, conn,maxLen):
    1.78 +    def _sendNextFrame(self, maxLen):
    1.79          pos = self.bytesSent
    1.80          payload = self.encoded[pos:pos+maxLen]
    1.81          pos += len(payload)
    1.82 @@ -449,14 +456,12 @@
    1.83              self.encoded = None
    1.84          log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
    1.85          
    1.86 -        conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
    1.87 +        header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
    1.88                                                     self.requestNo,
    1.89                                                     self.flags,
    1.90 -                                                   kFrameHeaderSize+len(payload)) )
    1.91 -        conn.push( payload )
    1.92 -        
    1.93 +                                                   kFrameHeaderSize+len(payload))
    1.94          self.bytesSent = pos
    1.95 -        return self._moreComing
    1.96 +        return header + payload
    1.97  
    1.98  
    1.99  class Request (object):
     2.1 --- a/Python/BLIPConnectionTest.py	Wed Jun 04 17:11:20 2008 -0700
     2.2 +++ b/Python/BLIPConnectionTest.py	Wed Jun 04 21:09:45 2008 -0700
     2.3 @@ -17,7 +17,9 @@
     2.4  import unittest
     2.5  
     2.6  
     2.7 -kSendInterval = 2.0
     2.8 +kSendInterval = 0.2
     2.9 +kNBatchedMessages = 10
    2.10 +kUrgentEvery = 4
    2.11  
    2.12  def randbool():
    2.13      return random.randint(0,1) == 1
    2.14 @@ -27,6 +29,7 @@
    2.15  
    2.16      def setUp(self):
    2.17          self.connection = Connection( ('localhost',46353) )
    2.18 +        self.nRepliesPending = 0
    2.19     
    2.20      def sendRequest(self):
    2.21          size = random.randint(0,32767)
    2.22 @@ -41,28 +44,31 @@
    2.23                                                       'Date': datetime.now(),
    2.24                                                       'Size': size})
    2.25          req.compressed = randbool()
    2.26 -        req.urgent     = randbool()
    2.27 +        req.urgent     = (random.randint(0,kUrgentEvery-1)==0)
    2.28          req.response.onComplete = self.gotResponse
    2.29          return req.send()
    2.30      
    2.31      def gotResponse(self, response):
    2.32 -        logging.info("Got response!: %s",response)
    2.33 +        self.nRepliesPending -= 1
    2.34 +        logging.info("Got response!: %s (%i pending)",response,self.nRepliesPending)
    2.35          request = response.request
    2.36          assert response.body == request.body
    2.37  
    2.38      def testClient(self):
    2.39          lastReqTime = None
    2.40 -        nRequests = 0
    2.41 -        while nRequests < 10:
    2.42 +        nIterations = 0
    2.43 +        while nIterations < 10:
    2.44              asyncore.loop(timeout=kSendInterval,count=1)
    2.45              
    2.46              now = datetime.now()
    2.47 -            if self.connection.status!=kOpening and not lastReqTime or (now-lastReqTime).seconds >= kSendInterval:
    2.48 +            if self.connection.status!=kOpening and (not lastReqTime or (now-lastReqTime).microseconds >= kSendInterval*1.0e6):
    2.49                  lastReqTime = now
    2.50 -                if not self.sendRequest():
    2.51 -                    logging.warn("Couldn't send request (connection is probably closed)")
    2.52 -                    break;
    2.53 -                nRequests += 1
    2.54 +                for i in xrange(0,kNBatchedMessages):
    2.55 +                    if not self.sendRequest():
    2.56 +                        logging.warn("Couldn't send request (connection is probably closed)")
    2.57 +                        break;
    2.58 +                    self.nRepliesPending += 1
    2.59 +                nIterations += 1
    2.60      
    2.61      def tearDown(self):
    2.62          self.connection.close()