001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicReference;
032
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.JMSException;
036import javax.jms.Message;
037import javax.jms.MessageConsumer;
038import javax.jms.MessageListener;
039import javax.jms.TransactionRolledBackException;
040
041import org.apache.activemq.blob.BlobDownloader;
042import org.apache.activemq.command.ActiveMQBlobMessage;
043import org.apache.activemq.command.ActiveMQDestination;
044import org.apache.activemq.command.ActiveMQMessage;
045import org.apache.activemq.command.ActiveMQTempDestination;
046import org.apache.activemq.command.CommandTypes;
047import org.apache.activemq.command.ConsumerId;
048import org.apache.activemq.command.ConsumerInfo;
049import org.apache.activemq.command.MessageAck;
050import org.apache.activemq.command.MessageDispatch;
051import org.apache.activemq.command.MessageId;
052import org.apache.activemq.command.MessagePull;
053import org.apache.activemq.command.RemoveInfo;
054import org.apache.activemq.command.TransactionId;
055import org.apache.activemq.management.JMSConsumerStatsImpl;
056import org.apache.activemq.management.StatsCapable;
057import org.apache.activemq.management.StatsImpl;
058import org.apache.activemq.selector.SelectorParser;
059import org.apache.activemq.transaction.Synchronization;
060import org.apache.activemq.util.Callback;
061import org.apache.activemq.util.IntrospectionSupport;
062import org.apache.activemq.util.JMSExceptionSupport;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
068 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
069 * passing a <CODE>Destination</CODE> object to a message-consumer creation
070 * method supplied by a session.
071 * <P>
072 * <CODE>MessageConsumer</CODE> is the parent interface for all message
073 * consumers.
074 * <P>
075 * A message consumer can be created with a message selector. A message selector
076 * allows the client to restrict the messages delivered to the message consumer
077 * to those that match the selector.
078 * <P>
079 * A client may either synchronously receive a message consumer's messages or
080 * have the consumer asynchronously deliver them as they arrive.
081 * <P>
082 * For synchronous receipt, a client can request the next message from a message
083 * consumer using one of its <CODE> receive</CODE> methods. There are several
084 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
085 * the next message.
086 * <P>
087 * For asynchronous delivery, a client can register a
088 * <CODE>MessageListener</CODE> object with a message consumer. As messages
089 * arrive at the message consumer, it delivers them by calling the
090 * <CODE>MessageListener</CODE>'s<CODE>
091 * onMessage</CODE> method.
092 * <P>
093 * It is a client programming error for a <CODE>MessageListener</CODE> to
094 * throw an exception.
095 *
096 *
097 * @see javax.jms.MessageConsumer
098 * @see javax.jms.QueueReceiver
099 * @see javax.jms.TopicSubscriber
100 * @see javax.jms.Session
101 */
102public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
103
104    @SuppressWarnings("serial")
105    class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
106        final TransactionId transactionId;
107        public PreviouslyDeliveredMap(TransactionId transactionId) {
108            this.transactionId = transactionId;
109        }
110    }
111
112    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
113    protected final ActiveMQSession session;
114    protected final ConsumerInfo info;
115
116    // These are the messages waiting to be delivered to the client
117    protected final MessageDispatchChannel unconsumedMessages;
118
119    // The are the messages that were delivered to the consumer but that have
120    // not been acknowledged. It's kept in reverse order since we
121    // Always walk list in reverse order.
122    private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123    // track duplicate deliveries in a transaction such that the tx integrity can be validated
124    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125    private int deliveredCounter;
126    private int additionalWindowSize;
127    private long redeliveryDelay;
128    private int ackCounter;
129    private int dispatchedCount;
130    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131    private final JMSConsumerStatsImpl stats;
132
133    private final String selector;
134    private boolean synchronizationRegistered;
135    private final AtomicBoolean started = new AtomicBoolean(false);
136
137    private MessageAvailableListener availableListener;
138
139    private RedeliveryPolicy redeliveryPolicy;
140    private boolean optimizeAcknowledge;
141    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142    private ExecutorService executorService;
143    private MessageTransformer transformer;
144    private boolean clearDispatchList;
145    boolean inProgressClearRequiredFlag;
146
147    private MessageAck pendingAck;
148    private long lastDeliveredSequenceId;
149
150    private IOException failureError;
151
152    private long optimizeAckTimestamp = System.currentTimeMillis();
153    private long optimizeAcknowledgeTimeOut = 0;
154    private long failoverRedeliveryWaitPeriod = 0;
155    private boolean transactedIndividualAck = false;
156    private boolean nonBlockingRedelivery = false;
157
158    /**
159     * Create a MessageConsumer
160     *
161     * @param session
162     * @param dest
163     * @param name
164     * @param selector
165     * @param prefetch
166     * @param maximumPendingMessageCount
167     * @param noLocal
168     * @param browser
169     * @param dispatchAsync
170     * @param messageListener
171     * @throws JMSException
172     */
173    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
174            String name, String selector, int prefetch,
175            int maximumPendingMessageCount, boolean noLocal, boolean browser,
176            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
177        if (dest == null) {
178            throw new InvalidDestinationException("Don't understand null destinations");
179        } else if (dest.getPhysicalName() == null) {
180            throw new InvalidDestinationException("The destination object was not given a physical name.");
181        } else if (dest.isTemporary()) {
182            String physicalName = dest.getPhysicalName();
183
184            if (physicalName == null) {
185                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
186            }
187
188            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
189
190            if (physicalName.indexOf(connectionID) < 0) {
191                throw new InvalidDestinationException(
192                                                      "Cannot use a Temporary destination from another Connection");
193            }
194
195            if (session.connection.isDeleted(dest)) {
196                throw new InvalidDestinationException(
197                                                      "Cannot use a Temporary destination that has been deleted");
198            }
199            if (prefetch < 0) {
200                throw new JMSException("Cannot have a prefetch size less than zero");
201            }
202        }
203        if (session.connection.isMessagePrioritySupported()) {
204            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
205        }else {
206            this.unconsumedMessages = new FifoMessageDispatchChannel();
207        }
208
209        this.session = session;
210        this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
211        setTransformer(session.getTransformer());
212
213        this.info = new ConsumerInfo(consumerId);
214        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
215        this.info.setSubscriptionName(name);
216        this.info.setPrefetchSize(prefetch);
217        this.info.setCurrentPrefetchSize(prefetch);
218        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
219        this.info.setNoLocal(noLocal);
220        this.info.setDispatchAsync(dispatchAsync);
221        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
222        this.info.setSelector(null);
223
224        // Allows the options on the destination to configure the consumerInfo
225        if (dest.getOptions() != null) {
226            Map<String, Object> options = IntrospectionSupport.extractProperties(
227                new HashMap<String, Object>(dest.getOptions()), "consumer.");
228            IntrospectionSupport.setProperties(this.info, options);
229            if (options.size() > 0) {
230                String msg = "There are " + options.size()
231                    + " consumer options that couldn't be set on the consumer."
232                    + " Check the options are spelled correctly."
233                    + " Unknown parameters=[" + options + "]."
234                    + " This consumer cannot be started.";
235                LOG.warn(msg);
236                throw new ConfigurationException(msg);
237            }
238        }
239
240        this.info.setDestination(dest);
241        this.info.setBrowser(browser);
242        if (selector != null && selector.trim().length() != 0) {
243            // Validate the selector
244            SelectorParser.parse(selector);
245            this.info.setSelector(selector);
246            this.selector = selector;
247        } else if (info.getSelector() != null) {
248            // Validate the selector
249            SelectorParser.parse(this.info.getSelector());
250            this.selector = this.info.getSelector();
251        } else {
252            this.selector = null;
253        }
254
255        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
256        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
257                                   && !info.isBrowser();
258        if (this.optimizeAcknowledge) {
259            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
260        }
261        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
262        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
263        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
264        this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
265        if (messageListener != null) {
266            setMessageListener(messageListener);
267        }
268        try {
269            this.session.addConsumer(this);
270            this.session.syncSendPacket(info);
271        } catch (JMSException e) {
272            this.session.removeConsumer(this);
273            throw e;
274        }
275
276        if (session.connection.isStarted()) {
277            start();
278        }
279    }
280
281    private boolean isAutoAcknowledgeEach() {
282        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
283    }
284
285    private boolean isAutoAcknowledgeBatch() {
286        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
287    }
288
289    public StatsImpl getStats() {
290        return stats;
291    }
292
293    public JMSConsumerStatsImpl getConsumerStats() {
294        return stats;
295    }
296
297    public RedeliveryPolicy getRedeliveryPolicy() {
298        return redeliveryPolicy;
299    }
300
301    /**
302     * Sets the redelivery policy used when messages are redelivered
303     */
304    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
305        this.redeliveryPolicy = redeliveryPolicy;
306    }
307
308    public MessageTransformer getTransformer() {
309        return transformer;
310    }
311
312    /**
313     * Sets the transformer used to transform messages before they are sent on
314     * to the JMS bus
315     */
316    public void setTransformer(MessageTransformer transformer) {
317        this.transformer = transformer;
318    }
319
320    /**
321     * @return Returns the value.
322     */
323    public ConsumerId getConsumerId() {
324        return info.getConsumerId();
325    }
326
327    /**
328     * @return the consumer name - used for durable consumers
329     */
330    public String getConsumerName() {
331        return this.info.getSubscriptionName();
332    }
333
334    /**
335     * @return true if this consumer does not accept locally produced messages
336     */
337    protected boolean isNoLocal() {
338        return info.isNoLocal();
339    }
340
341    /**
342     * Retrieve is a browser
343     *
344     * @return true if a browser
345     */
346    protected boolean isBrowser() {
347        return info.isBrowser();
348    }
349
350    /**
351     * @return ActiveMQDestination
352     */
353    protected ActiveMQDestination getDestination() {
354        return info.getDestination();
355    }
356
357    /**
358     * @return Returns the prefetchNumber.
359     */
360    public int getPrefetchNumber() {
361        return info.getPrefetchSize();
362    }
363
364    /**
365     * @return true if this is a durable topic subscriber
366     */
367    public boolean isDurableSubscriber() {
368        return info.getSubscriptionName() != null && info.getDestination().isTopic();
369    }
370
371    /**
372     * Gets this message consumer's message selector expression.
373     *
374     * @return this message consumer's message selector, or null if no message
375     *         selector exists for the message consumer (that is, if the message
376     *         selector was not set or was set to null or the empty string)
377     * @throws JMSException if the JMS provider fails to receive the next
378     *                 message due to some internal error.
379     */
380    public String getMessageSelector() throws JMSException {
381        checkClosed();
382        return selector;
383    }
384
385    /**
386     * Gets the message consumer's <CODE>MessageListener</CODE>.
387     *
388     * @return the listener for the message consumer, or null if no listener is
389     *         set
390     * @throws JMSException if the JMS provider fails to get the message
391     *                 listener due to some internal error.
392     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
393     */
394    public MessageListener getMessageListener() throws JMSException {
395        checkClosed();
396        return this.messageListener.get();
397    }
398
399    /**
400     * Sets the message consumer's <CODE>MessageListener</CODE>.
401     * <P>
402     * Setting the message listener to null is the equivalent of unsetting the
403     * message listener for the message consumer.
404     * <P>
405     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
406     * while messages are being consumed by an existing listener or the consumer
407     * is being used to consume messages synchronously is undefined.
408     *
409     * @param listener the listener to which the messages are to be delivered
410     * @throws JMSException if the JMS provider fails to receive the next
411     *                 message due to some internal error.
412     * @see javax.jms.MessageConsumer#getMessageListener
413     */
414    public void setMessageListener(MessageListener listener) throws JMSException {
415        checkClosed();
416        if (info.getPrefetchSize() == 0) {
417            throw new JMSException(
418                                   "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
419        }
420        if (listener != null) {
421            boolean wasRunning = session.isRunning();
422            if (wasRunning) {
423                session.stop();
424            }
425
426            this.messageListener.set(listener);
427            session.redispatch(this, unconsumedMessages);
428
429            if (wasRunning) {
430                session.start();
431            }
432        } else {
433            this.messageListener.set(null);
434        }
435    }
436
437    public MessageAvailableListener getAvailableListener() {
438        return availableListener;
439    }
440
441    /**
442     * Sets the listener used to notify synchronous consumers that there is a
443     * message available so that the {@link MessageConsumer#receiveNoWait()} can
444     * be called.
445     */
446    public void setAvailableListener(MessageAvailableListener availableListener) {
447        this.availableListener = availableListener;
448    }
449
450    /**
451     * Used to get an enqueued message from the unconsumedMessages list. The
452     * amount of time this method blocks is based on the timeout value. - if
453     * timeout==-1 then it blocks until a message is received. - if timeout==0
454     * then it it tries to not block at all, it returns a message if it is
455     * available - if timeout>0 then it blocks up to timeout amount of time.
456     * Expired messages will consumed by this method.
457     *
458     * @throws JMSException
459     * @return null if we timeout or if the consumer is closed.
460     */
461    private MessageDispatch dequeue(long timeout) throws JMSException {
462        try {
463            long deadline = 0;
464            if (timeout > 0) {
465                deadline = System.currentTimeMillis() + timeout;
466            }
467            while (true) {
468                MessageDispatch md = unconsumedMessages.dequeue(timeout);
469                if (md == null) {
470                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
471                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
472                    } else {
473                        if (failureError != null) {
474                            throw JMSExceptionSupport.create(failureError);
475                        } else {
476                            return null;
477                        }
478                    }
479                } else if (md.getMessage() == null) {
480                    return null;
481                } else if (md.getMessage().isExpired()) {
482                    if (LOG.isDebugEnabled()) {
483                        LOG.debug(getConsumerId() + " received expired message: " + md);
484                    }
485                    beforeMessageIsConsumed(md);
486                    afterMessageIsConsumed(md, true);
487                    if (timeout > 0) {
488                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
489                    }
490                } else {
491                    if (LOG.isTraceEnabled()) {
492                        LOG.trace(getConsumerId() + " received message: " + md);
493                    }
494                    return md;
495                }
496            }
497        } catch (InterruptedException e) {
498            Thread.currentThread().interrupt();
499            throw JMSExceptionSupport.create(e);
500        }
501    }
502
503    /**
504     * Receives the next message produced for this message consumer.
505     * <P>
506     * This call blocks indefinitely until a message is produced or until this
507     * message consumer is closed.
508     * <P>
509     * If this <CODE>receive</CODE> is done within a transaction, the consumer
510     * retains the message until the transaction commits.
511     *
512     * @return the next message produced for this message consumer, or null if
513     *         this message consumer is concurrently closed
514     */
515    public Message receive() throws JMSException {
516        checkClosed();
517        checkMessageListener();
518
519        sendPullCommand(0);
520        MessageDispatch md = dequeue(-1);
521        if (md == null) {
522            return null;
523        }
524
525        beforeMessageIsConsumed(md);
526        afterMessageIsConsumed(md, false);
527
528        return createActiveMQMessage(md);
529    }
530
531    /**
532     * @param md
533     * @return
534     */
535    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
536        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
537        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
538            ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
539        }
540        if (transformer != null) {
541            Message transformedMessage = transformer.consumerTransform(session, this, m);
542            if (transformedMessage != null) {
543                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
544            }
545        }
546        if (session.isClientAcknowledge()) {
547            m.setAcknowledgeCallback(new Callback() {
548                public void execute() throws Exception {
549                    session.checkClosed();
550                    session.acknowledge();
551                }
552            });
553        }else if (session.isIndividualAcknowledge()) {
554            m.setAcknowledgeCallback(new Callback() {
555                public void execute() throws Exception {
556                    session.checkClosed();
557                    acknowledge(md);
558                }
559            });
560        }
561        return m;
562    }
563
564    /**
565     * Receives the next message that arrives within the specified timeout
566     * interval.
567     * <P>
568     * This call blocks until a message arrives, the timeout expires, or this
569     * message consumer is closed. A <CODE>timeout</CODE> of zero never
570     * expires, and the call blocks indefinitely.
571     *
572     * @param timeout the timeout value (in milliseconds), a time out of zero
573     *                never expires.
574     * @return the next message produced for this message consumer, or null if
575     *         the timeout expires or this message consumer is concurrently
576     *         closed
577     */
578    public Message receive(long timeout) throws JMSException {
579        checkClosed();
580        checkMessageListener();
581        if (timeout == 0) {
582            return this.receive();
583        }
584
585        sendPullCommand(timeout);
586        while (timeout > 0) {
587
588            MessageDispatch md;
589            if (info.getPrefetchSize() == 0) {
590                md = dequeue(-1); // We let the broker let us know when we timeout.
591            } else {
592                md = dequeue(timeout);
593            }
594
595            if (md == null) {
596                return null;
597            }
598
599            beforeMessageIsConsumed(md);
600            afterMessageIsConsumed(md, false);
601            return createActiveMQMessage(md);
602        }
603        return null;
604    }
605
606    /**
607     * Receives the next message if one is immediately available.
608     *
609     * @return the next message produced for this message consumer, or null if
610     *         one is not available
611     * @throws JMSException if the JMS provider fails to receive the next
612     *                 message due to some internal error.
613     */
614    public Message receiveNoWait() throws JMSException {
615        checkClosed();
616        checkMessageListener();
617        sendPullCommand(-1);
618
619        MessageDispatch md;
620        if (info.getPrefetchSize() == 0) {
621            md = dequeue(-1); // We let the broker let us know when we
622            // timeout.
623        } else {
624            md = dequeue(0);
625        }
626
627        if (md == null) {
628            return null;
629        }
630
631        beforeMessageIsConsumed(md);
632        afterMessageIsConsumed(md, false);
633        return createActiveMQMessage(md);
634    }
635
636    /**
637     * Closes the message consumer.
638     * <P>
639     * Since a provider may allocate some resources on behalf of a <CODE>
640     * MessageConsumer</CODE>
641     * outside the Java virtual machine, clients should close them when they are
642     * not needed. Relying on garbage collection to eventually reclaim these
643     * resources may not be timely enough.
644     * <P>
645     * This call blocks until a <CODE>receive</CODE> or message listener in
646     * progress has completed. A blocked message consumer <CODE>receive </CODE>
647     * call returns null when this message consumer is closed.
648     *
649     * @throws JMSException if the JMS provider fails to close the consumer due
650     *                 to some internal error.
651     */
652    public void close() throws JMSException {
653        if (!unconsumedMessages.isClosed()) {
654            if (session.getTransactionContext().isInTransaction()) {
655                session.getTransactionContext().addSynchronization(new Synchronization() {
656                    @Override
657                    public void afterCommit() throws Exception {
658                        doClose();
659                    }
660
661                    @Override
662                    public void afterRollback() throws Exception {
663                        doClose();
664                    }
665                });
666            } else {
667                doClose();
668            }
669        }
670    }
671
672    void doClose() throws JMSException {
673        // Store interrupted state and clear so that Transport operations don't
674        // throw InterruptedException and we ensure that resources are clened up.
675        boolean interrupted = Thread.interrupted();
676        dispose();
677        RemoveInfo removeCommand = info.createRemoveCommand();
678        if (LOG.isDebugEnabled()) {
679            LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
680        }
681        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682        this.session.asyncSendPacket(removeCommand);
683        if (interrupted) {
684            Thread.currentThread().interrupt();
685        }    }
686
687    void inProgressClearRequired() {
688        inProgressClearRequiredFlag = true;
689        // deal with delivered messages async to avoid lock contention with in progress acks
690        clearDispatchList = true;
691    }
692
693    void clearMessagesInProgress() {
694        if (inProgressClearRequiredFlag) {
695            synchronized (unconsumedMessages.getMutex()) {
696                if (inProgressClearRequiredFlag) {
697                    if (LOG.isDebugEnabled()) {
698                        LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
699                    }
700                    // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
701                    List<MessageDispatch> list = unconsumedMessages.removeAll();
702                    if (!this.info.isBrowser()) {
703                        for (MessageDispatch old : list) {
704                            session.connection.rollbackDuplicate(this, old.getMessage());
705                        }
706                    }
707                    // allow dispatch on this connection to resume
708                    session.connection.transportInterruptionProcessingComplete();
709                    inProgressClearRequiredFlag = false;
710                }
711            }
712        }
713    }
714
715    void deliverAcks() {
716        MessageAck ack = null;
717        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
718            if (isAutoAcknowledgeEach()) {
719                synchronized(deliveredMessages) {
720                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
721                    if (ack != null) {
722                        deliveredMessages.clear();
723                        ackCounter = 0;
724                    } else {
725                        ack = pendingAck;
726                        pendingAck = null;
727                    }
728                }
729            } else if (pendingAck != null && pendingAck.isStandardAck()) {
730                ack = pendingAck;
731                pendingAck = null;
732            }
733            if (ack != null) {
734                final MessageAck ackToSend = ack;
735
736                if (executorService == null) {
737                    executorService = Executors.newSingleThreadExecutor();
738                }
739                executorService.submit(new Runnable() {
740                    public void run() {
741                        try {
742                            session.sendAck(ackToSend,true);
743                        } catch (JMSException e) {
744                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
745                        } finally {
746                            deliveryingAcknowledgements.set(false);
747                        }
748                    }
749                });
750            } else {
751                deliveryingAcknowledgements.set(false);
752            }
753        }
754    }
755
756    public void dispose() throws JMSException {
757        if (!unconsumedMessages.isClosed()) {
758
759            // Do we have any acks we need to send out before closing?
760            // Ack any delivered messages now.
761            if (!session.getTransacted()) {
762                deliverAcks();
763                if (isAutoAcknowledgeBatch()) {
764                    acknowledge();
765                }
766            }
767            if (executorService != null) {
768                executorService.shutdown();
769                try {
770                    executorService.awaitTermination(60, TimeUnit.SECONDS);
771                } catch (InterruptedException e) {
772                    Thread.currentThread().interrupt();
773                }
774            }
775
776            if (session.isClientAcknowledge()) {
777                if (!this.info.isBrowser()) {
778                    // rollback duplicates that aren't acknowledged
779                    List<MessageDispatch> tmp = null;
780                    synchronized (this.deliveredMessages) {
781                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
782                    }
783                    for (MessageDispatch old : tmp) {
784                        this.session.connection.rollbackDuplicate(this, old.getMessage());
785                    }
786                    tmp.clear();
787                }
788            }
789            if (!session.isTransacted()) {
790                synchronized(deliveredMessages) {
791                    deliveredMessages.clear();
792                }
793            }
794            unconsumedMessages.close();
795            this.session.removeConsumer(this);
796            List<MessageDispatch> list = unconsumedMessages.removeAll();
797            if (!this.info.isBrowser()) {
798                for (MessageDispatch old : list) {
799                    // ensure we don't filter this as a duplicate
800                    session.connection.rollbackDuplicate(this, old.getMessage());
801                }
802            }
803        }
804    }
805
806    /**
807     * @throws IllegalStateException
808     */
809    protected void checkClosed() throws IllegalStateException {
810        if (unconsumedMessages.isClosed()) {
811            throw new IllegalStateException("The Consumer is closed");
812        }
813    }
814
815    /**
816     * If we have a zero prefetch specified then send a pull command to the
817     * broker to pull a message we are about to receive
818     */
819    protected void sendPullCommand(long timeout) throws JMSException {
820        clearDispatchList();
821        if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
822            MessagePull messagePull = new MessagePull();
823            messagePull.configure(info);
824            messagePull.setTimeout(timeout);
825            session.asyncSendPacket(messagePull);
826        }
827    }
828
829    protected void checkMessageListener() throws JMSException {
830        session.checkMessageListener();
831    }
832
833    protected void setOptimizeAcknowledge(boolean value) {
834        if (optimizeAcknowledge && !value) {
835            deliverAcks();
836        }
837        optimizeAcknowledge = value;
838    }
839
840    protected void setPrefetchSize(int prefetch) {
841        deliverAcks();
842        this.info.setCurrentPrefetchSize(prefetch);
843    }
844
845    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
846        md.setDeliverySequenceId(session.getNextDeliveryId());
847        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
848        if (!isAutoAcknowledgeBatch()) {
849            synchronized(deliveredMessages) {
850                deliveredMessages.addFirst(md);
851            }
852            if (session.getTransacted()) {
853                if (transactedIndividualAck) {
854                    immediateIndividualTransactedAck(md);
855                } else {
856                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
857                }
858            }
859        }
860    }
861
862    private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
863        // acks accumulate on the broker pending transaction completion to indicate
864        // delivery status
865        registerSync();
866        MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
867        ack.setTransactionId(session.getTransactionContext().getTransactionId());
868        session.syncSendPacket(ack);
869    }
870
871    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
872        if (unconsumedMessages.isClosed()) {
873            return;
874        }
875        if (messageExpired) {
876            synchronized (deliveredMessages) {
877                deliveredMessages.remove(md);
878            }
879            stats.getExpiredMessageCount().increment();
880            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
881        } else {
882            stats.onMessage();
883            if (session.getTransacted()) {
884                // Do nothing.
885            } else if (isAutoAcknowledgeEach()) {
886                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
887                    synchronized (deliveredMessages) {
888                        if (!deliveredMessages.isEmpty()) {
889                            if (optimizeAcknowledge) {
890                                ackCounter++;
891                                if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
892                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
893                                    if (ack != null) {
894                                        deliveredMessages.clear();
895                                        ackCounter = 0;
896                                        session.sendAck(ack);
897                                        optimizeAckTimestamp = System.currentTimeMillis();
898                                    }
899                                }
900                            } else {
901                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
902                                if (ack!=null) {
903                                    deliveredMessages.clear();
904                                    session.sendAck(ack);
905                                }
906                            }
907                        }
908                    }
909                    deliveryingAcknowledgements.set(false);
910                }
911            } else if (isAutoAcknowledgeBatch()) {
912                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
913            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
914                boolean messageUnackedByConsumer = false;
915                synchronized (deliveredMessages) {
916                    messageUnackedByConsumer = deliveredMessages.contains(md);
917                }
918                if (messageUnackedByConsumer) {
919                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
920                }
921            }
922            else {
923                throw new IllegalStateException("Invalid session state.");
924            }
925        }
926    }
927
928    /**
929     * Creates a MessageAck for all messages contained in deliveredMessages.
930     * Caller should hold the lock for deliveredMessages.
931     *
932     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
933     * @return <code>null</code> if nothing to ack.
934     */
935    private MessageAck makeAckForAllDeliveredMessages(byte type) {
936        synchronized (deliveredMessages) {
937            if (deliveredMessages.isEmpty())
938                return null;
939
940            MessageDispatch md = deliveredMessages.getFirst();
941            MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
942            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
943            return ack;
944        }
945    }
946
947    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
948
949        // Don't acknowledge now, but we may need to let the broker know the
950        // consumer got the message to expand the pre-fetch window
951        if (session.getTransacted()) {
952            registerSync();
953        }
954
955        deliveredCounter++;
956
957        MessageAck oldPendingAck = pendingAck;
958        pendingAck = new MessageAck(md, ackType, deliveredCounter);
959        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
960        if( oldPendingAck==null ) {
961            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
962        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
963            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
964        } else {
965            // old pending ack being superseded by ack of another type, if is is not a delivered
966            // ack and hence important, send it now so it is not lost.
967            if ( !oldPendingAck.isDeliveredAck()) {
968                if (LOG.isDebugEnabled()) {
969                    LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
970                }
971                session.sendAck(oldPendingAck);
972            } else {
973                if (LOG.isDebugEnabled()) {
974                    LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
975                }
976            }
977        }
978
979        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
980            session.sendAck(pendingAck);
981            pendingAck=null;
982            deliveredCounter = 0;
983            additionalWindowSize = 0;
984        }
985    }
986
987    private void registerSync() throws JMSException {
988        session.doStartTransaction();
989        if (!synchronizationRegistered) {
990            synchronizationRegistered = true;
991            session.getTransactionContext().addSynchronization(new Synchronization() {
992                @Override
993                public void beforeEnd() throws Exception {
994                    if (transactedIndividualAck) {
995                        clearDispatchList();
996                        waitForRedeliveries();
997                        synchronized(deliveredMessages) {
998                            rollbackOnFailedRecoveryRedelivery();
999                        }
1000                    } else {
1001                        acknowledge();
1002                    }
1003                    synchronizationRegistered = false;
1004                }
1005
1006                @Override
1007                public void afterCommit() throws Exception {
1008                    commit();
1009                    synchronizationRegistered = false;
1010                }
1011
1012                @Override
1013                public void afterRollback() throws Exception {
1014                    rollback();
1015                    synchronizationRegistered = false;
1016                }
1017            });
1018        }
1019    }
1020
1021    /**
1022     * Acknowledge all the messages that have been delivered to the client up to
1023     * this point.
1024     *
1025     * @throws JMSException
1026     */
1027    public void acknowledge() throws JMSException {
1028        clearDispatchList();
1029        waitForRedeliveries();
1030        synchronized(deliveredMessages) {
1031            // Acknowledge all messages so far.
1032            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1033            if (ack == null)
1034                return; // no msgs
1035
1036            if (session.getTransacted()) {
1037                rollbackOnFailedRecoveryRedelivery();
1038                session.doStartTransaction();
1039                ack.setTransactionId(session.getTransactionContext().getTransactionId());
1040            }
1041            session.sendAck(ack);
1042            pendingAck = null;
1043
1044            // Adjust the counters
1045            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1046            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1047
1048            if (!session.getTransacted()) {
1049                deliveredMessages.clear();
1050            }
1051        }
1052    }
1053
1054    private void waitForRedeliveries() {
1055        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1056            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1057            int numberNotReplayed;
1058            do {
1059                numberNotReplayed = 0;
1060                synchronized(deliveredMessages) {
1061                    if (previouslyDeliveredMessages != null) {
1062                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1063                            if (!entry.getValue()) {
1064                                numberNotReplayed++;
1065                            }
1066                        }
1067                    }
1068                }
1069                if (numberNotReplayed > 0) {
1070                    LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1071                            + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1072                    try {
1073                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1074                    } catch (InterruptedException outOfhere) {
1075                        break;
1076                    }
1077                }
1078            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1079        }
1080    }
1081
1082    /*
1083     * called with deliveredMessages locked
1084     */
1085    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1086        if (previouslyDeliveredMessages != null) {
1087            // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1088            // as messages have been dispatched else where.
1089            int numberNotReplayed = 0;
1090            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1091                if (!entry.getValue()) {
1092                    numberNotReplayed++;
1093                    if (LOG.isDebugEnabled()) {
1094                        LOG.debug("previously delivered message has not been replayed in transaction: "
1095                                + previouslyDeliveredMessages.transactionId
1096                                + " , messageId: " + entry.getKey());
1097                    }
1098                }
1099            }
1100            if (numberNotReplayed > 0) {
1101                String message = "rolling back transaction ("
1102                    + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1103                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1104                LOG.warn(message);
1105                throw new TransactionRolledBackException(message);
1106            }
1107        }
1108    }
1109
1110    void acknowledge(MessageDispatch md) throws JMSException {
1111        MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1112        session.sendAck(ack);
1113        synchronized(deliveredMessages){
1114            deliveredMessages.remove(md);
1115        }
1116    }
1117
1118    public void commit() throws JMSException {
1119        synchronized (deliveredMessages) {
1120            deliveredMessages.clear();
1121            clearPreviouslyDelivered();
1122        }
1123        redeliveryDelay = 0;
1124    }
1125
1126    public void rollback() throws JMSException {
1127        synchronized (unconsumedMessages.getMutex()) {
1128            if (optimizeAcknowledge) {
1129                // remove messages read but not acked at the broker yet through
1130                // optimizeAcknowledge
1131                if (!this.info.isBrowser()) {
1132                    synchronized(deliveredMessages) {
1133                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1134                            // ensure we don't filter this as a duplicate
1135                            MessageDispatch md = deliveredMessages.removeLast();
1136                            session.connection.rollbackDuplicate(this, md.getMessage());
1137                        }
1138                    }
1139                }
1140            }
1141            synchronized(deliveredMessages) {
1142                rollbackPreviouslyDeliveredAndNotRedelivered();
1143                if (deliveredMessages.isEmpty()) {
1144                    return;
1145                }
1146
1147                // use initial delay for first redelivery
1148                MessageDispatch lastMd = deliveredMessages.getFirst();
1149                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1150                if (currentRedeliveryCount > 0) {
1151                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1152                } else {
1153                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1154                }
1155                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1156
1157                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1158                    MessageDispatch md = iter.next();
1159                    md.getMessage().onMessageRolledBack();
1160                    // ensure we don't filter this as a duplicate
1161                    session.connection.rollbackDuplicate(this, md.getMessage());
1162                }
1163
1164                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1165                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1166                    // We need to NACK the messages so that they get sent to the
1167                    // DLQ.
1168                    // Acknowledge the last message.
1169
1170                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1171                    ack.setPoisonCause(lastMd.getRollbackCause());
1172                    ack.setFirstMessageId(firstMsgId);
1173                    session.sendAck(ack,true);
1174                    // Adjust the window size.
1175                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1176                    redeliveryDelay = 0;
1177                } else {
1178
1179                    // only redelivery_ack after first delivery
1180                    if (currentRedeliveryCount > 0) {
1181                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1182                        ack.setFirstMessageId(firstMsgId);
1183                        session.sendAck(ack,true);
1184                    }
1185
1186                    // stop the delivery of messages.
1187                    if (nonBlockingRedelivery) {
1188                        if (!unconsumedMessages.isClosed()) {
1189
1190                            final LinkedList<MessageDispatch> pendingRedeliveries =
1191                                new LinkedList<MessageDispatch>(deliveredMessages);
1192
1193                            // Start up the delivery again a little later.
1194                            session.getScheduler().executeAfterDelay(new Runnable() {
1195                                public void run() {
1196                                    try {
1197                                        if (!unconsumedMessages.isClosed()) {
1198                                            for(MessageDispatch dispatch : pendingRedeliveries) {
1199                                                session.dispatch(dispatch);
1200                                            }
1201                                        }
1202                                    } catch (Exception e) {
1203                                        session.connection.onAsyncException(e);
1204                                    }
1205                                }
1206                            }, redeliveryDelay);
1207                        }
1208
1209                    } else {
1210                        unconsumedMessages.stop();
1211
1212                        for (MessageDispatch md : deliveredMessages) {
1213                            unconsumedMessages.enqueueFirst(md);
1214                        }
1215
1216                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1217                            // Start up the delivery again a little later.
1218                            session.getScheduler().executeAfterDelay(new Runnable() {
1219                                public void run() {
1220                                    try {
1221                                        if (started.get()) {
1222                                            start();
1223                                        }
1224                                    } catch (JMSException e) {
1225                                        session.connection.onAsyncException(e);
1226                                    }
1227                                }
1228                            }, redeliveryDelay);
1229                        } else {
1230                            start();
1231                        }
1232                    }
1233                }
1234                deliveredCounter -= deliveredMessages.size();
1235                deliveredMessages.clear();
1236            }
1237        }
1238        if (messageListener.get() != null) {
1239            session.redispatch(this, unconsumedMessages);
1240        }
1241    }
1242
1243    /*
1244     * called with unconsumedMessages && deliveredMessages locked
1245     * remove any message not re-delivered as they can't be replayed to this
1246     * consumer on rollback
1247     */
1248    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1249        if (previouslyDeliveredMessages != null) {
1250            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1251                if (!entry.getValue()) {
1252                    removeFromDeliveredMessages(entry.getKey());
1253                }
1254            }
1255            clearPreviouslyDelivered();
1256        }
1257    }
1258
1259    /*
1260     * called with deliveredMessages locked
1261     */
1262    private void removeFromDeliveredMessages(MessageId key) {
1263        Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1264        while (iterator.hasNext()) {
1265            MessageDispatch candidate = iterator.next();
1266            if (key.equals(candidate.getMessage().getMessageId())) {
1267                session.connection.rollbackDuplicate(this, candidate.getMessage());
1268                iterator.remove();
1269                break;
1270            }
1271        }
1272    }
1273
1274    /*
1275     * called with deliveredMessages locked
1276     */
1277    private void clearPreviouslyDelivered() {
1278        if (previouslyDeliveredMessages != null) {
1279            previouslyDeliveredMessages.clear();
1280            previouslyDeliveredMessages = null;
1281        }
1282    }
1283
1284    public void dispatch(MessageDispatch md) {
1285        MessageListener listener = this.messageListener.get();
1286        try {
1287            clearMessagesInProgress();
1288            clearDispatchList();
1289            synchronized (unconsumedMessages.getMutex()) {
1290                if (!unconsumedMessages.isClosed()) {
1291                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1292                        if (listener != null && unconsumedMessages.isRunning()) {
1293                            ActiveMQMessage message = createActiveMQMessage(md);
1294                            beforeMessageIsConsumed(md);
1295                            try {
1296                                boolean expired = message.isExpired();
1297                                if (!expired) {
1298                                    listener.onMessage(message);
1299                                }
1300                                afterMessageIsConsumed(md, expired);
1301                            } catch (RuntimeException e) {
1302                                LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1303                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1304                                    // schedual redelivery and possible dlq processing
1305                                    md.setRollbackCause(e);
1306                                    rollback();
1307                                } else {
1308                                    // Transacted or Client ack: Deliver the
1309                                    // next message.
1310                                    afterMessageIsConsumed(md, false);
1311                                }
1312                            }
1313                        } else {
1314                            if (!unconsumedMessages.isRunning()) {
1315                                // delayed redelivery, ensure it can be re delivered
1316                                session.connection.rollbackDuplicate(this, md.getMessage());
1317                            }
1318                            unconsumedMessages.enqueue(md);
1319                            if (availableListener != null) {
1320                                availableListener.onMessageAvailable(this);
1321                            }
1322                        }
1323                    } else {
1324                        if (!session.isTransacted()) {
1325                            LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
1326                                    + " to consumer: "  + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
1327                            MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
1328                            session.sendAck(ack);
1329                        } else {
1330                            if (LOG.isDebugEnabled()) {
1331                                LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1332                            }
1333                            boolean needsPoisonAck = false;
1334                            synchronized (deliveredMessages) {
1335                                if (previouslyDeliveredMessages != null) {
1336                                    previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1337                                } else {
1338                                    // delivery while pending redelivery to another consumer on the same connection
1339                                    // not waiting for redelivery will help here
1340                                    needsPoisonAck = true;
1341                                }
1342                            }
1343                            if (needsPoisonAck) {
1344                                MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1345                                poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1346                                poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
1347                                        + session.getConnection().getConnectionInfo().getConnectionId()));
1348                                LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1349                                        + " consumer on this connection, failoverRedeliveryWaitPeriod="
1350                                        + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
1351                                session.sendAck(poisonAck);
1352                            } else {
1353                                if (transactedIndividualAck) {
1354                                    immediateIndividualTransactedAck(md);
1355                                } else {
1356                                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1357                                }
1358                            }
1359                        }
1360                    }
1361                }
1362            }
1363            if (++dispatchedCount % 1000 == 0) {
1364                dispatchedCount = 0;
1365                Thread.yield();
1366            }
1367        } catch (Exception e) {
1368            session.connection.onClientInternalException(e);
1369        }
1370    }
1371
1372    // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1373    private void clearDispatchList() {
1374        if (clearDispatchList) {
1375            synchronized (deliveredMessages) {
1376                if (clearDispatchList) {
1377                    if (!deliveredMessages.isEmpty()) {
1378                        if (session.isTransacted()) {
1379                            if (LOG.isDebugEnabled()) {
1380                                LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1381                            }
1382                            if (previouslyDeliveredMessages == null) {
1383                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1384                            }
1385                            for (MessageDispatch delivered : deliveredMessages) {
1386                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1387                            }
1388                        } else {
1389                            if (LOG.isDebugEnabled()) {
1390                                LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1391                            }
1392                            deliveredMessages.clear();
1393                            pendingAck = null;
1394                        }
1395                    }
1396                    clearDispatchList = false;
1397                }
1398            }
1399        }
1400    }
1401
1402    public int getMessageSize() {
1403        return unconsumedMessages.size();
1404    }
1405
1406    public void start() throws JMSException {
1407        if (unconsumedMessages.isClosed()) {
1408            return;
1409        }
1410        started.set(true);
1411        unconsumedMessages.start();
1412        session.executor.wakeup();
1413    }
1414
1415    public void stop() {
1416        started.set(false);
1417        unconsumedMessages.stop();
1418    }
1419
1420    @Override
1421    public String toString() {
1422        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1423               + " }";
1424    }
1425
1426    /**
1427     * Delivers a message to the message listener.
1428     *
1429     * @return
1430     * @throws JMSException
1431     */
1432    public boolean iterate() {
1433        MessageListener listener = this.messageListener.get();
1434        if (listener != null) {
1435            MessageDispatch md = unconsumedMessages.dequeueNoWait();
1436            if (md != null) {
1437                dispatch(md);
1438                return true;
1439            }
1440        }
1441        return false;
1442    }
1443
1444    public boolean isInUse(ActiveMQTempDestination destination) {
1445        return info.getDestination().equals(destination);
1446    }
1447
1448    public long getLastDeliveredSequenceId() {
1449        return lastDeliveredSequenceId;
1450    }
1451
1452    public IOException getFailureError() {
1453        return failureError;
1454    }
1455
1456    public void setFailureError(IOException failureError) {
1457        this.failureError = failureError;
1458    }
1459}