Python/BLIP.py
author morrowa
Thu Jul 02 19:58:11 2009 -0700 (2009-07-02)
changeset 56 6c3b5372a307
parent 54 6d1392a3e0a6
child 58 6577813acf12
permissions -rw-r--r--
Removed unnecessary files. Toned down logging. Added null logging handler to BLIP so client code doesn't have to use logging. Modified test drivers to work against Cocoa versions.
jens@11
     1
# encoding: utf-8
jens@11
     2
"""
jens@11
     3
BLIP.py
jens@11
     4
jens@11
     5
Created by Jens Alfke on 2008-06-03.
jens@13
     6
Copyright notice and BSD license at end of file.
jens@11
     7
"""
jens@11
     8
jens@11
     9
import asynchat
jens@11
    10
import asyncore
jens@11
    11
from cStringIO import StringIO
jens@11
    12
import logging
jens@11
    13
import socket
jens@11
    14
import struct
jens@11
    15
import sys
jens@11
    16
import traceback
jens@11
    17
import zlib
jens@11
    18
jens@11
    19
jens@13
    20
# Connection status enumeration:
jens@13
    21
kDisconnected = -1
jens@13
    22
kClosed  = 0
jens@13
    23
kOpening = 1
jens@13
    24
kOpen    = 2
jens@13
    25
kClosing = 3
jens@13
    26
jens@13
    27
jens@12
    28
# INTERNAL CONSTANTS -- NO TOUCHIES!
jens@12
    29
morrowa@51
    30
kFrameMagicNumber   = 0x9B34F206
jens@11
    31
kFrameHeaderFormat  = '!LLHH'
jens@11
    32
kFrameHeaderSize    = 12
jens@11
    33
jens@11
    34
kMsgFlag_TypeMask   = 0x000F
jens@11
    35
kMsgFlag_Compressed = 0x0010
jens@11
    36
kMsgFlag_Urgent     = 0x0020
jens@11
    37
kMsgFlag_NoReply    = 0x0040
jens@11
    38
kMsgFlag_MoreComing = 0x0080
morrowa@51
    39
kMsgFlag_Meta       = 0x0100
jens@11
    40
jens@11
    41
kMsgType_Request    = 0
jens@11
    42
kMsgType_Response   = 1
jens@11
    43
kMsgType_Error      = 2
jens@11
    44
morrowa@51
    45
kMsgProfile_Hi      = "Hi"
morrowa@51
    46
kMsgProfile_Bye     = "Bye"
morrowa@51
    47
morrowa@56
    48
# Logging Setup
morrowa@56
    49
class NullLoggingHandler(logging.Handler):
morrowa@56
    50
    def emit(self, record):
morrowa@56
    51
        pass
jens@11
    52
jens@11
    53
log = logging.getLogger('BLIP')
morrowa@56
    54
# This line prevents the "No handlers found" warning if the calling code does not use logging.
morrowa@56
    55
log.addHandler(NullLoggingHandler())
jens@11
    56
log.propagate = True
jens@11
    57
jens@12
    58
jens@11
    59
class MessageException(Exception):
jens@11
    60
    pass
jens@11
    61
jens@11
    62
class ConnectionException(Exception):
jens@11
    63
    pass
jens@11
    64
jens@11
    65
jens@13
    66
### LISTENER AND CONNECTION CLASSES:
jens@13
    67
jens@13
    68
jens@11
    69
class Listener (asyncore.dispatcher):
jens@12
    70
    "BLIP listener/server class"
jens@12
    71
    
jens@13
    72
    def __init__(self, port, sslKeyFile=None, sslCertFile=None):
jens@12
    73
        "Create a listener on a port"
jens@11
    74
        asyncore.dispatcher.__init__(self)
jens@12
    75
        self.onConnected = self.onRequest = None
jens@11
    76
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
jens@11
    77
        self.bind( ('',port) )
jens@11
    78
        self.listen(5)
jens@13
    79
        self.sslKeyFile=sslKeyFile
jens@13
    80
        self.sslCertFile=sslCertFile
jens@11
    81
        log.info("Listening on port %u", port)
jens@11
    82
    
jens@11
    83
    def handle_accept( self ):
jens@13
    84
        socket,address = self.accept()
