Python/BLIP.py
author Jens Alfke <jens@mooseyard.com>
Tue Jun 03 22:24:21 2008 -0700 (2008-06-03)
changeset 12 710113961756
parent 11 29e8b03c05d4
child 13 84c2d38f924c
permissions -rw-r--r--
BLIP.py working for listener side (it talks to the Obj-C BLIPConnectionTester.)
jens@11
     1
#!/usr/bin/env python
jens@11
     2
# encoding: utf-8
jens@11
     3
"""
jens@11
     4
BLIP.py
jens@11
     5
jens@11
     6
Created by Jens Alfke on 2008-06-03.
jens@11
     7
Copyright (c) 2008 Jens Alfke. All rights reserved.
jens@11
     8
"""
jens@11
     9
jens@11
    10
import asynchat
jens@11
    11
import asyncore
jens@11
    12
from cStringIO import StringIO
jens@11
    13
import logging
jens@11
    14
import socket
jens@11
    15
import struct
jens@11
    16
import sys
jens@11
    17
import traceback
jens@11
    18
import unittest
jens@11
    19
import zlib
jens@11
    20
jens@11
    21
jens@12
    22
# INTERNAL CONSTANTS -- NO TOUCHIES!
jens@12
    23
jens@11
    24
kFrameMagicNumber   = 0x9B34F205
jens@11
    25
kFrameHeaderFormat  = '!LLHH'
jens@11
    26
kFrameHeaderSize    = 12
jens@11
    27
jens@11
    28
kMsgFlag_TypeMask   = 0x000F
jens@11
    29
kMsgFlag_Compressed = 0x0010
jens@11
    30
kMsgFlag_Urgent     = 0x0020
jens@11
    31
kMsgFlag_NoReply    = 0x0040
jens@11
    32
kMsgFlag_MoreComing = 0x0080
jens@11
    33
jens@11
    34
kMsgType_Request    = 0
jens@11
    35
kMsgType_Response   = 1
jens@11
    36
kMsgType_Error      = 2
jens@11
    37
jens@11
    38
jens@11
    39
log = logging.getLogger('BLIP')
jens@11
    40
log.propagate = True
jens@11
    41
jens@12
    42
jens@11
    43
class MessageException(Exception):
jens@11
    44
    pass
jens@11
    45
jens@11
    46
class ConnectionException(Exception):
jens@11
    47
    pass
jens@11
    48
jens@11
    49
jens@11
    50
class Listener (asyncore.dispatcher):
jens@12
    51
    "BLIP listener/server class"
jens@12
    52
    
jens@11
    53
    def __init__(self, port):
jens@12
    54
        "Create a listener on a port"
jens@11
    55
        asyncore.dispatcher.__init__(self)
jens@12
    56
        self.onConnected = self.onRequest = None
jens@11
    57
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
jens@11
    58
        self.bind( ('',port) )
jens@11
    59
        self.listen(5)
jens@11
    60
        log.info("Listening on port %u", port)
jens@11
    61
    
jens@11
    62
    def handle_accept( self ):
jens@11
    63
        client,address = self.accept()
jens@11
    64
        conn = Connection(address,client)
jens@11
    65
        conn.onRequest = self.onRequest
jens@11
    66
        if self.onConnected:
jens@11
    67
            self.onConnected(conn)
jens@11
    68
jens@11
    69
jens@11
    70
class Connection (asynchat.async_chat):
jens@11
    71
    def __init__( self, address, conn=None ):
jens@12
    72
        "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
jens@12
    73
        "otherwise it'll open a new outgoing socket."
jens@11
    74
        asynchat.async_chat.__init__(self,conn)
jens@11
    75
        self.address = address
jens@11
    76
        if conn:
jens@11
    77
            log.info("Accepted connection from %s",address)
jens@11
    78
        else:
jens@11
    79
            log.info("Opening connection to %s",address)
jens@11
    80
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
jens@11
    81
            self.connect(address)
jens@11
    82
        self.onRequest = None
jens@11
    83
        self.pendingRequests = {}
jens@11
    84
        self.pendingResponses = {}
jens@11
    85
        self.outBox = []
jens@11
    86
        self.inMessage = None
jens@11
    87
        self.inNumRequests = 0
jens@11
    88
        self._endOfFrame()
jens@11
    89
    
jens@11
    90
    #def handle_error(self,x):
jens@11
    91
    #    log.error("Uncaught exception: %s",x)
jens@11
    92
    #    self.close()
jens@11
    93
    
jens@11
    94
    def _fatal(self, error):
