* Initial checkin of BLIP.py. (Receiving seems to work.)
authorJens Alfke <jens@mooseyard.com>
Tue Jun 03 16:56:33 2008 -0700 (2008-06-03)
changeset 1129e8b03c05d4
parent 10 a2aeb9b04ecc
child 12 710113961756
* Initial checkin of BLIP.py. (Receiving seems to work.)
* FIXED: Abbreviation list in BLIPProperties was messed up.
* Renamed some instance variables to use 'request' instead of 'query'.
* Test client doesn't throw an assertion-failure now when the number of unresponded requests exceeds 100.
BLIP/BLIPProperties.m
BLIP/BLIPReader.h
BLIP/BLIPReader.m
BLIP/BLIPTest.m
BLIP/BLIPWriter.h
BLIP/BLIPWriter.m
MYNetwork.xcodeproj/project.pbxproj
Python/BLIP.py
     1.1 --- a/BLIP/BLIPProperties.m	Sun Jun 01 14:04:22 2008 -0700
     1.2 +++ b/BLIP/BLIPProperties.m	Tue Jun 03 16:56:33 2008 -0700
     1.3 @@ -16,14 +16,13 @@
     1.4  static const char* kAbbreviations[] = {
     1.5      "Content-Type",
     1.6      "Profile",
     1.7 -    "Channel"
     1.8 -    "Error-Code"
     1.9 -    "Error-Domain",
    1.10      "application/octet-stream",
    1.11      "text/plain; charset=UTF-8",
    1.12      "text/xml",
    1.13      "text/yaml",
    1.14 -    "application/x-cloudy-signed+yaml",
    1.15 +    "Channel",
    1.16 +    "Error-Code",
    1.17 +    "Error-Domain",
    1.18  };
    1.19  #define kNAbbreviations ((sizeof(kAbbreviations)/sizeof(const char*)))  // cannot exceed 31!
    1.20  
     2.1 --- a/BLIP/BLIPReader.h	Sun Jun 01 14:04:22 2008 -0700
     2.2 +++ b/BLIP/BLIPReader.h	Tue Jun 03 16:56:33 2008 -0700
     2.3 @@ -18,8 +18,8 @@
     2.4      UInt32 _curBytesRead;
     2.5      NSMutableData *_curBody;
     2.6  
     2.7 -    UInt32 _numQueriesReceived;
     2.8 -    NSMutableDictionary *_pendingQueries, *_pendingReplies;
     2.9 +    UInt32 _numRequestsReceived;
    2.10 +    NSMutableDictionary *_pendingRequests, *_pendingResponses;
    2.11  }
    2.12  
    2.13  - (void) _addPendingResponse: (BLIPResponse*)response;
     3.1 --- a/BLIP/BLIPReader.m	Sun Jun 01 14:04:22 2008 -0700
     3.2 +++ b/BLIP/BLIPReader.m	Tue Jun 03 16:56:33 2008 -0700
     3.3 @@ -31,27 +31,27 @@
     3.4  {
     3.5      self = [super initWithConnection: conn stream: stream];
     3.6      if (self != nil) {
     3.7 -        _pendingQueries = [[NSMutableDictionary alloc] init];
     3.8 -        _pendingReplies = [[NSMutableDictionary alloc] init];
     3.9 +        _pendingRequests = [[NSMutableDictionary alloc] init];
    3.10 +        _pendingResponses = [[NSMutableDictionary alloc] init];
    3.11      }
    3.12      return self;
    3.13  }
    3.14  
    3.15  - (void) dealloc
    3.16  {
    3.17 -    [_pendingQueries release];
    3.18 -    [_pendingReplies release];
    3.19 +    [_pendingRequests release];
    3.20 +    [_pendingResponses release];
    3.21      [_curBody release];
    3.22      [super dealloc];
    3.23  }
    3.24  
    3.25  - (void) disconnect
    3.26  {
    3.27 -    for( BLIPResponse *response in [_pendingReplies allValues] ) {
    3.28 +    for( BLIPResponse *response in [_pendingResponses allValues] ) {
    3.29          [response _connectionClosed];
    3.30          [_conn tellDelegate: @selector(connection:receivedResponse:) withObject: response];
    3.31      }
    3.32 -    setObj(&_pendingReplies,nil);
    3.33 +    setObj(&_pendingResponses,nil);
    3.34      [super disconnect];
    3.35  }
    3.36  
    3.37 @@ -154,7 +154,7 @@
    3.38  
    3.39  - (void) _addPendingResponse: (BLIPResponse*)response
    3.40  {
    3.41 -    [_pendingReplies setObject: response forKey: $object(response.number)];
    3.42 +    [_pendingResponses setObject: response forKey: $object(response.number)];
    3.43  }
    3.44  
    3.45  
    3.46 @@ -169,14 +169,14 @@
    3.47      switch(type) {
    3.48          case kBLIP_MSG: {
    3.49              // Incoming request:
    3.50 -            BLIPRequest *request = [_pendingQueries objectForKey: key];
    3.51 +            BLIPRequest *request = [_pendingRequests objectForKey: key];
    3.52              if( request ) {
    3.53                  // Continuation frame of a request:
    3.54                  if( complete ) {
    3.55                      [[request retain] autorelease];
    3.56 -                    [_pendingQueries removeObjectForKey: key];
    3.57 +                    [_pendingRequests removeObjectForKey: key];
    3.58                  }
    3.59 -            } else if( header->number == _numQueriesReceived+1 ) {
    3.60 +            } else if( header->number == _numRequestsReceived+1 ) {
    3.61                  // Next new request:
    3.62                  request = [[[BLIPRequest alloc] _initWithConnection: _blipConn
    3.63                                                           isMine: NO
    3.64 @@ -185,12 +185,12 @@
    3.65                                                             body: nil]
    3.66                                  autorelease];
    3.67                  if( ! complete )
    3.68 -                    [_pendingQueries setObject: request forKey: key];
    3.69 -                _numQueriesReceived++;
    3.70 +                    [_pendingRequests setObject: request forKey: key];
    3.71 +                _numRequestsReceived++;
    3.72              } else
    3.73                  return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, 
    3.74                                                 @"Received bad request frame #%u (next is #%u)",
    3.75 -                                               header->number,_numQueriesReceived+1)];
    3.76 +                                               header->number,_numRequestsReceived+1)];
    3.77              
    3.78              if( ! [request _receivedFrameWithHeader: header body: body] )
    3.79                  return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, 
    3.80 @@ -203,11 +203,11 @@
    3.81              
    3.82          case kBLIP_RPY:
    3.83          case kBLIP_ERR: {
    3.84 -            BLIPResponse *response = [_pendingReplies objectForKey: key];
    3.85 +            BLIPResponse *response = [_pendingResponses objectForKey: key];
    3.86              if( response ) {
    3.87                  if( complete ) {
    3.88                      [[response retain] autorelease];
    3.89 -                    [_pendingReplies removeObjectForKey: key];
    3.90 +                    [_pendingResponses removeObjectForKey: key];
    3.91                  }
    3.92                  
    3.93                  if( ! [response _receivedFrameWithHeader: header body: body] ) {
    3.94 @@ -217,7 +217,7 @@
    3.95                      [_blipConn _dispatchResponse: response];
    3.96                  
    3.97              } else {
    3.98 -                if( header->number <= ((BLIPWriter*)self.writer).numQueriesSent )
    3.99 +                if( header->number <= ((BLIPWriter*)self.writer).numRequestsSent )
   3.100                      LogTo(BLIP,@"??? %@ got unexpected response frame to my msg #%u",
   3.101                            self,header->number); //benign
   3.102                  else
     4.1 --- a/BLIP/BLIPTest.m	Sun Jun 01 14:04:22 2008 -0700
     4.2 +++ b/BLIP/BLIPTest.m	Tue Jun 03 16:56:33 2008 -0700
     4.3 @@ -25,6 +25,7 @@
     4.4  #endif
     4.5  
     4.6  
     4.7 +#define kListenerHost               @"localhost"
     4.8  #define kListenerPort               46353
     4.9  #define kSendInterval               0.5
    4.10  #define kNBatchedMessages           20
    4.11 @@ -66,7 +67,7 @@
    4.12      if (self != nil) {
    4.13          Log(@"** INIT %@",self);
    4.14          _pending = [[NSMutableDictionary alloc] init];
    4.15 -        IPAddress *addr = [[IPAddress alloc] initWithHostname: @"localhost" port: kListenerPort];
    4.16 +        IPAddress *addr = [[IPAddress alloc] initWithHostname: kListenerHost port: kListenerPort];
    4.17          _conn = [[BLIPConnection alloc] initToAddress: addr];
    4.18          if( ! _conn ) {
    4.19              [self release];
    4.20 @@ -99,30 +100,34 @@
    4.21  
    4.22  - (void) sendAMessage
    4.23  {
    4.24 -    Log(@"** Sending another %i messages...", kNBatchedMessages);
    4.25 -    for( int i=0; i<kNBatchedMessages; i++ ) {
    4.26 -        size_t size = random() % 32768;
    4.27 -        NSMutableData *body = [NSMutableData dataWithLength: size];
    4.28 -        UInt8 *bytes = body.mutableBytes;
    4.29 -        for( size_t i=0; i<size; i++ )
    4.30 -            bytes[i] = i % 256;
    4.31 -        
    4.32 -        BLIPRequest *q = [_conn requestWithBody: body
    4.33 -                                     properties: $dict({@"Content-Type", @"application/octet-stream"},
    4.34 -                                                       {@"User-Agent", @"BLIPConnectionTester"},
    4.35 -                                                       {@"Date", [[NSDate date] description]},
    4.36 -                                                       {@"Size",$sprintf(@"%u",size)})];
    4.37 -        Assert(q);
    4.38 -        if( kUseCompression && (random()%2==1) )
    4.39 -            q.compressed = YES;
    4.40 -        if( random()%16 > 12 )
    4.41 -            q.urgent = YES;
    4.42 -        BLIPResponse *response = [q send];
    4.43 -        Assert(response);
    4.44 -        Assert(q.number>0);
    4.45 -        Assert(response.number==q.number);
    4.46 -        [_pending setObject: $object(size) forKey: $object(q.number)];
    4.47 -        response.onComplete = $target(self,responseArrived:);
    4.48 +    if(_pending.count<100) {
    4.49 +        Log(@"** Sending another %i messages...", kNBatchedMessages);
    4.50 +        for( int i=0; i<kNBatchedMessages; i++ ) {
    4.51 +            size_t size = random() % 32768;
    4.52 +            NSMutableData *body = [NSMutableData dataWithLength: size];
    4.53 +            UInt8 *bytes = body.mutableBytes;
    4.54 +            for( size_t i=0; i<size; i++ )
    4.55 +                bytes[i] = i % 256;
    4.56 +            
    4.57 +            BLIPRequest *q = [_conn requestWithBody: body
    4.58 +                                         properties: $dict({@"Content-Type", @"application/octet-stream"},
    4.59 +                                                           {@"User-Agent", @"BLIPConnectionTester"},
    4.60 +                                                           {@"Date", [[NSDate date] description]},
    4.61 +                                                           {@"Size",$sprintf(@"%u",size)})];
    4.62 +            Assert(q);
    4.63 +            if( kUseCompression && (random()%2==1) )
    4.64 +                q.compressed = YES;
    4.65 +            if( random()%16 > 12 )
    4.66 +                q.urgent = YES;
    4.67 +            BLIPResponse *response = [q send];
    4.68 +            Assert(response);
    4.69 +            Assert(q.number>0);
    4.70 +            Assert(response.number==q.number);
    4.71 +            [_pending setObject: $object(size) forKey: $object(q.number)];
    4.72 +            response.onComplete = $target(self,responseArrived:);
    4.73 +        }
    4.74 +    } else {
    4.75 +        Warn(@"There are %u pending messages; waiting for the listener to catch up...",_pending.count);
    4.76      }
    4.77      [self performSelector: @selector(sendAMessage) withObject: nil afterDelay: kSendInterval];
    4.78  }
    4.79 @@ -184,7 +189,6 @@
    4.80      Assert(sizeObj);
    4.81      [_pending removeObjectForKey: $object(response.number)];
    4.82      Log(@"Now %u replies pending", _pending.count);
    4.83 -    Assert(_pending.count<100);
    4.84  }
    4.85  
    4.86  
     5.1 --- a/BLIP/BLIPWriter.h	Sun Jun 01 14:04:22 2008 -0700
     5.2 +++ b/BLIP/BLIPWriter.h	Tue Jun 03 16:56:33 2008 -0700
     5.3 @@ -13,12 +13,12 @@
     5.4  @interface BLIPWriter : TCPWriter
     5.5  {
     5.6      NSMutableArray *_outBox;
     5.7 -    UInt32 _numQueriesSent;
     5.8 +    UInt32 _numRequestsSent;
     5.9  }
    5.10  
    5.11  - (BOOL) sendRequest: (BLIPRequest*)request response: (BLIPResponse*)response;
    5.12  - (BOOL) sendMessage: (BLIPMessage*)message;
    5.13  
    5.14 -@property (readonly) UInt32 numQueriesSent;
    5.15 +@property (readonly) UInt32 numRequestsSent;
    5.16  
    5.17  @end
     6.1 --- a/BLIP/BLIPWriter.m	Sun Jun 01 14:04:22 2008 -0700
     6.2 +++ b/BLIP/BLIPWriter.m	Tue Jun 03 16:56:33 2008 -0700
     6.3 @@ -33,7 +33,7 @@
     6.4      [super disconnect];
     6.5  }
     6.6  
     6.7 -@synthesize numQueriesSent=_numQueriesSent;
     6.8 +@synthesize numRequestsSent=_numRequestsSent;
     6.9  
    6.10  
    6.11  - (BOOL) isBusy
    6.12 @@ -92,9 +92,9 @@
    6.13  - (BOOL) sendRequest: (BLIPRequest*)q response: (BLIPResponse*)response
    6.14  {
    6.15      if( !_shouldClose ) {
    6.16 -        [q _assignedNumber: ++_numQueriesSent];
    6.17 +        [q _assignedNumber: ++_numRequestsSent];
    6.18          if( response ) {
    6.19 -            [response _assignedNumber: _numQueriesSent];
    6.20 +            [response _assignedNumber: _numRequestsSent];
    6.21              [(BLIPReader*)self.reader _addPendingResponse: response];
    6.22          }
    6.23      }
     7.1 --- a/MYNetwork.xcodeproj/project.pbxproj	Sun Jun 01 14:04:22 2008 -0700
     7.2 +++ b/MYNetwork.xcodeproj/project.pbxproj	Tue Jun 03 16:56:33 2008 -0700
     7.3 @@ -521,7 +521,7 @@
     7.4  				GCC_C_LANGUAGE_STANDARD = c99;
     7.5  				GCC_MODEL_TUNING = G5;
     7.6  				GCC_PRECOMPILE_PREFIX_HEADER = YES;
     7.7 -				GCC_PREFIX_HEADER = MYNetwork_Prefix.pch;
     7.8 +				GCC_PREFIX_HEADER = ../MYUtilities/MYUtilities_Prefix.pch;
     7.9  				INSTALL_PATH = /usr/local/bin;
    7.10  				PRODUCT_NAME = MYNetwork;
    7.11  			};
     8.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     8.2 +++ b/Python/BLIP.py	Tue Jun 03 16:56:33 2008 -0700
     8.3 @@ -0,0 +1,386 @@
     8.4 +#!/usr/bin/env python
     8.5 +# encoding: utf-8
     8.6 +"""
     8.7 +BLIP.py
     8.8 +
     8.9 +Created by Jens Alfke on 2008-06-03.
    8.10 +Copyright (c) 2008 Jens Alfke. All rights reserved.
    8.11 +"""
    8.12 +
    8.13 +import asynchat
    8.14 +import asyncore
    8.15 +from cStringIO import StringIO
    8.16 +import logging
    8.17 +import socket
    8.18 +import struct
    8.19 +import sys
    8.20 +import traceback
    8.21 +import unittest
    8.22 +import zlib
    8.23 +
    8.24 +
    8.25 +kFrameMagicNumber   = 0x9B34F205
    8.26 +kFrameHeaderFormat  = '!LLHH'
    8.27 +kFrameHeaderSize    = 12
    8.28 +
    8.29 +kMsgFlag_TypeMask   = 0x000F
    8.30 +kMsgFlag_Compressed = 0x0010
    8.31 +kMsgFlag_Urgent     = 0x0020
    8.32 +kMsgFlag_NoReply    = 0x0040
    8.33 +kMsgFlag_MoreComing = 0x0080
    8.34 +
    8.35 +kMsgType_Request    = 0
    8.36 +kMsgType_Response   = 1
    8.37 +kMsgType_Error      = 2
    8.38 +
    8.39 +
    8.40 +log = logging.getLogger('BLIP')
    8.41 +log.propagate = True
    8.42 +
    8.43 +class MessageException(Exception):
    8.44 +    pass
    8.45 +
    8.46 +class ConnectionException(Exception):
    8.47 +    pass
    8.48 +
    8.49 +
    8.50 +class Listener (asyncore.dispatcher):
    8.51 +    def __init__(self, port):
    8.52 +        asyncore.dispatcher.__init__(self)
    8.53 +        self.onConnected = None
    8.54 +        self.onRequest = None
    8.55 +        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    8.56 +        self.bind( ('',port) )
    8.57 +        self.listen(5)
    8.58 +        log.info("Listening on port %u", port)
    8.59 +    
    8.60 +    def handle_accept( self ):
    8.61 +        client,address = self.accept()
    8.62 +        conn = Connection(address,client)
    8.63 +        conn.onRequest = self.onRequest
    8.64 +        if self.onConnected:
    8.65 +            self.onConnected(conn)
    8.66 +
    8.67 +
    8.68 +class Connection (asynchat.async_chat):
    8.69 +    def __init__( self, address, conn=None ):
    8.70 +        asynchat.async_chat.__init__(self,conn)
    8.71 +        self.address = address
    8.72 +        if conn:
    8.73 +            log.info("Accepted connection from %s",address)
    8.74 +        else:
    8.75 +            log.info("Opening connection to %s",address)
    8.76 +            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    8.77 +            self.connect(address)
    8.78 +        self.onRequest = None
    8.79 +        self.pendingRequests = {}
    8.80 +        self.pendingResponses = {}
    8.81 +        self.outBox = []
    8.82 +        self.inMessage = None
    8.83 +        self.inNumRequests = 0
    8.84 +        self._endOfFrame()
    8.85 +    
    8.86 +    #def handle_error(self,x):
    8.87 +    #    log.error("Uncaught exception: %s",x)
    8.88 +    #    self.close()
    8.89 +    
    8.90 +    def _fatal(self, error):
    8.91 +        log.error("Fatal BLIP connection error: %s",error)
    8.92 +        self.close()
    8.93 +    
    8.94 +    
    8.95 +    ### SENDING:
    8.96 +    
    8.97 +    def _outQueueMessage(self, msg,isNew=True):
    8.98 +        n = self.outBox.length
    8.99 +        index = n
   8.100 +        if msg.urgent and n>1:
   8.101 +            while index > 0:
   8.102 +                otherMsg = self.outBox[index]
   8.103 +                if otherMsg.urgent:
   8.104 +                    if index<n:
   8.105 +                        index += 1
   8.106 +                    break
   8.107 +                elif isNew and otherMsg._bytesWritten==0:
   8.108 +                    break
   8.109 +                index -= 1
   8.110 +            else:
   8.111 +                index = 1
   8.112 +                    
   8.113 +        self.outBox.insert(index,msg)
   8.114 +        if isNew:
   8.115 +            log.info("Queuing outgoing message at index %i",index)
   8.116 +    
   8.117 +    def _sendNextFrame(self):
   8.118 +        n = len(self.outBox)
   8.119 +        if n > 0:
   8.120 +            msg = self.outBox.pop(0)
   8.121 +            frameSize = 4096
   8.122 +            if msg.urgent or n==1 or not self.outBox[0].urgent:
   8.123 +                frameSize *= 4
   8.124 +            if msg._sendNextFrame(self):
   8.125 +                self._outQueueMessage(msg,isNew=False)
   8.126 +    
   8.127 +    
   8.128 +    ### RECEIVING:
   8.129 +    
   8.130 +    def collect_incoming_data(self, data):
   8.131 +        if self.expectingHeader:
   8.132 +            if self.inHeader==None:
   8.133 +                self.inHeader = data
   8.134 +            else:
   8.135 +                self.inHeader += data
   8.136 +        else:
   8.137 +            self.inMessage._receivedData(data)
   8.138 +        
   8.139 +    def found_terminator(self):
   8.140 +        if self.expectingHeader:
   8.141 +            # Got a header:
   8.142 +            (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
   8.143 +            self.inHeader = None
   8.144 +            if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
   8.145 +            if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
   8.146 +            frameLen -= kFrameHeaderSize
   8.147 +            log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
   8.148 +                        (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
   8.149 +            self.inMessage = self._inMessageForFrame(requestNo,flags)
   8.150 +            
   8.151 +            if frameLen > 0:
   8.152 +                self.expectingHeader = False
   8.153 +                self.set_terminator(frameLen)
   8.154 +            else:
   8.155 +                self._endOfFrame()
   8.156 +                
   8.157 +        else:
   8.158 +            # Got the frame's payload:
   8.159 +            self._endOfFrame()
   8.160 +    
   8.161 +    def _inMessageForFrame(self, requestNo,flags):
   8.162 +        message = None
   8.163 +        msgType = flags & kMsgFlag_TypeMask
   8.164 +        if msgType==kMsgType_Request:
   8.165 +            message = self.pendingRequests.get(requestNo)
   8.166 +            if message==None and requestNo == self.inNumRequests+1:
   8.167 +                message = IncomingRequest(self,requestNo,flags)
   8.168 +                self.pendingRequests[requestNo] = message
   8.169 +                self.inNumRequests += 1
   8.170 +        elif msgType==kMsgType_Response or msgType==kMsgType_Error:
   8.171 +            message = self.pendingResponses.get(requestNo)
   8.172 +            
   8.173 +        if message:
   8.174 +            message._beginFrame(flags)
   8.175 +        else:
   8.176 +            log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
   8.177 +        return message
   8.178 +    
   8.179 +    def _endOfFrame(self):
   8.180 +        msg = self.inMessage
   8.181 +        self.inMessage = None
   8.182 +        self.expectingHeader = True
   8.183 +        self.inHeader = None
   8.184 +        self.set_terminator(kFrameHeaderSize) # wait for binary header
   8.185 +        if msg:
   8.186 +            log.debug("End of frame of %s",msg)
   8.187 +            if not msg.moreComing:
   8.188 +                self._receivedMessage(msg)
   8.189 +
   8.190 +    def _receivedMessage(self, msg):
   8.191 +        log.info("Received: %s",msg)
   8.192 +        # Remove from pending:
   8.193 +        if msg.isResponse:
   8.194 +            del self.pendingReplies[msg.requestNo]
   8.195 +        else:
   8.196 +            del self.pendingRequests[msg.requestNo]
   8.197 +        # Decode:
   8.198 +        try:
   8.199 +            msg._finished()
   8.200 +        except Exception, x:
   8.201 +            log.error("Exception parsing message: %s", traceback.format_exc())
   8.202 +            return
   8.203 +        # Dispatch:
   8.204 +        try:
   8.205 +            self.onRequest(msg)
   8.206 +        except Exception, x:
   8.207 +            log.error("Exception dispatching message: %s", traceback.format_exc())
   8.208 +            #FIX: Send an error reply
   8.209 +
   8.210 +### MESSAGES:
   8.211 +
   8.212 +
   8.213 +class Message (object):
   8.214 +    def __init__(self, connection, properties=None, body=None):
   8.215 +        self.connection = connection
   8.216 +        self.properties = properties or {}
   8.217 +        self.body = body
   8.218 +    
   8.219 +    @property
   8.220 +    def flags(self):
   8.221 +        flags = kMsgType_Request
   8.222 +        if self.urgent:     flags |= kMsgFlag_Urgent
   8.223 +        if self.compressed: flags |= kMsgFlag_Compressed
   8.224 +        if self.noReply:    flags |= kMsgFlag_NoReply
   8.225 +        if self.moreComing: flags |= kMsgFlag_MoreComing
   8.226 +        return flags
   8.227 +    
   8.228 +    def __str__(self):
   8.229 +        s = "%s[#%i" %(type(self).__name__,self.requestNo)
   8.230 +        if self.urgent:     s += " URG"
   8.231 +        if self.compressed: s += " CMP"
   8.232 +        if self.noReply:    s += " NOR"
   8.233 +        if self.moreComing: s += " MOR"
   8.234 +        if self.body:       s += " %i bytes" %len(self.body)
   8.235 +        return s+"]"
   8.236 +    
   8.237 +    def __repr__(self):
   8.238 +        s = str(self)
   8.239 +        if len(self.properties): s += repr(self.properties)
   8.240 +        return s
   8.241 +        
   8.242 +    @property
   8.243 +    def isResponse(self):
   8.244 +        return False
   8.245 +
   8.246 +
   8.247 +class IncomingMessage (Message):
   8.248 +    def __init__(self, connection, requestNo, flags):
   8.249 +        super(IncomingMessage,self).__init__(connection)
   8.250 +        self.requestNo  = requestNo
   8.251 +        self.urgent     = (flags & kMsgFlag_Urgent) != 0 
   8.252 +        self.compressed = (flags & kMsgFlag_Compressed) != 0
   8.253 +        self.noReply    = (flags & kMsgFlag_NoReply) != 0
   8.254 +        self.moreComing = (flags & kMsgFlag_MoreComing) != 0
   8.255 +        self.frames     = []
   8.256 +    
   8.257 +    def _beginFrame(self, flags):
   8.258 +        if (flags & kMsgFlag_MoreComing)==0:
   8.259 +            self.moreComing = False
   8.260 +
   8.261 +    def _receivedData(self, data):
   8.262 +        self.frames.append(data)
   8.263 +    
   8.264 +    def _finished(self):
   8.265 +        encoded = "".join(self.frames)
   8.266 +        self.frames = None
   8.267 +        
   8.268 +        # Decode the properties:
   8.269 +        if len(encoded) < 2: raise MessageException, "missing properties length"
   8.270 +        propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
   8.271 +        if propSize>len(encoded): raise MessageException, "properties too long to fit"
   8.272 +        if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
   8.273 +        
   8.274 +        proplist = encoded[2:propSize-1].split('\000')
   8.275 +        encoded = encoded[propSize:]
   8.276 +        if len(proplist) & 1: raise MessageException, "odd number of property strings"
   8.277 +        for i in xrange(0,len(proplist),2):
   8.278 +            def expand(str):
   8.279 +                if len(str)==1:
   8.280 +                    str = IncomingMessage.__expandDict.get(str,str)
   8.281 +                return str
   8.282 +            self.properties[ expand(proplist[i])] = expand(proplist[i+1])
   8.283 +        
   8.284 +        # Decode the body:
   8.285 +        if self.compressed and len(encoded)>0:
   8.286 +            try:
   8.287 +                encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
   8.288 +            except zlib.error:
   8.289 +                raise MessageException, sys.exc_info()[1]
   8.290 +        self.body = encoded
   8.291 +    
   8.292 +    __expandDict= {'\x01' : "Content-Type",
   8.293 +                   '\x02' : "Profile",
   8.294 +                   '\x03' : "application/octet-stream",
   8.295 +                   '\x04' : "text/plain; charset=UTF-8",
   8.296 +                   '\x05' : "text/xml",
   8.297 +                   '\x06' : "text/yaml",
   8.298 +                   '\x07' : "Channel",
   8.299 +                   '\x08' : "Error-Code",
   8.300 +                   '\x09' : "Error-Domain"}
   8.301 +        
   8.302 +
   8.303 +
   8.304 +class OutgoingMessage (Message):
   8.305 +    
   8.306 +    def send(self):
   8.307 +        out = StringIO()
   8.308 +        out.write("xx")         # placeholder for properties length (16 bits)
   8.309 +        for (key,value) in self.properties:
   8.310 +            def _writePropString(self, str):
   8.311 +                out.write(str)
   8.312 +                #FIX: Abbreviate
   8.313 +                out.write('\000')
   8.314 +            self._writePropString(key)
   8.315 +            self._writePropString(value)
   8.316 +        propsLen = out.tell()
   8.317 +        self.encoded = out.stringvalue()
   8.318 +        out.close()
   8.319 +        self.encoded[0:2] = struct.pack('!H',propsLen)
   8.320 +        
   8.321 +        body = self.body
   8.322 +        if self.compressed:
   8.323 +            body = zlib.compress(body,5)
   8.324 +        self.encoded += body
   8.325 +        
   8.326 +        self.bytesSent = 0
   8.327 +        self.connection._outQueueMessage(self)
   8.328 +    
   8.329 +    def _sendNextFrame(self, conn,maxLen):
   8.330 +        pos = self.bytesSent
   8.331 +        payload = self.encoded[pos:pos+maxLen]
   8.332 +        pos += len(payload)
   8.333 +        if pos >= len(self.encoded):
   8.334 +            self.moreComing = False
   8.335 +
   8.336 +        conn.push( struct.pack(kFrameHeaderFormat, 
   8.337 +                               kFrameMagicNumber,
   8.338 +                               self.requestNo,
   8.339 +                               self.flags,
   8.340 +                               kFrameHeaderSize+len(payload)) )
   8.341 +        conn.push( payload )
   8.342 +        
   8.343 +        self.bytesSent = pos
   8.344 +
   8.345 +
   8.346 +class Request (Message):
   8.347 +    pass
   8.348 +
   8.349 +class Response (Message):
   8.350 +    @property
   8.351 +    def isResponse(self):
   8.352 +        return True
   8.353 +
   8.354 +    @property
   8.355 +    def flags(self):
   8.356 +        flags = super(Response,self).flags() ^ kMsgType_Request
   8.357 +        flags ^= kMsgType_Response
   8.358 +        return flags
   8.359 +
   8.360 +
   8.361 +
   8.362 +class IncomingRequest (IncomingMessage, Request):
   8.363 +    pass
   8.364 +
   8.365 +class OutgoingRequest (OutgoingMessage, Request):
   8.366 +    pass
   8.367 +
   8.368 +class IncomingResponse (IncomingMessage, Response):
   8.369 +    pass
   8.370 +
   8.371 +class OutgoingResponse (OutgoingMessage, Response):
   8.372 +    pass
   8.373 +
   8.374 +
   8.375 +### UNIT TESTS:
   8.376 +
   8.377 +
   8.378 +class BLIPTests(unittest.TestCase):
   8.379 +    def setUp(self):
   8.380 +        listener = Listener(46353)
   8.381 +        listener.onRequest = lambda req: logging.info("Got request!: %r",req)
   8.382 +    
   8.383 +    def testListener(self):
   8.384 +        logging.info("Waiting...")
   8.385 +        asyncore.loop()
   8.386 +
   8.387 +if __name__ == '__main__':
   8.388 +    logging.basicConfig(level=logging.INFO)
   8.389 +    unittest.main()
   8.390 \ No newline at end of file