jens@13
    85
        if self.sslKeyFile:
jens@13
    86
            socket.ssl(socket,self.sslKeyFile,self.sslCertFile)
jens@13
    87
        conn = Connection(address, sock=socket, listener=self)
jens@11
    88
        conn.onRequest = self.onRequest
jens@11
    89
        if self.onConnected:
jens@11
    90
            self.onConnected(conn)
jens@11
    91
jens@13
    92
    def handle_error(self):
jens@13
    93
        (typ,val,trace) = sys.exc_info()
jens@13
    94
        log.error("Listener caught: %s %s\n%s", typ,val,traceback.format_exc())
jens@13
    95
        self.close()
jens@13
    96
    
jens@13
    97
jens@11
    98
jens@11
    99
class Connection (asynchat.async_chat):
jens@13
   100
    def __init__( self, address, sock=None, listener=None, ssl=None ):
jens@12
   101
        "Opens a connection with the given address. If a connection/socket object is provided it'll use that,"
jens@12
   102
        "otherwise it'll open a new outgoing socket."
jens@13
   103
        if sock:
jens@13
   104
            asynchat.async_chat.__init__(self,sock)
jens@11
   105
            log.info("Accepted connection from %s",address)
jens@13
   106
            self.status = kOpen
jens@11
   107
        else:
jens@13
   108
            asynchat.async_chat.__init__(self)
jens@11
   109
            log.info("Opening connection to %s",address)
jens@11
   110
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
jens@13
   111
            self.status = kOpening
jens@13
   112
            if ssl:
jens@13
   113
                ssl(self.socket)
jens@11
   114
            self.connect(address)
jens@13
   115
        self.address = address
jens@13
   116
        self.listener = listener
morrowa@51
   117
        self.onRequest = self.onCloseRequest = self.onCloseRefused = None
jens@11
   118
        self.pendingRequests = {}
jens@11
   119
        self.pendingResponses = {}
jens@11
   120
        self.outBox = []
jens@11
   121
        self.inMessage = None
jens@13
   122
        self.inNumRequests = self.outNumRequests = 0
jens@14
   123
        self.sending = False
jens@11
   124
        self._endOfFrame()
morrowa@51
   125
        self._closeWhenPossible = False
jens@11
   126
    
jens@13
   127
    def handle_connect(self):
jens@13
   128
        log.info("Connection open!")
jens@13
   129
        self.status = kOpen
jens@13
   130
    
jens@13
   131
    def handle_error(self):
jens@13
   132
        (typ,val,trace) = sys.exc_info()
jens@13
   133
        log.error("Connection caught: %s %s\n%s", typ,val,traceback.format_exc())
jens@13
   134
        self.discard_buffers()
jens@13
   135
        self.status = kDisconnected
jens@11
   136
        self.close()
jens@11
   137
    
jens@11
   138
    
jens@11
   139
    ### SENDING:
jens@11
   140
    
jens@13
   141
    @property
morrowa@51
   142
    def isOpen(self):
jens@13
   143
        return self.status==kOpening or self.status==kOpen
jens@13
   144
    
morrowa@51
   145
    @property
morrowa@51
   146
    def canSend(self):
morrowa@51
   147
        return self.isOpen and not self._closeWhenPossible
morrowa@51
   148
    
jens@13
   149
    def _sendMessage(self, msg):
morrowa@51
   150
        if self.isOpen:
jens@13
   151
            self._outQueueMessage(msg,True)
jens@14
   152
            if not self.sending:
jens@14
   153
                log.debug("Waking up the output stream")
jens@14
   154
                self.sending = True
jens@14
   155
                self.push_with_producer(self)
jens@13
   156
            return True
jens@13
   157
        else:
jens@13
   158
            return False
jens@13
   159
    
jens@13
   160
    def _sendRequest(self, req):
jens@13
   161
        if self.canSend:
jens@13
   162
            requestNo = req.requestNo = self.outNumRequests = self.outNumRequests + 1
jens@13
   163
            response = req.response
jens@13
   164
            if response:
jens@13
   165
                response.requestNo = requestNo
jens@13
   166
                self.pendingResponses[requestNo] = response
