001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.BrokerService;
021import org.apache.activemq.broker.Connection;
022import org.apache.activemq.broker.ConnectionContext;
023import org.apache.activemq.broker.ConsumerBrokerExchange;
024import org.apache.activemq.broker.EmptyBroker;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.TransportConnector;
027import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028import org.apache.activemq.broker.region.policy.PolicyMap;
029import org.apache.activemq.command.*;
030import org.apache.activemq.state.ConnectionState;
031import org.apache.activemq.store.kahadb.plist.PListStore;
032import org.apache.activemq.thread.Scheduler;
033import org.apache.activemq.thread.TaskRunnerFactory;
034import org.apache.activemq.usage.SystemUsage;
035import org.apache.activemq.util.BrokerSupport;
036import org.apache.activemq.util.IdGenerator;
037import org.apache.activemq.util.InetAddressUtil;
038import org.apache.activemq.util.LongSequenceGenerator;
039import org.apache.activemq.util.ServiceStopper;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import javax.jms.InvalidClientIDException;
044import javax.jms.JMSException;
045import java.io.IOException;
046import java.net.URI;
047import java.util.ArrayList;
048import java.util.Collections;
049import java.util.HashMap;
050import java.util.List;
051import java.util.Map;
052import java.util.Set;
053import java.util.concurrent.ConcurrentHashMap;
054import java.util.concurrent.CopyOnWriteArrayList;
055import java.util.concurrent.ThreadPoolExecutor;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057
058/**
059 * Routes Broker operations to the correct messaging regions for processing.
060 *
061 *
062 */
063public class RegionBroker extends EmptyBroker {
064    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
065    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
066    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
067
068    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
069    protected DestinationFactory destinationFactory;
070    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
071
072    private final Region queueRegion;
073    private final Region topicRegion;
074    private final Region tempQueueRegion;
075    private final Region tempTopicRegion;
076    protected final BrokerService brokerService;
077    private boolean started;
078    private boolean keepDurableSubsActive;
079
080    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
081    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
082    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
083
084    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
085    private BrokerId brokerId;
086    private String brokerName;
087    private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
088    private final DestinationInterceptor destinationInterceptor;
089    private ConnectionContext adminConnectionContext;
090    private final Scheduler scheduler;
091    private final ThreadPoolExecutor executor;
092    private boolean allowTempAutoCreationOnSend;
093
094    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
095    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
096        public void run() {
097            purgeInactiveDestinations();
098        }
099    };
100
101    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
102                        DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
103        this.brokerService = brokerService;
104        this.executor=executor;
105        this.scheduler = scheduler;
106        if (destinationFactory == null) {
107            throw new IllegalArgumentException("null destinationFactory");
108        }
109        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
110        this.destinationFactory = destinationFactory;
111        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
112        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
113        this.destinationInterceptor = destinationInterceptor;
114        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
115        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
116    }
117
118    @Override
119    public Map<ActiveMQDestination, Destination> getDestinationMap() {
120        Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
121        answer.putAll(getTopicRegion().getDestinationMap());
122        return answer;
123    }
124
125    @Override
126    public Set <Destination> getDestinations(ActiveMQDestination destination) {
127        switch (destination.getDestinationType()) {
128        case ActiveMQDestination.QUEUE_TYPE:
129            return queueRegion.getDestinations(destination);
130        case ActiveMQDestination.TOPIC_TYPE:
131            return topicRegion.getDestinations(destination);
132        case ActiveMQDestination.TEMP_QUEUE_TYPE:
133            return tempQueueRegion.getDestinations(destination);
134        case ActiveMQDestination.TEMP_TOPIC_TYPE:
135            return tempTopicRegion.getDestinations(destination);
136        default:
137            return Collections.emptySet();
138        }
139    }
140
141    @Override
142    @SuppressWarnings("rawtypes")
143    public Broker getAdaptor(Class type) {
144        if (type.isInstance(this)) {
145            return this;
146        }
147        return null;
148    }
149
150    public Region getQueueRegion() {
151        return queueRegion;
152    }
153
154    public Region getTempQueueRegion() {
155        return tempQueueRegion;
156    }
157
158    public Region getTempTopicRegion() {
159        return tempTopicRegion;
160    }
161
162    public Region getTopicRegion() {
163        return topicRegion;
164    }
165
166    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
167        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
168    }
169
170    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
171        return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
172    }
173
174    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
175        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
176    }
177
178    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
179        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
180    }
181
182    @Override
183    public void start() throws Exception {
184        ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
185        started = true;
186        queueRegion.start();
187        topicRegion.start();
188        tempQueueRegion.start();
189        tempTopicRegion.start();
190        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
191        if (period > 0) {
192            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
193        }
194    }
195
196    @Override
197    public void stop() throws Exception {
198        started = false;
199        this.scheduler.cancel(purgeInactiveDestinationsTask);
200        ServiceStopper ss = new ServiceStopper();
201        doStop(ss);
202        ss.throwFirstException();
203        // clear the state
204        clientIdSet.clear();
205        connections.clear();
206        destinations.clear();
207        brokerInfos.clear();
208    }
209
210    public PolicyMap getDestinationPolicy() {
211        return brokerService != null ? brokerService.getDestinationPolicy() : null;
212    }
213
214    @Override
215    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
216        String clientId = info.getClientId();
217        if (clientId == null) {
218            throw new InvalidClientIDException("No clientID specified for connection request");
219        }
220        synchronized (clientIdSet) {
221            ConnectionContext oldContext = clientIdSet.get(clientId);
222            if (oldContext != null) {
223                throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
224                                                   + oldContext.getConnection().getRemoteAddress());
225            } else {
226                clientIdSet.put(clientId, context);
227            }
228        }
229
230        connections.add(context.getConnection());
231    }
232
233    @Override
234    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
235        String clientId = info.getClientId();
236        if (clientId == null) {
237            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
238        }
239        synchronized (clientIdSet) {
240            ConnectionContext oldValue = clientIdSet.get(clientId);
241            // we may be removing the duplicate connection, not the first
242            // connection to be created
243            // so lets check that their connection IDs are the same
244            if (oldValue == context) {
245                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
246                    clientIdSet.remove(clientId);
247                }
248            }
249        }
250        connections.remove(context.getConnection());
251    }
252
253    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
254        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
255    }
256
257    @Override
258    public Connection[] getClients() throws Exception {
259        ArrayList<Connection> l = new ArrayList<Connection>(connections);
260        Connection rc[] = new Connection[l.size()];
261        l.toArray(rc);
262        return rc;
263    }
264
265    @Override
266    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
267
268        Destination answer;
269
270        answer = destinations.get(destination);
271        if (answer != null) {
272            return answer;
273        }
274
275     synchronized (destinations) {
276        answer = destinations.get(destination);
277        if (answer != null) {
278            return answer;
279        }
280
281        switch (destination.getDestinationType()) {
282        case ActiveMQDestination.QUEUE_TYPE:
283            answer = queueRegion.addDestination(context, destination,true);
284            break;
285        case ActiveMQDestination.TOPIC_TYPE:
286            answer = topicRegion.addDestination(context, destination,true);
287            break;
288        case ActiveMQDestination.TEMP_QUEUE_TYPE:
289            answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
290            break;
291        case ActiveMQDestination.TEMP_TOPIC_TYPE:
292            answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
293            break;
294        default:
295            throw createUnknownDestinationTypeException(destination);
296        }
297
298        destinations.put(destination, answer);
299        return answer;
300     }
301
302    }
303
304    @Override
305    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
306
307        if (destinations.containsKey(destination)) {
308            switch (destination.getDestinationType()) {
309            case ActiveMQDestination.QUEUE_TYPE:
310                queueRegion.removeDestination(context, destination, timeout);
311                break;
312            case ActiveMQDestination.TOPIC_TYPE:
313                topicRegion.removeDestination(context, destination, timeout);
314                break;
315            case ActiveMQDestination.TEMP_QUEUE_TYPE:
316                tempQueueRegion.removeDestination(context, destination, timeout);
317                break;
318            case ActiveMQDestination.TEMP_TOPIC_TYPE:
319                tempTopicRegion.removeDestination(context, destination, timeout);
320                break;
321            default:
322                throw createUnknownDestinationTypeException(destination);
323            }
324            destinations.remove(destination);
325
326        }
327
328    }
329
330    @Override
331    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
332        addDestination(context, info.getDestination(),true);
333
334    }
335
336    @Override
337    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
338        removeDestination(context, info.getDestination(), info.getTimeout());
339
340    }
341
342    @Override
343    public ActiveMQDestination[] getDestinations() throws Exception {
344        ArrayList<ActiveMQDestination> l;
345
346        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
347
348        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
349        l.toArray(rc);
350        return rc;
351    }
352
353    @Override
354    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
355        ActiveMQDestination destination = info.getDestination();
356        if (destination != null) {
357            inactiveDestinationsPurgeLock.readLock().lock();
358            try {
359                // This seems to cause the destination to be added but without
360                // advisories firing...
361                context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
362                switch (destination.getDestinationType()) {
363                case ActiveMQDestination.QUEUE_TYPE:
364                    queueRegion.addProducer(context, info);
365                    break;
366                case ActiveMQDestination.TOPIC_TYPE:
367                    topicRegion.addProducer(context, info);
368                    break;
369                case ActiveMQDestination.TEMP_QUEUE_TYPE:
370                    tempQueueRegion.addProducer(context, info);
371                    break;
372                case ActiveMQDestination.TEMP_TOPIC_TYPE:
373                    tempTopicRegion.addProducer(context, info);
374                    break;
375                }
376            } finally {
377                inactiveDestinationsPurgeLock.readLock().unlock();
378            }
379        }
380    }
381
382    @Override
383    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
384        ActiveMQDestination destination = info.getDestination();
385        if (destination != null) {
386            inactiveDestinationsPurgeLock.readLock().lock();
387            try {
388                switch (destination.getDestinationType()) {
389                case ActiveMQDestination.QUEUE_TYPE:
390                    queueRegion.removeProducer(context, info);
391                    break;
392                case ActiveMQDestination.TOPIC_TYPE:
393                    topicRegion.removeProducer(context, info);
394                    break;
395                case ActiveMQDestination.TEMP_QUEUE_TYPE:
396                    tempQueueRegion.removeProducer(context, info);
397                    break;
398                case ActiveMQDestination.TEMP_TOPIC_TYPE:
399                    tempTopicRegion.removeProducer(context, info);
400                    break;
401                }
402            } finally {
403                inactiveDestinationsPurgeLock.readLock().unlock();
404            }
405        }
406    }
407
408    @Override
409    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
410        ActiveMQDestination destination = info.getDestination();
411        if (destinationInterceptor != null) {
412            destinationInterceptor.create(this, context, destination);
413        }
414        inactiveDestinationsPurgeLock.readLock().lock();
415        try {
416            switch (destination.getDestinationType()) {
417            case ActiveMQDestination.QUEUE_TYPE:
418                return queueRegion.addConsumer(context, info);
419
420            case ActiveMQDestination.TOPIC_TYPE:
421                return topicRegion.addConsumer(context, info);
422
423            case ActiveMQDestination.TEMP_QUEUE_TYPE:
424                return tempQueueRegion.addConsumer(context, info);
425
426            case ActiveMQDestination.TEMP_TOPIC_TYPE:
427                return tempTopicRegion.addConsumer(context, info);
428
429            default:
430                throw createUnknownDestinationTypeException(destination);
431            }
432        } finally {
433            inactiveDestinationsPurgeLock.readLock().unlock();
434        }
435    }
436
437    @Override
438    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
439        ActiveMQDestination destination = info.getDestination();
440        inactiveDestinationsPurgeLock.readLock().lock();
441        try {
442            switch (destination.getDestinationType()) {
443
444            case ActiveMQDestination.QUEUE_TYPE:
445                queueRegion.removeConsumer(context, info);
446                break;
447            case ActiveMQDestination.TOPIC_TYPE:
448                topicRegion.removeConsumer(context, info);
449                break;
450            case ActiveMQDestination.TEMP_QUEUE_TYPE:
451                tempQueueRegion.removeConsumer(context, info);
452                break;
453            case ActiveMQDestination.TEMP_TOPIC_TYPE:
454                tempTopicRegion.removeConsumer(context, info);
455                break;
456            default:
457                throw createUnknownDestinationTypeException(destination);
458            }
459        } finally {
460            inactiveDestinationsPurgeLock.readLock().unlock();
461        }
462    }
463
464    @Override
465    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
466        inactiveDestinationsPurgeLock.readLock().lock();
467        try {
468            topicRegion.removeSubscription(context, info);
469        } finally {
470            inactiveDestinationsPurgeLock.readLock().unlock();
471        }
472    }
473
474    @Override
475    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
476        message.setBrokerInTime(System.currentTimeMillis());
477        if (producerExchange.isMutable() || producerExchange.getRegion() == null
478                || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
479            ActiveMQDestination destination = message.getDestination();
480            // ensure the destination is registered with the RegionBroker
481            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
482            Region region;
483            switch (destination.getDestinationType()) {
484            case ActiveMQDestination.QUEUE_TYPE:
485                region = queueRegion;
486                break;
487            case ActiveMQDestination.TOPIC_TYPE:
488                region = topicRegion;
489                break;
490            case ActiveMQDestination.TEMP_QUEUE_TYPE:
491                region = tempQueueRegion;
492                break;
493            case ActiveMQDestination.TEMP_TOPIC_TYPE:
494                region = tempTopicRegion;
495                break;
496            default:
497                throw createUnknownDestinationTypeException(destination);
498            }
499            producerExchange.setRegion(region);
500            producerExchange.setRegionDestination(null);
501        }
502
503        producerExchange.getRegion().send(producerExchange, message);
504    }
505
506    @Override
507    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
508        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
509            ActiveMQDestination destination = ack.getDestination();
510            Region region;
511            switch (destination.getDestinationType()) {
512            case ActiveMQDestination.QUEUE_TYPE:
513                region = queueRegion;
514                break;
515            case ActiveMQDestination.TOPIC_TYPE:
516                region = topicRegion;
517                break;
518            case ActiveMQDestination.TEMP_QUEUE_TYPE:
519                region = tempQueueRegion;
520                break;
521            case ActiveMQDestination.TEMP_TOPIC_TYPE:
522                region = tempTopicRegion;
523                break;
524            default:
525                throw createUnknownDestinationTypeException(destination);
526            }
527            consumerExchange.setRegion(region);
528        }
529        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
530    }
531
532    @Override
533    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
534        ActiveMQDestination destination = pull.getDestination();
535        switch (destination.getDestinationType()) {
536        case ActiveMQDestination.QUEUE_TYPE:
537            return queueRegion.messagePull(context, pull);
538
539        case ActiveMQDestination.TOPIC_TYPE:
540            return topicRegion.messagePull(context, pull);
541
542        case ActiveMQDestination.TEMP_QUEUE_TYPE:
543            return tempQueueRegion.messagePull(context, pull);
544
545        case ActiveMQDestination.TEMP_TOPIC_TYPE:
546            return tempTopicRegion.messagePull(context, pull);
547        default:
548            throw createUnknownDestinationTypeException(destination);
549        }
550    }
551
552    @Override
553    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
554        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
555    }
556
557    @Override
558    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
559        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
560    }
561
562    @Override
563    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
564        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
565    }
566
567    @Override
568    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
569        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
570    }
571
572    @Override
573    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
574        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575    }
576
577    @Override
578    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
579        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580    }
581
582    @Override
583    public void gc() {
584        queueRegion.gc();
585        topicRegion.gc();
586    }
587
588    @Override
589    public BrokerId getBrokerId() {
590        if (brokerId == null) {
591            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
592        }
593        return brokerId;
594    }
595
596    public void setBrokerId(BrokerId brokerId) {
597        this.brokerId = brokerId;
598    }
599
600    @Override
601    public String getBrokerName() {
602        if (brokerName == null) {
603            try {
604                brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
605            } catch (Exception e) {
606                brokerName = "localhost";
607            }
608        }
609        return brokerName;
610    }
611
612    public void setBrokerName(String brokerName) {
613        this.brokerName = brokerName;
614    }
615
616    public DestinationStatistics getDestinationStatistics() {
617        return destinationStatistics;
618    }
619
620    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
621        return new JMSException("Unknown destination type: " + destination.getDestinationType());
622    }
623
624    @Override
625    public synchronized void addBroker(Connection connection, BrokerInfo info) {
626        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
627        if (existing == null) {
628            existing = info.copy();
629            existing.setPeerBrokerInfos(null);
630            brokerInfos.put(info.getBrokerId(), existing);
631        }
632        existing.incrementRefCount();
633        if (LOG.isDebugEnabled()) {
634            LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
635        }
636        addBrokerInClusterUpdate(info);
637    }
638
639    @Override
640    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
641        if (info != null) {
642            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
643            if (existing != null && existing.decrementRefCount() == 0) {
644               brokerInfos.remove(info.getBrokerId());
645            }
646            if (LOG.isDebugEnabled()) {
647                LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
648            }
649            removeBrokerInClusterUpdate(info);
650        }
651    }
652
653    @Override
654    public synchronized BrokerInfo[] getPeerBrokerInfos() {
655        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
656        result = brokerInfos.values().toArray(result);
657        return result;
658    }
659
660    @Override
661    public void preProcessDispatch(MessageDispatch messageDispatch) {
662        Message message = messageDispatch.getMessage();
663        if (message != null) {
664            long endTime = System.currentTimeMillis();
665            message.setBrokerOutTime(endTime);
666            if (getBrokerService().isEnableStatistics()) {
667                long totalTime = endTime - message.getBrokerInTime();
668                message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
669            }
670        }
671    }
672
673    @Override
674    public void postProcessDispatch(MessageDispatch messageDispatch) {
675    }
676
677    @Override
678    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
679        ActiveMQDestination destination = messageDispatchNotification.getDestination();
680        switch (destination.getDestinationType()) {
681        case ActiveMQDestination.QUEUE_TYPE:
682            queueRegion.processDispatchNotification(messageDispatchNotification);
683            break;
684        case ActiveMQDestination.TOPIC_TYPE:
685            topicRegion.processDispatchNotification(messageDispatchNotification);
686            break;
687        case ActiveMQDestination.TEMP_QUEUE_TYPE:
688            tempQueueRegion.processDispatchNotification(messageDispatchNotification);
689            break;
690        case ActiveMQDestination.TEMP_TOPIC_TYPE:
691            tempTopicRegion.processDispatchNotification(messageDispatchNotification);
692            break;
693        default:
694            throw createUnknownDestinationTypeException(destination);
695        }
696    }
697
698    public boolean isSlaveBroker() {
699        return brokerService.isSlave();
700    }
701
702    @Override
703    public boolean isStopped() {
704        return !started;
705    }
706
707    @Override
708    public Set<ActiveMQDestination> getDurableDestinations() {
709        return destinationFactory.getDestinations();
710    }
711
712    protected void doStop(ServiceStopper ss) {
713        ss.stop(queueRegion);
714        ss.stop(topicRegion);
715        ss.stop(tempQueueRegion);
716        ss.stop(tempTopicRegion);
717    }
718
719    public boolean isKeepDurableSubsActive() {
720        return keepDurableSubsActive;
721    }
722
723    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
724        this.keepDurableSubsActive = keepDurableSubsActive;
725    }
726
727    public DestinationInterceptor getDestinationInterceptor() {
728        return destinationInterceptor;
729    }
730
731    @Override
732    public ConnectionContext getAdminConnectionContext() {
733        return adminConnectionContext;
734    }
735
736    @Override
737    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
738        this.adminConnectionContext = adminConnectionContext;
739    }
740
741    public Map<ConnectionId, ConnectionState> getConnectionStates() {
742        return connectionStates;
743    }
744
745    @Override
746    public PListStore getTempDataStore() {
747        return brokerService.getTempDataStore();
748    }
749
750    @Override
751    public URI getVmConnectorURI() {
752        return brokerService.getVmConnectorURI();
753    }
754
755    @Override
756    public void brokerServiceStarted() {
757    }
758
759    @Override
760    public BrokerService getBrokerService() {
761        return brokerService;
762    }
763
764    @Override
765    public boolean isExpired(MessageReference messageReference) {
766        boolean expired = false;
767        if (messageReference.isExpired()) {
768            try {
769                // prevent duplicate expiry processing
770                Message message = messageReference.getMessage();
771                synchronized (message) {
772                    expired = stampAsExpired(message);
773                }
774            } catch (IOException e) {
775                LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
776            }
777        }
778        return expired;
779    }
780
781    private boolean stampAsExpired(Message message) throws IOException {
782        boolean stamped=false;
783        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
784            long expiration=message.getExpiration();
785            message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
786            stamped = true;
787        }
788        return stamped;
789    }
790
791
792    @Override
793    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
794        if (LOG.isDebugEnabled()) {
795            LOG.debug("Message expired " + node);
796        }
797        getRoot().sendToDeadLetterQueue(context, node, subscription);
798    }
799
800    @Override
801    public void sendToDeadLetterQueue(ConnectionContext context,
802            MessageReference node, Subscription subscription){
803        try{
804            if(node!=null){
805                Message message=node.getMessage();
806                if(message!=null && node.getRegionDestination()!=null){
807                    DeadLetterStrategy deadLetterStrategy=node
808                            .getRegionDestination().getDeadLetterStrategy();
809                    if(deadLetterStrategy!=null){
810                        if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
811                            // message may be inflight to other subscriptions so do not modify
812                            message = message.copy();
813                            stampAsExpired(message);
814                            message.setExpiration(0);
815                            if(!message.isPersistent()){
816                                message.setPersistent(true);
817                                message.setProperty("originalDeliveryMode",
818                                        "NON_PERSISTENT");
819                            }
820                            // The original destination and transaction id do
821                            // not get filled when the message is first sent,
822                            // it is only populated if the message is routed to
823                            // another destination like the DLQ
824                            ActiveMQDestination deadLetterDestination=deadLetterStrategy
825                                    .getDeadLetterQueueFor(message, subscription);
826                            if (context.getBroker()==null) {
827                                context.setBroker(getRoot());
828                            }
829                            BrokerSupport.resendNoCopy(context,message,
830                                    deadLetterDestination);
831                        }
832                    } else {
833                        if (LOG.isDebugEnabled()) {
834                            LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
835                                    + message.getMessageId() + ", destination: " + message.getDestination());
836                        }
837                    }
838                }
839            }
840        }catch(Exception e){
841            LOG.warn("Caught an exception sending to DLQ: "+node,e);
842        }
843    }
844
845    @Override
846    public Broker getRoot() {
847        try {
848            return getBrokerService().getBroker();
849        } catch (Exception e) {
850            LOG.error("Trying to get Root Broker " + e);
851            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
852        }
853    }
854
855    /**
856     * @return the broker sequence id
857     */
858    @Override
859    public long getBrokerSequenceId() {
860        synchronized(sequenceGenerator) {
861            return sequenceGenerator.getNextSequenceId();
862        }
863    }
864
865
866    @Override
867    public Scheduler getScheduler() {
868        return this.scheduler;
869    }
870
871    public ThreadPoolExecutor getExecutor() {
872        return this.executor;
873    }
874
875    @Override
876    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
877        ActiveMQDestination destination = control.getDestination();
878        switch (destination.getDestinationType()) {
879        case ActiveMQDestination.QUEUE_TYPE:
880            queueRegion.processConsumerControl(consumerExchange, control);
881            break;
882
883        case ActiveMQDestination.TOPIC_TYPE:
884            topicRegion.processConsumerControl(consumerExchange, control);
885            break;
886
887        case ActiveMQDestination.TEMP_QUEUE_TYPE:
888            tempQueueRegion.processConsumerControl(consumerExchange, control);
889            break;
890
891        case ActiveMQDestination.TEMP_TOPIC_TYPE:
892            tempTopicRegion.processConsumerControl(consumerExchange, control);
893            break;
894
895        default:
896            LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
897        }
898    }
899
900    protected void addBrokerInClusterUpdate(BrokerInfo info) {
901        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
902        for (TransportConnector connector : connectors) {
903            if (connector.isUpdateClusterClients()) {
904                connector.addPeerBroker(info);
905                connector.updateClientClusterInfo();
906            }
907        }
908    }
909
910    protected void removeBrokerInClusterUpdate(BrokerInfo info) {
911        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
912        for (TransportConnector connector : connectors) {
913            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
914                connector.removePeerBroker(info);
915                connector.updateClientClusterInfo();
916            }
917        }
918    }
919
920    protected void purgeInactiveDestinations() {
921        inactiveDestinationsPurgeLock.writeLock().lock();
922        try {
923            List<Destination> list = new ArrayList<Destination>();
924            Map<ActiveMQDestination, Destination> map = getDestinationMap();
925            if (isAllowTempAutoCreationOnSend()) {
926                map.putAll(tempQueueRegion.getDestinationMap());
927                map.putAll(tempTopicRegion.getDestinationMap());
928            }
929            long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
930            long timeStamp = System.currentTimeMillis();
931            for (Destination d : map.values()) {
932                d.markForGC(timeStamp);
933                if (d.canGC()) {
934                    list.add(d);
935                    if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
936                        break;
937                    }
938                }
939            }
940
941            if (!list.isEmpty()) {
942                ConnectionContext context = BrokerSupport.getConnectionContext(this);
943                context.setBroker(this);
944
945                for (Destination dest : list) {
946                    Logger log = LOG;
947                    if (dest instanceof BaseDestination) {
948                        log = ((BaseDestination) dest).getLog();
949                    }
950                    log.info(dest.getName() + " Inactive for longer than " +
951                             dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
952                    try {
953                        getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
954                    } catch (Exception e) {
955                        LOG.error("Failed to remove inactive destination " + dest, e);
956                    }
957                }
958            }
959        } finally {
960            inactiveDestinationsPurgeLock.writeLock().unlock();
961        }
962    }
963
964    public boolean isAllowTempAutoCreationOnSend() {
965        return allowTempAutoCreationOnSend;
966    }
967
968    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
969        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
970    }
971}