001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq;
019
020import java.util.Collections;
021import java.util.LinkedList;
022import java.util.List;
023
024import javax.jms.ConnectionConsumer;
025import javax.jms.IllegalStateException;
026import javax.jms.JMSException;
027import javax.jms.ServerSession;
028import javax.jms.ServerSessionPool;
029import javax.jms.Session;
030
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.command.MessageDispatch;
033
034/**
035 * For application servers, <CODE>Connection</CODE> objects provide a special
036 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
037 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
038 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
039 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
040 * <p/>
041 * <P>
042 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
043 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
044 * and starts it. As traffic picks up, messages can back up. If this happens, a
045 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
046 * with more than one message. This reduces the thread context switches and
047 * minimizes resource use at the expense of some serialization of message
048 * processing.
049 * 
050 * @see javax.jms.Connection#createConnectionConsumer
051 * @see javax.jms.Connection#createDurableConnectionConsumer
052 * @see javax.jms.QueueConnection#createConnectionConsumer
053 * @see javax.jms.TopicConnection#createConnectionConsumer
054 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
055 */
056
057public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
058
059    private ActiveMQConnection connection;
060    private ServerSessionPool sessionPool;
061    private ConsumerInfo consumerInfo;
062    private boolean closed;
063
064    /**
065     * Create a ConnectionConsumer
066     * 
067     * @param theConnection
068     * @param theSessionPool
069     * @param theConsumerInfo
070     * @throws JMSException
071     */
072    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
073        this.connection = theConnection;
074        this.sessionPool = theSessionPool;
075        this.consumerInfo = theConsumerInfo;
076
077        this.connection.addConnectionConsumer(this);
078        this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
079        this.connection.asyncSendPacket(this.consumerInfo);
080    }
081
082    /**
083     * Gets the server session pool associated with this connection consumer.
084     * 
085     * @return the server session pool used by this connection consumer
086     * @throws JMSException if the JMS provider fails to get the server session
087     *                 pool associated with this consumer due to some internal
088     *                 error.
089     */
090
091    public ServerSessionPool getServerSessionPool() throws JMSException {
092        if (closed) {
093            throw new IllegalStateException("The Connection Consumer is closed");
094        }
095        return this.sessionPool;
096    }
097
098    /**
099     * Closes the connection consumer. <p/>
100     * <P>
101     * Since a provider may allocate some resources on behalf of a connection
102     * consumer outside the Java virtual machine, clients should close these
103     * resources when they are not needed. Relying on garbage collection to
104     * eventually reclaim these resources may not be timely enough.
105     * 
106     * @throws JMSException
107     */
108
109    public void close() throws JMSException {
110        if (!closed) {
111            dispose();
112            this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
113        }
114
115    }
116
117    public void dispose() {
118        if (!closed) {
119            this.connection.removeDispatcher(consumerInfo.getConsumerId());
120            this.connection.removeConnectionConsumer(this);
121            closed = true;
122        }
123    }
124
125    public void dispatch(MessageDispatch messageDispatch) {
126        try {
127            messageDispatch.setConsumer(this);
128
129            ServerSession serverSession = sessionPool.getServerSession();
130            Session s = serverSession.getSession();
131            ActiveMQSession session = null;
132
133            if (s instanceof ActiveMQSession) {
134                session = (ActiveMQSession)s;
135            } else if (s instanceof ActiveMQTopicSession) {
136                ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
137                session = (ActiveMQSession)topicSession.getNext();
138            } else if (s instanceof ActiveMQQueueSession) {
139                ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
140                session = (ActiveMQSession)queueSession.getNext();
141            } else {
142                connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
143                return;
144            }
145
146            session.dispatch(messageDispatch);
147            serverSession.start();
148        } catch (JMSException e) {
149            connection.onAsyncException(e);
150        }
151    }
152
153    public String toString() {
154        return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
155    }
156
157    public void clearMessagesInProgress() {
158        // future: may want to deal with rollback of in progress messages to track re deliveries
159        // before indicating that all is complete.        
160        // Till there is a need, lets immediately allow dispatch
161        this.connection.transportInterruptionProcessingComplete();
162    }
163}