jens@13
   167
                log.debug("pendingResponses[%i] := %s",requestNo,response)
jens@13
   168
            return self._sendMessage(req)
jens@13
   169
        else:
morrowa@54
   170
            log.warning("%s: Attempt to send a request after the connection has started closing: %s" % (self, req))
jens@13
   171
            return False
jens@13
   172
    
jens@11
   173
    def _outQueueMessage(self, msg,isNew=True):
jens@12
   174
        n = len(self.outBox)
jens@11
   175
        index = n
jens@11
   176
        if msg.urgent and n>1:
jens@11
   177
            while index > 0:
jens@12
   178
                otherMsg = self.outBox[index-1]
jens@11
   179
                if otherMsg.urgent:
jens@11
   180
                    if index<n:
jens@11
   181
                        index += 1
jens@11
   182
                    break
jens@14
   183
                elif isNew and otherMsg.bytesSent==0:
jens@11
   184
                    break
jens@11
   185
                index -= 1
jens@11
   186
            else:
jens@11
   187
                index = 1
jens@12
   188
        
jens@11
   189
        self.outBox.insert(index,msg)
jens@11
   190
        if isNew:
jens@13
   191
            log.info("Queuing %s at index %i",msg,index)
jens@12
   192
        else:
jens@12
   193
            log.debug("Re-queueing outgoing message at index %i of %i",index,len(self.outBox))
jens@11
   194
    
jens@14
   195
    def more(self):
jens@14
   196
        n = len(self.outBox)
jens@14
   197
        if n > 0:
jens@14
   198
            msg = self.outBox.pop(0)
jens@14
   199
            frameSize = 4096
jens@14
   200
            if msg.urgent or n==1 or not self.outBox[0].urgent:
jens@14
   201
                frameSize *= 4
jens@14
   202
            data = msg._sendNextFrame(frameSize)
jens@14
   203
            if msg._moreComing:
jens@14
   204
                self._outQueueMessage(msg,isNew=False)
jens@14
   205
            else:
jens@14
   206
                log.info("Finished sending %s",msg)
jens@14
   207
            return data
jens@14
   208
        else:
jens@14
   209
            log.debug("Nothing more to send")
jens@14
   210
            self.sending = False
morrowa@51
   211
            self._closeIfReady()
jens@14
   212
            return None
jens@11
   213
    
jens@11
   214
    ### RECEIVING:
jens@11
   215
    
jens@11
   216
    def collect_incoming_data(self, data):
jens@11
   217
        if self.expectingHeader:
jens@11
   218
            if self.inHeader==None:
jens@11
   219
                self.inHeader = data
jens@11
   220
            else:
jens@11
   221
                self.inHeader += data
jens@13
   222
        elif self.inMessage:
jens@11
   223
            self.inMessage._receivedData(data)
jens@12
   224
    
jens@11
   225
    def found_terminator(self):
jens@11
   226
        if self.expectingHeader:
jens@11
   227
            # Got a header:
jens@11
   228
            (magic, requestNo, flags, frameLen) = struct.unpack(kFrameHeaderFormat,self.inHeader)
jens@11
   229
            self.inHeader = None
jens@13
   230
            if magic!=kFrameMagicNumber: raise ConnectionException, "Incorrect frame magic number %x" %magic
jens@13
   231
            if frameLen < kFrameHeaderSize: raise ConnectionException,"Invalid frame length %u" %frameLen
jens@11
   232
            frameLen -= kFrameHeaderSize
jens@11
   233
            log.debug("Incoming frame: type=%i, number=%i, flags=%x, length=%i",
jens@11
   234
                        (flags&kMsgFlag_TypeMask),requestNo,flags,frameLen)
jens@11
   235
            self.inMessage = self._inMessageForFrame(requestNo,flags)
jens@11
   236
            
jens@11
   237
            if frameLen > 0:
jens@11
   238
                self.expectingHeader = False
jens@11
   239
                self.set_terminator(frameLen)
jens@11
   240
            else:
jens@11
   241
                self._endOfFrame()
jens@12
   242
        
jens@11
   243
        else:
jens@11
   244
            # Got the frame's payload:
jens@11
   245
            self._endOfFrame()
jens@11
   246
    
jens@11
   247
    def _inMessageForFrame(self, requestNo,flags):