jens@11
    95
        log.error("Fatal BLIP connection error: %s",error)
jens@11
    96
        self.close()
jens@11
    97
    
jens@11
    98
    
jens@11
    99
    ### SENDING:
jens@11
   100
    
jens@11
   101
    def _outQueueMessage(self, msg,isNew=True):
jens@12
   102
        n = len(self.outBox)
jens@11
   103
        index = n
jens@11
   104
        if msg.urgent and n>1:
jens@11
   105
            while index > 0:
jens@12
   106
                otherMsg = self.outBox[index-1]
jens@11
   107
                if otherMsg.urgent:
jens@11
   108
                    if index<n:
jens@11
   109
                        index += 1
jens@11
   110
                    break
jens@11
   111
                elif isNew and otherMsg._bytesWritten==0:
jens@11
   112
                    break
jens@11
   113
                index -= 1
jens@11
   114
            else:
jens@11
   115
                index = 1
jens@12
   116
        
jens@11
   117
        self.outBox.insert(index,msg)
jens@11
   118
        if isNew:
jens@11
   119
            log.info("Queuing outgoing message at index %i",index)
jens@12
   120
            if n==0:
jens@12
   121
                self._sendNextFrame()
jens@12
   122
        else:
jens@12
   123
            log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
jens@11
   124
    
jens@11
   125
    def _sendNextFrame(self):
jens@12
   126
        while self.outBox:              #FIX: Don't send everything at once; only as space becomes available!
jens@12
   127
            n = len(self.outBox)
jens@12
   128
            if n > 0:
jens@12
   129
                msg = self.outBox.pop(0)
jens@12
   130
                frameSize = 4096
jens@12
   131
                if msg.urgent or n==1 or not self.outBox[0].urgent:
jens@12
   132
                    frameSize *= 4
jens@12
   133
                if msg._sendNextFrame(self,frameSize):
jens@12
   134
                    self._outQueueMessage(msg,isNew=False)
jens@12
   135
                else:
jens@12
   136
                    log.info("Finished sending %s",msg)
jens@11
   137
    
jens@11
   138
    
jens@11
   139
    ### RECEIVING:
jens@11
   140
    
jens@11
   141
    def collect_incoming_data(self, data):
jens@11
   142
        if self.expectingHeader:
jens@11
   143
            if self.inHeader==None:
jens@11
   144
                self.inHeader = data
jens@11
   145
            else:
jens@11
   146
                self.inHeader += data
jens@11
   147
        else:
jens@11
   148
            self.inMessage._receivedData(data)
jens@12
   149
    
jens@11
   150
    def found_terminator(self):
jens@11
   151
        if self.expectingHeader:
jens@11
   152
            # Got a header:
