001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ConsumerInfo;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.NoLocalSubscriptionAware;
055import org.apache.activemq.store.PersistenceAdapter;
056import org.apache.activemq.store.TopicMessageStore;
057import org.apache.activemq.thread.Task;
058import org.apache.activemq.thread.TaskRunner;
059import org.apache.activemq.thread.TaskRunnerFactory;
060import org.apache.activemq.transaction.Synchronization;
061import org.apache.activemq.util.SubscriptionKey;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * The Topic is a destination that sends a copy of a message to every active
067 * Subscription registered.
068 */
069public class Topic extends BaseDestination implements Task {
070    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
071    private final TopicMessageStore topicStore;
072    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
073    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
074    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
075    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
076    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
077    private final TaskRunner taskRunner;
078    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
079    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
080        @Override
081        public void run() {
082            try {
083                Topic.this.taskRunner.wakeup();
084            } catch (InterruptedException e) {
085            }
086        }
087    };
088
089    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
090            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
091        super(brokerService, store, destination, parentStats);
092        this.topicStore = store;
093        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
094        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
095    }
096
097    @Override
098    public void initialize() throws Exception {
099        super.initialize();
100        // set non default subscription recovery policy (override policyEntries)
101        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
102            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
103            setAlwaysRetroactive(true);
104        }
105        if (store != null) {
106            // AMQ-2586: Better to leave this stat at zero than to give the user
107            // misleading metrics.
108            // int messageCount = store.getMessageCount();
109            // destinationStatistics.getMessages().setCount(messageCount);
110            store.start();
111        }
112    }
113
114    @Override
115    public List<Subscription> getConsumers() {
116        synchronized (consumers) {
117            return new ArrayList<Subscription>(consumers);
118        }
119    }
120
121    public boolean lock(MessageReference node, LockOwner sub) {
122        return true;
123    }
124
125    @Override
126    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
127        if (!sub.getConsumerInfo().isDurable()) {
128
129            // Do a retroactive recovery if needed.
130            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
131
132                // synchronize with dispatch method so that no new messages are sent
133                // while we are recovering a subscription to avoid out of order messages.
134                dispatchLock.writeLock().lock();
135                try {
136                    boolean applyRecovery = false;
137                    synchronized (consumers) {
138                        if (!consumers.contains(sub)){
139                            sub.add(context, this);
140                            consumers.add(sub);
141                            applyRecovery=true;
142                            super.addSubscription(context, sub);
143                        }
144                    }
145                    if (applyRecovery){
146                        subscriptionRecoveryPolicy.recover(context, this, sub);
147                    }
148                } finally {
149                    dispatchLock.writeLock().unlock();
150                }
151
152            } else {
153                synchronized (consumers) {
154                    if (!consumers.contains(sub)){
155                        sub.add(context, this);
156                        consumers.add(sub);
157                        super.addSubscription(context, sub);
158                    }
159                }
160            }
161        } else {
162            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
163            super.addSubscription(context, sub);
164            sub.add(context, this);
165            if(dsub.isActive()) {
166                synchronized (consumers) {
167                    boolean hasSubscription = false;
168
169                    if (consumers.size() == 0) {
170                        hasSubscription = false;
171                    } else {
172                        for (Subscription currentSub : consumers) {
173                            if (currentSub.getConsumerInfo().isDurable()) {
174                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
175                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
176                                    hasSubscription = true;
177                                    break;
178                                }
179                            }
180                        }
181                    }
182
183                    if (!hasSubscription) {
184                        consumers.add(sub);
185                    }
186                }
187            }
188            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
189        }
190    }
191
192    @Override
193    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
194        if (!sub.getConsumerInfo().isDurable()) {
195            boolean removed = false;
196            synchronized (consumers) {
197                removed = consumers.remove(sub);
198            }
199            if (removed) {
200                super.removeSubscription(context, sub, lastDeliveredSequenceId);
201            }
202        }
203        sub.remove(context, this);
204    }
205
206    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
207        if (topicStore != null) {
208            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
209            DurableTopicSubscription removed = durableSubscribers.remove(key);
210            if (removed != null) {
211                destinationStatistics.getConsumers().decrement();
212                // deactivate and remove
213                removed.deactivate(false, 0l);
214                consumers.remove(removed);
215            }
216        }
217    }
218
219    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
220        if (hasSelectorChanged(info1, info2)) {
221            return true;
222        }
223
224        return hasNoLocalChanged(info1, info2);
225    }
226
227    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
228        //Not all persistence adapters store the noLocal value for a subscription
229        PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter();
230        if (adapter instanceof NoLocalSubscriptionAware) {
231            if (info1.isNoLocal() ^ info2.isNoLocal()) {
232                return true;
233            }
234        }
235
236        return false;
237    }
238
239    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
240        if (info1.getSelector() != null ^ info2.getSelector() != null) {
241            return true;
242        }
243
244        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
245            return true;
246        }
247
248        return false;
249    }
250
251    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
252        // synchronize with dispatch method so that no new messages are sent
253        // while we are recovering a subscription to avoid out of order messages.
254        dispatchLock.writeLock().lock();
255        try {
256
257            if (topicStore == null) {
258                return;
259            }
260
261            // Recover the durable subscription.
262            String clientId = subscription.getSubscriptionKey().getClientId();
263            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
264            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
265            if (info != null) {
266                // Check to see if selector changed.
267                if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
268                    // Need to delete the subscription
269                    topicStore.deleteSubscription(clientId, subscriptionName);
270                    info = null;
271                    // Force a rebuild of the selector chain for the subscription otherwise
272                    // the stored subscription is updated but the selector expression is not
273                    // and the subscription will not behave according to the new configuration.
274                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
275                    synchronized (consumers) {
276                        consumers.remove(subscription);
277                    }
278                } else {
279                    synchronized (consumers) {
280                        if (!consumers.contains(subscription)) {
281                            consumers.add(subscription);
282                        }
283                    }
284                }
285            }
286
287            // Do we need to create the subscription?
288            if (info == null) {
289                info = new SubscriptionInfo();
290                info.setClientId(clientId);
291                info.setSelector(subscription.getConsumerInfo().getSelector());
292                info.setSubscriptionName(subscriptionName);
293                info.setDestination(getActiveMQDestination());
294                info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
295                // This destination is an actual destination id.
296                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
297                // This destination might be a pattern
298                synchronized (consumers) {
299                    consumers.add(subscription);
300                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
301                }
302            }
303
304            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
305            msgContext.setDestination(destination);
306            if (subscription.isRecoveryRequired()) {
307                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
308                    @Override
309                    public boolean recoverMessage(Message message) throws Exception {
310                        message.setRegionDestination(Topic.this);
311                        try {
312                            msgContext.setMessageReference(message);
313                            if (subscription.matches(message, msgContext)) {
314                                subscription.add(message);
315                            }
316                        } catch (IOException e) {
317                            LOG.error("Failed to recover this message {}", message, e);
318                        }
319                        return true;
320                    }
321
322                    @Override
323                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
324                        throw new RuntimeException("Should not be called.");
325                    }
326
327                    @Override
328                    public boolean hasSpace() {
329                        return true;
330                    }
331
332                    @Override
333                    public boolean isDuplicate(MessageId id) {
334                        return false;
335                    }
336                });
337            }
338        } finally {
339            dispatchLock.writeLock().unlock();
340        }
341    }
342
343    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
344        synchronized (consumers) {
345            consumers.remove(sub);
346        }
347        sub.remove(context, this, dispatched);
348    }
349
350    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
351        if (subscription.getConsumerInfo().isRetroactive()) {
352            subscriptionRecoveryPolicy.recover(context, this, subscription);
353        }
354    }
355
356    @Override
357    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
358        final ConnectionContext context = producerExchange.getConnectionContext();
359
360        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
361        producerExchange.incrementSend();
362        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
363                && !context.isInRecoveryMode();
364
365        message.setRegionDestination(this);
366
367        // There is delay between the client sending it and it arriving at the
368        // destination.. it may have expired.
369        if (message.isExpired()) {
370            broker.messageExpired(context, message, null);
371            getDestinationStatistics().getExpired().increment();
372            if (sendProducerAck) {
373                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
374                context.getConnection().dispatchAsync(ack);
375            }
376            return;
377        }
378
379        if (memoryUsage.isFull()) {
380            isFull(context, memoryUsage);
381            fastProducer(context, producerInfo);
382
383            if (isProducerFlowControl() && context.isProducerFlowControl()) {
384
385                if (warnOnProducerFlowControl) {
386                    warnOnProducerFlowControl = false;
387                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
388                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
389                }
390
391                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
392                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
393                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
394                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
395                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
396                }
397
398                // We can avoid blocking due to low usage if the producer is sending a sync message or
399                // if it is using a producer window
400                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
401                    synchronized (messagesWaitingForSpace) {
402                        messagesWaitingForSpace.add(new Runnable() {
403                            @Override
404                            public void run() {
405                                try {
406
407                                    // While waiting for space to free up... the
408                                    // message may have expired.
409                                    if (message.isExpired()) {
410                                        broker.messageExpired(context, message, null);
411                                        getDestinationStatistics().getExpired().increment();
412                                    } else {
413                                        doMessageSend(producerExchange, message);
414                                    }
415
416                                    if (sendProducerAck) {
417                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
418                                                .getSize());
419                                        context.getConnection().dispatchAsync(ack);
420                                    } else {
421                                        Response response = new Response();
422                                        response.setCorrelationId(message.getCommandId());
423                                        context.getConnection().dispatchAsync(response);
424                                    }
425
426                                } catch (Exception e) {
427                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
428                                        ExceptionResponse response = new ExceptionResponse(e);
429                                        response.setCorrelationId(message.getCommandId());
430                                        context.getConnection().dispatchAsync(response);
431                                    }
432                                }
433                            }
434                        });
435
436                        registerCallbackForNotFullNotification();
437                        context.setDontSendReponse(true);
438                        return;
439                    }
440
441                } else {
442                    // Producer flow control cannot be used, so we have do the flow control
443                    // at the broker by blocking this thread until there is space available.
444
445                    if (memoryUsage.isFull()) {
446                        if (context.isInTransaction()) {
447
448                            int count = 0;
449                            while (!memoryUsage.waitForSpace(1000)) {
450                                if (context.getStopping().get()) {
451                                    throw new IOException("Connection closed, send aborted.");
452                                }
453                                if (count > 2 && context.isInTransaction()) {
454                                    count = 0;
455                                    int size = context.getTransaction().size();
456                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
457                                }
458                                count++;
459                            }
460                        } else {
461                            waitForSpace(
462                                    context,
463                                    producerExchange,
464                                    memoryUsage,
465                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
466                                            + message.getProducerId()
467                                            + ") to prevent flooding "
468                                            + getActiveMQDestination().getQualifiedName()
469                                            + "."
470                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
471                        }
472                    }
473
474                    // The usage manager could have delayed us by the time
475                    // we unblock the message could have expired..
476                    if (message.isExpired()) {
477                        getDestinationStatistics().getExpired().increment();
478                        LOG.debug("Expired message: {}", message);
479                        return;
480                    }
481                }
482            }
483        }
484
485        doMessageSend(producerExchange, message);
486        messageDelivered(context, message);
487        if (sendProducerAck) {
488            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
489            context.getConnection().dispatchAsync(ack);
490        }
491    }
492
493    /**
494     * do send the message - this needs to be synchronized to ensure messages
495     * are stored AND dispatched in the right order
496     *
497     * @param producerExchange
498     * @param message
499     * @throws IOException
500     * @throws Exception
501     */
502    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
503            throws IOException, Exception {
504        final ConnectionContext context = producerExchange.getConnectionContext();
505        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
506        Future<Object> result = null;
507
508        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
509            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
510                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
511                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
512                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
513                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
514                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
515                    throw new javax.jms.ResourceAllocationException(logMessage);
516                }
517
518                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
519            }
520            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
521
522            //Moved the reduceMemoryfootprint clearing to the dispatch method
523        }
524
525        message.incrementReferenceCount();
526
527        if (context.isInTransaction()) {
528            context.getTransaction().addSynchronization(new Synchronization() {
529                @Override
530                public void afterCommit() throws Exception {
531                    // It could take while before we receive the commit
532                    // operation.. by that time the message could have
533                    // expired..
534                    if (message.isExpired()) {
535                        if (broker.isExpired(message)) {
536                            getDestinationStatistics().getExpired().increment();
537                            broker.messageExpired(context, message, null);
538                        }
539                        message.decrementReferenceCount();
540                        return;
541                    }
542                    try {
543                        dispatch(context, message);
544                    } finally {
545                        message.decrementReferenceCount();
546                    }
547                }
548
549                @Override
550                public void afterRollback() throws Exception {
551                    message.decrementReferenceCount();
552                }
553            });
554
555        } else {
556            try {
557                dispatch(context, message);
558            } finally {
559                message.decrementReferenceCount();
560            }
561        }
562
563        if (result != null && !result.isCancelled()) {
564            try {
565                result.get();
566            } catch (CancellationException e) {
567                // ignore - the task has been cancelled if the message
568                // has already been deleted
569            }
570        }
571    }
572
573    private boolean canOptimizeOutPersistence() {
574        return durableSubscribers.size() == 0;
575    }
576
577    @Override
578    public String toString() {
579        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
580    }
581
582    @Override
583    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
584            final MessageReference node) throws IOException {
585        if (topicStore != null && node.isPersistent()) {
586            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
587            SubscriptionKey key = dsub.getSubscriptionKey();
588            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
589                    convertToNonRangedAck(ack, node));
590        }
591        messageConsumed(context, node);
592    }
593
594    @Override
595    public void gc() {
596    }
597
598    public Message loadMessage(MessageId messageId) throws IOException {
599        return topicStore != null ? topicStore.getMessage(messageId) : null;
600    }
601
602    @Override
603    public void start() throws Exception {
604        if (started.compareAndSet(false, true)) {
605            this.subscriptionRecoveryPolicy.start();
606            if (memoryUsage != null) {
607                memoryUsage.start();
608            }
609
610            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
611                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
612            }
613        }
614    }
615
616    @Override
617    public void stop() throws Exception {
618        if (started.compareAndSet(true, false)) {
619            if (taskRunner != null) {
620                taskRunner.shutdown();
621            }
622            this.subscriptionRecoveryPolicy.stop();
623            if (memoryUsage != null) {
624                memoryUsage.stop();
625            }
626            if (this.topicStore != null) {
627                this.topicStore.stop();
628            }
629
630            scheduler.cancel(expireMessagesTask);
631        }
632    }
633
634    @Override
635    public Message[] browse() {
636        final List<Message> result = new ArrayList<Message>();
637        doBrowse(result, getMaxBrowsePageSize());
638        return result.toArray(new Message[result.size()]);
639    }
640
641    private void doBrowse(final List<Message> browseList, final int max) {
642        try {
643            if (topicStore != null) {
644                final List<Message> toExpire = new ArrayList<Message>();
645                topicStore.recover(new MessageRecoveryListener() {
646                    @Override
647                    public boolean recoverMessage(Message message) throws Exception {
648                        if (message.isExpired()) {
649                            toExpire.add(message);
650                        }
651                        browseList.add(message);
652                        return true;
653                    }
654
655                    @Override
656                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
657                        return true;
658                    }
659
660                    @Override
661                    public boolean hasSpace() {
662                        return browseList.size() < max;
663                    }
664
665                    @Override
666                    public boolean isDuplicate(MessageId id) {
667                        return false;
668                    }
669                });
670                final ConnectionContext connectionContext = createConnectionContext();
671                for (Message message : toExpire) {
672                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
673                        if (!sub.isActive()) {
674                            message.setRegionDestination(this);
675                            messageExpired(connectionContext, sub, message);
676                        }
677                    }
678                }
679                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
680                if (msgs != null) {
681                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
682                        browseList.add(msgs[i]);
683                    }
684                }
685            }
686        } catch (Throwable e) {
687            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
688        }
689    }
690
691    @Override
692    public boolean iterate() {
693        synchronized (messagesWaitingForSpace) {
694            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
695                Runnable op = messagesWaitingForSpace.removeFirst();
696                op.run();
697            }
698
699            if (!messagesWaitingForSpace.isEmpty()) {
700                registerCallbackForNotFullNotification();
701            }
702        }
703        return false;
704    }
705
706    private void registerCallbackForNotFullNotification() {
707        // If the usage manager is not full, then the task will not
708        // get called..
709        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
710            // so call it directly here.
711            sendMessagesWaitingForSpaceTask.run();
712        }
713    }
714
715    // Properties
716    // -------------------------------------------------------------------------
717
718    public DispatchPolicy getDispatchPolicy() {
719        return dispatchPolicy;
720    }
721
722    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
723        this.dispatchPolicy = dispatchPolicy;
724    }
725
726    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
727        return subscriptionRecoveryPolicy;
728    }
729
730    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
731        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
732            // allow users to combine retained message policy with other ActiveMQ policies
733            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
734            policy.setWrapped(recoveryPolicy);
735        } else {
736            this.subscriptionRecoveryPolicy = recoveryPolicy;
737        }
738    }
739
740    // Implementation methods
741    // -------------------------------------------------------------------------
742
743    @Override
744    public final void wakeup() {
745    }
746
747    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
748        // AMQ-2586: Better to leave this stat at zero than to give the user
749        // misleading metrics.
750        // destinationStatistics.getMessages().increment();
751        destinationStatistics.getEnqueues().increment();
752        destinationStatistics.getMessageSize().addSize(message.getSize());
753        MessageEvaluationContext msgContext = null;
754
755        dispatchLock.readLock().lock();
756        try {
757            if (!subscriptionRecoveryPolicy.add(context, message)) {
758                return;
759            }
760            synchronized (consumers) {
761                if (consumers.isEmpty()) {
762                    onMessageWithNoConsumers(context, message);
763                    return;
764                }
765            }
766
767            // Clear memory before dispatch - need to clear here because the call to
768            //subscriptionRecoveryPolicy.add() will unmarshall the state
769            if (isReduceMemoryFootprint() && message.isMarshalled()) {
770                message.clearUnMarshalledState();
771            }
772
773            msgContext = context.getMessageEvaluationContext();
774            msgContext.setDestination(destination);
775            msgContext.setMessageReference(message);
776            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
777                onMessageWithNoConsumers(context, message);
778            }
779
780        } finally {
781            dispatchLock.readLock().unlock();
782            if (msgContext != null) {
783                msgContext.clear();
784            }
785        }
786    }
787
788    private final Runnable expireMessagesTask = new Runnable() {
789        @Override
790        public void run() {
791            List<Message> browsedMessages = new InsertionCountList<Message>();
792            doBrowse(browsedMessages, getMaxExpirePageSize());
793        }
794    };
795
796    @Override
797    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
798        broker.messageExpired(context, reference, subs);
799        // AMQ-2586: Better to leave this stat at zero than to give the user
800        // misleading metrics.
801        // destinationStatistics.getMessages().decrement();
802        destinationStatistics.getExpired().increment();
803        MessageAck ack = new MessageAck();
804        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
805        ack.setDestination(destination);
806        ack.setMessageID(reference.getMessageId());
807        try {
808            if (subs instanceof DurableTopicSubscription) {
809                ((DurableTopicSubscription)subs).removePending(reference);
810            }
811            acknowledge(context, subs, ack, reference);
812        } catch (Exception e) {
813            LOG.error("Failed to remove expired Message from the store ", e);
814        }
815    }
816
817    @Override
818    protected Logger getLog() {
819        return LOG;
820    }
821
822    protected boolean isOptimizeStorage(){
823        boolean result = false;
824
825        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
826                result = true;
827                for (DurableTopicSubscription s : durableSubscribers.values()) {
828                    if (s.isActive()== false){
829                        result = false;
830                        break;
831                    }
832                    if (s.getPrefetchSize()==0){
833                        result = false;
834                        break;
835                    }
836                    if (s.isSlowConsumer()){
837                        result = false;
838                        break;
839                    }
840                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
841                        result = false;
842                        break;
843                    }
844                }
845        }
846        return result;
847    }
848
849    /**
850     * force a reread of the store - after transaction recovery completion
851     */
852    @Override
853    public void clearPendingMessages() {
854        dispatchLock.readLock().lock();
855        try {
856            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
857                clearPendingAndDispatch(durableTopicSubscription);
858            }
859        } finally {
860            dispatchLock.readLock().unlock();
861        }
862    }
863
864    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
865        synchronized (durableTopicSubscription.pendingLock) {
866            durableTopicSubscription.pending.clear();
867            try {
868                durableTopicSubscription.dispatchPending();
869            } catch (IOException exception) {
870                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
871                        durableTopicSubscription,
872                        destination,
873                        durableTopicSubscription.pending }, exception);
874            }
875        }
876    }
877
878    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
879        return durableSubscribers;
880    }
881}