jens@11
   248
        message = None
jens@11
   249
        msgType = flags & kMsgFlag_TypeMask
jens@11
   250
        if msgType==kMsgType_Request:
jens@11
   251
            message = self.pendingRequests.get(requestNo)
jens@11
   252
            if message==None and requestNo == self.inNumRequests+1:
jens@11
   253
                message = IncomingRequest(self,requestNo,flags)
jens@12
   254
                assert message!=None
jens@11
   255
                self.pendingRequests[requestNo] = message
jens@11
   256
                self.inNumRequests += 1
jens@11
   257
        elif msgType==kMsgType_Response or msgType==kMsgType_Error:
jens@11
   258
            message = self.pendingResponses.get(requestNo)
morrowa@51
   259
            message._updateFlags(flags)
jens@12
   260
        
jens@12
   261
        if message != None:
jens@11
   262
            message._beginFrame(flags)
jens@11
   263
        else:
jens@11
   264
            log.warning("Ignoring unexpected frame with type %u, request #%u", msgType,requestNo)
jens@11
   265
        return message
jens@11
   266
    
jens@11
   267
    def _endOfFrame(self):
jens@11
   268
        msg = self.inMessage
jens@11
   269
        self.inMessage = None
jens@11
   270
        self.expectingHeader = True
jens@11
   271
        self.inHeader = None
jens@11
   272
        self.set_terminator(kFrameHeaderSize) # wait for binary header
jens@11
   273
        if msg:
jens@11
   274
            log.debug("End of frame of %s",msg)
jens@13
   275
            if not msg._moreComing:
jens@11
   276
                self._receivedMessage(msg)
jens@12
   277
    
jens@11
   278
    def _receivedMessage(self, msg):
jens@11
   279
        log.info("Received: %s",msg)
jens@11
   280
        # Remove from pending:
jens@11
   281
        if msg.isResponse:
jens@13
   282
            del self.pendingResponses[msg.requestNo]
jens@11
   283
        else:
jens@11
   284
            del self.pendingRequests[msg.requestNo]
jens@11
   285
        # Decode:
jens@11
   286
        try:
jens@11
   287
            msg._finished()
jens@12
   288
            if not msg.isResponse:
morrowa@51
   289
                if msg._meta:
morrowa@51
   290
                    self._dispatchMetaRequest(msg)
morrowa@51
   291
                else:
morrowa@51
   292
                    self.onRequest(msg)
jens@11
   293
        except Exception, x:
jens@12
   294
            log.error("Exception handling incoming message: %s", traceback.format_exc())
jens@11
   295
            #FIX: Send an error reply
morrowa@51
   296
        # Check to see if we're done and ready to close:
morrowa@51
   297
        self._closeIfReady()
morrowa@51
   298
    
morrowa@51
   299
    def _dispatchMetaRequest(self, request):
morrowa@51
   300
        """Handles dispatching internal meta requests."""
morrowa@51
   301
        if request['Profile'] == kMsgProfile_Bye:
morrowa@54
   302
            self._handleCloseRequest(request)
morrowa@51
   303
        else:
morrowa@51
   304
            response = request.response
morrowa@51
   305
            response.isError = True
morrowa@51
   306
            response['Error-Domain'] = "BLIP"
morrowa@51
   307
            response['Error-Code'] = 404
morrowa@51
   308
            response.body = "Unknown meta profile"
morrowa@51
   309
            response.send()
morrowa@51
   310
    
morrowa@51
   311
    ### CLOSING:
morrowa@51
   312
    
morrowa@54
   313
    def _handleCloseRequest(self, request):
morrowa@54
   314
        """Handles requests from a peer to close."""
morrowa@54
   315
        shouldClose = True
morrowa@54
   316
        if self.onCloseRequest:
morrowa@54
   317
            shouldClose = self.onCloseRequest()
morrowa@54
   318
        if not shouldClose:
morrowa@54
   319
            log.debug("Sending resfusal to close...")
morrowa@54
   320
            response = request.response
morrowa@54
   321
            response.isError = True
morrowa@54
   322
            response['Error-Domain'] = "BLIP"
morrowa@54
   323
            response['Error-Code'] = 403