jens@11
   153
            (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
jens@11
   154
            self.inHeader = None
jens@11
   155
            if magic!=kFrameMagicNumber: self._fatal("Incorrect frame magic number %x" %magic)
jens@11
   156
            if frameLen < kFrameHeaderSize: self._fatal("Invalid frame length %u" %frameLen)
jens@11
   157
            frameLen -= kFrameHeaderSize
jens@11
   158
            log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
jens@11
   159
                        (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
jens@11
   160
            self.inMessage = self._inMessageForFrame(requestNo,flags)
jens@11
   161
            
jens@11
   162
            if frameLen > 0:
jens@11
   163
                self.expectingHeader = False
jens@11
   164
                self.set_terminator(frameLen)
jens@11
   165
            else:
jens@11
   166
                self._endOfFrame()
jens@12
   167
        
jens@11
   168
        else:
jens@11
   169
            # Got the frame's payload:
jens@11
   170
            self._endOfFrame()
jens@11
   171
    
jens@11
   172
    def _inMessageForFrame(self, requestNo,flags):
jens@11
   173
        message = None
jens@11
   174
        msgType = flags & kMsgFlag_TypeMask
jens@11
   175
        if msgType==kMsgType_Request:
jens@11
   176
            message = self.pendingRequests.get(requestNo)
jens@11
   177
            if message==None and requestNo == self.inNumRequests+1:
jens@11
   178
                message = IncomingRequest(self,requestNo,flags)
jens@12
   179
                assert message!=None
jens@11
   180
                self.pendingRequests[requestNo] = message
jens@11
   181
                self.inNumRequests += 1
jens@11
   182
        elif msgType==kMsgType_Response or msgType==kMsgType_Error:
jens@11
   183
            message = self.pendingResponses.get(requestNo)
jens@12
   184
        
jens@12
   185
        if message != None:
jens@11
   186
            message._beginFrame(flags)
jens@11
   187
        else:
jens@11
   188
            log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
jens@11
   189
        return message
jens@11
   190
    
jens@11
   191
    def _endOfFrame(self):
jens@11
   192
        msg = self.inMessage
jens@11
   193
        self.inMessage = None
jens@11
   194
        self.expectingHeader = True
jens@11
   195
        self.inHeader = None
jens@11
   196
        self.set_terminator(kFrameHeaderSize) # wait for binary header
jens@11
   197
        if msg:
jens@11
   198
            log.debug("End of frame of %s",msg)
jens@11
   199
            if not msg.moreComing:
jens@11
   200
                self._receivedMessage(msg)
jens@12
   201
    
jens@11
   202
    def _receivedMessage(self, msg):
jens@11
   203
        log.info("Received: %s",msg)
jens@11
   204
        # Remove from pending:
jens@11
   205
        if msg.isResponse:
jens@11
   206
            del self.pendingReplies[msg.requestNo]
jens@11
   207
        else:
jens@11
   208
            del self.pendingRequests[msg.requestNo]
jens@11
   209
        # Decode:
jens@11
   210
        try:
jens@11
   211
            msg._finished()
jens@12
   212
            if not msg.isResponse:
jens@12
   213
                self.onRequest(msg)
jens@11
   214
        except Exception, x:
jens@12
   215
            log.error("Exception handling incoming message: %s", traceback.format_exc())
jens@11
   216
            #FIX: Send an error reply
jens@11
   217
jens@12
   218
jens@11
   219
### MESSAGES:
jens@11
   220
jens@11
   221
jens@11
   222
class Message (object):
jens@12
   223
    "Abstract superclass of all request/response objects"
jens@12
   224
    
jens@11
   225
    def __init__(self, connection, properties=None, body=None):
jens@11
   226
        self.connection = connection
jens@11
   227
        self.properties = properties or {}
jens@11
   228
        self.body = body
jens@11
   229
    
jens@11
   230
    @property
jens@11
   231
    def flags(self):
jens@12
   232
        if self.isResponse:
jens@12
   233
            flags = kMsgType_Response
jens@12
   234
        else:
jens@12
   235
            flags = kMsgType_Request
jens@11
   236
        if self.urgent:     flags |= kMsgFlag_Urgent
jens@11
   237
        if self.compressed: flags |= kMsgFlag_Compressed
jens@11
   238
        if self.noReply:    flags |= kMsgFlag_NoReply
jens@11
   239
        if self.moreComing: flags |= kMsgFlag_MoreComing
jens@11
   240
        return flags
jens@11
   241
    
jens@11
   242
    def __str__(self):
jens@11
   243
        s = "%s[#%i" %(type(self).__name__,self.requestNo)
jens@11
   244
        if self.urgent:     s += " URG"
jens@11
   245
        if self.compressed: s += " CMP"
jens@11
   246
        if self.noReply:    s += " NOR"
jens@11
   247
        if self.moreComing: s += " MOR"
jens@11
   248
        if self.body:       s += " %i bytes" %len(self.body)
jens@11
   249
        return s+"]"
jens@11
   250
    
jens@11
   251
    def __repr__(self):
jens@11
   252
        s = str(self)
jens@11
   253
        if len(self.properties): s += repr(self.properties)
jens@11
   254
        return s
jens@12
   255
    
jens@12
   256
    @property 
jens@11
   257
    def isResponse(self):
jens@12
   258
        "Is this message a response?"
jens@11
   259
        return False
jens@12
   260
    
jens@12
   261
    @property 
jens@12
   262
    def contentType(self):
jens@12
   263
        return self.properties.get('Content-Type')
jens@12
   264
    
jens@12
   265
    def __getitem__(self, key):     return self.properties.get(key)
jens@12
   266
    def __contains__(self, key):    return key in self.properties
jens@12
   267
    def __len__(self):              return len(self.properties)
jens@12
   268
    def __nonzero__(self):          return True
jens@12
   269
    def __iter__(self):             return self.properties.__iter__()
jens@11
   270
jens@11
   271
jens@11
   272
class IncomingMessage (Message):
jens@12
   273
    "Abstract superclass of incoming messages."
jens@12
   274
    
jens@11
   275
    def __init__(self, connection, requestNo, flags):
jens@11
   276
        super(IncomingMessage,self).__init__(connection)
jens@11
   277
        self.requestNo  = requestNo
jens@12
   278
        self.urgent     = (flags & kMsgFlag_Urgent) != 0
jens@11
   279
        self.compressed = (flags & kMsgFlag_Compressed) != 0
jens@11
   280
        self.noReply    = (flags & kMsgFlag_NoReply) != 0
jens@11
   281
        self.moreComing = (flags & kMsgFlag_MoreComing) != 0
jens@11
   282
        self.frames     = []
jens@11
   283
    
jens@11
   284
    def _beginFrame(self, flags):
jens@11
   285
        if (flags & kMsgFlag_MoreComing)==0:
jens@11
   286
            self.moreComing = False
jens@12
   287
    
jens@11
   288
    def _receivedData(self, data):
jens@11
   289
        self.frames.append(data)
jens@11
   290
    
jens@11
   291
    def _finished(self):
jens@11
   292
        encoded = "".join(self.frames)
jens@11
   293
        self.frames = None
jens@11
   294
        
jens@11
   295
        # Decode the properties:
jens@11
   296
        if len(encoded) < 2: raise MessageException, "missing properties length"
jens@11
   297
        propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
jens@11
   298
        if propSize>len(encoded): raise MessageException, "properties too long to fit"
jens@11
   299
        if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
jens@11
   300
        
jens@11
   301
        proplist = encoded[2:propSize-1].split('\000')
jens@11
   302
        encoded = encoded[propSize:]
jens@11
   303
        if len(proplist) & 1: raise MessageException, "odd number of property strings"
jens@11
   304
        for i in xrange(0,len(proplist),2):
jens@11
   305
            def expand(str):
jens@11
   306
                if len(str)==1:
jens@11
   307
                    str = IncomingMessage.__expandDict.get(str,str)
jens@11
   308
                return str
jens@11
   309
            self.properties[ expand(proplist[i])] = expand(proplist[i+1])
jens@11
   310
        
jens@11
   311
        # Decode the body:
jens@11
   312
        if self.compressed and len(encoded)>0:
jens@11
   313
            try:
jens@11
   314
                encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
jens@11
   315
            except zlib.error:
jens@11
   316
                raise MessageException, sys.exc_info()[1]
jens@11
   317
        self.body = encoded
jens@11
   318
    
jens@11
   319
    __expandDict= {'\x01' : "Content-Type",
jens@11
   320
                   '\x02' : "Profile",
jens@11
   321
                   '\x03' : "application/octet-stream",
jens@11
   322
                   '\x04' : "text/plain; charset=UTF-8",
jens@11
   323
                   '\x05' : "text/xml",
jens@11
   324
                   '\x06' : "text/yaml",
jens@11
   325
                   '\x07' : "Channel",
jens@11
   326
                   '\x08' : "Error-Code",
jens@11
   327
                   '\x09' : "Error-Domain"}
jens@12
   328
jens@11
   329
jens@11
   330
jens@11
   331
class OutgoingMessage (Message):
jens@12
   332
    "Abstract superclass of outgoing requests/responses."
jens@12
   333
    
jens@12
   334
    def __init__(self, connection, properties=None, body=None):
jens@12
   335
        Message.__init__(self,connection,properties,body)
jens@12
   336
        self.urgent = self.compressed = self.noReply = False
jens@12
   337
        self.moreComing = True
jens@12
   338
    
jens@12
   339
    def __setitem__(self, key,val):
jens@12
   340
        self.properties[key] = val
jens@12
   341
    def __delitem__(self, key):
jens@12
   342
        del self.properties[key]
jens@11
   343
    
jens@11
   344
    def send(self):
jens@12
   345
        "Sends this message."
jens@12
   346
        log.info("Sending %s",self)
jens@11
   347
        out = StringIO()
jens@12
   348
        for (key,value) in self.properties.iteritems():
jens@12
   349
            def _writePropString(str):
jens@12
   350
                out.write(str)    #FIX: Abbreviate
jens@11
   351
                out.write('\000')
jens@12
   352
            _writePropString(key)
jens@12
   353
            _writePropString(value)
jens@12
   354
        self.encoded = struct.pack('!H',out.tell()) + out.getvalue()
jens@11
   355
        out.close()
jens@11
   356
        
jens@11
   357
        body = self.body
jens@11
   358
        if self.compressed:
jens@11
   359
            body = zlib.compress(body,5)
jens@11
   360
        self.encoded += body
jens@12
   361
        log.debug("Encoded %s into %u bytes", self,len(self.encoded))
jens@11
   362
        
jens@11
   363
        self.bytesSent = 0
jens@11
   364
        self.connection._outQueueMessage(self)
jens@11
   365
    
jens@11
   366
    def _sendNextFrame(self, conn,maxLen):
jens@11
   367
        pos = self.bytesSent
jens@11
   368
        payload = self.encoded[pos:pos+maxLen]
jens@11
   369
        pos += len(payload)
jens@12
   370
        self.moreComing = (pos < len(self.encoded))
jens@12
   371
        log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
jens@12
   372
        
jens@12
   373
        conn.push( struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
jens@12
   374
                                                   self.requestNo,
jens@12
   375
                                                   self.flags,
jens@12
   376
                                                   kFrameHeaderSize+len(payload)) )
jens@11
   377
        conn.push( payload )
jens@11
   378
        
jens@11
   379
        self.bytesSent = pos
jens@12
   380
        return self.moreComing
jens@11
   381
jens@11
   382
jens@12
   383
class Request (object):
jens@12
   384
    @property
jens@12
   385
    def response(self):
jens@12
   386
        "The response object for this request."
jens@12
   387
        r = self.__dict__.get('_response')
jens@12
   388
        if r==None:
jens@12
   389
            r = self._response = self._createResponse()
jens@12
   390
        return r
jens@12
   391
jens@11
   392
jens@11
   393
class Response (Message):
jens@12
   394
    def __init__(self, request):
jens@12
   395
        assert not request.noReply
jens@12
   396
        self.request = request
jens@12
   397
        self.requestNo = request.requestNo
jens@12
   398
        self.urgent = request.urgent
jens@12
   399
    
jens@11
   400
    @property
jens@11
   401
    def isResponse(self):
jens@11
   402
        return True
jens@11
   403
jens@11
   404
jens@11
   405
jens@11
   406
class IncomingRequest (IncomingMessage, Request):
jens@12
   407
    def _createResponse(self):
jens@12
   408
        return OutgoingResponse(self)
jens@11
   409
jens@11
   410
class OutgoingRequest (OutgoingMessage, Request):
jens@12
   411
    def _createResponse(self):
jens@12
   412
        return IncomingResponse(self)
jens@11
   413
jens@11
   414
class IncomingResponse (IncomingMessage, Response):
jens@12
   415
    def __init__(self, request):
jens@12
   416
        IncomingMessage.__init__(self,request.connection,request.requestNo,0)
jens@12
   417
        Response.__init__(self,request)
jens@12
   418
        self.onComplete = None
jens@12
   419
    
jens@12
   420
    def _finished(self):
jens@12
   421
        super(IncomingResponse,self)._finished()
jens@12
   422
        if self.onComplete:
jens@12
   423
            try:
jens@12
   424
                self.onComplete(self)
jens@12
   425
            except Exception, x:
jens@12
   426
                log.error("Exception dispatching response: %s", traceback.format_exc())
jens@12
   427
            
jens@11
   428
class OutgoingResponse (OutgoingMessage, Response):
jens@12
   429
    def __init__(self, request):
jens@12
   430
        OutgoingMessage.__init__(self,request.connection)
jens@12
   431
        Response.__init__(self,request)
jens@11
   432
jens@11
   433
jens@11
   434
### UNIT TESTS:
jens@11
   435
jens@11
   436
jens@11
   437
class BLIPTests(unittest.TestCase):
jens@11
   438
    def setUp(self):
jens@12
   439
        def handleRequest(request):
jens@12
   440
            logging.info("Got request!: %r",request)
jens@12
   441
            body = request.body
jens@12
   442
            assert len(body)<32768
jens@12
   443
            assert request.contentType == 'application/octet-stream'
jens@12
   444
            assert int(request['Size']) == len(body)
jens@12
   445
            assert request['User-Agent'] == 'BLIPConnectionTester'
jens@12
   446
            for i in xrange(0,len(request.body)):
jens@12
   447
                assert ord(body[i]) == i%256
jens@12
   448
            
jens@12
   449
            response = request.response
jens@12
   450
            response.body = request.body
jens@12
   451
            response['Content-Type'] = request.contentType
jens@12
   452
            response.send()
jens@12
   453
        
jens@11
   454
        listener = Listener(46353)
jens@12
   455
        listener.onRequest = handleRequest
jens@11
   456
    
jens@11
   457
    def testListener(self):
jens@11
   458
        logging.info("Waiting...")
jens@11
   459
        asyncore.loop()
jens@11
   460
jens@11
   461
if __name__ == '__main__':
jens@11
   462
    logging.basicConfig(level=logging.INFO)
jens@11
   463
    unittest.main()