001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.InstanceNotFoundException;
054import javax.management.MalformedObjectNameException;
055import javax.management.ObjectName;
056
057import org.apache.activemq.ActiveMQConnectionMetaData;
058import org.apache.activemq.ConfigurationException;
059import org.apache.activemq.Service;
060import org.apache.activemq.advisory.AdvisoryBroker;
061import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
062import org.apache.activemq.broker.jmx.AnnotatedMBean;
063import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
064import org.apache.activemq.broker.jmx.BrokerView;
065import org.apache.activemq.broker.jmx.ConnectorView;
066import org.apache.activemq.broker.jmx.ConnectorViewMBean;
067import org.apache.activemq.broker.jmx.HealthView;
068import org.apache.activemq.broker.jmx.HealthViewMBean;
069import org.apache.activemq.broker.jmx.JmsConnectorView;
070import org.apache.activemq.broker.jmx.JobSchedulerView;
071import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
072import org.apache.activemq.broker.jmx.Log4JConfigView;
073import org.apache.activemq.broker.jmx.ManagedRegionBroker;
074import org.apache.activemq.broker.jmx.ManagementContext;
075import org.apache.activemq.broker.jmx.NetworkConnectorView;
076import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
077import org.apache.activemq.broker.jmx.ProxyConnectorView;
078import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
079import org.apache.activemq.broker.region.Destination;
080import org.apache.activemq.broker.region.DestinationFactory;
081import org.apache.activemq.broker.region.DestinationFactoryImpl;
082import org.apache.activemq.broker.region.DestinationInterceptor;
083import org.apache.activemq.broker.region.RegionBroker;
084import org.apache.activemq.broker.region.policy.PolicyMap;
085import org.apache.activemq.broker.region.virtual.MirroredQueue;
086import org.apache.activemq.broker.region.virtual.VirtualDestination;
087import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
088import org.apache.activemq.broker.region.virtual.VirtualTopic;
089import org.apache.activemq.broker.scheduler.JobSchedulerStore;
090import org.apache.activemq.broker.scheduler.SchedulerBroker;
091import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
092import org.apache.activemq.command.ActiveMQDestination;
093import org.apache.activemq.command.ActiveMQQueue;
094import org.apache.activemq.command.BrokerId;
095import org.apache.activemq.command.ProducerInfo;
096import org.apache.activemq.filter.DestinationFilter;
097import org.apache.activemq.network.ConnectionFilter;
098import org.apache.activemq.network.DiscoveryNetworkConnector;
099import org.apache.activemq.network.NetworkConnector;
100import org.apache.activemq.network.jms.JmsConnector;
101import org.apache.activemq.openwire.OpenWireFormat;
102import org.apache.activemq.proxy.ProxyConnector;
103import org.apache.activemq.security.MessageAuthorizationPolicy;
104import org.apache.activemq.selector.SelectorParser;
105import org.apache.activemq.store.JournaledStore;
106import org.apache.activemq.store.PListStore;
107import org.apache.activemq.store.PersistenceAdapter;
108import org.apache.activemq.store.PersistenceAdapterFactory;
109import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
110import org.apache.activemq.thread.Scheduler;
111import org.apache.activemq.thread.TaskRunnerFactory;
112import org.apache.activemq.transport.TransportFactorySupport;
113import org.apache.activemq.transport.TransportServer;
114import org.apache.activemq.transport.vm.VMTransportFactory;
115import org.apache.activemq.usage.StoreUsage;
116import org.apache.activemq.usage.SystemUsage;
117import org.apache.activemq.usage.Usage;
118import org.apache.activemq.util.BrokerSupport;
119import org.apache.activemq.util.DefaultIOExceptionHandler;
120import org.apache.activemq.util.IOExceptionHandler;
121import org.apache.activemq.util.IOExceptionSupport;
122import org.apache.activemq.util.IOHelper;
123import org.apache.activemq.util.InetAddressUtil;
124import org.apache.activemq.util.ServiceStopper;
125import org.apache.activemq.util.StoreUtil;
126import org.apache.activemq.util.ThreadPoolUtils;
127import org.apache.activemq.util.TimeUtils;
128import org.apache.activemq.util.URISupport;
129import org.slf4j.Logger;
130import org.slf4j.LoggerFactory;
131import org.slf4j.MDC;
132
133/**
134 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
135 * number of transport connectors, network connectors and a bunch of properties
136 * which can be used to configure the broker as its lazily created.
137 *
138 * @org.apache.xbean.XBean
139 */
140public class BrokerService implements Service {
141    public static final String DEFAULT_PORT = "61616";
142    public static final String LOCAL_HOST_NAME;
143    public static final String BROKER_VERSION;
144    public static final String DEFAULT_BROKER_NAME = "localhost";
145    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
146    public static final long DEFAULT_START_TIMEOUT = 600000L;
147
148    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
149
150    @SuppressWarnings("unused")
151    private static final long serialVersionUID = 7353129142305630237L;
152
153    private boolean useJmx = true;
154    private boolean enableStatistics = true;
155    private boolean persistent = true;
156    private boolean populateJMSXUserID;
157    private boolean useAuthenticatedPrincipalForJMSXUserID;
158    private boolean populateUserNameInMBeans;
159    private long mbeanInvocationTimeout = 0;
160
161    private boolean useShutdownHook = true;
162    private boolean useLoggingForShutdownErrors;
163    private boolean shutdownOnMasterFailure;
164    private boolean shutdownOnSlaveFailure;
165    private boolean waitForSlave;
166    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
167    private boolean passiveSlave;
168    private String brokerName = DEFAULT_BROKER_NAME;
169    private File dataDirectoryFile;
170    private File tmpDataDirectory;
171    private Broker broker;
172    private BrokerView adminView;
173    private ManagementContext managementContext;
174    private ObjectName brokerObjectName;
175    private TaskRunnerFactory taskRunnerFactory;
176    private TaskRunnerFactory persistenceTaskRunnerFactory;
177    private SystemUsage systemUsage;
178    private SystemUsage producerSystemUsage;
179    private SystemUsage consumerSystemUsaage;
180    private PersistenceAdapter persistenceAdapter;
181    private PersistenceAdapterFactory persistenceFactory;
182    protected DestinationFactory destinationFactory;
183    private MessageAuthorizationPolicy messageAuthorizationPolicy;
184    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
185    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
186    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
187    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
188    private final List<Service> services = new ArrayList<Service>();
189    private transient Thread shutdownHook;
190    private String[] transportConnectorURIs;
191    private String[] networkConnectorURIs;
192    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
193    // to other jms messaging systems
194    private boolean deleteAllMessagesOnStartup;
195    private boolean advisorySupport = true;
196    private URI vmConnectorURI;
197    private String defaultSocketURIString;
198    private PolicyMap destinationPolicy;
199    private final AtomicBoolean started = new AtomicBoolean(false);
200    private final AtomicBoolean stopped = new AtomicBoolean(false);
201    private final AtomicBoolean stopping = new AtomicBoolean(false);
202    private BrokerPlugin[] plugins;
203    private boolean keepDurableSubsActive = true;
204    private boolean useVirtualTopics = true;
205    private boolean useMirroredQueues = false;
206    private boolean useTempMirroredQueues = true;
207    /**
208     * Whether or not virtual destination subscriptions should cause network demand
209     */
210    private boolean useVirtualDestSubs = false;
211    /**
212     * Whether or not the creation of destinations that match virtual destinations
213     * should cause network demand
214     */
215    private boolean useVirtualDestSubsOnCreation = false;
216    private BrokerId brokerId;
217    private volatile DestinationInterceptor[] destinationInterceptors;
218    private ActiveMQDestination[] destinations;
219    private PListStore tempDataStore;
220    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
221    private boolean useLocalHostBrokerName;
222    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
223    private final CountDownLatch startedLatch = new CountDownLatch(1);
224    private Broker regionBroker;
225    private int producerSystemUsagePortion = 60;
226    private int consumerSystemUsagePortion = 40;
227    private boolean splitSystemUsageForProducersConsumers;
228    private boolean monitorConnectionSplits = false;
229    private int taskRunnerPriority = Thread.NORM_PRIORITY;
230    private boolean dedicatedTaskRunner;
231    private boolean cacheTempDestinations = false;// useful for failover
232    private int timeBeforePurgeTempDestinations = 5000;
233    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
234    private boolean systemExitOnShutdown;
235    private int systemExitOnShutdownExitCode;
236    private SslContext sslContext;
237    private boolean forceStart = false;
238    private IOExceptionHandler ioExceptionHandler;
239    private boolean schedulerSupport = false;
240    private File schedulerDirectoryFile;
241    private Scheduler scheduler;
242    private ThreadPoolExecutor executor;
243    private int schedulePeriodForDestinationPurge= 0;
244    private int maxPurgedDestinationsPerSweep = 0;
245    private int schedulePeriodForDiskUsageCheck = 0;
246    private int diskUsageCheckRegrowThreshold = -1;
247    private boolean adjustUsageLimits = true;
248    private BrokerContext brokerContext;
249    private boolean networkConnectorStartAsync = false;
250    private boolean allowTempAutoCreationOnSend;
251    private JobSchedulerStore jobSchedulerStore;
252    private final AtomicLong totalConnections = new AtomicLong();
253    private final AtomicInteger currentConnections = new AtomicInteger();
254
255    private long offlineDurableSubscriberTimeout = -1;
256    private long offlineDurableSubscriberTaskSchedule = 300000;
257    private DestinationFilter virtualConsumerDestinationFilter;
258
259    private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false);
260    private Throwable startException = null;
261    private boolean startAsync = false;
262    private Date startDate;
263    private boolean slave = true;
264
265    private boolean restartAllowed = true;
266    private boolean restartRequested = false;
267    private boolean rejectDurableConsumers = false;
268    private boolean rollbackOnlyOnAsyncException = true;
269
270    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
271
272    static {
273
274        try {
275            ClassLoader loader = BrokerService.class.getClassLoader();
276            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
277            Provider bouncycastle = (Provider) clazz.newInstance();
278            Security.insertProviderAt(bouncycastle,
279                Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2));
280            LOG.info("Loaded the Bouncy Castle security provider.");
281        } catch(Throwable e) {
282            // No BouncyCastle found so we use the default Java Security Provider
283        }
284
285        String localHostName = "localhost";
286        try {
287            localHostName =  InetAddressUtil.getLocalHostName();
288        } catch (UnknownHostException e) {
289            LOG.error("Failed to resolve localhost");
290        }
291        LOCAL_HOST_NAME = localHostName;
292
293        String version = null;
294        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
295            if (in != null) {
296                try(InputStreamReader isr = new InputStreamReader(in);
297                    BufferedReader reader = new BufferedReader(isr)) {
298                    version = reader.readLine();
299                }
300            }
301        } catch (IOException ie) {
302            LOG.warn("Error reading broker version ", ie);
303        }
304        BROKER_VERSION = version;
305    }
306
307    @Override
308    public String toString() {
309        return "BrokerService[" + getBrokerName() + "]";
310    }
311
312    private String getBrokerVersion() {
313        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
314        if (version == null) {
315            version = BROKER_VERSION;
316        }
317
318        return version;
319    }
320
321    /**
322     * Adds a new transport connector for the given bind address
323     *
324     * @return the newly created and added transport connector
325     * @throws Exception
326     */
327    public TransportConnector addConnector(String bindAddress) throws Exception {
328        return addConnector(new URI(bindAddress));
329    }
330
331    /**
332     * Adds a new transport connector for the given bind address
333     *
334     * @return the newly created and added transport connector
335     * @throws Exception
336     */
337    public TransportConnector addConnector(URI bindAddress) throws Exception {
338        return addConnector(createTransportConnector(bindAddress));
339    }
340
341    /**
342     * Adds a new transport connector for the given TransportServer transport
343     *
344     * @return the newly created and added transport connector
345     * @throws Exception
346     */
347    public TransportConnector addConnector(TransportServer transport) throws Exception {
348        return addConnector(new TransportConnector(transport));
349    }
350
351    /**
352     * Adds a new transport connector
353     *
354     * @return the transport connector
355     * @throws Exception
356     */
357    public TransportConnector addConnector(TransportConnector connector) throws Exception {
358        transportConnectors.add(connector);
359        return connector;
360    }
361
362    /**
363     * Stops and removes a transport connector from the broker.
364     *
365     * @param connector
366     * @return true if the connector has been previously added to the broker
367     * @throws Exception
368     */
369    public boolean removeConnector(TransportConnector connector) throws Exception {
370        boolean rc = transportConnectors.remove(connector);
371        if (rc) {
372            unregisterConnectorMBean(connector);
373        }
374        return rc;
375    }
376
377    /**
378     * Adds a new network connector using the given discovery address
379     *
380     * @return the newly created and added network connector
381     * @throws Exception
382     */
383    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
384        return addNetworkConnector(new URI(discoveryAddress));
385    }
386
387    /**
388     * Adds a new proxy connector using the given bind address
389     *
390     * @return the newly created and added network connector
391     * @throws Exception
392     */
393    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
394        return addProxyConnector(new URI(bindAddress));
395    }
396
397    /**
398     * Adds a new network connector using the given discovery address
399     *
400     * @return the newly created and added network connector
401     * @throws Exception
402     */
403    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
404        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
405        return addNetworkConnector(connector);
406    }
407
408    /**
409     * Adds a new proxy connector using the given bind address
410     *
411     * @return the newly created and added network connector
412     * @throws Exception
413     */
414    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
415        ProxyConnector connector = new ProxyConnector();
416        connector.setBind(bindAddress);
417        connector.setRemote(new URI("fanout:multicast://default"));
418        return addProxyConnector(connector);
419    }
420
421    /**
422     * Adds a new network connector to connect this broker to a federated
423     * network
424     */
425    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
426        connector.setBrokerService(this);
427        connector.setLocalUri(getVmConnectorURI());
428        // Set a connection filter so that the connector does not establish loop
429        // back connections.
430        connector.setConnectionFilter(new ConnectionFilter() {
431            @Override
432            public boolean connectTo(URI location) {
433                List<TransportConnector> transportConnectors = getTransportConnectors();
434                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
435                    try {
436                        TransportConnector tc = iter.next();
437                        if (location.equals(tc.getConnectUri())) {
438                            return false;
439                        }
440                    } catch (Throwable e) {
441                    }
442                }
443                return true;
444            }
445        });
446        networkConnectors.add(connector);
447        return connector;
448    }
449
450    /**
451     * Removes the given network connector without stopping it. The caller
452     * should call {@link NetworkConnector#stop()} to close the connector
453     */
454    public boolean removeNetworkConnector(NetworkConnector connector) {
455        boolean answer = networkConnectors.remove(connector);
456        if (answer) {
457            unregisterNetworkConnectorMBean(connector);
458        }
459        return answer;
460    }
461
462    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
463        URI uri = getVmConnectorURI();
464        connector.setLocalUri(uri);
465        proxyConnectors.add(connector);
466        if (isUseJmx()) {
467            registerProxyConnectorMBean(connector);
468        }
469        return connector;
470    }
471
472    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
473        connector.setBrokerService(this);
474        jmsConnectors.add(connector);
475        if (isUseJmx()) {
476            registerJmsConnectorMBean(connector);
477        }
478        return connector;
479    }
480
481    public JmsConnector removeJmsConnector(JmsConnector connector) {
482        if (jmsConnectors.remove(connector)) {
483            return connector;
484        }
485        return null;
486    }
487
488    public void masterFailed() {
489        if (shutdownOnMasterFailure) {
490            LOG.error("The Master has failed ... shutting down");
491            try {
492                stop();
493            } catch (Exception e) {
494                LOG.error("Failed to stop for master failure", e);
495            }
496        } else {
497            LOG.warn("Master Failed - starting all connectors");
498            try {
499                startAllConnectors();
500                broker.nowMasterBroker();
501            } catch (Exception e) {
502                LOG.error("Failed to startAllConnectors", e);
503            }
504        }
505    }
506
507    public String getUptime() {
508        long delta = getUptimeMillis();
509
510        if (delta == 0) {
511            return "not started";
512        }
513
514        return TimeUtils.printDuration(delta);
515    }
516
517    public long getUptimeMillis() {
518        if (startDate == null) {
519            return 0;
520        }
521
522        return new Date().getTime() - startDate.getTime();
523    }
524
525    public boolean isStarted() {
526        return started.get() && startedLatch.getCount() == 0;
527    }
528
529    /**
530     * Forces a start of the broker.
531     * By default a BrokerService instance that was
532     * previously stopped using BrokerService.stop() cannot be restarted
533     * using BrokerService.start().
534     * This method enforces a restart.
535     * It is not recommended to force a restart of the broker and will not work
536     * for most but some very trivial broker configurations.
537     * For restarting a broker instance we recommend to first call stop() on
538     * the old instance and then recreate a new BrokerService instance.
539     *
540     * @param force - if true enforces a restart.
541     * @throws Exception
542     */
543    public void start(boolean force) throws Exception {
544        forceStart = force;
545        stopped.set(false);
546        started.set(false);
547        start();
548    }
549
550    // Service interface
551    // -------------------------------------------------------------------------
552
553    protected boolean shouldAutostart() {
554        return true;
555    }
556
557    /**
558     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
559     *
560     * delegates to autoStart, done to prevent backwards incompatible signature change
561     */
562    @PostConstruct
563    private void postConstruct() {
564        try {
565            autoStart();
566        } catch (Exception ex) {
567            throw new RuntimeException(ex);
568        }
569    }
570
571    /**
572     *
573     * @throws Exception
574     * @org. apache.xbean.InitMethod
575     */
576    public void autoStart() throws Exception {
577        if(shouldAutostart()) {
578            start();
579        }
580    }
581
582    @Override
583    public void start() throws Exception {
584        if (stopped.get() || !started.compareAndSet(false, true)) {
585            // lets just ignore redundant start() calls
586            // as its way too easy to not be completely sure if start() has been
587            // called or not with the gazillion of different configuration
588            // mechanisms
589            // throw new IllegalStateException("Already started.");
590            return;
591        }
592
593        setStartException(null);
594        stopping.set(false);
595        startDate = new Date();
596        MDC.put("activemq.broker", brokerName);
597
598        try {
599            checkMemorySystemUsageLimits();
600            if (systemExitOnShutdown && useShutdownHook) {
601                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
602            }
603            processHelperProperties();
604            if (isUseJmx()) {
605                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
606                // we cannot cleanup clear that during shutdown of the broker.
607                MDC.remove("activemq.broker");
608                try {
609                    startManagementContext();
610                    for (NetworkConnector connector : getNetworkConnectors()) {
611                        registerNetworkConnectorMBean(connector);
612                    }
613                } finally {
614                    MDC.put("activemq.broker", brokerName);
615                }
616            }
617
618            // in jvm master slave, lets not publish over existing broker till we get the lock
619            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
620            if (brokerRegistry.lookup(getBrokerName()) == null) {
621                brokerRegistry.bind(getBrokerName(), BrokerService.this);
622            }
623            startPersistenceAdapter(startAsync);
624            startBroker(startAsync);
625            brokerRegistry.bind(getBrokerName(), BrokerService.this);
626        } catch (Exception e) {
627            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
628            try {
629                if (!stopped.get()) {
630                    stop();
631                }
632            } catch (Exception ex) {
633                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
634            }
635            throw e;
636        } finally {
637            MDC.remove("activemq.broker");
638        }
639    }
640
641    private void startPersistenceAdapter(boolean async) throws Exception {
642        if (async) {
643            new Thread("Persistence Adapter Starting Thread") {
644                @Override
645                public void run() {
646                    try {
647                        doStartPersistenceAdapter();
648                    } catch (Throwable e) {
649                        setStartException(e);
650                    } finally {
651                        synchronized (persistenceAdapterStarted) {
652                            persistenceAdapterStarted.set(true);
653                            persistenceAdapterStarted.notifyAll();
654                        }
655                    }
656                }
657            }.start();
658        } else {
659            doStartPersistenceAdapter();
660        }
661    }
662
663    private void doStartPersistenceAdapter() throws Exception {
664        PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter();
665        if (persistenceAdapterToStart == null) {
666            checkStartException();
667            throw new ConfigurationException("Cannot start null persistence adapter");
668        }
669        persistenceAdapterToStart.setUsageManager(getProducerSystemUsage());
670        persistenceAdapterToStart.setBrokerName(getBrokerName());
671        LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart);
672        if (deleteAllMessagesOnStartup) {
673            deleteAllMessages();
674        }
675        persistenceAdapterToStart.start();
676
677        getTempDataStore();
678        if (tempDataStore != null) {
679            try {
680                // start after we have the store lock
681                tempDataStore.start();
682            } catch (Exception e) {
683                RuntimeException exception = new RuntimeException(
684                        "Failed to start temp data store: " + tempDataStore, e);
685                LOG.error(exception.getLocalizedMessage(), e);
686                throw exception;
687            }
688        }
689
690        getJobSchedulerStore();
691        if (jobSchedulerStore != null) {
692            try {
693                jobSchedulerStore.start();
694            } catch (Exception e) {
695                RuntimeException exception = new RuntimeException(
696                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
697                LOG.error(exception.getLocalizedMessage(), e);
698                throw exception;
699            }
700        }
701    }
702
703    private void startBroker(boolean async) throws Exception {
704        if (async) {
705            new Thread("Broker Starting Thread") {
706                @Override
707                public void run() {
708                    try {
709                        synchronized (persistenceAdapterStarted) {
710                            if (!persistenceAdapterStarted.get()) {
711                                persistenceAdapterStarted.wait();
712                            }
713                        }
714                        doStartBroker();
715                    } catch (Throwable t) {
716                        setStartException(t);
717                    }
718                }
719            }.start();
720        } else {
721            doStartBroker();
722        }
723    }
724
725    private void doStartBroker() throws Exception {
726        checkStartException();
727        startDestinations();
728        addShutdownHook();
729
730        broker = getBroker();
731        brokerId = broker.getBrokerId();
732
733        // need to log this after creating the broker so we have its id and name
734        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
735        broker.start();
736
737        if (isUseJmx()) {
738            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
739                // try to restart management context
740                // typical for slaves that use the same ports as master
741                managementContext.stop();
742                startManagementContext();
743            }
744            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
745            managedBroker.setContextBroker(broker);
746            adminView.setBroker(managedBroker);
747        }
748
749        if (ioExceptionHandler == null) {
750            setIoExceptionHandler(new DefaultIOExceptionHandler());
751        }
752
753        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
754            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
755            Log4JConfigView log4jConfigView = new Log4JConfigView();
756            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
757        }
758
759        startAllConnectors();
760
761        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
762        LOG.info("For help or more information please see: http://activemq.apache.org");
763
764        getBroker().brokerServiceStarted();
765        checkStoreSystemUsageLimits();
766        startedLatch.countDown();
767        getBroker().nowMasterBroker();
768    }
769
770    /**
771     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
772     *
773     * delegates to stop, done to prevent backwards incompatible signature change
774     */
775    @PreDestroy
776    private void preDestroy () {
777        try {
778            stop();
779        } catch (Exception ex) {
780            throw new RuntimeException();
781        }
782    }
783
784    /**
785     *
786     * @throws Exception
787     * @org.apache .xbean.DestroyMethod
788     */
789    @Override
790    public void stop() throws Exception {
791        if (!stopping.compareAndSet(false, true)) {
792            LOG.trace("Broker already stopping/stopped");
793            return;
794        }
795
796        setStartException(new BrokerStoppedException("Stop invoked"));
797        MDC.put("activemq.broker", brokerName);
798
799        if (systemExitOnShutdown) {
800            new Thread() {
801                @Override
802                public void run() {
803                    System.exit(systemExitOnShutdownExitCode);
804                }
805            }.start();
806        }
807
808        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
809
810        removeShutdownHook();
811        if (this.scheduler != null) {
812            this.scheduler.stop();
813            this.scheduler = null;
814        }
815        ServiceStopper stopper = new ServiceStopper();
816        if (services != null) {
817            for (Service service : services) {
818                stopper.stop(service);
819            }
820        }
821        stopAllConnectors(stopper);
822        this.slave = true;
823        // remove any VMTransports connected
824        // this has to be done after services are stopped,
825        // to avoid timing issue with discovery (spinning up a new instance)
826        BrokerRegistry.getInstance().unbind(getBrokerName());
827        VMTransportFactory.stopped(getBrokerName());
828        if (broker != null) {
829            stopper.stop(broker);
830            broker = null;
831        }
832
833        if (jobSchedulerStore != null) {
834            jobSchedulerStore.stop();
835            jobSchedulerStore = null;
836        }
837        if (tempDataStore != null) {
838            tempDataStore.stop();
839            tempDataStore = null;
840        }
841        try {
842            stopper.stop(getPersistenceAdapter());
843            persistenceAdapter = null;
844            if (isUseJmx()) {
845                stopper.stop(managementContext);
846                managementContext = null;
847            }
848            // Clear SelectorParser cache to free memory
849            SelectorParser.clearCache();
850        } finally {
851            started.set(false);
852            stopped.set(true);
853            stoppedLatch.countDown();
854        }
855
856        if (this.taskRunnerFactory != null) {
857            this.taskRunnerFactory.shutdown();
858            this.taskRunnerFactory = null;
859        }
860        if (this.executor != null) {
861            ThreadPoolUtils.shutdownNow(executor);
862            this.executor = null;
863        }
864
865        this.destinationInterceptors = null;
866        this.destinationFactory = null;
867
868        if (startDate != null) {
869            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
870        }
871        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
872
873        synchronized (shutdownHooks) {
874            for (Runnable hook : shutdownHooks) {
875                try {
876                    hook.run();
877                } catch (Throwable e) {
878                    stopper.onException(hook, e);
879                }
880            }
881        }
882
883        MDC.remove("activemq.broker");
884
885        // and clear start date
886        startDate = null;
887
888        stopper.throwFirstException();
889    }
890
891    public boolean checkQueueSize(String queueName) {
892        long count = 0;
893        long queueSize = 0;
894        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
895        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
896            if (entry.getKey().isQueue()) {
897                if (entry.getValue().getName().matches(queueName)) {
898                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
899                    count += queueSize;
900                    if (queueSize > 0) {
901                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
902                    }
903                }
904            }
905        }
906        return count == 0;
907    }
908
909    /**
910     * This method (both connectorName and queueName are using regex to match)
911     * 1. stop the connector (supposed the user input the connector which the
912     * clients connect to) 2. to check whether there is any pending message on
913     * the queues defined by queueName 3. supposedly, after stop the connector,
914     * client should failover to other broker and pending messages should be
915     * forwarded. if no pending messages, the method finally call stop to stop
916     * the broker.
917     *
918     * @param connectorName
919     * @param queueName
920     * @param timeout
921     * @param pollInterval
922     * @throws Exception
923     */
924    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
925        if (isUseJmx()) {
926            if (connectorName == null || queueName == null || timeout <= 0) {
927                throw new Exception(
928                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
929            }
930            if (pollInterval <= 0) {
931                pollInterval = 30;
932            }
933            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
934                    connectorName, queueName, timeout, pollInterval
935            });
936            TransportConnector connector;
937            for (int i = 0; i < transportConnectors.size(); i++) {
938                connector = transportConnectors.get(i);
939                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
940                    connector.stop();
941                }
942            }
943            long start = System.currentTimeMillis();
944            while (System.currentTimeMillis() - start < timeout * 1000) {
945                // check quesize until it gets zero
946                if (checkQueueSize(queueName)) {
947                    stop();
948                    break;
949                } else {
950                    Thread.sleep(pollInterval * 1000);
951                }
952            }
953            if (stopped.get()) {
954                LOG.info("Successfully stop the broker.");
955            } else {
956                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
957            }
958        }
959    }
960
961    /**
962     * A helper method to block the caller thread until the broker has been
963     * stopped
964     */
965    public void waitUntilStopped() {
966        while (isStarted() && !stopped.get()) {
967            try {
968                stoppedLatch.await();
969            } catch (InterruptedException e) {
970                // ignore
971            }
972        }
973    }
974
975    public boolean isStopped() {
976        return stopped.get();
977    }
978
979    /**
980     * A helper method to block the caller thread until the broker has fully started
981     * @return boolean true if wait succeeded false if broker was not started or was stopped
982     */
983    public boolean waitUntilStarted() {
984        return waitUntilStarted(DEFAULT_START_TIMEOUT);
985    }
986
987    /**
988     * A helper method to block the caller thread until the broker has fully started
989     *
990     * @param timeout
991     *        the amount of time to wait before giving up and returning false.
992     *
993     * @return boolean true if wait succeeded false if broker was not started or was stopped
994     */
995    public boolean waitUntilStarted(long timeout) {
996        boolean waitSucceeded = isStarted();
997        long expiration = Math.max(0, timeout + System.currentTimeMillis());
998        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
999            try {
1000                if (getStartException() != null) {
1001                    return waitSucceeded;
1002                }
1003                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
1004            } catch (InterruptedException ignore) {
1005            }
1006        }
1007        return waitSucceeded;
1008    }
1009
1010    // Properties
1011    // -------------------------------------------------------------------------
1012    /**
1013     * Returns the message broker
1014     */
1015    public Broker getBroker() throws Exception {
1016        if (broker == null) {
1017            checkStartException();
1018            broker = createBroker();
1019        }
1020        return broker;
1021    }
1022
1023    /**
1024     * Returns the administration view of the broker; used to create and destroy
1025     * resources such as queues and topics. Note this method returns null if JMX
1026     * is disabled.
1027     */
1028    public BrokerView getAdminView() throws Exception {
1029        if (adminView == null) {
1030            // force lazy creation
1031            getBroker();
1032        }
1033        return adminView;
1034    }
1035
1036    public void setAdminView(BrokerView adminView) {
1037        this.adminView = adminView;
1038    }
1039
1040    public String getBrokerName() {
1041        return brokerName;
1042    }
1043
1044    /**
1045     * Sets the name of this broker; which must be unique in the network
1046     *
1047     * @param brokerName
1048     */
1049    private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]";
1050    public void setBrokerName(String brokerName) {
1051        if (brokerName == null) {
1052            throw new NullPointerException("The broker name cannot be null");
1053        }
1054        String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_");
1055        if (!str.equals(brokerName)) {
1056            LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str);
1057        }
1058        this.brokerName = str.trim();
1059    }
1060
1061    public PersistenceAdapterFactory getPersistenceFactory() {
1062        return persistenceFactory;
1063    }
1064
1065    public File getDataDirectoryFile() {
1066        if (dataDirectoryFile == null) {
1067            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1068        }
1069        return dataDirectoryFile;
1070    }
1071
1072    public File getBrokerDataDirectory() {
1073        String brokerDir = getBrokerName();
1074        return new File(getDataDirectoryFile(), brokerDir);
1075    }
1076
1077    /**
1078     * Sets the directory in which the data files will be stored by default for
1079     * the JDBC and Journal persistence adaptors.
1080     *
1081     * @param dataDirectory
1082     *            the directory to store data files
1083     */
1084    public void setDataDirectory(String dataDirectory) {
1085        setDataDirectoryFile(new File(dataDirectory));
1086    }
1087
1088    /**
1089     * Sets the directory in which the data files will be stored by default for
1090     * the JDBC and Journal persistence adaptors.
1091     *
1092     * @param dataDirectoryFile
1093     *            the directory to store data files
1094     */
1095    public void setDataDirectoryFile(File dataDirectoryFile) {
1096        this.dataDirectoryFile = dataDirectoryFile;
1097    }
1098
1099    /**
1100     * @return the tmpDataDirectory
1101     */
1102    public File getTmpDataDirectory() {
1103        if (tmpDataDirectory == null) {
1104            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1105        }
1106        return tmpDataDirectory;
1107    }
1108
1109    /**
1110     * @param tmpDataDirectory
1111     *            the tmpDataDirectory to set
1112     */
1113    public void setTmpDataDirectory(File tmpDataDirectory) {
1114        this.tmpDataDirectory = tmpDataDirectory;
1115    }
1116
1117    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1118        this.persistenceFactory = persistenceFactory;
1119    }
1120
1121    public void setDestinationFactory(DestinationFactory destinationFactory) {
1122        this.destinationFactory = destinationFactory;
1123    }
1124
1125    public boolean isPersistent() {
1126        return persistent;
1127    }
1128
1129    /**
1130     * Sets whether or not persistence is enabled or disabled.
1131     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1132     */
1133    public void setPersistent(boolean persistent) {
1134        this.persistent = persistent;
1135    }
1136
1137    public boolean isPopulateJMSXUserID() {
1138        return populateJMSXUserID;
1139    }
1140
1141    /**
1142     * Sets whether or not the broker should populate the JMSXUserID header.
1143     */
1144    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1145        this.populateJMSXUserID = populateJMSXUserID;
1146    }
1147
1148    public SystemUsage getSystemUsage() {
1149        try {
1150            if (systemUsage == null) {
1151
1152                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1153                systemUsage.setExecutor(getExecutor());
1154                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1155                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1156                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1157                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1158                addService(this.systemUsage);
1159            }
1160            return systemUsage;
1161        } catch (IOException e) {
1162            LOG.error("Cannot create SystemUsage", e);
1163            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1164        }
1165    }
1166
1167    public void setSystemUsage(SystemUsage memoryManager) {
1168        if (this.systemUsage != null) {
1169            removeService(this.systemUsage);
1170        }
1171        this.systemUsage = memoryManager;
1172        if (this.systemUsage.getExecutor()==null) {
1173            this.systemUsage.setExecutor(getExecutor());
1174        }
1175        addService(this.systemUsage);
1176    }
1177
1178    /**
1179     * @return the consumerUsageManager
1180     * @throws IOException
1181     */
1182    public SystemUsage getConsumerSystemUsage() throws IOException {
1183        if (this.consumerSystemUsaage == null) {
1184            if (splitSystemUsageForProducersConsumers) {
1185                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1186                float portion = consumerSystemUsagePortion / 100f;
1187                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1188                addService(this.consumerSystemUsaage);
1189            } else {
1190                consumerSystemUsaage = getSystemUsage();
1191            }
1192        }
1193        return this.consumerSystemUsaage;
1194    }
1195
1196    /**
1197     * @param consumerSystemUsaage
1198     *            the storeSystemUsage to set
1199     */
1200    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1201        if (this.consumerSystemUsaage != null) {
1202            removeService(this.consumerSystemUsaage);
1203        }
1204        this.consumerSystemUsaage = consumerSystemUsaage;
1205        addService(this.consumerSystemUsaage);
1206    }
1207
1208    /**
1209     * @return the producerUsageManager
1210     * @throws IOException
1211     */
1212    public SystemUsage getProducerSystemUsage() throws IOException {
1213        if (producerSystemUsage == null) {
1214            if (splitSystemUsageForProducersConsumers) {
1215                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1216                float portion = producerSystemUsagePortion / 100f;
1217                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1218                addService(producerSystemUsage);
1219            } else {
1220                producerSystemUsage = getSystemUsage();
1221            }
1222        }
1223        return producerSystemUsage;
1224    }
1225
1226    /**
1227     * @param producerUsageManager
1228     *            the producerUsageManager to set
1229     */
1230    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1231        if (this.producerSystemUsage != null) {
1232            removeService(this.producerSystemUsage);
1233        }
1234        this.producerSystemUsage = producerUsageManager;
1235        addService(this.producerSystemUsage);
1236    }
1237
1238    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
1239        if (persistenceAdapter == null && !hasStartException()) {
1240            persistenceAdapter = createPersistenceAdapter();
1241            configureService(persistenceAdapter);
1242            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1243        }
1244        return persistenceAdapter;
1245    }
1246
1247    /**
1248     * Sets the persistence adaptor implementation to use for this broker
1249     *
1250     * @throws IOException
1251     */
1252    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1253        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1254            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1255            return;
1256        }
1257        this.persistenceAdapter = persistenceAdapter;
1258        configureService(this.persistenceAdapter);
1259        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1260    }
1261
1262    public TaskRunnerFactory getTaskRunnerFactory() {
1263        if (this.taskRunnerFactory == null) {
1264            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1265                    isDedicatedTaskRunner());
1266            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1267        }
1268        return this.taskRunnerFactory;
1269    }
1270
1271    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1272        this.taskRunnerFactory = taskRunnerFactory;
1273    }
1274
1275    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1276        if (taskRunnerFactory == null) {
1277            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1278                    true, 1000, isDedicatedTaskRunner());
1279        }
1280        return persistenceTaskRunnerFactory;
1281    }
1282
1283    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1284        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1285    }
1286
1287    public boolean isUseJmx() {
1288        return useJmx;
1289    }
1290
1291    public boolean isEnableStatistics() {
1292        return enableStatistics;
1293    }
1294
1295    /**
1296     * Sets whether or not the Broker's services enable statistics or not.
1297     */
1298    public void setEnableStatistics(boolean enableStatistics) {
1299        this.enableStatistics = enableStatistics;
1300    }
1301
1302    /**
1303     * Sets whether or not the Broker's services should be exposed into JMX or
1304     * not.
1305     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1306     */
1307    public void setUseJmx(boolean useJmx) {
1308        this.useJmx = useJmx;
1309    }
1310
1311    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1312        if (brokerObjectName == null) {
1313            brokerObjectName = createBrokerObjectName();
1314        }
1315        return brokerObjectName;
1316    }
1317
1318    /**
1319     * Sets the JMX ObjectName for this broker
1320     */
1321    public void setBrokerObjectName(ObjectName brokerObjectName) {
1322        this.brokerObjectName = brokerObjectName;
1323    }
1324
1325    public ManagementContext getManagementContext() {
1326        if (managementContext == null) {
1327            checkStartException();
1328            managementContext = new ManagementContext();
1329        }
1330        return managementContext;
1331    }
1332
1333    synchronized private void checkStartException() {
1334        if (startException != null) {
1335            throw new BrokerStoppedException(startException);
1336        }
1337    }
1338
1339    synchronized private boolean hasStartException() {
1340        return startException != null;
1341    }
1342
1343    synchronized private void setStartException(Throwable t) {
1344        startException = t;
1345    }
1346
1347    public void setManagementContext(ManagementContext managementContext) {
1348        this.managementContext = managementContext;
1349    }
1350
1351    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1352        for (NetworkConnector connector : networkConnectors) {
1353            if (connector.getName().equals(connectorName)) {
1354                return connector;
1355            }
1356        }
1357        return null;
1358    }
1359
1360    public String[] getNetworkConnectorURIs() {
1361        return networkConnectorURIs;
1362    }
1363
1364    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1365        this.networkConnectorURIs = networkConnectorURIs;
1366    }
1367
1368    public TransportConnector getConnectorByName(String connectorName) {
1369        for (TransportConnector connector : transportConnectors) {
1370            if (connector.getName().equals(connectorName)) {
1371                return connector;
1372            }
1373        }
1374        return null;
1375    }
1376
1377    public Map<String, String> getTransportConnectorURIsAsMap() {
1378        Map<String, String> answer = new HashMap<String, String>();
1379        for (TransportConnector connector : transportConnectors) {
1380            try {
1381                URI uri = connector.getConnectUri();
1382                if (uri != null) {
1383                    String scheme = uri.getScheme();
1384                    if (scheme != null) {
1385                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1386                    }
1387                }
1388            } catch (Exception e) {
1389                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1390            }
1391        }
1392        return answer;
1393    }
1394
1395    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1396        ProducerBrokerExchange result = null;
1397
1398        for (TransportConnector connector : transportConnectors) {
1399            for (TransportConnection tc: connector.getConnections()){
1400                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1401                if (result !=null){
1402                    return result;
1403                }
1404            }
1405        }
1406        return result;
1407    }
1408
1409    public String[] getTransportConnectorURIs() {
1410        return transportConnectorURIs;
1411    }
1412
1413    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1414        this.transportConnectorURIs = transportConnectorURIs;
1415    }
1416
1417    /**
1418     * @return Returns the jmsBridgeConnectors.
1419     */
1420    public JmsConnector[] getJmsBridgeConnectors() {
1421        return jmsBridgeConnectors;
1422    }
1423
1424    /**
1425     * @param jmsConnectors
1426     *            The jmsBridgeConnectors to set.
1427     */
1428    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1429        this.jmsBridgeConnectors = jmsConnectors;
1430    }
1431
1432    public Service[] getServices() {
1433        return services.toArray(new Service[0]);
1434    }
1435
1436    /**
1437     * Sets the services associated with this broker.
1438     */
1439    public void setServices(Service[] services) {
1440        this.services.clear();
1441        if (services != null) {
1442            for (int i = 0; i < services.length; i++) {
1443                this.services.add(services[i]);
1444            }
1445        }
1446    }
1447
1448    /**
1449     * Adds a new service so that it will be started as part of the broker
1450     * lifecycle
1451     */
1452    public void addService(Service service) {
1453        services.add(service);
1454    }
1455
1456    public void removeService(Service service) {
1457        services.remove(service);
1458    }
1459
1460    public boolean isUseLoggingForShutdownErrors() {
1461        return useLoggingForShutdownErrors;
1462    }
1463
1464    /**
1465     * Sets whether or not we should use commons-logging when reporting errors
1466     * when shutting down the broker
1467     */
1468    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1469        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1470    }
1471
1472    public boolean isUseShutdownHook() {
1473        return useShutdownHook;
1474    }
1475
1476    /**
1477     * Sets whether or not we should use a shutdown handler to close down the
1478     * broker cleanly if the JVM is terminated. It is recommended you leave this
1479     * enabled.
1480     */
1481    public void setUseShutdownHook(boolean useShutdownHook) {
1482        this.useShutdownHook = useShutdownHook;
1483    }
1484
1485    public boolean isAdvisorySupport() {
1486        return advisorySupport;
1487    }
1488
1489    /**
1490     * Allows the support of advisory messages to be disabled for performance
1491     * reasons.
1492     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1493     */
1494    public void setAdvisorySupport(boolean advisorySupport) {
1495        this.advisorySupport = advisorySupport;
1496    }
1497
1498    public List<TransportConnector> getTransportConnectors() {
1499        return new ArrayList<TransportConnector>(transportConnectors);
1500    }
1501
1502    /**
1503     * Sets the transport connectors which this broker will listen on for new
1504     * clients
1505     *
1506     * @org.apache.xbean.Property
1507     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1508     */
1509    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1510        for (TransportConnector connector : transportConnectors) {
1511            addConnector(connector);
1512        }
1513    }
1514
1515    public TransportConnector getTransportConnectorByName(String name){
1516        for (TransportConnector transportConnector : transportConnectors){
1517           if (name.equals(transportConnector.getName())){
1518               return transportConnector;
1519           }
1520        }
1521        return null;
1522    }
1523
1524    public TransportConnector getTransportConnectorByScheme(String scheme){
1525        for (TransportConnector transportConnector : transportConnectors){
1526            if (scheme.equals(transportConnector.getUri().getScheme())){
1527                return transportConnector;
1528            }
1529        }
1530        return null;
1531    }
1532
1533    public List<NetworkConnector> getNetworkConnectors() {
1534        return new ArrayList<NetworkConnector>(networkConnectors);
1535    }
1536
1537    public List<ProxyConnector> getProxyConnectors() {
1538        return new ArrayList<ProxyConnector>(proxyConnectors);
1539    }
1540
1541    /**
1542     * Sets the network connectors which this broker will use to connect to
1543     * other brokers in a federated network
1544     *
1545     * @org.apache.xbean.Property
1546     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1547     */
1548    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1549        for (Object connector : networkConnectors) {
1550            addNetworkConnector((NetworkConnector) connector);
1551        }
1552    }
1553
1554    /**
1555     * Sets the network connectors which this broker will use to connect to
1556     * other brokers in a federated network
1557     */
1558    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1559        for (Object connector : proxyConnectors) {
1560            addProxyConnector((ProxyConnector) connector);
1561        }
1562    }
1563
1564    public PolicyMap getDestinationPolicy() {
1565        return destinationPolicy;
1566    }
1567
1568    /**
1569     * Sets the destination specific policies available either for exact
1570     * destinations or for wildcard areas of destinations.
1571     */
1572    public void setDestinationPolicy(PolicyMap policyMap) {
1573        this.destinationPolicy = policyMap;
1574    }
1575
1576    public BrokerPlugin[] getPlugins() {
1577        return plugins;
1578    }
1579
1580    /**
1581     * Sets a number of broker plugins to install such as for security
1582     * authentication or authorization
1583     */
1584    public void setPlugins(BrokerPlugin[] plugins) {
1585        this.plugins = plugins;
1586    }
1587
1588    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1589        return messageAuthorizationPolicy;
1590    }
1591
1592    /**
1593     * Sets the policy used to decide if the current connection is authorized to
1594     * consume a given message
1595     */
1596    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1597        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1598    }
1599
1600    /**
1601     * Delete all messages from the persistent store
1602     *
1603     * @throws IOException
1604     */
1605    public void deleteAllMessages() throws IOException {
1606        getPersistenceAdapter().deleteAllMessages();
1607    }
1608
1609    public boolean isDeleteAllMessagesOnStartup() {
1610        return deleteAllMessagesOnStartup;
1611    }
1612
1613    /**
1614     * Sets whether or not all messages are deleted on startup - mostly only
1615     * useful for testing.
1616     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1617     */
1618    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1619        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1620    }
1621
1622    public URI getVmConnectorURI() {
1623        if (vmConnectorURI == null) {
1624            try {
1625                vmConnectorURI = new URI("vm://" + getBrokerName());
1626            } catch (URISyntaxException e) {
1627                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1628            }
1629        }
1630        return vmConnectorURI;
1631    }
1632
1633    public void setVmConnectorURI(URI vmConnectorURI) {
1634        this.vmConnectorURI = vmConnectorURI;
1635    }
1636
1637    public String getDefaultSocketURIString() {
1638        if (started.get()) {
1639            if (this.defaultSocketURIString == null) {
1640                for (TransportConnector tc:this.transportConnectors) {
1641                    String result = null;
1642                    try {
1643                        result = tc.getPublishableConnectString();
1644                    } catch (Exception e) {
1645                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1646                    }
1647                    if (result != null) {
1648                        // find first publishable uri
1649                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1650                            this.defaultSocketURIString = result;
1651                            break;
1652                        } else {
1653                        // or use the first defined
1654                            if (this.defaultSocketURIString == null) {
1655                                this.defaultSocketURIString = result;
1656                            }
1657                        }
1658                    }
1659                }
1660
1661            }
1662            return this.defaultSocketURIString;
1663        }
1664       return null;
1665    }
1666
1667    /**
1668     * @return Returns the shutdownOnMasterFailure.
1669     */
1670    public boolean isShutdownOnMasterFailure() {
1671        return shutdownOnMasterFailure;
1672    }
1673
1674    /**
1675     * @param shutdownOnMasterFailure
1676     *            The shutdownOnMasterFailure to set.
1677     */
1678    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1679        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1680    }
1681
1682    public boolean isKeepDurableSubsActive() {
1683        return keepDurableSubsActive;
1684    }
1685
1686    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1687        this.keepDurableSubsActive = keepDurableSubsActive;
1688    }
1689
1690    public boolean isUseVirtualTopics() {
1691        return useVirtualTopics;
1692    }
1693
1694    /**
1695     * Sets whether or not <a
1696     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1697     * Topics</a> should be supported by default if they have not been
1698     * explicitly configured.
1699     */
1700    public void setUseVirtualTopics(boolean useVirtualTopics) {
1701        this.useVirtualTopics = useVirtualTopics;
1702    }
1703
1704    public DestinationInterceptor[] getDestinationInterceptors() {
1705        return destinationInterceptors;
1706    }
1707
1708    public boolean isUseMirroredQueues() {
1709        return useMirroredQueues;
1710    }
1711
1712    /**
1713     * Sets whether or not <a
1714     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1715     * Queues</a> should be supported by default if they have not been
1716     * explicitly configured.
1717     */
1718    public void setUseMirroredQueues(boolean useMirroredQueues) {
1719        this.useMirroredQueues = useMirroredQueues;
1720    }
1721
1722    /**
1723     * Sets the destination interceptors to use
1724     */
1725    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1726        this.destinationInterceptors = destinationInterceptors;
1727    }
1728
1729    public ActiveMQDestination[] getDestinations() {
1730        return destinations;
1731    }
1732
1733    /**
1734     * Sets the destinations which should be loaded/created on startup
1735     */
1736    public void setDestinations(ActiveMQDestination[] destinations) {
1737        this.destinations = destinations;
1738    }
1739
1740    /**
1741     * @return the tempDataStore
1742     */
1743    public synchronized PListStore getTempDataStore() {
1744        if (tempDataStore == null) {
1745            if (!isPersistent()) {
1746                return null;
1747            }
1748
1749            try {
1750                PersistenceAdapter pa = getPersistenceAdapter();
1751                if( pa!=null && pa instanceof PListStore) {
1752                    return (PListStore) pa;
1753                }
1754            } catch (IOException e) {
1755                throw new RuntimeException(e);
1756            }
1757
1758            try {
1759                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1760                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1761                this.tempDataStore.setDirectory(getTmpDataDirectory());
1762                configureService(tempDataStore);
1763            } catch (Exception e) {
1764                throw new RuntimeException(e);
1765            }
1766        }
1767        return tempDataStore;
1768    }
1769
1770    /**
1771     * @param tempDataStore
1772     *            the tempDataStore to set
1773     */
1774    public void setTempDataStore(PListStore tempDataStore) {
1775        this.tempDataStore = tempDataStore;
1776        configureService(tempDataStore);
1777    }
1778
1779    public int getPersistenceThreadPriority() {
1780        return persistenceThreadPriority;
1781    }
1782
1783    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1784        this.persistenceThreadPriority = persistenceThreadPriority;
1785    }
1786
1787    /**
1788     * @return the useLocalHostBrokerName
1789     */
1790    public boolean isUseLocalHostBrokerName() {
1791        return this.useLocalHostBrokerName;
1792    }
1793
1794    /**
1795     * @param useLocalHostBrokerName
1796     *            the useLocalHostBrokerName to set
1797     */
1798    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1799        this.useLocalHostBrokerName = useLocalHostBrokerName;
1800        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1801            brokerName = LOCAL_HOST_NAME;
1802        }
1803    }
1804
1805    /**
1806     * Looks up and lazily creates if necessary the destination for the given
1807     * JMS name
1808     */
1809    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1810        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1811    }
1812
1813    public void removeDestination(ActiveMQDestination destination) throws Exception {
1814        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1815    }
1816
1817    public int getProducerSystemUsagePortion() {
1818        return producerSystemUsagePortion;
1819    }
1820
1821    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1822        this.producerSystemUsagePortion = producerSystemUsagePortion;
1823    }
1824
1825    public int getConsumerSystemUsagePortion() {
1826        return consumerSystemUsagePortion;
1827    }
1828
1829    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1830        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1831    }
1832
1833    public boolean isSplitSystemUsageForProducersConsumers() {
1834        return splitSystemUsageForProducersConsumers;
1835    }
1836
1837    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1838        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1839    }
1840
1841    public boolean isMonitorConnectionSplits() {
1842        return monitorConnectionSplits;
1843    }
1844
1845    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1846        this.monitorConnectionSplits = monitorConnectionSplits;
1847    }
1848
1849    public int getTaskRunnerPriority() {
1850        return taskRunnerPriority;
1851    }
1852
1853    public void setTaskRunnerPriority(int taskRunnerPriority) {
1854        this.taskRunnerPriority = taskRunnerPriority;
1855    }
1856
1857    public boolean isDedicatedTaskRunner() {
1858        return dedicatedTaskRunner;
1859    }
1860
1861    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1862        this.dedicatedTaskRunner = dedicatedTaskRunner;
1863    }
1864
1865    public boolean isCacheTempDestinations() {
1866        return cacheTempDestinations;
1867    }
1868
1869    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1870        this.cacheTempDestinations = cacheTempDestinations;
1871    }
1872
1873    public int getTimeBeforePurgeTempDestinations() {
1874        return timeBeforePurgeTempDestinations;
1875    }
1876
1877    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1878        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1879    }
1880
1881    public boolean isUseTempMirroredQueues() {
1882        return useTempMirroredQueues;
1883    }
1884
1885    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1886        this.useTempMirroredQueues = useTempMirroredQueues;
1887    }
1888
1889    public synchronized JobSchedulerStore getJobSchedulerStore() {
1890
1891        // If support is off don't allow any scheduler even is user configured their own.
1892        if (!isSchedulerSupport()) {
1893            return null;
1894        }
1895
1896        // If the user configured their own we use it even if persistence is disabled since
1897        // we don't know anything about their implementation.
1898        if (jobSchedulerStore == null) {
1899
1900            if (!isPersistent()) {
1901                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1902                configureService(jobSchedulerStore);
1903                return this.jobSchedulerStore;
1904            }
1905
1906            try {
1907                PersistenceAdapter pa = getPersistenceAdapter();
1908                if (pa != null) {
1909                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1910                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1911                    configureService(jobSchedulerStore);
1912                    return this.jobSchedulerStore;
1913                }
1914            } catch (IOException e) {
1915                throw new RuntimeException(e);
1916            } catch (UnsupportedOperationException ex) {
1917                // It's ok if the store doesn't implement a scheduler.
1918            } catch (Exception e) {
1919                throw new RuntimeException(e);
1920            }
1921
1922            try {
1923                PersistenceAdapter pa = getPersistenceAdapter();
1924                if (pa != null && pa instanceof JobSchedulerStore) {
1925                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1926                    configureService(jobSchedulerStore);
1927                    return this.jobSchedulerStore;
1928                }
1929            } catch (IOException e) {
1930                throw new RuntimeException(e);
1931            }
1932
1933            // Load the KahaDB store as a last resort, this only works if KahaDB is
1934            // included at runtime, otherwise this will fail.  User should disable
1935            // scheduler support if this fails.
1936            try {
1937                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1938                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1939                jobSchedulerStore = adaptor.createJobSchedulerStore();
1940                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1941                configureService(jobSchedulerStore);
1942                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1943            } catch (Exception e) {
1944                throw new RuntimeException(e);
1945            }
1946        }
1947        return jobSchedulerStore;
1948    }
1949
1950    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1951        this.jobSchedulerStore = jobSchedulerStore;
1952        configureService(jobSchedulerStore);
1953    }
1954
1955    //
1956    // Implementation methods
1957    // -------------------------------------------------------------------------
1958    /**
1959     * Handles any lazy-creation helper properties which are added to make
1960     * things easier to configure inside environments such as Spring
1961     *
1962     * @throws Exception
1963     */
1964    protected void processHelperProperties() throws Exception {
1965        if (transportConnectorURIs != null) {
1966            for (int i = 0; i < transportConnectorURIs.length; i++) {
1967                String uri = transportConnectorURIs[i];
1968                addConnector(uri);
1969            }
1970        }
1971        if (networkConnectorURIs != null) {
1972            for (int i = 0; i < networkConnectorURIs.length; i++) {
1973                String uri = networkConnectorURIs[i];
1974                addNetworkConnector(uri);
1975            }
1976        }
1977        if (jmsBridgeConnectors != null) {
1978            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1979                addJmsConnector(jmsBridgeConnectors[i]);
1980            }
1981        }
1982    }
1983
1984    /**
1985     * Check that the store usage limit is not greater than max usable
1986     * space and adjust if it is
1987     */
1988    protected void checkStoreUsageLimits() throws Exception {
1989        final SystemUsage usage = getSystemUsage();
1990
1991        if (getPersistenceAdapter() != null) {
1992            PersistenceAdapter adapter = getPersistenceAdapter();
1993            checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit());
1994
1995            long maxJournalFileSize = 0;
1996            long storeLimit = usage.getStoreUsage().getLimit();
1997
1998            if (adapter instanceof JournaledStore) {
1999                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
2000            }
2001
2002            if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
2003                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2004                          " mb, whilst the max journal file size for the store is: " +
2005                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2006                          "the store will not accept any data when used.");
2007
2008            }
2009        }
2010    }
2011
2012    /**
2013     * Check that temporary usage limit is not greater than max usable
2014     * space and adjust if it is
2015     */
2016    protected void checkTmpStoreUsageLimits() throws Exception {
2017        final SystemUsage usage = getSystemUsage();
2018
2019        File tmpDir = getTmpDataDirectory();
2020
2021        if (tmpDir != null) {
2022            checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit());
2023
2024            if (isPersistent()) {
2025                long maxJournalFileSize;
2026
2027                PListStore store = usage.getTempUsage().getStore();
2028                if (store != null && store instanceof JournaledStore) {
2029                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2030                } else {
2031                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2032                }
2033                long storeLimit = usage.getTempUsage().getLimit();
2034
2035                if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
2036                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2037                              " mb, whilst the max journal file size for the temporary store is: " +
2038                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2039                              "the temp store will not accept any data when used.");
2040                }
2041            }
2042        }
2043    }
2044
2045    protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException {
2046        if (dir != null) {
2047            dir = StoreUtil.findParentDirectory(dir);
2048            String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
2049            long storeLimit = storeUsage.getLimit();
2050            long storeCurrent = storeUsage.getUsage();
2051            long totalSpace = dir.getTotalSpace();
2052            long totalUsableSpace = dir.getUsableSpace() + storeCurrent;
2053            //compute byte value of the percent limit
2054            long bytePercentLimit = totalSpace * percentLimit / 100;
2055            int oneMeg = 1024 * 1024;
2056
2057            //Check if the store limit is less than the percent Limit that was set and also
2058            //the usable space...this means we can grow the store larger
2059            //Changes in partition size (total space) as well as changes in usable space should
2060            //be detected here
2061            if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0
2062                    && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){
2063
2064                // set the limit to be bytePercentLimit or usableSpace if
2065                // usableSpace is less than the percentLimit
2066                long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit;
2067
2068                //To prevent changing too often, check threshold
2069                if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
2070                    LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to "
2071                            + percentLimit + "% of the partition size.");
2072                    storeUsage.setLimit(newLimit);
2073                    LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace
2074                            + "% (" + newLimit / oneMeg + " mb) of the partition size.");
2075                }
2076
2077            //check if the limit is too large for the amount of usable space
2078            } else if (storeLimit > totalUsableSpace) {
2079                final String message = storeName + " limit is " +  storeLimit / oneMeg
2080                        + " mb (current store usage is " + storeCurrent / oneMeg
2081                        + " mb). The data directory: " + dir.getAbsolutePath()
2082                        + " only has " + totalUsableSpace / oneMeg
2083                        + " mb of usable space.";
2084
2085                if (!isAdjustUsageLimits()) {
2086                    LOG.error(message);
2087                    throw new ConfigurationException(message);
2088                }
2089
2090                if (percentLimit > 0) {
2091                    LOG.warn(storeName + " limit has been set to "
2092                            + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
2093                            + " of the partition size but there is not enough usable space."
2094                            + " The current store limit (which may have been adjusted by a"
2095                            + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
2096                            + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
2097                            + " is available - resetting limit");
2098                } else {
2099                    LOG.warn(message + " - resetting to maximum available disk space: " +
2100                            totalUsableSpace / oneMeg + " mb");
2101                }
2102                storeUsage.setLimit(totalUsableSpace);
2103            }
2104        }
2105    }
2106
2107    /**
2108     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
2109     * update store and temporary store limits if the amount of available space
2110     * plus current store size is less than the existin configured limit
2111     */
2112    protected void scheduleDiskUsageLimitsCheck() throws IOException {
2113        if (schedulePeriodForDiskUsageCheck > 0 &&
2114                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
2115            Runnable diskLimitCheckTask = new Runnable() {
2116                @Override
2117                public void run() {
2118                    try {
2119                        checkStoreUsageLimits();
2120                    } catch (Exception e) {
2121                        LOG.error("Failed to check persistent disk usage limits", e);
2122                    }
2123
2124                    try {
2125                        checkTmpStoreUsageLimits();
2126                    } catch (Exception e) {
2127                        LOG.error("Failed to check temporary store usage limits", e);
2128                    }
2129                }
2130            };
2131            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
2132        }
2133    }
2134
2135    protected void checkMemorySystemUsageLimits() throws Exception {
2136        final SystemUsage usage = getSystemUsage();
2137        long memLimit = usage.getMemoryUsage().getLimit();
2138        long jvmLimit = Runtime.getRuntime().maxMemory();
2139
2140        if (memLimit > jvmLimit) {
2141            final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
2142                    + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
2143
2144            if (adjustUsageLimits) {
2145                usage.getMemoryUsage().setPercentOfJvmHeap(70);
2146                LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
2147            } else {
2148                LOG.error(message);
2149                throw new ConfigurationException(message);
2150            }
2151        }
2152    }
2153
2154    protected void checkStoreSystemUsageLimits() throws Exception {
2155        final SystemUsage usage = getSystemUsage();
2156
2157        //Check the persistent store and temp store limits if they exist
2158        //and schedule a periodic check to update disk limits if
2159        //schedulePeriodForDiskLimitCheck is set
2160        checkStoreUsageLimits();
2161        checkTmpStoreUsageLimits();
2162        scheduleDiskUsageLimitsCheck();
2163
2164        if (getJobSchedulerStore() != null) {
2165            JobSchedulerStore scheduler = getJobSchedulerStore();
2166            File schedulerDir = scheduler.getDirectory();
2167            if (schedulerDir != null) {
2168
2169                String schedulerDirPath = schedulerDir.getAbsolutePath();
2170                if (!schedulerDir.isAbsolute()) {
2171                    schedulerDir = new File(schedulerDirPath);
2172                }
2173
2174                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2175                    schedulerDir = schedulerDir.getParentFile();
2176                }
2177                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2178                long dirFreeSpace = schedulerDir.getUsableSpace();
2179                if (schedulerLimit > dirFreeSpace) {
2180                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2181                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2182                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2183                            dirFreeSpace / (1024 * 1024) + " mb.");
2184                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2185                }
2186            }
2187        }
2188    }
2189
2190    public void stopAllConnectors(ServiceStopper stopper) {
2191        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2192            NetworkConnector connector = iter.next();
2193            unregisterNetworkConnectorMBean(connector);
2194            stopper.stop(connector);
2195        }
2196        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2197            ProxyConnector connector = iter.next();
2198            stopper.stop(connector);
2199        }
2200        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2201            JmsConnector connector = iter.next();
2202            stopper.stop(connector);
2203        }
2204        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2205            TransportConnector connector = iter.next();
2206            try {
2207                unregisterConnectorMBean(connector);
2208            } catch (IOException e) {
2209            }
2210            stopper.stop(connector);
2211        }
2212    }
2213
2214    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2215        try {
2216            ObjectName objectName = createConnectorObjectName(connector);
2217            connector = connector.asManagedConnector(getManagementContext(), objectName);
2218            ConnectorViewMBean view = new ConnectorView(connector);
2219            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2220            return connector;
2221        } catch (Throwable e) {
2222            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2223        }
2224    }
2225
2226    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2227        if (isUseJmx()) {
2228            try {
2229                ObjectName objectName = createConnectorObjectName(connector);
2230                getManagementContext().unregisterMBean(objectName);
2231            } catch (Throwable e) {
2232                throw IOExceptionSupport.create(
2233                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2234            }
2235        }
2236    }
2237
2238    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2239        return adaptor;
2240    }
2241
2242    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2243        if (isUseJmx()) {}
2244    }
2245
2246    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2247        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2248    }
2249
2250    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2251        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2252        try {
2253            ObjectName objectName = createNetworkConnectorObjectName(connector);
2254            connector.setObjectName(objectName);
2255            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2256        } catch (Throwable e) {
2257            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2258        }
2259    }
2260
2261    public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2262        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2263    }
2264
2265    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2266        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2267    }
2268
2269    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2270        if (isUseJmx()) {
2271            try {
2272                ObjectName objectName = createNetworkConnectorObjectName(connector);
2273                getManagementContext().unregisterMBean(objectName);
2274            } catch (Exception e) {
2275                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2276            }
2277        }
2278    }
2279
2280    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2281        ProxyConnectorView view = new ProxyConnectorView(connector);
2282        try {
2283            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2284            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2285        } catch (Throwable e) {
2286            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2287        }
2288    }
2289
2290    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2291        JmsConnectorView view = new JmsConnectorView(connector);
2292        try {
2293            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2294            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2295        } catch (Throwable e) {
2296            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2297        }
2298    }
2299
2300    /**
2301     * Factory method to create a new broker
2302     *
2303     * @throws Exception
2304     */
2305    protected Broker createBroker() throws Exception {
2306        regionBroker = createRegionBroker();
2307        Broker broker = addInterceptors(regionBroker);
2308        // Add a filter that will stop access to the broker once stopped
2309        broker = new MutableBrokerFilter(broker) {
2310            Broker old;
2311
2312            @Override
2313            public void stop() throws Exception {
2314                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2315                    // Just ignore additional stop actions.
2316                    @Override
2317                    public void stop() throws Exception {
2318                    }
2319                });
2320                old.stop();
2321            }
2322
2323            @Override
2324            public void start() throws Exception {
2325                if (forceStart && old != null) {
2326                    this.next.set(old);
2327                }
2328                getNext().start();
2329            }
2330        };
2331        return broker;
2332    }
2333
2334    /**
2335     * Factory method to create the core region broker onto which interceptors
2336     * are added
2337     *
2338     * @throws Exception
2339     */
2340    protected Broker createRegionBroker() throws Exception {
2341        if (destinationInterceptors == null) {
2342            destinationInterceptors = createDefaultDestinationInterceptor();
2343        }
2344        configureServices(destinationInterceptors);
2345        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2346        if (destinationFactory == null) {
2347            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2348        }
2349        return createRegionBroker(destinationInterceptor);
2350    }
2351
2352    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2353        RegionBroker regionBroker;
2354        if (isUseJmx()) {
2355            try {
2356                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2357                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2358            } catch(MalformedObjectNameException me){
2359                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2360                throw new IOException(me);
2361            }
2362        } else {
2363            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2364                    destinationInterceptor,getScheduler(),getExecutor());
2365        }
2366        destinationFactory.setRegionBroker(regionBroker);
2367        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2368        regionBroker.setBrokerName(getBrokerName());
2369        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2370        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2371        if (brokerId != null) {
2372            regionBroker.setBrokerId(brokerId);
2373        }
2374        return regionBroker;
2375    }
2376
2377    /**
2378     * Create the default destination interceptor
2379     */
2380    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2381        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2382        if (isUseVirtualTopics()) {
2383            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2384            VirtualTopic virtualTopic = new VirtualTopic();
2385            virtualTopic.setName("VirtualTopic.>");
2386            VirtualDestination[] virtualDestinations = { virtualTopic };
2387            interceptor.setVirtualDestinations(virtualDestinations);
2388            answer.add(interceptor);
2389        }
2390        if (isUseMirroredQueues()) {
2391            MirroredQueue interceptor = new MirroredQueue();
2392            answer.add(interceptor);
2393        }
2394        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2395        answer.toArray(array);
2396        return array;
2397    }
2398
2399    /**
2400     * Strategy method to add interceptors to the broker
2401     *
2402     * @throws IOException
2403     */
2404    protected Broker addInterceptors(Broker broker) throws Exception {
2405        if (isSchedulerSupport()) {
2406            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2407            if (isUseJmx()) {
2408                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2409                try {
2410                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2411                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2412                    this.adminView.setJMSJobScheduler(objectName);
2413                } catch (Throwable e) {
2414                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2415                            + e.getMessage(), e);
2416                }
2417            }
2418            broker = sb;
2419        }
2420        if (isUseJmx()) {
2421            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2422            try {
2423                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2424                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2425            } catch (Throwable e) {
2426                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2427                        + e.getMessage(), e);
2428            }
2429        }
2430        if (isAdvisorySupport()) {
2431            broker = new AdvisoryBroker(broker);
2432        }
2433        broker = new CompositeDestinationBroker(broker);
2434        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2435        if (isPopulateJMSXUserID()) {
2436            UserIDBroker userIDBroker = new UserIDBroker(broker);
2437            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2438            broker = userIDBroker;
2439        }
2440        if (isMonitorConnectionSplits()) {
2441            broker = new ConnectionSplitBroker(broker);
2442        }
2443        if (plugins != null) {
2444            for (int i = 0; i < plugins.length; i++) {
2445                BrokerPlugin plugin = plugins[i];
2446                broker = plugin.installPlugin(broker);
2447            }
2448        }
2449        return broker;
2450    }
2451
2452    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2453        if (isPersistent()) {
2454            PersistenceAdapterFactory fac = getPersistenceFactory();
2455            if (fac != null) {
2456                return fac.createPersistenceAdapter();
2457            } else {
2458                try {
2459                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2460                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2461                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2462                    adaptor.setDirectory(dir);
2463                    return adaptor;
2464                } catch (Throwable e) {
2465                    throw IOExceptionSupport.create(e);
2466                }
2467            }
2468        } else {
2469            return new MemoryPersistenceAdapter();
2470        }
2471    }
2472
2473    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2474        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2475    }
2476
2477    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2478        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2479        return new TransportConnector(transport);
2480    }
2481
2482    /**
2483     * Extracts the port from the options
2484     */
2485    protected Object getPort(Map<?,?> options) {
2486        Object port = options.get("port");
2487        if (port == null) {
2488            port = DEFAULT_PORT;
2489            LOG.warn("No port specified so defaulting to: {}", port);
2490        }
2491        return port;
2492    }
2493
2494    protected void addShutdownHook() {
2495        if (useShutdownHook) {
2496            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2497                @Override
2498                public void run() {
2499                    containerShutdown();
2500                }
2501            };
2502            Runtime.getRuntime().addShutdownHook(shutdownHook);
2503        }
2504    }
2505
2506    protected void removeShutdownHook() {
2507        if (shutdownHook != null) {
2508            try {
2509                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2510            } catch (Exception e) {
2511                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2512            }
2513        }
2514    }
2515
2516    /**
2517     * Sets hooks to be executed when broker shut down
2518     *
2519     * @org.apache.xbean.Property
2520     */
2521    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2522        for (Runnable hook : hooks) {
2523            addShutdownHook(hook);
2524        }
2525    }
2526
2527    /**
2528     * Causes a clean shutdown of the container when the VM is being shut down
2529     */
2530    protected void containerShutdown() {
2531        try {
2532            stop();
2533        } catch (IOException e) {
2534            Throwable linkedException = e.getCause();
2535            if (linkedException != null) {
2536                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2537            } else {
2538                logError("Failed to shut down: " + e, e);
2539            }
2540            if (!useLoggingForShutdownErrors) {
2541                e.printStackTrace(System.err);
2542            }
2543        } catch (Exception e) {
2544            logError("Failed to shut down: " + e, e);
2545        }
2546    }
2547
2548    protected void logError(String message, Throwable e) {
2549        if (useLoggingForShutdownErrors) {
2550            LOG.error("Failed to shut down: " + e);
2551        } else {
2552            System.err.println("Failed to shut down: " + e);
2553        }
2554    }
2555
2556    /**
2557     * Starts any configured destinations on startup
2558     */
2559    protected void startDestinations() throws Exception {
2560        if (destinations != null) {
2561            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2562            for (int i = 0; i < destinations.length; i++) {
2563                ActiveMQDestination destination = destinations[i];
2564                getBroker().addDestination(adminConnectionContext, destination,true);
2565            }
2566        }
2567        if (isUseVirtualTopics()) {
2568            startVirtualConsumerDestinations();
2569        }
2570    }
2571
2572    /**
2573     * Returns the broker's administration connection context used for
2574     * configuring the broker at startup
2575     */
2576    public ConnectionContext getAdminConnectionContext() throws Exception {
2577        return BrokerSupport.getConnectionContext(getBroker());
2578    }
2579
2580    protected void startManagementContext() throws Exception {
2581        getManagementContext().setBrokerName(brokerName);
2582        getManagementContext().start();
2583        adminView = new BrokerView(this, null);
2584        ObjectName objectName = getBrokerObjectName();
2585        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2586    }
2587
2588    /**
2589     * Start all transport and network connections, proxies and bridges
2590     *
2591     * @throws Exception
2592     */
2593    public void startAllConnectors() throws Exception {
2594        final Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2595        List<TransportConnector> al = new ArrayList<>();
2596        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2597            TransportConnector connector = iter.next();
2598            al.add(startTransportConnector(connector));
2599        }
2600        if (al.size() > 0) {
2601            // let's clear the transportConnectors list and replace it with
2602            // the started transportConnector instances
2603            this.transportConnectors.clear();
2604            setTransportConnectors(al);
2605        }
2606        this.slave = false;
2607        URI uri = getVmConnectorURI();
2608        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2609        map.put("async", "false");
2610        map.put("create","false");
2611        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2612
2613        if (!stopped.get()) {
2614            ThreadPoolExecutor networkConnectorStartExecutor = null;
2615            if (isNetworkConnectorStartAsync()) {
2616                // spin up as many threads as needed
2617                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2618                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2619                    new ThreadFactory() {
2620                        int count=0;
2621                        @Override
2622                        public Thread newThread(Runnable runnable) {
2623                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2624                            thread.setDaemon(true);
2625                            return thread;
2626                        }
2627                    });
2628            }
2629
2630            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2631                final NetworkConnector connector = iter.next();
2632                connector.setLocalUri(uri);
2633                startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor);
2634            }
2635            if (networkConnectorStartExecutor != null) {
2636                // executor done when enqueued tasks are complete
2637                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2638            }
2639
2640            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2641                ProxyConnector connector = iter.next();
2642                connector.start();
2643            }
2644            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2645                JmsConnector connector = iter.next();
2646                connector.start();
2647            }
2648            for (Service service : services) {
2649                configureService(service);
2650                service.start();
2651            }
2652        }
2653    }
2654
2655    public void startNetworkConnector(final NetworkConnector connector,
2656            final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception {
2657        startNetworkConnector(connector, getBroker().getDurableDestinations(), networkConnectorStartExecutor);
2658    }
2659
2660    public void startNetworkConnector(final NetworkConnector connector,
2661            final Set<ActiveMQDestination> durableDestinations,
2662            final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception {
2663        connector.setBrokerName(getBrokerName());
2664        //set the durable destinations to match the broker if not set on the connector
2665        if (connector.getDurableDestinations() == null) {
2666            connector.setDurableDestinations(durableDestinations);
2667        }
2668        String defaultSocketURI = getDefaultSocketURIString();
2669        if (defaultSocketURI != null) {
2670            connector.setBrokerURL(defaultSocketURI);
2671        }
2672        //If using the runtime plugin to start a network connector then the mbean needs
2673        //to be added, under normal start it will already exist so check for InstanceNotFoundException
2674        if (isUseJmx()) {
2675            ObjectName networkMbean = createNetworkConnectorObjectName(connector);
2676            try {
2677                getManagementContext().getObjectInstance(networkMbean);
2678            } catch (InstanceNotFoundException e) {
2679                LOG.debug("Network connector MBean {} not found, registering", networkMbean);
2680                registerNetworkConnectorMBean(connector);
2681            }
2682        }
2683        if (networkConnectorStartExecutor != null) {
2684            networkConnectorStartExecutor.execute(new Runnable() {
2685                @Override
2686                public void run() {
2687                    try {
2688                        LOG.info("Async start of {}", connector);
2689                        connector.start();
2690                    } catch(Exception e) {
2691                        LOG.error("Async start of network connector: {} failed", connector, e);
2692                    }
2693                }
2694            });
2695        } else {
2696            connector.start();
2697        }
2698    }
2699
2700    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2701        connector.setBrokerService(this);
2702        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2703        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2704        if (policy != null) {
2705            connector.setMessageAuthorizationPolicy(policy);
2706        }
2707        if (isUseJmx()) {
2708            connector = registerConnectorMBean(connector);
2709        }
2710        connector.getStatistics().setEnabled(enableStatistics);
2711        connector.start();
2712        return connector;
2713    }
2714
2715    /**
2716     * Perform any custom dependency injection
2717     */
2718    protected void configureServices(Object[] services) {
2719        for (Object service : services) {
2720            configureService(service);
2721        }
2722    }
2723
2724    /**
2725     * Perform any custom dependency injection
2726     */
2727    protected void configureService(Object service) {
2728        if (service instanceof BrokerServiceAware) {
2729            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2730            serviceAware.setBrokerService(this);
2731        }
2732    }
2733
2734    public void handleIOException(IOException exception) {
2735        if (ioExceptionHandler != null) {
2736            ioExceptionHandler.handle(exception);
2737         } else {
2738            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2739         }
2740    }
2741
2742    protected void startVirtualConsumerDestinations() throws Exception {
2743        checkStartException();
2744        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2745        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2746        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2747        if (!destinations.isEmpty()) {
2748            for (ActiveMQDestination destination : destinations) {
2749                if (filter.matches(destination) == true) {
2750                    broker.addDestination(adminConnectionContext, destination, false);
2751                }
2752            }
2753        }
2754    }
2755
2756    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2757        // created at startup, so no sync needed
2758        if (virtualConsumerDestinationFilter == null) {
2759            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2760            if (destinationInterceptors != null) {
2761                for (DestinationInterceptor interceptor : destinationInterceptors) {
2762                    if (interceptor instanceof VirtualDestinationInterceptor) {
2763                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2764                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2765                            if (virtualDestination instanceof VirtualTopic) {
2766                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2767                            }
2768                            if (isUseVirtualDestSubs()) {
2769                                try {
2770                                    broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
2771                                    LOG.debug("Adding virtual destination: {}", virtualDestination);
2772                                } catch (Exception e) {
2773                                    LOG.warn("Could not fire virtual destination consumer advisory", e);
2774                                }
2775                            }
2776                        }
2777                    }
2778                }
2779            }
2780            ActiveMQQueue filter = new ActiveMQQueue();
2781            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2782            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2783        }
2784        return virtualConsumerDestinationFilter;
2785    }
2786
2787    protected synchronized ThreadPoolExecutor getExecutor() {
2788        if (this.executor == null) {
2789            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2790
2791                private long i = 0;
2792
2793                @Override
2794                public Thread newThread(Runnable runnable) {
2795                    this.i++;
2796                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2797                    thread.setDaemon(true);
2798                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2799                        @Override
2800                        public void uncaughtException(final Thread t, final Throwable e) {
2801                            LOG.error("Error in thread '{}'", t.getName(), e);
2802                        }
2803                    });
2804                    return thread;
2805                }
2806            }, new RejectedExecutionHandler() {
2807                @Override
2808                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2809                    try {
2810                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2811                    } catch (InterruptedException e) {
2812                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2813                    }
2814
2815                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2816                }
2817            });
2818        }
2819        return this.executor;
2820    }
2821
2822    public synchronized Scheduler getScheduler() {
2823        if (this.scheduler==null) {
2824            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2825            try {
2826                this.scheduler.start();
2827            } catch (Exception e) {
2828               LOG.error("Failed to start Scheduler", e);
2829            }
2830        }
2831        return this.scheduler;
2832    }
2833
2834    public Broker getRegionBroker() {
2835        return regionBroker;
2836    }
2837
2838    public void setRegionBroker(Broker regionBroker) {
2839        this.regionBroker = regionBroker;
2840    }
2841
2842    public void addShutdownHook(Runnable hook) {
2843        synchronized (shutdownHooks) {
2844            shutdownHooks.add(hook);
2845        }
2846    }
2847
2848    public void removeShutdownHook(Runnable hook) {
2849        synchronized (shutdownHooks) {
2850            shutdownHooks.remove(hook);
2851        }
2852    }
2853
2854    public boolean isSystemExitOnShutdown() {
2855        return systemExitOnShutdown;
2856    }
2857
2858    /**
2859     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2860     */
2861    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2862        this.systemExitOnShutdown = systemExitOnShutdown;
2863    }
2864
2865    public int getSystemExitOnShutdownExitCode() {
2866        return systemExitOnShutdownExitCode;
2867    }
2868
2869    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2870        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2871    }
2872
2873    public SslContext getSslContext() {
2874        return sslContext;
2875    }
2876
2877    public void setSslContext(SslContext sslContext) {
2878        this.sslContext = sslContext;
2879    }
2880
2881    public boolean isShutdownOnSlaveFailure() {
2882        return shutdownOnSlaveFailure;
2883    }
2884
2885    /**
2886     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2887     */
2888    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2889        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2890    }
2891
2892    public boolean isWaitForSlave() {
2893        return waitForSlave;
2894    }
2895
2896    /**
2897     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2898     */
2899    public void setWaitForSlave(boolean waitForSlave) {
2900        this.waitForSlave = waitForSlave;
2901    }
2902
2903    public long getWaitForSlaveTimeout() {
2904        return this.waitForSlaveTimeout;
2905    }
2906
2907    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2908        this.waitForSlaveTimeout = waitForSlaveTimeout;
2909    }
2910
2911    /**
2912     * Get the passiveSlave
2913     * @return the passiveSlave
2914     */
2915    public boolean isPassiveSlave() {
2916        return this.passiveSlave;
2917    }
2918
2919    /**
2920     * Set the passiveSlave
2921     * @param passiveSlave the passiveSlave to set
2922     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2923     */
2924    public void setPassiveSlave(boolean passiveSlave) {
2925        this.passiveSlave = passiveSlave;
2926    }
2927
2928    /**
2929     * override the Default IOException handler, called when persistence adapter
2930     * has experiences File or JDBC I/O Exceptions
2931     *
2932     * @param ioExceptionHandler
2933     */
2934    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2935        configureService(ioExceptionHandler);
2936        this.ioExceptionHandler = ioExceptionHandler;
2937    }
2938
2939    public IOExceptionHandler getIoExceptionHandler() {
2940        return ioExceptionHandler;
2941    }
2942
2943    /**
2944     * @return the schedulerSupport
2945     */
2946    public boolean isSchedulerSupport() {
2947        return this.schedulerSupport;
2948    }
2949
2950    /**
2951     * @param schedulerSupport the schedulerSupport to set
2952     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2953     */
2954    public void setSchedulerSupport(boolean schedulerSupport) {
2955        this.schedulerSupport = schedulerSupport;
2956    }
2957
2958    /**
2959     * @return the schedulerDirectory
2960     */
2961    public File getSchedulerDirectoryFile() {
2962        if (this.schedulerDirectoryFile == null) {
2963            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2964        }
2965        return schedulerDirectoryFile;
2966    }
2967
2968    /**
2969     * @param schedulerDirectory the schedulerDirectory to set
2970     */
2971    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2972        this.schedulerDirectoryFile = schedulerDirectory;
2973    }
2974
2975    public void setSchedulerDirectory(String schedulerDirectory) {
2976        setSchedulerDirectoryFile(new File(schedulerDirectory));
2977    }
2978
2979    public int getSchedulePeriodForDestinationPurge() {
2980        return this.schedulePeriodForDestinationPurge;
2981    }
2982
2983    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2984        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2985    }
2986
2987    /**
2988     * @param schedulePeriodForDiskUsageCheck
2989     */
2990    public void setSchedulePeriodForDiskUsageCheck(
2991            int schedulePeriodForDiskUsageCheck) {
2992        this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck;
2993    }
2994
2995    public int getDiskUsageCheckRegrowThreshold() {
2996        return diskUsageCheckRegrowThreshold;
2997    }
2998
2999    /**
3000     * @param diskUsageCheckRegrowThreshold
3001     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
3002     */
3003    public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) {
3004        this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold;
3005    }
3006
3007    public int getMaxPurgedDestinationsPerSweep() {
3008        return this.maxPurgedDestinationsPerSweep;
3009    }
3010
3011    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
3012        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
3013    }
3014
3015    public BrokerContext getBrokerContext() {
3016        return brokerContext;
3017    }
3018
3019    public void setBrokerContext(BrokerContext brokerContext) {
3020        this.brokerContext = brokerContext;
3021    }
3022
3023    public void setBrokerId(String brokerId) {
3024        this.brokerId = new BrokerId(brokerId);
3025    }
3026
3027    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
3028        return useAuthenticatedPrincipalForJMSXUserID;
3029    }
3030
3031    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
3032        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
3033    }
3034
3035    /**
3036     * Should MBeans that support showing the Authenticated User Name information have this
3037     * value filled in or not.
3038     *
3039     * @return true if user names should be exposed in MBeans
3040     */
3041    public boolean isPopulateUserNameInMBeans() {
3042        return this.populateUserNameInMBeans;
3043    }
3044
3045    /**
3046     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
3047     * @param value if MBeans should expose user name information.
3048     */
3049    public void setPopulateUserNameInMBeans(boolean value) {
3050        this.populateUserNameInMBeans = value;
3051    }
3052
3053    /**
3054     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3055     * failing.  The default value is to wait forever (zero).
3056     *
3057     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3058     */
3059    public long getMbeanInvocationTimeout() {
3060        return mbeanInvocationTimeout;
3061    }
3062
3063    /**
3064     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3065     * failing. The default value is to wait forever (zero).
3066     *
3067     * @param mbeanInvocationTimeout
3068     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3069     */
3070    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
3071        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
3072    }
3073
3074    public boolean isNetworkConnectorStartAsync() {
3075        return networkConnectorStartAsync;
3076    }
3077
3078    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
3079        this.networkConnectorStartAsync = networkConnectorStartAsync;
3080    }
3081
3082    public boolean isAllowTempAutoCreationOnSend() {
3083        return allowTempAutoCreationOnSend;
3084    }
3085
3086    /**
3087     * enable if temp destinations need to be propagated through a network when
3088     * advisorySupport==false. This is used in conjunction with the policy
3089     * gcInactiveDestinations for matching temps so they can get removed
3090     * when inactive
3091     *
3092     * @param allowTempAutoCreationOnSend
3093     */
3094    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
3095        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
3096    }
3097
3098    public long getOfflineDurableSubscriberTimeout() {
3099        return offlineDurableSubscriberTimeout;
3100    }
3101
3102    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
3103        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
3104    }
3105
3106    public long getOfflineDurableSubscriberTaskSchedule() {
3107        return offlineDurableSubscriberTaskSchedule;
3108    }
3109
3110    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
3111        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
3112    }
3113
3114    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
3115        return isUseVirtualTopics() && destination.isQueue() &&
3116               getVirtualTopicConsumerDestinationFilter().matches(destination);
3117    }
3118
3119    synchronized public Throwable getStartException() {
3120        return startException;
3121    }
3122
3123    public boolean isStartAsync() {
3124        return startAsync;
3125    }
3126
3127    public void setStartAsync(boolean startAsync) {
3128        this.startAsync = startAsync;
3129    }
3130
3131    public boolean isSlave() {
3132        return this.slave;
3133    }
3134
3135    public boolean isStopping() {
3136        return this.stopping.get();
3137    }
3138
3139    /**
3140     * @return true if the broker allowed to restart on shutdown.
3141     */
3142    public boolean isRestartAllowed() {
3143        return restartAllowed;
3144    }
3145
3146    /**
3147     * Sets if the broker allowed to restart on shutdown.
3148     */
3149    public void setRestartAllowed(boolean restartAllowed) {
3150        this.restartAllowed = restartAllowed;
3151    }
3152
3153    /**
3154     * A lifecycle manager of the BrokerService should
3155     * inspect this property after a broker shutdown has occurred
3156     * to find out if the broker needs to be re-created and started
3157     * again.
3158     *
3159     * @return true if the broker wants to be restarted after it shuts down.
3160     */
3161    public boolean isRestartRequested() {
3162        return restartRequested;
3163    }
3164
3165    public void requestRestart() {
3166        this.restartRequested = true;
3167    }
3168
3169    public int getStoreOpenWireVersion() {
3170        return storeOpenWireVersion;
3171    }
3172
3173    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3174        this.storeOpenWireVersion = storeOpenWireVersion;
3175    }
3176
3177    /**
3178     * @return the current number of connections on this Broker.
3179     */
3180    public int getCurrentConnections() {
3181        return this.currentConnections.get();
3182    }
3183
3184    /**
3185     * @return the total number of connections this broker has handled since startup.
3186     */
3187    public long getTotalConnections() {
3188        return this.totalConnections.get();
3189    }
3190
3191    public void incrementCurrentConnections() {
3192        this.currentConnections.incrementAndGet();
3193    }
3194
3195    public void decrementCurrentConnections() {
3196        this.currentConnections.decrementAndGet();
3197    }
3198
3199    public void incrementTotalConnections() {
3200        this.totalConnections.incrementAndGet();
3201    }
3202
3203    public boolean isRejectDurableConsumers() {
3204        return rejectDurableConsumers;
3205    }
3206
3207    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3208        this.rejectDurableConsumers = rejectDurableConsumers;
3209    }
3210
3211    public boolean isUseVirtualDestSubs() {
3212        return useVirtualDestSubs;
3213    }
3214
3215    public void setUseVirtualDestSubs(
3216            boolean useVirtualDestSubs) {
3217        this.useVirtualDestSubs = useVirtualDestSubs;
3218    }
3219
3220    public boolean isUseVirtualDestSubsOnCreation() {
3221        return useVirtualDestSubsOnCreation;
3222    }
3223
3224    public void setUseVirtualDestSubsOnCreation(
3225            boolean useVirtualDestSubsOnCreation) {
3226        this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
3227    }
3228
3229    public boolean isAdjustUsageLimits() {
3230        return adjustUsageLimits;
3231    }
3232
3233    public void setAdjustUsageLimits(boolean adjustUsageLimits) {
3234        this.adjustUsageLimits = adjustUsageLimits;
3235    }
3236
3237    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
3238        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
3239    }
3240
3241    public boolean isRollbackOnlyOnAsyncException() {
3242        return rollbackOnlyOnAsyncException;
3243    }
3244}