morrowa@54
   324
            response.body = "Close request denied"
morrowa@54
   325
            response.send()
morrowa@54
   326
        else:
morrowa@54
   327
            log.debug("Sending permission to close...")
morrowa@54
   328
            response = request.response
morrowa@54
   329
            response.send()
morrowa@54
   330
    
morrowa@51
   331
    def close(self):
morrowa@51
   332
        """Publicly callable close method. Sends close request to peer."""
morrowa@51
   333
        if self.status != kOpen:
morrowa@51
   334
            return False
morrowa@51
   335
        log.info("Sending close request...")
morrowa@51
   336
        req = OutgoingRequest(self, None, {'Profile': kMsgProfile_Bye})
morrowa@51
   337
        req._meta = True
morrowa@51
   338
        req.response.onComplete = self._handleCloseResponse
morrowa@51
   339
        if not req.send():
morrowa@51
   340
            log.error("Error sending close request.")
morrowa@51
   341
            return False
morrowa@51
   342
        else:
morrowa@51
   343
            self.status = kClosing
morrowa@51
   344
        return True
morrowa@51
   345
    
morrowa@51
   346
    def _handleCloseResponse(self, response):
morrowa@51
   347
        """Called when we receive a response to a close request."""
morrowa@51
   348
        log.info("Received close response.")
morrowa@51
   349
        if response.isError:
morrowa@51
   350
            # remote refused to close
morrowa@51
   351
            if self.onCloseRefused:
morrowa@51
   352
                self.onCloseRefused(response)
morrowa@51
   353
            self.status = kOpen
morrowa@51
   354
        else:
morrowa@51
   355
            # now wait until everything has finished sending, then actually close
morrowa@51
   356
            log.info("No refusal, actually closing...")
morrowa@51
   357
            self._closeWhenPossible = True
morrowa@51
   358
    
morrowa@51
   359
    def _closeIfReady(self):
morrowa@51
   360
        """Checks if all transmissions are complete and then closes the actual socket."""
morrowa@51
   361
        if self._closeWhenPossible and len(self.outBox) == 0 and len(self.pendingRequests) == 0 and len(self.pendingResponses) == 0:
morrowa@51
   362
            # self._closeWhenPossible = False
morrowa@51
   363
            log.debug("_closeIfReady closing.")
morrowa@51
   364
            asynchat.async_chat.close(self)
morrowa@51
   365
    
morrowa@51
   366
    def handle_close(self):
morrowa@51
   367
        """Called when the socket actually closes."""
morrowa@51
   368
        log.info("Connection closed!")
morrowa@51
   369
        self.pendingRequests = self.pendingResponses = None
morrowa@51
   370
        self.outBox = None
morrowa@51
   371
        if self.status == kClosing:
morrowa@51
   372
            self.status = kClosed
morrowa@51
   373
        else:
morrowa@51
   374
            self.status = kDisconnected
morrowa@53
   375
        asyncore.dispatcher.close(self)
jens@11
   376
jens@12
   377
jens@13
   378
### MESSAGE CLASSES:
jens@11
   379
jens@11
   380
jens@11
   381
class Message (object):
jens@12
   382
    "Abstract superclass of all request/response objects"
jens@12
   383
    
jens@13
   384
    def __init__(self, connection, body=None, properties=None):
jens@11
   385
        self.connection = connection
jens@13
   386
        self.body = body
jens@11
   387
        self.properties = properties or {}
jens@13
   388
        self.requestNo = None
jens@11
   389
    
jens@11
   390
    @property
jens@11
   391
    def flags(self):
jens@12
   392
        if self.isResponse:
morrowa@51
   393
            if self.isError:
morrowa@51
   394
                flags = kMsgType_Error
morrowa@51
   395
            else:
morrowa@51
   396
                flags = kMsgType_Response
jens@12
   397
        else:
jens@12
   398
            flags = kMsgType_Request
jens@11
   399
        if self.urgent:     flags |= kMsgFlag_Urgent
jens@11
   400
        if self.compressed: flags |= kMsgFlag_Compressed
jens@11
   401
        if self.noReply:    flags |= kMsgFlag_NoReply
jens@13
   402
        if self._moreComing:flags |= kMsgFlag_MoreComing
