# HG changeset patch # User Jens Alfke # Date 1212537393 25200 # Node ID 29e8b03c05d45a31037f825fd29c5b3f2379cc0f # Parent a2aeb9b04eccddeccb98b116468cd4b4dd1f8336 * Initial checkin of BLIP.py. (Receiving seems to work.) * FIXED: Abbreviation list in BLIPProperties was messed up. * Renamed some instance variables to use 'request' instead of 'query'. * Test client doesn't throw an assertion-failure now when the number of unresponded requests exceeds 100. diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPProperties.m --- a/BLIP/BLIPProperties.m Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPProperties.m Tue Jun 03 16:56:33 2008 -0700 @@ -16,14 +16,13 @@ static const char* kAbbreviations[] = { "Content-Type", "Profile", - "Channel" - "Error-Code" - "Error-Domain", "application/octet-stream", "text/plain; charset=UTF-8", "text/xml", "text/yaml", - "application/x-cloudy-signed+yaml", + "Channel", + "Error-Code", + "Error-Domain", }; #define kNAbbreviations ((sizeof(kAbbreviations)/sizeof(const char*))) // cannot exceed 31! diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPReader.h --- a/BLIP/BLIPReader.h Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPReader.h Tue Jun 03 16:56:33 2008 -0700 @@ -18,8 +18,8 @@ UInt32 _curBytesRead; NSMutableData *_curBody; - UInt32 _numQueriesReceived; - NSMutableDictionary *_pendingQueries, *_pendingReplies; + UInt32 _numRequestsReceived; + NSMutableDictionary *_pendingRequests, *_pendingResponses; } - (void) _addPendingResponse: (BLIPResponse*)response; diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPReader.m --- a/BLIP/BLIPReader.m Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPReader.m Tue Jun 03 16:56:33 2008 -0700 @@ -31,27 +31,27 @@ { self = [super initWithConnection: conn stream: stream]; if (self != nil) { - _pendingQueries = [[NSMutableDictionary alloc] init]; - _pendingReplies = [[NSMutableDictionary alloc] init]; + _pendingRequests = [[NSMutableDictionary alloc] init]; + _pendingResponses = [[NSMutableDictionary alloc] init]; } return self; } - (void) dealloc { - [_pendingQueries release]; - [_pendingReplies release]; + [_pendingRequests release]; + [_pendingResponses release]; [_curBody release]; [super dealloc]; } - (void) disconnect { - for( BLIPResponse *response in [_pendingReplies allValues] ) { + for( BLIPResponse *response in [_pendingResponses allValues] ) { [response _connectionClosed]; [_conn tellDelegate: @selector(connection:receivedResponse:) withObject: response]; } - setObj(&_pendingReplies,nil); + setObj(&_pendingResponses,nil); [super disconnect]; } @@ -154,7 +154,7 @@ - (void) _addPendingResponse: (BLIPResponse*)response { - [_pendingReplies setObject: response forKey: $object(response.number)]; + [_pendingResponses setObject: response forKey: $object(response.number)]; } @@ -169,14 +169,14 @@ switch(type) { case kBLIP_MSG: { // Incoming request: - BLIPRequest *request = [_pendingQueries objectForKey: key]; + BLIPRequest *request = [_pendingRequests objectForKey: key]; if( request ) { // Continuation frame of a request: if( complete ) { [[request retain] autorelease]; - [_pendingQueries removeObjectForKey: key]; + [_pendingRequests removeObjectForKey: key]; } - } else if( header->number == _numQueriesReceived+1 ) { + } else if( header->number == _numRequestsReceived+1 ) { // Next new request: request = [[[BLIPRequest alloc] _initWithConnection: _blipConn isMine: NO @@ -185,12 +185,12 @@ body: nil] autorelease]; if( ! complete ) - [_pendingQueries setObject: request forKey: key]; - _numQueriesReceived++; + [_pendingRequests setObject: request forKey: key]; + _numRequestsReceived++; } else return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, @"Received bad request frame #%u (next is #%u)", - header->number,_numQueriesReceived+1)]; + header->number,_numRequestsReceived+1)]; if( ! [request _receivedFrameWithHeader: header body: body] ) return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, @@ -203,11 +203,11 @@ case kBLIP_RPY: case kBLIP_ERR: { - BLIPResponse *response = [_pendingReplies objectForKey: key]; + BLIPResponse *response = [_pendingResponses objectForKey: key]; if( response ) { if( complete ) { [[response retain] autorelease]; - [_pendingReplies removeObjectForKey: key]; + [_pendingResponses removeObjectForKey: key]; } if( ! [response _receivedFrameWithHeader: header body: body] ) { @@ -217,7 +217,7 @@ [_blipConn _dispatchResponse: response]; } else { - if( header->number <= ((BLIPWriter*)self.writer).numQueriesSent ) + if( header->number <= ((BLIPWriter*)self.writer).numRequestsSent ) LogTo(BLIP,@"??? %@ got unexpected response frame to my msg #%u", self,header->number); //benign else diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPTest.m --- a/BLIP/BLIPTest.m Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPTest.m Tue Jun 03 16:56:33 2008 -0700 @@ -25,6 +25,7 @@ #endif +#define kListenerHost @"localhost" #define kListenerPort 46353 #define kSendInterval 0.5 #define kNBatchedMessages 20 @@ -66,7 +67,7 @@ if (self != nil) { Log(@"** INIT %@",self); _pending = [[NSMutableDictionary alloc] init]; - IPAddress *addr = [[IPAddress alloc] initWithHostname: @"localhost" port: kListenerPort]; + IPAddress *addr = [[IPAddress alloc] initWithHostname: kListenerHost port: kListenerPort]; _conn = [[BLIPConnection alloc] initToAddress: addr]; if( ! _conn ) { [self release]; @@ -99,30 +100,34 @@ - (void) sendAMessage { - Log(@"** Sending another %i messages...", kNBatchedMessages); - for( int i=0; i 12 ) - q.urgent = YES; - BLIPResponse *response = [q send]; - Assert(response); - Assert(q.number>0); - Assert(response.number==q.number); - [_pending setObject: $object(size) forKey: $object(q.number)]; - response.onComplete = $target(self,responseArrived:); + if(_pending.count<100) { + Log(@"** Sending another %i messages...", kNBatchedMessages); + for( int i=0; i 12 ) + q.urgent = YES; + BLIPResponse *response = [q send]; + Assert(response); + Assert(q.number>0); + Assert(response.number==q.number); + [_pending setObject: $object(size) forKey: $object(q.number)]; + response.onComplete = $target(self,responseArrived:); + } + } else { + Warn(@"There are %u pending messages; waiting for the listener to catch up...",_pending.count); } [self performSelector: @selector(sendAMessage) withObject: nil afterDelay: kSendInterval]; } @@ -184,7 +189,6 @@ Assert(sizeObj); [_pending removeObjectForKey: $object(response.number)]; Log(@"Now %u replies pending", _pending.count); - Assert(_pending.count<100); } diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPWriter.h --- a/BLIP/BLIPWriter.h Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPWriter.h Tue Jun 03 16:56:33 2008 -0700 @@ -13,12 +13,12 @@ @interface BLIPWriter : TCPWriter { NSMutableArray *_outBox; - UInt32 _numQueriesSent; + UInt32 _numRequestsSent; } - (BOOL) sendRequest: (BLIPRequest*)request response: (BLIPResponse*)response; - (BOOL) sendMessage: (BLIPMessage*)message; -@property (readonly) UInt32 numQueriesSent; +@property (readonly) UInt32 numRequestsSent; @end diff -r a2aeb9b04ecc -r 29e8b03c05d4 BLIP/BLIPWriter.m --- a/BLIP/BLIPWriter.m Sun Jun 01 14:04:22 2008 -0700 +++ b/BLIP/BLIPWriter.m Tue Jun 03 16:56:33 2008 -0700 @@ -33,7 +33,7 @@ [super disconnect]; } -@synthesize numQueriesSent=_numQueriesSent; +@synthesize numRequestsSent=_numRequestsSent; - (BOOL) isBusy @@ -92,9 +92,9 @@ - (BOOL) sendRequest: (BLIPRequest*)q response: (BLIPResponse*)response { if( !_shouldClose ) { - [q _assignedNumber: ++_numQueriesSent]; + [q _assignedNumber: ++_numRequestsSent]; if( response ) { - [response _assignedNumber: _numQueriesSent]; + [response _assignedNumber: _numRequestsSent]; [(BLIPReader*)self.reader _addPendingResponse: response]; } } diff -r a2aeb9b04ecc -r 29e8b03c05d4 MYNetwork.xcodeproj/project.pbxproj --- a/MYNetwork.xcodeproj/project.pbxproj Sun Jun 01 14:04:22 2008 -0700 +++ b/MYNetwork.xcodeproj/project.pbxproj Tue Jun 03 16:56:33 2008 -0700 @@ -521,7 +521,7 @@ GCC_C_LANGUAGE_STANDARD = c99; GCC_MODEL_TUNING = G5; GCC_PRECOMPILE_PREFIX_HEADER = YES; - GCC_PREFIX_HEADER = MYNetwork_Prefix.pch; + GCC_PREFIX_HEADER = ../MYUtilities/MYUtilities_Prefix.pch; INSTALL_PATH = /usr/local/bin; PRODUCT_NAME = MYNetwork; }; diff -r a2aeb9b04ecc -r 29e8b03c05d4 Python/BLIP.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Python/BLIP.py Tue Jun 03 16:56:33 2008 -0700 @@ -0,0 +1,386 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +BLIP.py + +Created by Jens Alfke on 2008-06-03. +Copyright (c) 2008 Jens Alfke. All rights reserved. +""" + +import asynchat +import asyncore +from cStringIO import StringIO +import logging +import socket +import struct +import sys +import traceback +import unittest +import zlib + + +kFrameMagicNumber = 0x9B34F205 +kFrameHeaderFormat = '!LLHH' +kFrameHeaderSize = 12 + +kMsgFlag_TypeMask = 0x000F +kMsgFlag_Compressed = 0x0010 +kMsgFlag_Urgent = 0x0020 +kMsgFlag_NoReply = 0x0040 +kMsgFlag_MoreComing = 0x0080 + +kMsgType_Request = 0 +kMsgType_Response = 1 +kMsgType_Error = 2 + + +log = logging.getLogger('BLIP') +log.propagate = True + +class MessageException(Exception): + pass + +class ConnectionException(Exception): + pass + + +class Listener (asyncore.dispatcher): + def __init__(self, port): + asyncore.dispatcher.__init__(self) + self.onConnected = None + self.onRequest = None + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind( ('',port) ) + self.listen(5) + log.info("Listening on port %u", port) + + def handle_accept( self ): + client,address = self.accept() + conn = Connection(address,client) + conn.onRequest = self.onRequest + if self.onConnected: + self.onConnected(conn) + + +class Connection (asynchat.async_chat): + def __init__( self, address, conn=None ): + asynchat.async_chat.__init__(self,conn) + self.address = address + if conn: + log.info("Accepted connection from %s",address) + else: + log.info("Opening connection to %s",address) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(address) + self.onRequest = None + self.pendingRequests = {} + self.pendingResponses = {} + self.outBox = [] + self.inMessage = None + self.inNumRequests = 0 + self._endOfFrame() + + #def handle_error(self,x): + # log.error("Uncaught exception: %s",x) + # self.close() + + def _fatal(self, error): + log.error("Fatal BLIP connection error: %s",error) + self.close() + + + ### SENDING: + + def _outQueueMessage(self, msg,isNew=True): + n = self.outBox.length + index = n + if msg.urgent and n>1: + while index > 0: + otherMsg = self.outBox[index] + if otherMsg.urgent: + if index 0: + msg = self.outBox.pop(0) + frameSize = 4096 + if msg.urgent or n==1 or not self.outBox[0].urgent: + frameSize *= 4 + if msg._sendNextFrame(self): + self._outQueueMessage(msg,isNew=False) + + + ### RECEIVING: + + def collect_incoming_data(self, data): + if self.expectingHeader: + if self.inHeader==None: + self.inHeader = data + else: + self.inHeader += data + else: + self.inMessage._receivedData(data) + + def found_terminator(self): + if self.expectingHeader: + # Got a header: + (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader) + self.inHeader = None + if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic) + if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen) + frameLen -= kFrameHeaderSize + log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i", + (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen) + self.inMessage = self._inMessageForFrame(requestNo,flags) + + if frameLen > 0: + self.expectingHeader = False + self.set_terminator(frameLen) + else: + self._endOfFrame() + + else: + # Got the frame's payload: + self._endOfFrame() + + def _inMessageForFrame(self, requestNo,flags): + message = None + msgType = flags & kMsgFlag_TypeMask + if msgType==kMsgType_Request: + message = self.pendingRequests.get(requestNo) + if message==None and requestNo == self.inNumRequests+1: + message = IncomingRequest(self,requestNo,flags) + self.pendingRequests[requestNo] = message + self.inNumRequests += 1 + elif msgType==kMsgType_Response or msgType==kMsgType_Error: + message = self.pendingResponses.get(requestNo) + + if message: + message._beginFrame(flags) + else: + log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo) + return message + + def _endOfFrame(self): + msg = self.inMessage + self.inMessage = None + self.expectingHeader = True + self.inHeader = None + self.set_terminator(kFrameHeaderSize) # wait for binary header + if msg: + log.debug("End of frame of %s",msg) + if not msg.moreComing: + self._receivedMessage(msg) + + def _receivedMessage(self, msg): + log.info("Received: %s",msg) + # Remove from pending: + if msg.isResponse: + del self.pendingReplies[msg.requestNo] + else: + del self.pendingRequests[msg.requestNo] + # Decode: + try: + msg._finished() + except Exception, x: + log.error("Exception parsing message: %s", traceback.format_exc()) + return + # Dispatch: + try: + self.onRequest(msg) + except Exception, x: + log.error("Exception dispatching message: %s", traceback.format_exc()) + #FIX: Send an error reply + +### MESSAGES: + + +class Message (object): + def __init__(self, connection, properties=None, body=None): + self.connection = connection + self.properties = properties or {} + self.body = body + + @property + def flags(self): + flags = kMsgType_Request + if self.urgent: flags |= kMsgFlag_Urgent + if self.compressed: flags |= kMsgFlag_Compressed + if self.noReply: flags |= kMsgFlag_NoReply + if self.moreComing: flags |= kMsgFlag_MoreComing + return flags + + def __str__(self): + s = "%s[#%i" %(type(self).__name__,self.requestNo) + if self.urgent: s += " URG" + if self.compressed: s += " CMP" + if self.noReply: s += " NOR" + if self.moreComing: s += " MOR" + if self.body: s += " %i bytes" %len(self.body) + return s+"]" + + def __repr__(self): + s = str(self) + if len(self.properties): s += repr(self.properties) + return s + + @property + def isResponse(self): + return False + + +class IncomingMessage (Message): + def __init__(self, connection, requestNo, flags): + super(IncomingMessage,self).__init__(connection) + self.requestNo = requestNo + self.urgent = (flags & kMsgFlag_Urgent) != 0 + self.compressed = (flags & kMsgFlag_Compressed) != 0 + self.noReply = (flags & kMsgFlag_NoReply) != 0 + self.moreComing = (flags & kMsgFlag_MoreComing) != 0 + self.frames = [] + + def _beginFrame(self, flags): + if (flags & kMsgFlag_MoreComing)==0: + self.moreComing = False + + def _receivedData(self, data): + self.frames.append(data) + + def _finished(self): + encoded = "".join(self.frames) + self.frames = None + + # Decode the properties: + if len(encoded) < 2: raise MessageException, "missing properties length" + propSize = 2 + struct.unpack('!H',encoded[0:2])[0] + if propSize>len(encoded): raise MessageException, "properties too long to fit" + if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated" + + proplist = encoded[2:propSize-1].split('\000') + encoded = encoded[propSize:] + if len(proplist) & 1: raise MessageException, "odd number of property strings" + for i in xrange(0,len(proplist),2): + def expand(str): + if len(str)==1: + str = IncomingMessage.__expandDict.get(str,str) + return str + self.properties[ expand(proplist[i])] = expand(proplist[i+1]) + + # Decode the body: + if self.compressed and len(encoded)>0: + try: + encoded = zlib.decompress(encoded,31) # window size of 31 needed for gzip format + except zlib.error: + raise MessageException, sys.exc_info()[1] + self.body = encoded + + __expandDict= {'\x01' : "Content-Type", + '\x02' : "Profile", + '\x03' : "application/octet-stream", + '\x04' : "text/plain; charset=UTF-8", + '\x05' : "text/xml", + '\x06' : "text/yaml", + '\x07' : "Channel", + '\x08' : "Error-Code", + '\x09' : "Error-Domain"} + + + +class OutgoingMessage (Message): + + def send(self): + out = StringIO() + out.write("xx") # placeholder for properties length (16 bits) + for (key,value) in self.properties: + def _writePropString(self, str): + out.write(str) + #FIX: Abbreviate + out.write('\000') + self._writePropString(key) + self._writePropString(value) + propsLen = out.tell() + self.encoded = out.stringvalue() + out.close() + self.encoded[0:2] = struct.pack('!H',propsLen) + + body = self.body + if self.compressed: + body = zlib.compress(body,5) + self.encoded += body + + self.bytesSent = 0 + self.connection._outQueueMessage(self) + + def _sendNextFrame(self, conn,maxLen): + pos = self.bytesSent + payload = self.encoded[pos:pos+maxLen] + pos += len(payload) + if pos >= len(self.encoded): + self.moreComing = False + + conn.push( struct.pack(kFrameHeaderFormat, + kFrameMagicNumber, + self.requestNo, + self.flags, + kFrameHeaderSize+len(payload)) ) + conn.push( payload ) + + self.bytesSent = pos + + +class Request (Message): + pass + +class Response (Message): + @property + def isResponse(self): + return True + + @property + def flags(self): + flags = super(Response,self).flags() ^ kMsgType_Request + flags ^= kMsgType_Response + return flags + + + +class IncomingRequest (IncomingMessage, Request): + pass + +class OutgoingRequest (OutgoingMessage, Request): + pass + +class IncomingResponse (IncomingMessage, Response): + pass + +class OutgoingResponse (OutgoingMessage, Response): + pass + + +### UNIT TESTS: + + +class BLIPTests(unittest.TestCase): + def setUp(self): + listener = Listener(46353) + listener.onRequest = lambda req: logging.info("Got request!: %r",req) + + def testListener(self): + logging.info("Waiting...") + asyncore.loop() + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main() \ No newline at end of file