001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020 021import org.apache.activemq.transport.tcp.TcpTransport; 022import org.fusesource.hawtbuf.Buffer; 023import org.fusesource.hawtbuf.DataByteArrayInputStream; 024import org.fusesource.mqtt.codec.MQTTFrame; 025 026public class MQTTCodec { 027 028 private final MQTTFrameSink frameSink; 029 private final MQTTWireFormat wireFormat; 030 031 private byte header; 032 private int contentLength = -1; 033 034 private FrameParser currentParser; 035 036 private final Buffer scratch = new Buffer(8 * 1024); 037 private Buffer currentBuffer; 038 039 /** 040 * Sink for newly decoded MQTT Frames. 041 */ 042 public interface MQTTFrameSink { 043 void onFrame(MQTTFrame mqttFrame); 044 } 045 046 public MQTTCodec(MQTTFrameSink sink) { 047 this(sink, null); 048 } 049 050 public MQTTCodec(MQTTFrameSink sink, MQTTWireFormat wireFormat) { 051 this.frameSink = sink; 052 this.wireFormat = wireFormat; 053 } 054 055 public MQTTCodec(final TcpTransport transport) { 056 this(transport, null); 057 } 058 059 public MQTTCodec(final TcpTransport transport, MQTTWireFormat wireFormat) { 060 this.wireFormat = wireFormat; 061 this.frameSink = new MQTTFrameSink() { 062 063 @Override 064 public void onFrame(MQTTFrame mqttFrame) { 065 transport.doConsume(mqttFrame); 066 } 067 }; 068 } 069 070 public void parse(DataByteArrayInputStream input, int readSize) throws Exception { 071 if (currentParser == null) { 072 currentParser = initializeHeaderParser(); 073 } 074 075 // Parser stack will run until current incoming data has all been consumed. 076 currentParser.parse(input, readSize); 077 } 078 079 private void processCommand() throws IOException { 080 081 Buffer frameContents = null; 082 if (currentBuffer == scratch) { 083 frameContents = scratch.deepCopy(); 084 } else { 085 frameContents = currentBuffer; 086 currentBuffer = null; 087 } 088 089 MQTTFrame frame = new MQTTFrame(frameContents).header(header); 090 frameSink.onFrame(frame); 091 } 092 093 private int getMaxFrameSize() { 094 return wireFormat != null ? wireFormat.getMaxFrameSize() : MQTTWireFormat.MAX_MESSAGE_LENGTH; 095 } 096 097 //----- Prepare the current frame parser for use -------------------------// 098 099 private FrameParser initializeHeaderParser() throws IOException { 100 headerParser.reset(); 101 return headerParser; 102 } 103 104 private FrameParser initializeVariableLengthParser() throws IOException { 105 variableLengthParser.reset(); 106 return variableLengthParser; 107 } 108 109 private FrameParser initializeContentParser() throws IOException { 110 contentParser.reset(); 111 return contentParser; 112 } 113 114 //----- Frame parser implementations -------------------------------------// 115 116 private interface FrameParser { 117 118 void parse(DataByteArrayInputStream data, int readSize) throws IOException; 119 120 void reset() throws IOException; 121 } 122 123 private final FrameParser headerParser = new FrameParser() { 124 125 @Override 126 public void parse(DataByteArrayInputStream data, int readSize) throws IOException { 127 while (readSize-- > 0) { 128 byte b = data.readByte(); 129 // skip repeating nulls 130 if (b == 0) { 131 continue; 132 } 133 134 header = b; 135 136 currentParser = initializeVariableLengthParser(); 137 if (readSize > 0) { 138 currentParser.parse(data, readSize); 139 } 140 return; 141 } 142 } 143 144 @Override 145 public void reset() throws IOException { 146 header = -1; 147 contentLength = -1; 148 } 149 }; 150 151 private final FrameParser variableLengthParser = new FrameParser() { 152 153 private byte digit; 154 private int multiplier = 1; 155 private int length; 156 157 @Override 158 public void parse(DataByteArrayInputStream data, int readSize) throws IOException { 159 int i = 0; 160 while (i++ < readSize) { 161 digit = data.readByte(); 162 length += (digit & 0x7F) * multiplier; 163 multiplier <<= 7; 164 if ((digit & 0x80) == 0) { 165 if (length == 0) { 166 processCommand(); 167 currentParser = initializeHeaderParser(); 168 } else { 169 if (length > getMaxFrameSize()) { 170 throw new IOException("The maximum message length was exceeded"); 171 } 172 173 currentParser = initializeContentParser(); 174 contentLength = length; 175 } 176 177 readSize = readSize - i; 178 if (readSize > 0) { 179 currentParser.parse(data, readSize); 180 } 181 return; 182 } 183 } 184 } 185 186 @Override 187 public void reset() throws IOException { 188 digit = 0; 189 multiplier = 1; 190 length = 0; 191 } 192 }; 193 194 private final FrameParser contentParser = new FrameParser() { 195 196 private int payLoadRead = 0; 197 198 @Override 199 public void parse(DataByteArrayInputStream data, int readSize) throws IOException { 200 if (currentBuffer == null) { 201 if (contentLength < scratch.length()) { 202 currentBuffer = scratch; 203 currentBuffer.length = contentLength; 204 } else { 205 currentBuffer = new Buffer(contentLength); 206 } 207 } 208 209 int length = Math.min(readSize, contentLength - payLoadRead); 210 payLoadRead += data.read(currentBuffer.data, payLoadRead, length); 211 212 if (payLoadRead == contentLength) { 213 processCommand(); 214 currentParser = initializeHeaderParser(); 215 readSize = readSize - length; 216 if (readSize > 0) { 217 currentParser.parse(data, readSize); 218 } 219 } 220 } 221 222 @Override 223 public void reset() throws IOException { 224 contentLength = -1; 225 payLoadRead = 0; 226 scratch.reset(); 227 currentBuffer = null; 228 } 229 }; 230}