morrowa@51
   403
        if self._meta:      flags |= kMsgFlag_Meta
jens@11
   404
        return flags
jens@11
   405
    
jens@11
   406
    def __str__(self):
jens@13
   407
        s = "%s[" %(type(self).__name__)
jens@13
   408
        if self.requestNo != None:
jens@13
   409
            s += "#%i" %self.requestNo
jens@11
   410
        if self.urgent:     s += " URG"
jens@11
   411
        if self.compressed: s += " CMP"
jens@11
   412
        if self.noReply:    s += " NOR"
jens@13
   413
        if self._moreComing:s += " MOR"
morrowa@51
   414
        if self._meta:      s += " MET"
jens@11
   415
        if self.body:       s += " %i bytes" %len(self.body)
jens@11
   416
        return s+"]"
jens@11
   417
    
jens@11
   418
    def __repr__(self):
jens@11
   419
        s = str(self)
jens@11
   420
        if len(self.properties): s += repr(self.properties)
jens@11
   421
        return s
jens@12
   422
    
jens@13
   423
    @property
jens@11
   424
    def isResponse(self):
jens@12
   425
        "Is this message a response?"
jens@11
   426
        return False
jens@12
   427
    
jens@13
   428
    @property
jens@12
   429
    def contentType(self):
jens@12
   430
        return self.properties.get('Content-Type')
jens@12
   431
    
jens@12
   432
    def __getitem__(self, key):     return self.properties.get(key)
jens@12
   433
    def __contains__(self, key):    return key in self.properties
jens@12
   434
    def __len__(self):              return len(self.properties)
jens@12
   435
    def __nonzero__(self):          return True
jens@12
   436
    def __iter__(self):             return self.properties.__iter__()
jens@11
   437
jens@11
   438
jens@11
   439
class IncomingMessage (Message):
jens@12
   440
    "Abstract superclass of incoming messages."
jens@12
   441
    
jens@11
   442
    def __init__(self, connection, requestNo, flags):
jens@11
   443
        super(IncomingMessage,self).__init__(connection)
jens@11
   444
        self.requestNo  = requestNo
morrowa@51
   445
        self._updateFlags(flags)
morrowa@51
   446
        self.frames     = []
morrowa@51
   447
    
morrowa@51
   448
    def _updateFlags(self, flags):
jens@12
   449
        self.urgent     = (flags & kMsgFlag_Urgent) != 0
jens@11
   450
        self.compressed = (flags & kMsgFlag_Compressed) != 0
jens@11
   451
        self.noReply    = (flags & kMsgFlag_NoReply) != 0
jens@13
   452
        self._moreComing= (flags & kMsgFlag_MoreComing) != 0
morrowa@51
   453
        self._meta      = (flags & kMsgFlag_Meta) != 0
morrowa@51
   454
        self.isError    = (flags & kMsgType_Error) != 0
jens@11
   455
    
jens@11
   456
    def _beginFrame(self, flags):
jens@13
   457
        """Received a frame header."""
jens@13
   458
        self._moreComing = (flags & kMsgFlag_MoreComing)!=0
jens@12
   459
    
jens@11
   460
    def _receivedData(self, data):
jens@13
   461
        """Received data from a frame."""
jens@11
   462
        self.frames.append(data)
jens@11
   463
    
jens@11
   464
    def _finished(self):
jens@13
   465
        """The entire message has been received; now decode it."""
jens@11
   466
        encoded = "".join(self.frames)
jens@11
   467
        self.frames = None
jens@11
   468
        
jens@11
   469
        # Decode the properties:
jens@11
   470
        if len(encoded) < 2: raise MessageException, "missing properties length"
jens@11
   471
        propSize = 2 + struct.unpack('!H',encoded[0:2])[0]
jens@11
   472
        if propSize>len(encoded): raise MessageException, "properties too long to fit"
jens@11
   473
        if propSize>2 and encoded[propSize-1] != '\000': raise MessageException, "properties are not nul-terminated"
jens@11
   474
        
morrowa@51
   475
        if propSize > 2:
morrowa@51
   476
            proplist = encoded[2:propSize-1].split('\000')
morrowa@51
   477
        
