jens@0: // jens@0: // BLIPReader.m jens@0: // MYNetwork jens@0: // jens@0: // Created by Jens Alfke on 5/10/08. jens@0: // Copyright 2008 Jens Alfke. All rights reserved. jens@0: // jens@0: jens@0: #import "BLIPReader.h" jens@0: #import "BLIP_Internal.h" jens@0: #import "BLIPWriter.h" jens@0: #import "BLIPDispatcher.h" jens@0: jens@0: jens@0: @interface BLIPReader () jens@0: - (BOOL) _receivedFrameWithHeader: (const BLIPFrameHeader*)header body: (NSData*)body; jens@0: @end jens@0: jens@0: jens@0: @implementation BLIPReader jens@0: jens@0: jens@0: #define _blipConn ((BLIPConnection*)_conn) jens@0: jens@0: jens@0: - (id) initWithConnection: (BLIPConnection*)conn stream: (NSStream*)stream jens@0: { jens@0: self = [super initWithConnection: conn stream: stream]; jens@0: if (self != nil) { jens@0: _pendingQueries = [[NSMutableDictionary alloc] init]; jens@0: _pendingReplies = [[NSMutableDictionary alloc] init]; jens@0: } jens@0: return self; jens@0: } jens@0: jens@0: - (void) dealloc jens@0: { jens@0: [_pendingQueries release]; jens@0: [_pendingReplies release]; jens@0: [_curBody release]; jens@0: [super dealloc]; jens@0: } jens@0: jens@0: - (void) disconnect jens@0: { jens@0: for( BLIPResponse *response in [_pendingReplies allValues] ) { jens@0: [response _connectionClosed]; jens@0: [_conn tellDelegate: @selector(connection:receivedResponse:) withObject: response]; jens@0: } jens@0: setObj(&_pendingReplies,nil); jens@0: [super disconnect]; jens@0: } jens@0: jens@0: jens@0: #pragma mark - jens@0: #pragma mark READING FRAMES: jens@0: jens@0: jens@0: - (NSString*) _validateHeader jens@0: { jens@0: // Convert header to native byte order: jens@0: _curHeader.magic = EndianU32_BtoN(_curHeader.magic); jens@0: _curHeader.number= EndianU32_BtoN(_curHeader.number); jens@0: _curHeader.flags = EndianU16_BtoN(_curHeader.flags); jens@0: _curHeader.size = EndianU16_BtoN(_curHeader.size); jens@0: jens@0: if( _curHeader.magic != kBLIPFrameHeaderMagicNumber ) jens@0: return $sprintf(@"Incorrect magic number (%08X not %08X)", jens@0: _curHeader.magic,kBLIPFrameHeaderMagicNumber); jens@0: size_t bodyLength = _curHeader.size; jens@0: if( bodyLength < sizeof(BLIPFrameHeader) ) jens@0: return @"Length is impossibly short"; jens@0: bodyLength -= sizeof(BLIPFrameHeader); jens@0: _curBody = [[NSMutableData alloc] initWithLength: bodyLength]; jens@0: return nil; jens@0: } jens@0: jens@0: jens@0: - (void) _endCurFrame jens@0: { jens@0: [self retain]; jens@0: [self _receivedFrameWithHeader: &_curHeader body: _curBody]; jens@0: memset(&_curHeader,0,sizeof(_curHeader)); jens@0: setObj(&_curBody,nil); jens@0: _curBytesRead = 0; jens@0: [self release]; jens@0: } jens@0: jens@0: jens@0: - (BOOL) isBusy jens@0: { jens@0: return _curBytesRead > 0; jens@0: } jens@0: jens@0: jens@0: - (void) _canRead jens@0: { jens@0: SInt32 headerLeft = sizeof(BLIPFrameHeader) - _curBytesRead; jens@0: if( headerLeft > 0 ) { jens@0: // Read (more of) the header: jens@0: NSInteger bytesRead = [(NSInputStream*)_stream read: (uint8_t*)&_curHeader +_curBytesRead jens@0: maxLength: headerLeft]; jens@0: if( bytesRead < 0 ) { jens@0: [self _gotError]; jens@0: } else { jens@0: _curBytesRead += bytesRead; jens@0: if( _curBytesRead < sizeof(BLIPFrameHeader) ) { jens@0: // Incomplete header: jens@0: LogTo(BLIPVerbose,@"%@ read %u bytes of header (%u left)", jens@0: self,bytesRead,sizeof(BLIPFrameHeader)-_curBytesRead); jens@0: } else { jens@0: // Finished reading the header! jens@0: headerLeft = 0; jens@0: NSString *err = [self _validateHeader]; jens@0: if( err ) { jens@0: Warn(@"%@ read bogus frame header: %@",self,err); jens@0: return (void)[self _gotError: BLIPMakeError(kBLIPError_BadData, @"%@", err)]; jens@0: } jens@0: LogTo(BLIPVerbose,@"%@: Read header; next is %u-byte body",self,_curBody.length); jens@0: jens@0: if( _curBody.length == 0 ) { jens@0: // Zero-byte body, so no need to wait for another read jens@0: [self _endCurFrame]; jens@0: } jens@0: } jens@0: } jens@0: jens@0: } else { jens@0: // Read (more of) the current frame's body: jens@0: SInt32 bodyRemaining = (SInt32)_curBody.length + headerLeft; jens@0: if( bodyRemaining > 0 ) { jens@0: uint8_t *dst = _curBody.mutableBytes; jens@0: dst += _curBody.length - bodyRemaining; jens@0: NSInteger bytesRead = [(NSInputStream*)_stream read: dst maxLength: bodyRemaining]; jens@0: if( bytesRead < 0 ) jens@0: return (void)[self _gotError]; jens@0: else if( bytesRead > 0 ) { jens@0: _curBytesRead += bytesRead; jens@0: bodyRemaining -= bytesRead; jens@0: LogTo(BLIPVerbose,@"%@: Read %u bytes of frame body (%u left)",self,bytesRead,bodyRemaining); jens@0: } jens@0: } jens@0: if( bodyRemaining==0 ) { jens@0: // Done reading this frame: give it to the Connection and reset my state jens@0: [self _endCurFrame]; jens@0: } jens@0: } jens@0: } jens@0: jens@0: jens@0: #pragma mark - jens@0: #pragma mark PROCESSING FRAMES: jens@0: jens@0: jens@0: - (void) _addPendingResponse: (BLIPResponse*)response jens@0: { jens@0: [_pendingReplies setObject: response forKey: $object(response.number)]; jens@0: } jens@0: jens@0: jens@0: - (BOOL) _receivedFrameWithHeader: (const BLIPFrameHeader*)header body: (NSData*)body jens@0: { jens@0: static const char* kTypeStrs[16] = {"MSG","RPY","ERR","3??","4??","5??","6??","7??"}; jens@0: BLIPMessageType type = header->flags & kBLIP_TypeMask; jens@0: LogTo(BLIPVerbose,@"%@ rcvd frame of %s #%u, length %u",self,kTypeStrs[type],header->number,body.length); jens@0: jens@0: id key = $object(header->number); jens@0: BOOL complete = ! (header->flags & kBLIP_MoreComing); jens@0: switch(type) { jens@0: case kBLIP_MSG: { jens@0: // Incoming request: jens@0: BLIPRequest *request = [_pendingQueries objectForKey: key]; jens@0: if( request ) { jens@0: // Continuation frame of a request: jens@0: if( complete ) { jens@0: [[request retain] autorelease]; jens@0: [_pendingQueries removeObjectForKey: key]; jens@0: } jens@0: } else if( header->number == _numQueriesReceived+1 ) { jens@0: // Next new request: jens@0: request = [[[BLIPRequest alloc] _initWithConnection: _blipConn jens@0: isMine: NO jens@0: flags: header->flags | kBLIP_MoreComing jens@0: number: header->number jens@0: body: nil] jens@0: autorelease]; jens@0: if( ! complete ) jens@0: [_pendingQueries setObject: request forKey: key]; jens@0: _numQueriesReceived++; jens@0: } else jens@0: return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, jens@0: @"Received bad request frame #%u (next is #%u)", jens@0: header->number,_numQueriesReceived+1)]; jens@0: jens@0: if( ! [request _receivedFrameWithHeader: header body: body] ) jens@0: return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, jens@0: @"Couldn't parse message frame")]; jens@0: jens@0: if( complete ) jens@0: [_blipConn _dispatchRequest: request]; jens@0: break; jens@0: } jens@0: jens@0: case kBLIP_RPY: jens@0: case kBLIP_ERR: { jens@0: BLIPResponse *response = [_pendingReplies objectForKey: key]; jens@0: if( response ) { jens@0: if( complete ) { jens@0: [[response retain] autorelease]; jens@0: [_pendingReplies removeObjectForKey: key]; jens@0: } jens@0: jens@0: if( ! [response _receivedFrameWithHeader: header body: body] ) { jens@0: return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, jens@0: @"Couldn't parse response frame")]; jens@0: } else if( complete ) jens@0: [_blipConn _dispatchResponse: response]; jens@0: jens@0: } else { jens@0: if( header->number <= ((BLIPWriter*)self.writer).numQueriesSent ) jens@0: LogTo(BLIP,@"??? %@ got unexpected response frame to my msg #%u", jens@0: self,header->number); //benign jens@0: else jens@0: return [self _gotError: BLIPMakeError(kBLIPError_BadFrame, jens@0: @"Bogus message number %u in response", jens@0: header->number)]; jens@0: } jens@0: break; jens@0: } jens@0: jens@0: default: jens@0: // To leave room for future expansion, undefined message types are just ignored. jens@0: Log(@"??? %@ received header with unknown message type %i", self,type); jens@0: break; jens@0: } jens@0: return YES; jens@0: } jens@0: jens@0: jens@0: @end jens@0: jens@0: jens@0: /* jens@0: Copyright (c) 2008, Jens Alfke . All rights reserved. jens@0: jens@0: Redistribution and use in source and binary forms, with or without modification, are permitted jens@0: provided that the following conditions are met: jens@0: jens@0: * Redistributions of source code must retain the above copyright notice, this list of conditions jens@0: and the following disclaimer. jens@0: * Redistributions in binary form must reproduce the above copyright notice, this list of conditions jens@0: and the following disclaimer in the documentation and/or other materials provided with the jens@0: distribution. jens@0: jens@0: THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR jens@0: IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND jens@0: FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRI- jens@0: BUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES jens@0: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR jens@0: PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN jens@0: CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF jens@0: THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. jens@0: */