jens@13: #!/usr/bin/env python jens@13: # encoding: utf-8 jens@13: """ jens@13: BLIPConnectionTest.py jens@13: jens@13: Created by Jens Alfke on 2008-06-04. jens@13: This source file is test/example code, and is in the public domain. jens@13: """ jens@13: jens@13: from BLIP import Connection, OutgoingRequest, kOpening jens@13: jens@13: import asyncore jens@13: from cStringIO import StringIO jens@13: from datetime import datetime jens@13: import logging jens@13: import random jens@13: import unittest jens@13: jens@13: jens@14: kSendInterval = 0.2 jens@14: kNBatchedMessages = 10 jens@14: kUrgentEvery = 4 jens@13: jens@13: def randbool(): jens@13: return random.randint(0,1) == 1 jens@13: jens@13: jens@13: class BLIPConnectionTest(unittest.TestCase): jens@13: jens@13: def setUp(self): jens@13: self.connection = Connection( ('localhost',46353) ) jens@14: self.nRepliesPending = 0 jens@13: jens@13: def sendRequest(self): jens@13: size = random.randint(0,32767) jens@13: io = StringIO() jens@13: for i in xrange(0,size): jens@13: io.write( chr(i % 256) ) jens@13: body = io.getvalue() jens@13: io.close jens@13: jens@13: req = OutgoingRequest(self.connection, body,{'Content-Type': 'application/octet-stream', jens@13: 'User-Agent': 'PyBLIP', jens@13: 'Date': datetime.now(), jens@13: 'Size': size}) jens@13: req.compressed = randbool() jens@14: req.urgent = (random.randint(0,kUrgentEvery-1)==0) jens@13: req.response.onComplete = self.gotResponse jens@13: return req.send() jens@13: jens@13: def gotResponse(self, response): jens@14: self.nRepliesPending -= 1 jens@14: logging.info("Got response!: %s (%i pending)",response,self.nRepliesPending) jens@13: request = response.request jens@13: assert response.body == request.body jens@13: jens@13: def testClient(self): jens@13: lastReqTime = None jens@14: nIterations = 0 jens@14: while nIterations < 10: jens@13: asyncore.loop(timeout=kSendInterval,count=1) jens@13: jens@13: now = datetime.now() jens@14: if self.connection.status!=kOpening and (not lastReqTime or (now-lastReqTime).microseconds >= kSendInterval*1.0e6): jens@13: lastReqTime = now jens@14: for i in xrange(0,kNBatchedMessages): jens@14: if not self.sendRequest(): jens@14: logging.warn("Couldn't send request (connection is probably closed)") jens@14: break; jens@14: self.nRepliesPending += 1 jens@14: nIterations += 1 jens@13: jens@13: def tearDown(self): jens@13: self.connection.close() morrowa@51: asyncore.loop() # got to give it time to negotiate close; this call should exit eventually jens@13: jens@13: if __name__ == '__main__': morrowa@51: logging.basicConfig(level=logging.DEBUG) jens@13: unittest.main()