morrowa@51
   478
            if len(proplist) & 1: raise MessageException, "odd number of property strings"
morrowa@51
   479
            for i in xrange(0,len(proplist),2):
morrowa@51
   480
                def expand(str):
morrowa@51
   481
                    if len(str)==1:
morrowa@51
   482
                        str = IncomingMessage.__expandDict.get(str,str)
morrowa@51
   483
                    return str
morrowa@51
   484
                self.properties[ expand(proplist[i])] = expand(proplist[i+1])
morrowa@51
   485
        
jens@11
   486
        encoded = encoded[propSize:]
jens@11
   487
        # Decode the body:
jens@11
   488
        if self.compressed and len(encoded)>0:
jens@11
   489
            try:
jens@11
   490
                encoded = zlib.decompress(encoded,31)   # window size of 31 needed for gzip format
jens@11
   491
            except zlib.error:
jens@11
   492
                raise MessageException, sys.exc_info()[1]
jens@11
   493
        self.body = encoded
jens@11
   494
    
jens@11
   495
    __expandDict= {'\x01' : "Content-Type",
jens@11
   496
                   '\x02' : "Profile",
jens@11
   497
                   '\x03' : "application/octet-stream",
jens@11
   498
                   '\x04' : "text/plain; charset=UTF-8",
jens@11
   499
                   '\x05' : "text/xml",
jens@11
   500
                   '\x06' : "text/yaml",
jens@11
   501
                   '\x07' : "Channel",
jens@11
   502
                   '\x08' : "Error-Code",
jens@11
   503
                   '\x09' : "Error-Domain"}
jens@12
   504
jens@11
   505
jens@11
   506
class OutgoingMessage (Message):
jens@12
   507
    "Abstract superclass of outgoing requests/responses."
jens@12
   508
    
jens@13
   509
    def __init__(self, connection, body=None, properties=None):
jens@13
   510
        Message.__init__(self,connection,body,properties)
morrowa@51
   511
        self.urgent = self.compressed = self.noReply = self._meta = self.isError = False
jens@13
   512
        self._moreComing = True
jens@12
   513
    
jens@12
   514
    def __setitem__(self, key,val):
jens@12
   515
        self.properties[key] = val
jens@12
   516
    def __delitem__(self, key):
jens@12
   517
        del self.properties[key]
jens@11
   518
    
jens@13
   519
    @property
jens@13
   520
    def sent(self):
jens@16
   521
        return hasattr(self,'encoded')
jens@13
   522
    
jens@13
   523
    def _encode(self):
jens@13
   524
        "Generates the message's encoded form, prior to sending it."
jens@11
   525
        out = StringIO()
jens@12
   526
        for (key,value) in self.properties.iteritems():
jens@13
   527
            def _writePropString(s):
jens@13
   528
                out.write(str(s))    #FIX: Abbreviate
jens@11
   529
                out.write('\000')
jens@12
   530
            _writePropString(key)
jens@12
   531
            _writePropString(value)
jens@13
   532
        propertiesSize = out.tell()
jens@13
   533
        assert propertiesSize<65536     #FIX: Return an error instead
jens@11
   534
        
morrowa@51
   535
        body = self.body or ""
jens@11
   536
        if self.compressed:
jens@13
   537
            z = zlib.compressobj(6,zlib.DEFLATED,31)   # window size of 31 needed for gzip format
jens@13
   538
            out.write(z.compress(body))
jens@13
   539
            body = z.flush()
jens@13
   540
        out.write(body)
jens@13
   541
        
jens@13
   542
        self.encoded = struct.pack('!H',propertiesSize) + out.getvalue()
jens@13
   543
        out.close()
jens@12
   544
        log.debug("Encoded %s into %u bytes", self,len(self.encoded))
jens@11
   545
        self.bytesSent = 0
jens@11
   546
    
jens@14
   547
    def _sendNextFrame(self, maxLen):
jens@11
   548
        pos = self.bytesSent
jens@11
   549
        payload = self.encoded[pos:pos+maxLen]
jens@11
   550
        pos += len(payload)
jens@13
   551
        self._moreComing = (pos < len(self.encoded))
jens@13
   552
        if not self._moreComing:
jens@13
   553
            self.encoded = None
