001package org.apache.activemq.transport.auto.nio; 002 003import java.io.IOException; 004import java.net.Socket; 005import java.net.URI; 006import java.net.URISyntaxException; 007import java.nio.ByteBuffer; 008import java.util.HashMap; 009import java.util.Set; 010import java.util.concurrent.Future; 011 012import javax.net.ServerSocketFactory; 013import javax.net.ssl.SSLContext; 014import javax.net.ssl.SSLEngine; 015 016import org.apache.activemq.broker.BrokerService; 017import org.apache.activemq.broker.BrokerServiceAware; 018import org.apache.activemq.transport.Transport; 019import org.apache.activemq.transport.auto.AutoTcpTransportServer; 020import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; 021import org.apache.activemq.transport.nio.NIOSSLTransport; 022import org.apache.activemq.transport.tcp.TcpTransport; 023import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; 024import org.apache.activemq.transport.tcp.TcpTransportFactory; 025import org.apache.activemq.transport.tcp.TcpTransportServer; 026import org.apache.activemq.util.IntrospectionSupport; 027import org.apache.activemq.wireformat.WireFormat; 028 029/** 030 * Licensed to the Apache Software Foundation (ASF) under one or more 031 * contributor license agreements. See the NOTICE file distributed with 032 * this work for additional information regarding copyright ownership. 033 * The ASF licenses this file to You under the Apache License, Version 2.0 034 * (the "License"); you may not use this file except in compliance with 035 * the License. You may obtain a copy of the License at 036 * 037 * http://www.apache.org/licenses/LICENSE-2.0 038 * 039 * Unless required by applicable law or agreed to in writing, software 040 * distributed under the License is distributed on an "AS IS" BASIS, 041 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 042 * See the License for the specific language governing permissions and 043 * limitations under the License. 044 */ 045public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { 046 047 private SSLContext context; 048 049 public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory, 050 BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { 051 super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); 052 053 this.context = context; 054 } 055 056 private boolean needClientAuth; 057 private boolean wantClientAuth; 058 059 protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, 060 InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { 061 NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer); 062 if (context != null) { 063 transport.setSslContext(context); 064 } 065 066 transport.setNeedClientAuth(needClientAuth); 067 transport.setWantClientAuth(wantClientAuth); 068 069 070 return transport; 071 } 072 073 @Override 074 protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { 075 throw new UnsupportedOperationException("method not supported"); 076 } 077 078 @Override 079 public boolean isSslServer() { 080 return true; 081 } 082 083 public boolean isNeedClientAuth() { 084 return this.needClientAuth; 085 } 086 087 public void setNeedClientAuth(boolean value) { 088 this.needClientAuth = value; 089 } 090 091 public boolean isWantClientAuth() { 092 return this.wantClientAuth; 093 } 094 095 public void setWantClientAuth(boolean value) { 096 this.wantClientAuth = value; 097 } 098 099 100 @Override 101 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 102 //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format 103 //The wireformat doesn't need properties set here because we aren't using this format during the SSL handshake 104 final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); 105 if (context != null) { 106 in.setSslContext(context); 107 } 108 //We need to set the transport options on the init transport so that the SSL options are set 109 if (transportOptions != null) { 110 //Clone the map because we will need to set the options later on the actual transport 111 IntrospectionSupport.setProperties(in, new HashMap<>(transportOptions)); 112 } 113 in.start(); 114 SSLEngine engine = in.getSslSession(); 115 116 //Attempt to read enough bytes to detect the protocol until the timeout period 117 //is reached 118 Future<?> future = protocolDetectionExecutor.submit(new Runnable() { 119 @Override 120 public void run() { 121 int attempts = 0; 122 do { 123 if(attempts > 0) { 124 try { 125 //increase sleep period each attempt to prevent high cpu usage 126 //if the client is hung and not sending bytes 127 int sleep = attempts >= 1024 ? 1024 : 4 * attempts; 128 Thread.sleep(sleep); 129 } catch (InterruptedException e) { 130 break; 131 } 132 } 133 //In the future it might be better to register a nonblocking selector 134 //to be told when bytes are ready 135 in.serviceRead(); 136 attempts++; 137 } while(in.getReadSize().get() < 8 && !Thread.interrupted()); 138 } 139 }); 140 141 try { 142 //If this fails and throws an exception and the socket will be closed 143 waitForProtocolDetectionFinish(future, in.getReadSize()); 144 } finally { 145 //call cancel in case task didn't complete which will interrupt the task 146 future.cancel(true); 147 } 148 in.stop(); 149 150 InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length)); 151 initBuffer.buffer.put(in.getReadData()); 152 153 ProtocolInfo protocolInfo = detectProtocol(in.getReadData()); 154 155 if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { 156 ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); 157 } 158 159 WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); 160 Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory); 161 162 return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); 163 } 164 165} 166 167