001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Properties;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.CopyOnWriteArrayList;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.concurrent.atomic.AtomicReference;
039import java.util.concurrent.locks.ReentrantReadWriteLock;
040
041import javax.transaction.xa.XAResource;
042
043import org.apache.activemq.advisory.AdvisoryBroker;
044import org.apache.activemq.advisory.AdvisorySupport;
045import org.apache.activemq.broker.region.ConnectionStatistics;
046import org.apache.activemq.broker.region.DurableTopicSubscription;
047import org.apache.activemq.broker.region.RegionBroker;
048import org.apache.activemq.broker.region.Subscription;
049import org.apache.activemq.broker.region.TopicRegion;
050import org.apache.activemq.command.ActiveMQDestination;
051import org.apache.activemq.command.BrokerInfo;
052import org.apache.activemq.command.BrokerSubscriptionInfo;
053import org.apache.activemq.command.Command;
054import org.apache.activemq.command.CommandTypes;
055import org.apache.activemq.command.ConnectionControl;
056import org.apache.activemq.command.ConnectionError;
057import org.apache.activemq.command.ConnectionId;
058import org.apache.activemq.command.ConnectionInfo;
059import org.apache.activemq.command.ConsumerControl;
060import org.apache.activemq.command.ConsumerId;
061import org.apache.activemq.command.ConsumerInfo;
062import org.apache.activemq.command.ControlCommand;
063import org.apache.activemq.command.DataArrayResponse;
064import org.apache.activemq.command.DestinationInfo;
065import org.apache.activemq.command.ExceptionResponse;
066import org.apache.activemq.command.FlushCommand;
067import org.apache.activemq.command.IntegerResponse;
068import org.apache.activemq.command.KeepAliveInfo;
069import org.apache.activemq.command.Message;
070import org.apache.activemq.command.MessageAck;
071import org.apache.activemq.command.MessageDispatch;
072import org.apache.activemq.command.MessageDispatchNotification;
073import org.apache.activemq.command.MessagePull;
074import org.apache.activemq.command.ProducerAck;
075import org.apache.activemq.command.ProducerId;
076import org.apache.activemq.command.ProducerInfo;
077import org.apache.activemq.command.RemoveInfo;
078import org.apache.activemq.command.RemoveSubscriptionInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionId;
081import org.apache.activemq.command.SessionInfo;
082import org.apache.activemq.command.ShutdownInfo;
083import org.apache.activemq.command.TransactionId;
084import org.apache.activemq.command.TransactionInfo;
085import org.apache.activemq.command.WireFormatInfo;
086import org.apache.activemq.network.DemandForwardingBridge;
087import org.apache.activemq.network.MBeanNetworkListener;
088import org.apache.activemq.network.NetworkBridgeConfiguration;
089import org.apache.activemq.network.NetworkBridgeFactory;
090import org.apache.activemq.network.NetworkConnector;
091import org.apache.activemq.security.MessageAuthorizationPolicy;
092import org.apache.activemq.state.CommandVisitor;
093import org.apache.activemq.state.ConnectionState;
094import org.apache.activemq.state.ConsumerState;
095import org.apache.activemq.state.ProducerState;
096import org.apache.activemq.state.SessionState;
097import org.apache.activemq.state.TransactionState;
098import org.apache.activemq.thread.Task;
099import org.apache.activemq.thread.TaskRunner;
100import org.apache.activemq.thread.TaskRunnerFactory;
101import org.apache.activemq.transaction.Transaction;
102import org.apache.activemq.transport.DefaultTransportListener;
103import org.apache.activemq.transport.ResponseCorrelator;
104import org.apache.activemq.transport.TransmitCallback;
105import org.apache.activemq.transport.Transport;
106import org.apache.activemq.transport.TransportDisposedIOException;
107import org.apache.activemq.util.IntrospectionSupport;
108import org.apache.activemq.util.MarshallingSupport;
109import org.apache.activemq.util.NetworkBridgeUtils;
110import org.apache.activemq.util.SubscriptionKey;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113import org.slf4j.MDC;
114
115public class TransportConnection implements Connection, Task, CommandVisitor {
116    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
117    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
118    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
119    // Keeps track of the broker and connector that created this connection.
120    protected final Broker broker;
121    protected final BrokerService brokerService;
122    protected final TransportConnector connector;
123    // Keeps track of the state of the connections.
124    // protected final ConcurrentHashMap localConnectionStates=new
125    // ConcurrentHashMap();
126    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
127    // The broker and wireformat info that was exchanged.
128    protected BrokerInfo brokerInfo;
129    protected final List<Command> dispatchQueue = new LinkedList<>();
130    protected TaskRunner taskRunner;
131    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
132    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
133    private final Transport transport;
134    private MessageAuthorizationPolicy messageAuthorizationPolicy;
135    private WireFormatInfo wireFormatInfo;
136    // Used to do async dispatch.. this should perhaps be pushed down into the
137    // transport layer..
138    private boolean inServiceException;
139    private final ConnectionStatistics statistics = new ConnectionStatistics();
140    private boolean manageable;
141    private boolean slow;
142    private boolean markedCandidate;
143    private boolean blockedCandidate;
144    private boolean blocked;
145    private boolean connected;
146    private boolean active;
147    private final AtomicBoolean starting = new AtomicBoolean();
148    private final AtomicBoolean pendingStop = new AtomicBoolean();
149    private long timeStamp;
150    private final AtomicBoolean stopping = new AtomicBoolean(false);
151    private final CountDownLatch stopped = new CountDownLatch(1);
152    private final AtomicBoolean asyncException = new AtomicBoolean(false);
153    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
154    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
155    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
156    private ConnectionContext context;
157    private boolean networkConnection;
158    private boolean faultTolerantConnection;
159    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
160    private DemandForwardingBridge duplexBridge;
161    private final TaskRunnerFactory taskRunnerFactory;
162    private final TaskRunnerFactory stopTaskRunnerFactory;
163    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
164    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
165    private String duplexNetworkConnectorId;
166
167    /**
168     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
169     *                          else commands are sent async.
170     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
171     */
172    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
173                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
174        this.connector = connector;
175        this.broker = broker;
176        this.brokerService = broker.getBrokerService();
177
178        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
179        brokerConnectionStates = rb.getConnectionStates();
180        if (connector != null) {
181            this.statistics.setParent(connector.getStatistics());
182            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
183        }
184        this.taskRunnerFactory = taskRunnerFactory;
185        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
186        this.transport = transport;
187        if( this.transport instanceof BrokerServiceAware ) {
188            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
189        }
190        this.transport.setTransportListener(new DefaultTransportListener() {
191            @Override
192            public void onCommand(Object o) {
193                serviceLock.readLock().lock();
194                try {
195                    if (!(o instanceof Command)) {
196                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
197                    }
198                    Command command = (Command) o;
199                    if (!brokerService.isStopping()) {
200                        Response response = service(command);
201                        if (response != null && !brokerService.isStopping()) {
202                            dispatchSync(response);
203                        }
204                    } else {
205                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
206                    }
207                } finally {
208                    serviceLock.readLock().unlock();
209                }
210            }
211
212            @Override
213            public void onException(IOException exception) {
214                serviceLock.readLock().lock();
215                try {
216                    serviceTransportException(exception);
217                } finally {
218                    serviceLock.readLock().unlock();
219                }
220            }
221        });
222        connected = true;
223    }
224
225    /**
226     * Returns the number of messages to be dispatched to this connection
227     *
228     * @return size of dispatch queue
229     */
230    @Override
231    public int getDispatchQueueSize() {
232        synchronized (dispatchQueue) {
233            return dispatchQueue.size();
234        }
235    }
236
237    public void serviceTransportException(IOException e) {
238        if (!stopping.get() && !pendingStop.get()) {
239            transportException.set(e);
240            if (TRANSPORTLOG.isDebugEnabled()) {
241                TRANSPORTLOG.debug(this + " failed: " + e, e);
242            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
243                TRANSPORTLOG.warn(this + " failed: " + e);
244            }
245            stopAsync(e);
246        }
247    }
248
249    private boolean expected(IOException e) {
250        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
251    }
252
253    private boolean isStomp() {
254        URI uri = connector.getUri();
255        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
256    }
257
258    /**
259     * Calls the serviceException method in an async thread. Since handling a
260     * service exception closes a socket, we should not tie up broker threads
261     * since client sockets may hang or cause deadlocks.
262     */
263    @Override
264    public void serviceExceptionAsync(final IOException e) {
265        if (asyncException.compareAndSet(false, true)) {
266            new Thread("Async Exception Handler") {
267                @Override
268                public void run() {
269                    serviceException(e);
270                }
271            }.start();
272        }
273    }
274
275    /**
276     * Closes a clients connection due to a detected error. Errors are ignored
277     * if: the client is closing or broker is closing. Otherwise, the connection
278     * error transmitted to the client before stopping it's transport.
279     */
280    @Override
281    public void serviceException(Throwable e) {
282        // are we a transport exception such as not being able to dispatch
283        // synchronously to a transport
284        if (e instanceof IOException) {
285            serviceTransportException((IOException) e);
286        } else if (e.getClass() == BrokerStoppedException.class) {
287            // Handle the case where the broker is stopped
288            // But the client is still connected.
289            if (!stopping.get()) {
290                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
291                ConnectionError ce = new ConnectionError();
292                ce.setException(e);
293                dispatchSync(ce);
294                // Record the error that caused the transport to stop
295                transportException.set(e);
296                // Wait a little bit to try to get the output buffer to flush
297                // the exception notification to the client.
298                try {
299                    Thread.sleep(500);
300                } catch (InterruptedException ie) {
301                    Thread.currentThread().interrupt();
302                }
303                // Worst case is we just kill the connection before the
304                // notification gets to him.
305                stopAsync();
306            }
307        } else if (!stopping.get() && !inServiceException) {
308            inServiceException = true;
309            try {
310                if (SERVICELOG.isDebugEnabled()) {
311                    SERVICELOG.debug("Async error occurred: " + e, e);
312                } else {
313                    SERVICELOG.warn("Async error occurred: " + e);
314                }
315                ConnectionError ce = new ConnectionError();
316                ce.setException(e);
317                if (pendingStop.get()) {
318                    dispatchSync(ce);
319                } else {
320                    dispatchAsync(ce);
321                }
322            } finally {
323                inServiceException = false;
324            }
325        }
326    }
327
328    @Override
329    public Response service(Command command) {
330        MDC.put("activemq.connector", connector.getUri().toString());
331        Response response = null;
332        boolean responseRequired = command.isResponseRequired();
333        int commandId = command.getCommandId();
334        try {
335            if (!pendingStop.get()) {
336                response = command.visit(this);
337            } else {
338                response = new ExceptionResponse(transportException.get());
339            }
340        } catch (Throwable e) {
341            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
342                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
343                        + " command: " + command + ", exception: " + e, e);
344            }
345
346            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
347                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
348                responseRequired = false;
349            }
350
351            if (responseRequired) {
352                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
353                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
354                            transport.getRemoteAddress(), e.getMessage());
355                }
356                response = new ExceptionResponse(e);
357            } else {
358                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
359                serviceException(e);
360            }
361        }
362        if (responseRequired) {
363            if (response == null) {
364                response = new Response();
365            }
366            response.setCorrelationId(commandId);
367        }
368        // The context may have been flagged so that the response is not
369        // sent.
370        if (context != null) {
371            if (context.isDontSendReponse()) {
372                context.setDontSendReponse(false);
373                response = null;
374            }
375            context = null;
376        }
377        MDC.remove("activemq.connector");
378        return response;
379    }
380
381    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
382        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
383            Transaction transaction = getActiveTransaction(command);
384            if (transaction != null && !transaction.isRollbackOnly()) {
385                LOG.debug("on async exception, force rollback of transaction for: " + command, e);
386                transaction.setRollbackOnly(e);
387            }
388        }
389    }
390
391    private Transaction getActiveTransaction(Command command) {
392        Transaction transaction = null;
393        try {
394            if (command instanceof Message) {
395                Message messageSend = (Message) command;
396                ProducerId producerId = messageSend.getProducerId();
397                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
398                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
399            } else if (command instanceof  MessageAck) {
400                MessageAck messageAck = (MessageAck) command;
401                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
402                if (consumerExchange != null) {
403                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
404                }
405            }
406        } catch(Exception ignored){
407            LOG.trace("failed to find active transaction for command: " + command, ignored);
408        }
409        return transaction;
410    }
411
412    private boolean isInTransaction(Command command) {
413        return command instanceof Message && ((Message)command).isInTransaction()
414                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
415    }
416
417    @Override
418    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
419        return null;
420    }
421
422    @Override
423    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
424        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
425        return null;
426    }
427
428    @Override
429    public Response processWireFormat(WireFormatInfo info) throws Exception {
430        wireFormatInfo = info;
431        protocolVersion.set(info.getVersion());
432        return null;
433    }
434
435    @Override
436    public Response processShutdown(ShutdownInfo info) throws Exception {
437        stopAsync();
438        return null;
439    }
440
441    @Override
442    public Response processFlush(FlushCommand command) throws Exception {
443        return null;
444    }
445
446    @Override
447    public Response processBeginTransaction(TransactionInfo info) throws Exception {
448        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
449        context = null;
450        if (cs != null) {
451            context = cs.getContext();
452        }
453        if (cs == null) {
454            throw new NullPointerException("Context is null");
455        }
456        // Avoid replaying dup commands
457        if (cs.getTransactionState(info.getTransactionId()) == null) {
458            cs.addTransactionState(info.getTransactionId());
459            broker.beginTransaction(context, info.getTransactionId());
460        }
461        return null;
462    }
463
464    @Override
465    public int getActiveTransactionCount() {
466        int rc = 0;
467        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
468            Collection<TransactionState> transactions = cs.getTransactionStates();
469            for (TransactionState transaction : transactions) {
470                rc++;
471            }
472        }
473        return rc;
474    }
475
476    @Override
477    public Long getOldestActiveTransactionDuration() {
478        TransactionState oldestTX = null;
479        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
480            Collection<TransactionState> transactions = cs.getTransactionStates();
481            for (TransactionState transaction : transactions) {
482                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
483                    oldestTX = transaction;
484                }
485            }
486        }
487        if( oldestTX == null ) {
488            return null;
489        }
490        return System.currentTimeMillis() - oldestTX.getCreatedAt();
491    }
492
493    @Override
494    public Response processEndTransaction(TransactionInfo info) throws Exception {
495        // No need to do anything. This packet is just sent by the client
496        // make sure he is synced with the server as commit command could
497        // come from a different connection.
498        return null;
499    }
500
501    @Override
502    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
503        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
504        context = null;
505        if (cs != null) {
506            context = cs.getContext();
507        }
508        if (cs == null) {
509            throw new NullPointerException("Context is null");
510        }
511        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
512        if (transactionState == null) {
513            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
514                    + info.getTransactionId());
515        }
516        // Avoid dups.
517        if (!transactionState.isPrepared()) {
518            transactionState.setPrepared(true);
519            int result = broker.prepareTransaction(context, info.getTransactionId());
520            transactionState.setPreparedResult(result);
521            if (result == XAResource.XA_RDONLY) {
522                // we are done, no further rollback or commit from TM
523                cs.removeTransactionState(info.getTransactionId());
524            }
525            IntegerResponse response = new IntegerResponse(result);
526            return response;
527        } else {
528            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
529            return response;
530        }
531    }
532
533    @Override
534    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
535        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
536        context = cs.getContext();
537        cs.removeTransactionState(info.getTransactionId());
538        broker.commitTransaction(context, info.getTransactionId(), true);
539        return null;
540    }
541
542    @Override
543    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
544        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
545        context = cs.getContext();
546        cs.removeTransactionState(info.getTransactionId());
547        broker.commitTransaction(context, info.getTransactionId(), false);
548        return null;
549    }
550
551    @Override
552    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
553        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
554        context = cs.getContext();
555        cs.removeTransactionState(info.getTransactionId());
556        broker.rollbackTransaction(context, info.getTransactionId());
557        return null;
558    }
559
560    @Override
561    public Response processForgetTransaction(TransactionInfo info) throws Exception {
562        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
563        context = cs.getContext();
564        broker.forgetTransaction(context, info.getTransactionId());
565        return null;
566    }
567
568    @Override
569    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
570        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
571        context = cs.getContext();
572        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
573        return new DataArrayResponse(preparedTransactions);
574    }
575
576    @Override
577    public Response processMessage(Message messageSend) throws Exception {
578        ProducerId producerId = messageSend.getProducerId();
579        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
580        if (producerExchange.canDispatch(messageSend)) {
581            broker.send(producerExchange, messageSend);
582        }
583        return null;
584    }
585
586    @Override
587    public Response processMessageAck(MessageAck ack) throws Exception {
588        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
589        if (consumerExchange != null) {
590            broker.acknowledge(consumerExchange, ack);
591        } else if (ack.isInTransaction()) {
592            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
593        }
594        return null;
595    }
596
597    @Override
598    public Response processMessagePull(MessagePull pull) throws Exception {
599        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
600    }
601
602    @Override
603    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
604        broker.processDispatchNotification(notification);
605        return null;
606    }
607
608    @Override
609    public Response processAddDestination(DestinationInfo info) throws Exception {
610        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
611        broker.addDestinationInfo(cs.getContext(), info);
612        if (info.getDestination().isTemporary()) {
613            cs.addTempDestination(info);
614        }
615        return null;
616    }
617
618    @Override
619    public Response processRemoveDestination(DestinationInfo info) throws Exception {
620        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
621        broker.removeDestinationInfo(cs.getContext(), info);
622        if (info.getDestination().isTemporary()) {
623            cs.removeTempDestination(info.getDestination());
624        }
625        return null;
626    }
627
628    @Override
629    public Response processAddProducer(ProducerInfo info) throws Exception {
630        SessionId sessionId = info.getProducerId().getParentId();
631        ConnectionId connectionId = sessionId.getParentId();
632        TransportConnectionState cs = lookupConnectionState(connectionId);
633        if (cs == null) {
634            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
635                    + connectionId);
636        }
637        SessionState ss = cs.getSessionState(sessionId);
638        if (ss == null) {
639            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
640                    + sessionId);
641        }
642        // Avoid replaying dup commands
643        if (!ss.getProducerIds().contains(info.getProducerId())) {
644            ActiveMQDestination destination = info.getDestination();
645            // Do not check for null here as it would cause the count of max producers to exclude
646            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
647            // call it from here with a null Destination value.
648            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
649                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
650                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
651                }
652            }
653            broker.addProducer(cs.getContext(), info);
654            try {
655                ss.addProducer(info);
656            } catch (IllegalStateException e) {
657                broker.removeProducer(cs.getContext(), info);
658            }
659
660        }
661        return null;
662    }
663
664    @Override
665    public Response processRemoveProducer(ProducerId id) throws Exception {
666        SessionId sessionId = id.getParentId();
667        ConnectionId connectionId = sessionId.getParentId();
668        TransportConnectionState cs = lookupConnectionState(connectionId);
669        SessionState ss = cs.getSessionState(sessionId);
670        if (ss == null) {
671            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
672                    + sessionId);
673        }
674        ProducerState ps = ss.removeProducer(id);
675        if (ps == null) {
676            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
677        }
678        removeProducerBrokerExchange(id);
679        broker.removeProducer(cs.getContext(), ps.getInfo());
680        return null;
681    }
682
683    @Override
684    public Response processAddConsumer(ConsumerInfo info) throws Exception {
685        SessionId sessionId = info.getConsumerId().getParentId();
686        ConnectionId connectionId = sessionId.getParentId();
687        TransportConnectionState cs = lookupConnectionState(connectionId);
688        if (cs == null) {
689            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
690                    + connectionId);
691        }
692        SessionState ss = cs.getSessionState(sessionId);
693        if (ss == null) {
694            throw new IllegalStateException(broker.getBrokerName()
695                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
696        }
697        // Avoid replaying dup commands
698        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
699            ActiveMQDestination destination = info.getDestination();
700            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
701                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
702                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
703                }
704            }
705
706            broker.addConsumer(cs.getContext(), info);
707            try {
708                ss.addConsumer(info);
709                addConsumerBrokerExchange(cs, info.getConsumerId());
710            } catch (IllegalStateException e) {
711                broker.removeConsumer(cs.getContext(), info);
712            }
713
714        }
715        return null;
716    }
717
718    @Override
719    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
720        SessionId sessionId = id.getParentId();
721        ConnectionId connectionId = sessionId.getParentId();
722        TransportConnectionState cs = lookupConnectionState(connectionId);
723        if (cs == null) {
724            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
725                    + connectionId);
726        }
727        SessionState ss = cs.getSessionState(sessionId);
728        if (ss == null) {
729            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
730                    + sessionId);
731        }
732        ConsumerState consumerState = ss.removeConsumer(id);
733        if (consumerState == null) {
734            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
735        }
736        ConsumerInfo info = consumerState.getInfo();
737        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
738        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
739        removeConsumerBrokerExchange(id);
740        return null;
741    }
742
743    @Override
744    public Response processAddSession(SessionInfo info) throws Exception {
745        ConnectionId connectionId = info.getSessionId().getParentId();
746        TransportConnectionState cs = lookupConnectionState(connectionId);
747        // Avoid replaying dup commands
748        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
749            broker.addSession(cs.getContext(), info);
750            try {
751                cs.addSession(info);
752            } catch (IllegalStateException e) {
753                LOG.warn("Failed to add session: {}", info.getSessionId(), e);
754                broker.removeSession(cs.getContext(), info);
755            }
756        }
757        return null;
758    }
759
760    @Override
761    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
762        ConnectionId connectionId = id.getParentId();
763        TransportConnectionState cs = lookupConnectionState(connectionId);
764        if (cs == null) {
765            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
766        }
767        SessionState session = cs.getSessionState(id);
768        if (session == null) {
769            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
770        }
771        // Don't let new consumers or producers get added while we are closing
772        // this down.
773        session.shutdown();
774        // Cascade the connection stop to the consumers and producers.
775        for (ConsumerId consumerId : session.getConsumerIds()) {
776            try {
777                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
778            } catch (Throwable e) {
779                LOG.warn("Failed to remove consumer: {}", consumerId, e);
780            }
781        }
782        for (ProducerId producerId : session.getProducerIds()) {
783            try {
784                processRemoveProducer(producerId);
785            } catch (Throwable e) {
786                LOG.warn("Failed to remove producer: {}", producerId, e);
787            }
788        }
789        cs.removeSession(id);
790        broker.removeSession(cs.getContext(), session.getInfo());
791        return null;
792    }
793
794    @Override
795    public Response processAddConnection(ConnectionInfo info) throws Exception {
796        // Older clients should have been defaulting this field to true.. but
797        // they were not.
798        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
799            info.setClientMaster(true);
800        }
801        TransportConnectionState state;
802        // Make sure 2 concurrent connections by the same ID only generate 1
803        // TransportConnectionState object.
804        synchronized (brokerConnectionStates) {
805            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
806            if (state == null) {
807                state = new TransportConnectionState(info, this);
808                brokerConnectionStates.put(info.getConnectionId(), state);
809            }
810            state.incrementReference();
811        }
812        // If there are 2 concurrent connections for the same connection id,
813        // then last one in wins, we need to sync here
814        // to figure out the winner.
815        synchronized (state.getConnectionMutex()) {
816            if (state.getConnection() != this) {
817                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
818                state.getConnection().stop();
819                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
820                state.setConnection(this);
821                state.reset(info);
822            }
823        }
824        registerConnectionState(info.getConnectionId(), state);
825        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
826        this.faultTolerantConnection = info.isFaultTolerant();
827        // Setup the context.
828        String clientId = info.getClientId();
829        context = new ConnectionContext();
830        context.setBroker(broker);
831        context.setClientId(clientId);
832        context.setClientMaster(info.isClientMaster());
833        context.setConnection(this);
834        context.setConnectionId(info.getConnectionId());
835        context.setConnector(connector);
836        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
837        context.setNetworkConnection(networkConnection);
838        context.setFaultTolerant(faultTolerantConnection);
839        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
840        context.setUserName(info.getUserName());
841        context.setWireFormatInfo(wireFormatInfo);
842        context.setReconnect(info.isFailoverReconnect());
843        this.manageable = info.isManageable();
844        context.setConnectionState(state);
845        state.setContext(context);
846        state.setConnection(this);
847        if (info.getClientIp() == null) {
848            info.setClientIp(getRemoteAddress());
849        }
850
851        try {
852            broker.addConnection(context, info);
853        } catch (Exception e) {
854            synchronized (brokerConnectionStates) {
855                brokerConnectionStates.remove(info.getConnectionId());
856            }
857            unregisterConnectionState(info.getConnectionId());
858            LOG.warn("Failed to add Connection id={}, clientId={} due to {}", info.getConnectionId(), clientId, e);
859            //AMQ-6561 - stop for all exceptions on addConnection
860            // close this down - in case the peer of this transport doesn't play nice
861            delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
862            throw e;
863        }
864        if (info.isManageable()) {
865            // send ConnectionCommand
866            ConnectionControl command = this.connector.getConnectionControl();
867            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
868            if (info.isFailoverReconnect()) {
869                command.setRebalanceConnection(false);
870            }
871            dispatchAsync(command);
872        }
873        return null;
874    }
875
876    @Override
877    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
878            throws InterruptedException {
879        LOG.debug("remove connection id: {}", id);
880        TransportConnectionState cs = lookupConnectionState(id);
881        if (cs != null) {
882            // Don't allow things to be added to the connection state while we
883            // are shutting down.
884            cs.shutdown();
885            // Cascade the connection stop to the sessions.
886            for (SessionId sessionId : cs.getSessionIds()) {
887                try {
888                    processRemoveSession(sessionId, lastDeliveredSequenceId);
889                } catch (Throwable e) {
890                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
891                }
892            }
893            // Cascade the connection stop to temp destinations.
894            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
895                DestinationInfo di = iter.next();
896                try {
897                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
898                } catch (Throwable e) {
899                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
900                }
901                iter.remove();
902            }
903            try {
904                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
905            } catch (Throwable e) {
906                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
907            }
908            TransportConnectionState state = unregisterConnectionState(id);
909            if (state != null) {
910                synchronized (brokerConnectionStates) {
911                    // If we are the last reference, we should remove the state
912                    // from the broker.
913                    if (state.decrementReference() == 0) {
914                        brokerConnectionStates.remove(id);
915                    }
916                }
917            }
918        }
919        return null;
920    }
921
922    @Override
923    public Response processProducerAck(ProducerAck ack) throws Exception {
924        // A broker should not get ProducerAck messages.
925        return null;
926    }
927
928    @Override
929    public Connector getConnector() {
930        return connector;
931    }
932
933    @Override
934    public void dispatchSync(Command message) {
935        try {
936            processDispatch(message);
937        } catch (IOException e) {
938            serviceExceptionAsync(e);
939        }
940    }
941
942    @Override
943    public void dispatchAsync(Command message) {
944        if (!stopping.get()) {
945            if (taskRunner == null) {
946                dispatchSync(message);
947            } else {
948                synchronized (dispatchQueue) {
949                    dispatchQueue.add(message);
950                }
951                try {
952                    taskRunner.wakeup();
953                } catch (InterruptedException e) {
954                    Thread.currentThread().interrupt();
955                }
956            }
957        } else {
958            if (message.isMessageDispatch()) {
959                MessageDispatch md = (MessageDispatch) message;
960                TransmitCallback sub = md.getTransmitCallback();
961                broker.postProcessDispatch(md);
962                if (sub != null) {
963                    sub.onFailure();
964                }
965            }
966        }
967    }
968
969    protected void processDispatch(Command command) throws IOException {
970        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
971        try {
972            if (!stopping.get()) {
973                if (messageDispatch != null) {
974                    try {
975                        broker.preProcessDispatch(messageDispatch);
976                    } catch (RuntimeException convertToIO) {
977                        throw new IOException(convertToIO);
978                    }
979                }
980                dispatch(command);
981            }
982        } catch (IOException e) {
983            if (messageDispatch != null) {
984                TransmitCallback sub = messageDispatch.getTransmitCallback();
985                broker.postProcessDispatch(messageDispatch);
986                if (sub != null) {
987                    sub.onFailure();
988                }
989                messageDispatch = null;
990                throw e;
991            }
992        } finally {
993            if (messageDispatch != null) {
994                TransmitCallback sub = messageDispatch.getTransmitCallback();
995                broker.postProcessDispatch(messageDispatch);
996                if (sub != null) {
997                    sub.onSuccess();
998                }
999            }
1000        }
1001    }
1002
1003    @Override
1004    public boolean iterate() {
1005        try {
1006            if (pendingStop.get() || stopping.get()) {
1007                if (dispatchStopped.compareAndSet(false, true)) {
1008                    if (transportException.get() == null) {
1009                        try {
1010                            dispatch(new ShutdownInfo());
1011                        } catch (Throwable ignore) {
1012                        }
1013                    }
1014                    dispatchStoppedLatch.countDown();
1015                }
1016                return false;
1017            }
1018            if (!dispatchStopped.get()) {
1019                Command command = null;
1020                synchronized (dispatchQueue) {
1021                    if (dispatchQueue.isEmpty()) {
1022                        return false;
1023                    }
1024                    command = dispatchQueue.remove(0);
1025                }
1026                processDispatch(command);
1027                return true;
1028            }
1029            return false;
1030        } catch (IOException e) {
1031            if (dispatchStopped.compareAndSet(false, true)) {
1032                dispatchStoppedLatch.countDown();
1033            }
1034            serviceExceptionAsync(e);
1035            return false;
1036        }
1037    }
1038
1039    /**
1040     * Returns the statistics for this connection
1041     */
1042    @Override
1043    public ConnectionStatistics getStatistics() {
1044        return statistics;
1045    }
1046
1047    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1048        return messageAuthorizationPolicy;
1049    }
1050
1051    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1052        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1053    }
1054
1055    @Override
1056    public boolean isManageable() {
1057        return manageable;
1058    }
1059
1060    @Override
1061    public void start() throws Exception {
1062        try {
1063            synchronized (this) {
1064                starting.set(true);
1065                if (taskRunnerFactory != null) {
1066                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1067                            + getRemoteAddress());
1068                } else {
1069                    taskRunner = null;
1070                }
1071                transport.start();
1072                active = true;
1073                BrokerInfo info = connector.getBrokerInfo().copy();
1074                if (connector.isUpdateClusterClients()) {
1075                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1076                } else {
1077                    info.setPeerBrokerInfos(null);
1078                }
1079                dispatchAsync(info);
1080
1081                connector.onStarted(this);
1082            }
1083        } catch (Exception e) {
1084            // Force clean up on an error starting up.
1085            pendingStop.set(true);
1086            throw e;
1087        } finally {
1088            // stop() can be called from within the above block,
1089            // but we want to be sure start() completes before
1090            // stop() runs, so queue the stop until right now:
1091            setStarting(false);
1092            if (isPendingStop()) {
1093                LOG.debug("Calling the delayed stop() after start() {}", this);
1094                stop();
1095            }
1096        }
1097    }
1098
1099    @Override
1100    public void stop() throws Exception {
1101        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1102        // as their lifecycle is handled elsewhere
1103
1104        stopAsync();
1105        while (!stopped.await(5, TimeUnit.SECONDS)) {
1106            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1107        }
1108    }
1109
1110    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1111        if (waitTime > 0) {
1112            synchronized (this) {
1113                pendingStop.set(true);
1114                transportException.set(cause);
1115            }
1116            try {
1117                stopTaskRunnerFactory.execute(new Runnable() {
1118                    @Override
1119                    public void run() {
1120                        try {
1121                            Thread.sleep(waitTime);
1122                            stopAsync();
1123                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1124                        } catch (InterruptedException e) {
1125                        }
1126                    }
1127                });
1128            } catch (Throwable t) {
1129                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1130            }
1131        }
1132    }
1133
1134    public void stopAsync(Throwable cause) {
1135        transportException.set(cause);
1136        stopAsync();
1137    }
1138
1139    public void stopAsync() {
1140        // If we're in the middle of starting then go no further... for now.
1141        synchronized (this) {
1142            pendingStop.set(true);
1143            if (starting.get()) {
1144                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1145                return;
1146            }
1147        }
1148        if (stopping.compareAndSet(false, true)) {
1149            // Let all the connection contexts know we are shutting down
1150            // so that in progress operations can notice and unblock.
1151            List<TransportConnectionState> connectionStates = listConnectionStates();
1152            for (TransportConnectionState cs : connectionStates) {
1153                ConnectionContext connectionContext = cs.getContext();
1154                if (connectionContext != null) {
1155                    connectionContext.getStopping().set(true);
1156                }
1157            }
1158            try {
1159                stopTaskRunnerFactory.execute(new Runnable() {
1160                    @Override
1161                    public void run() {
1162                        serviceLock.writeLock().lock();
1163                        try {
1164                            doStop();
1165                        } catch (Throwable e) {
1166                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1167                        } finally {
1168                            stopped.countDown();
1169                            serviceLock.writeLock().unlock();
1170                        }
1171                    }
1172                });
1173            } catch (Throwable t) {
1174                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1175                stopped.countDown();
1176            }
1177        }
1178    }
1179
1180    @Override
1181    public String toString() {
1182        return "Transport Connection to: " + transport.getRemoteAddress();
1183    }
1184
1185    protected void doStop() throws Exception {
1186        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1187        connector.onStopped(this);
1188        try {
1189            synchronized (this) {
1190                if (duplexBridge != null) {
1191                    duplexBridge.stop();
1192                }
1193            }
1194        } catch (Exception ignore) {
1195            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1196        }
1197        try {
1198            transport.stop();
1199            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1200        } catch (Exception e) {
1201            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1202        }
1203        if (taskRunner != null) {
1204            taskRunner.shutdown(1);
1205            taskRunner = null;
1206        }
1207        active = false;
1208        // Run the MessageDispatch callbacks so that message references get
1209        // cleaned up.
1210        synchronized (dispatchQueue) {
1211            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1212                Command command = iter.next();
1213                if (command.isMessageDispatch()) {
1214                    MessageDispatch md = (MessageDispatch) command;
1215                    TransmitCallback sub = md.getTransmitCallback();
1216                    broker.postProcessDispatch(md);
1217                    if (sub != null) {
1218                        sub.onFailure();
1219                    }
1220                }
1221            }
1222            dispatchQueue.clear();
1223        }
1224        //
1225        // Remove all logical connection associated with this connection
1226        // from the broker.
1227        if (!broker.isStopped()) {
1228            List<TransportConnectionState> connectionStates = listConnectionStates();
1229            connectionStates = listConnectionStates();
1230            for (TransportConnectionState cs : connectionStates) {
1231                cs.getContext().getStopping().set(true);
1232                try {
1233                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1234                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1235                } catch (Throwable ignore) {
1236                    LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore);
1237                }
1238            }
1239        }
1240        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1241    }
1242
1243    /**
1244     * @return Returns the blockedCandidate.
1245     */
1246    public boolean isBlockedCandidate() {
1247        return blockedCandidate;
1248    }
1249
1250    /**
1251     * @param blockedCandidate The blockedCandidate to set.
1252     */
1253    public void setBlockedCandidate(boolean blockedCandidate) {
1254        this.blockedCandidate = blockedCandidate;
1255    }
1256
1257    /**
1258     * @return Returns the markedCandidate.
1259     */
1260    public boolean isMarkedCandidate() {
1261        return markedCandidate;
1262    }
1263
1264    /**
1265     * @param markedCandidate The markedCandidate to set.
1266     */
1267    public void setMarkedCandidate(boolean markedCandidate) {
1268        this.markedCandidate = markedCandidate;
1269        if (!markedCandidate) {
1270            timeStamp = 0;
1271            blockedCandidate = false;
1272        }
1273    }
1274
1275    /**
1276     * @param slow The slow to set.
1277     */
1278    public void setSlow(boolean slow) {
1279        this.slow = slow;
1280    }
1281
1282    /**
1283     * @return true if the Connection is slow
1284     */
1285    @Override
1286    public boolean isSlow() {
1287        return slow;
1288    }
1289
1290    /**
1291     * @return true if the Connection is potentially blocked
1292     */
1293    public boolean isMarkedBlockedCandidate() {
1294        return markedCandidate;
1295    }
1296
1297    /**
1298     * Mark the Connection, so we can deem if it's collectable on the next sweep
1299     */
1300    public void doMark() {
1301        if (timeStamp == 0) {
1302            timeStamp = System.currentTimeMillis();
1303        }
1304    }
1305
1306    /**
1307     * @return if after being marked, the Connection is still writing
1308     */
1309    @Override
1310    public boolean isBlocked() {
1311        return blocked;
1312    }
1313
1314    /**
1315     * @return true if the Connection is connected
1316     */
1317    @Override
1318    public boolean isConnected() {
1319        return connected;
1320    }
1321
1322    /**
1323     * @param blocked The blocked to set.
1324     */
1325    public void setBlocked(boolean blocked) {
1326        this.blocked = blocked;
1327    }
1328
1329    /**
1330     * @param connected The connected to set.
1331     */
1332    public void setConnected(boolean connected) {
1333        this.connected = connected;
1334    }
1335
1336    /**
1337     * @return true if the Connection is active
1338     */
1339    @Override
1340    public boolean isActive() {
1341        return active;
1342    }
1343
1344    /**
1345     * @param active The active to set.
1346     */
1347    public void setActive(boolean active) {
1348        this.active = active;
1349    }
1350
1351    /**
1352     * @return true if the Connection is starting
1353     */
1354    public boolean isStarting() {
1355        return starting.get();
1356    }
1357
1358    @Override
1359    public synchronized boolean isNetworkConnection() {
1360        return networkConnection;
1361    }
1362
1363    @Override
1364    public boolean isFaultTolerantConnection() {
1365        return this.faultTolerantConnection;
1366    }
1367
1368    protected void setStarting(boolean starting) {
1369        this.starting.set(starting);
1370    }
1371
1372    /**
1373     * @return true if the Connection needs to stop
1374     */
1375    public boolean isPendingStop() {
1376        return pendingStop.get();
1377    }
1378
1379    protected void setPendingStop(boolean pendingStop) {
1380        this.pendingStop.set(pendingStop);
1381    }
1382
1383    private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
1384        Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1385        Map<String, String> props = createMap(properties);
1386        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1387        IntrospectionSupport.setProperties(config, props, "");
1388        return config;
1389    }
1390
1391    @Override
1392    public Response processBrokerInfo(BrokerInfo info) {
1393        if (info.isSlaveBroker()) {
1394            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1395        } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
1396            try {
1397                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1398                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1399                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1400                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1401                }
1402            } catch (Exception e) {
1403                LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
1404                return null;
1405            }
1406        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1407            // so this TransportConnection is the rear end of a network bridge
1408            // We have been requested to create a two way pipe ...
1409            try {
1410                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1411                config.setBrokerName(broker.getBrokerName());
1412
1413                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1414                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1415                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1416                }
1417
1418                // check for existing duplex connection hanging about
1419
1420                // We first look if existing network connection already exists for the same broker Id and network connector name
1421                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1422                // and the duplex network connector side wanting to open a new one
1423                // In this case, the old connection must be broken
1424                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1425                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1426                synchronized (connections) {
1427                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1428                        TransportConnection c = iter.next();
1429                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1430                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1431                            c.stopAsync();
1432                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1433                            c.getStopped().await(1, TimeUnit.SECONDS);
1434                        }
1435                    }
1436                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1437                }
1438                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
1439                Transport remoteBridgeTransport = transport;
1440                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1441                    // the vm transport case is already wrapped
1442                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1443                }
1444                String duplexName = localTransport.toString();
1445                if (duplexName.contains("#")) {
1446                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1447                }
1448                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
1449                listener.setCreatedByDuplex(true);
1450                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1451                duplexBridge.setBrokerService(brokerService);
1452                //Need to set durableDestinations to properly restart subs when dynamicOnly=false
1453                duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
1454                        broker.getDurableDestinations()));
1455
1456                // now turn duplex off this side
1457                info.setDuplexConnection(false);
1458                duplexBridge.setCreatedByDuplex(true);
1459                duplexBridge.duplexStart(this, brokerInfo, info);
1460                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1461                return null;
1462            } catch (TransportDisposedIOException e) {
1463                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1464                return null;
1465            } catch (Exception e) {
1466                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1467                return null;
1468            }
1469        }
1470        // We only expect to get one broker info command per connection
1471        if (this.brokerInfo != null) {
1472            LOG.warn("Unexpected extra broker info command received: {}", info);
1473        }
1474        this.brokerInfo = info;
1475        networkConnection = true;
1476        List<TransportConnectionState> connectionStates = listConnectionStates();
1477        for (TransportConnectionState cs : connectionStates) {
1478            cs.getContext().setNetworkConnection(true);
1479        }
1480        return null;
1481    }
1482
1483    @SuppressWarnings({"unchecked", "rawtypes"})
1484    private HashMap<String, String> createMap(Properties properties) {
1485        return new HashMap(properties);
1486    }
1487
1488    protected void dispatch(Command command) throws IOException {
1489        try {
1490            setMarkedCandidate(true);
1491            transport.oneway(command);
1492        } finally {
1493            setMarkedCandidate(false);
1494        }
1495    }
1496
1497    @Override
1498    public String getRemoteAddress() {
1499        return transport.getRemoteAddress();
1500    }
1501
1502    public Transport getTransport() {
1503        return transport;
1504    }
1505
1506    @Override
1507    public String getConnectionId() {
1508        List<TransportConnectionState> connectionStates = listConnectionStates();
1509        for (TransportConnectionState cs : connectionStates) {
1510            if (cs.getInfo().getClientId() != null) {
1511                return cs.getInfo().getClientId();
1512            }
1513            return cs.getInfo().getConnectionId().toString();
1514        }
1515        return null;
1516    }
1517
1518    @Override
1519    public void updateClient(ConnectionControl control) {
1520        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1521                && this.wireFormatInfo.getVersion() >= 6) {
1522            dispatchAsync(control);
1523        }
1524    }
1525
1526    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1527        ProducerBrokerExchange result = null;
1528        if (producerInfo != null && producerInfo.getProducerId() != null){
1529            synchronized (producerExchanges){
1530                result = producerExchanges.get(producerInfo.getProducerId());
1531            }
1532        }
1533        return result;
1534    }
1535
1536    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1537        ProducerBrokerExchange result = producerExchanges.get(id);
1538        if (result == null) {
1539            synchronized (producerExchanges) {
1540                result = new ProducerBrokerExchange();
1541                TransportConnectionState state = lookupConnectionState(id);
1542                context = state.getContext();
1543                result.setConnectionContext(context);
1544                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1545                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
1546                }
1547                SessionState ss = state.getSessionState(id.getParentId());
1548                if (ss != null) {
1549                    result.setProducerState(ss.getProducerState(id));
1550                    ProducerState producerState = ss.getProducerState(id);
1551                    if (producerState != null && producerState.getInfo() != null) {
1552                        ProducerInfo info = producerState.getInfo();
1553                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1554                    }
1555                }
1556                producerExchanges.put(id, result);
1557            }
1558        } else {
1559            context = result.getConnectionContext();
1560        }
1561        return result;
1562    }
1563
1564    private void removeProducerBrokerExchange(ProducerId id) {
1565        synchronized (producerExchanges) {
1566            producerExchanges.remove(id);
1567        }
1568    }
1569
1570    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1571        ConsumerBrokerExchange result = consumerExchanges.get(id);
1572        return result;
1573    }
1574
1575    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
1576        ConsumerBrokerExchange result = consumerExchanges.get(id);
1577        if (result == null) {
1578            synchronized (consumerExchanges) {
1579                result = new ConsumerBrokerExchange();
1580                context = connectionState.getContext();
1581                result.setConnectionContext(context);
1582                SessionState ss = connectionState.getSessionState(id.getParentId());
1583                if (ss != null) {
1584                    ConsumerState cs = ss.getConsumerState(id);
1585                    if (cs != null) {
1586                        ConsumerInfo info = cs.getInfo();
1587                        if (info != null) {
1588                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1589                                result.setWildcard(true);
1590                            }
1591                        }
1592                    }
1593                }
1594                consumerExchanges.put(id, result);
1595            }
1596        }
1597        return result;
1598    }
1599
1600    private void removeConsumerBrokerExchange(ConsumerId id) {
1601        synchronized (consumerExchanges) {
1602            consumerExchanges.remove(id);
1603        }
1604    }
1605
1606    public int getProtocolVersion() {
1607        return protocolVersion.get();
1608    }
1609
1610    @Override
1611    public Response processControlCommand(ControlCommand command) throws Exception {
1612        return null;
1613    }
1614
1615    @Override
1616    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1617        return null;
1618    }
1619
1620    @Override
1621    public Response processConnectionControl(ConnectionControl control) throws Exception {
1622        if (control != null) {
1623            faultTolerantConnection = control.isFaultTolerant();
1624        }
1625        return null;
1626    }
1627
1628    @Override
1629    public Response processConnectionError(ConnectionError error) throws Exception {
1630        return null;
1631    }
1632
1633    @Override
1634    public Response processConsumerControl(ConsumerControl control) throws Exception {
1635        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1636        broker.processConsumerControl(consumerExchange, control);
1637        return null;
1638    }
1639
1640    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1641                                                                            TransportConnectionState state) {
1642        TransportConnectionState cs = null;
1643        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1644            // swap implementations
1645            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1646            newRegister.intialize(connectionStateRegister);
1647            connectionStateRegister = newRegister;
1648        }
1649        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1650        return cs;
1651    }
1652
1653    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1654        return connectionStateRegister.unregisterConnectionState(connectionId);
1655    }
1656
1657    protected synchronized List<TransportConnectionState> listConnectionStates() {
1658        return connectionStateRegister.listConnectionStates();
1659    }
1660
1661    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1662        return connectionStateRegister.lookupConnectionState(connectionId);
1663    }
1664
1665    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1666        return connectionStateRegister.lookupConnectionState(id);
1667    }
1668
1669    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1670        return connectionStateRegister.lookupConnectionState(id);
1671    }
1672
1673    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1674        return connectionStateRegister.lookupConnectionState(id);
1675    }
1676
1677    // public only for testing
1678    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1679        return connectionStateRegister.lookupConnectionState(connectionId);
1680    }
1681
1682    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1683        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1684    }
1685
1686    protected synchronized String getDuplexNetworkConnectorId() {
1687        return this.duplexNetworkConnectorId;
1688    }
1689
1690    public boolean isStopping() {
1691        return stopping.get();
1692    }
1693
1694    protected CountDownLatch getStopped() {
1695        return stopped;
1696    }
1697
1698    private int getProducerCount(ConnectionId connectionId) {
1699        int result = 0;
1700        TransportConnectionState cs = lookupConnectionState(connectionId);
1701        if (cs != null) {
1702            for (SessionId sessionId : cs.getSessionIds()) {
1703                SessionState sessionState = cs.getSessionState(sessionId);
1704                if (sessionState != null) {
1705                    result += sessionState.getProducerIds().size();
1706                }
1707            }
1708        }
1709        return result;
1710    }
1711
1712    private int getConsumerCount(ConnectionId connectionId) {
1713        int result = 0;
1714        TransportConnectionState cs = lookupConnectionState(connectionId);
1715        if (cs != null) {
1716            for (SessionId sessionId : cs.getSessionIds()) {
1717                SessionState sessionState = cs.getSessionState(sessionId);
1718                if (sessionState != null) {
1719                    result += sessionState.getConsumerIds().size();
1720                }
1721            }
1722        }
1723        return result;
1724    }
1725
1726    public WireFormatInfo getRemoteWireFormatInfo() {
1727        return wireFormatInfo;
1728    }
1729
1730    /* (non-Javadoc)
1731     * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo)
1732     */
1733    @Override
1734    public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception {
1735        return null;
1736    }
1737}