jens@12
   554
        log.debug("Sending frame of %s; bytes %i--%i", self,pos-len(payload),pos)
jens@12
   555
        
jens@14
   556
        header = struct.pack(kFrameHeaderFormat, kFrameMagicNumber,
jens@12
   557
                                                   self.requestNo,
jens@12
   558
                                                   self.flags,
jens@14
   559
                                                   kFrameHeaderSize+len(payload))
jens@11
   560
        self.bytesSent = pos
jens@14
   561
        return header + payload
jens@11
   562
jens@11
   563
jens@12
   564
class Request (object):
jens@12
   565
    @property
jens@12
   566
    def response(self):
jens@12
   567
        "The response object for this request."
jens@13
   568
        if self.noReply:
jens@13
   569
            return None
jens@12
   570
        r = self.__dict__.get('_response')
jens@12
   571
        if r==None:
jens@12
   572
            r = self._response = self._createResponse()
jens@12
   573
        return r
jens@12
   574
jens@11
   575
jens@11
   576
class Response (Message):
jens@13
   577
    def _setRequest(self, request):
jens@12
   578
        assert not request.noReply
jens@12
   579
        self.request = request
jens@12
   580
        self.requestNo = request.requestNo
jens@12
   581
        self.urgent = request.urgent
jens@12
   582
    
jens@11
   583
    @property
jens@11
   584
    def isResponse(self):
jens@11
   585
        return True
jens@11
   586
jens@11
   587
jens@11
   588
class IncomingRequest (IncomingMessage, Request):
jens@12
   589
    def _createResponse(self):
jens@12
   590
        return OutgoingResponse(self)
jens@11
   591
jens@13
   592
jens@11
   593
class OutgoingRequest (OutgoingMessage, Request):
jens@12
   594
    def _createResponse(self):
jens@12
   595
        return IncomingResponse(self)
jens@13
   596
    
jens@13
   597
    def send(self):
jens@13
   598
        self._encode()
jens@13
   599
        return self.connection._sendRequest(self) and self.response
jens@13
   600
jens@11
   601
jens@11
   602
class IncomingResponse (IncomingMessage, Response):
jens@12
   603
    def __init__(self, request):
jens@13
   604
        IncomingMessage.__init__(self,request.connection,None,0)
jens@13
   605
        self._setRequest(request)
jens@12
   606
        self.onComplete = None
jens@12
   607
    
jens@12
   608
    def _finished(self):
jens@12
   609
        super(IncomingResponse,self)._finished()
jens@12
   610
        if self.onComplete:
jens@12
   611
            try:
jens@12
   612
                self.onComplete(self)
jens@12
   613
            except Exception, x:
jens@12
   614
                log.error("Exception dispatching response: %s", traceback.format_exc())
jens@13
   615
jens@13
   616
jens@11
   617
class OutgoingResponse (OutgoingMessage, Response):
jens@12
   618
    def __init__(self, request):
jens@12
   619
        OutgoingMessage.__init__(self,request.connection)
jens@13
   620
        self._setRequest(request)
jens@13
   621
    
jens@13
   622
    def send(self):
jens@13
   623
        self._encode()
jens@13
   624
        return self.connection._sendMessage(self)
jens@11
   625
jens@11
   626
jens@13
   627
"""
jens@13
   628
 Copyright (c) 2008, Jens Alfke <jens@mooseyard.com>. All rights reserved.
jens@13
   629
 
jens@13
   630
 Redistribution and use in source and binary forms, with or without modification, are permitted
jens@13
   631
 provided that the following conditions are met:
jens@13
   632
 
jens@13
   633
 * Redistributions of source code must retain the above copyright notice, this list of conditions
jens@13
   634
 and the following disclaimer.
jens@13
   635
 * Redistributions in binary form must reproduce the above copyright notice, this list of conditions
jens@13
   636
 and the following disclaimer in the documentation and/or other materials provided with the
jens@13
   637
 distribution.
jens@13
   638
 
jens@13
   639
 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
jens@13
   640
 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
jens@13
   641
 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI-
jens@13
   642
 BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
jens@13
   643
 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
jens@13
   644
  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
jens@13
   645
 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 
jens@13
   646
 THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
jens@13
   647
"""