001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Session;
024import javax.jms.Topic;
025import javax.jms.TopicConnection;
026import javax.jms.TopicConnectionFactory;
027import javax.jms.TopicSession;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Bridge to other JMS Topic providers
035 */
036public class SimpleJmsTopicConnector extends JmsConnector {
037    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class);
038    private String outboundTopicConnectionFactoryName;
039    private String localConnectionFactoryName;
040    private TopicConnectionFactory outboundTopicConnectionFactory;
041    private TopicConnectionFactory localTopicConnectionFactory;
042    private InboundTopicBridge[] inboundTopicBridges;
043    private OutboundTopicBridge[] outboundTopicBridges;
044
045    /**
046     * @return Returns the inboundTopicBridges.
047     */
048    public InboundTopicBridge[] getInboundTopicBridges() {
049        return inboundTopicBridges;
050    }
051
052    /**
053     * @param inboundTopicBridges The inboundTopicBridges to set.
054     */
055    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
056        this.inboundTopicBridges = inboundTopicBridges;
057    }
058
059    /**
060     * @return Returns the outboundTopicBridges.
061     */
062    public OutboundTopicBridge[] getOutboundTopicBridges() {
063        return outboundTopicBridges;
064    }
065
066    /**
067     * @param outboundTopicBridges The outboundTopicBridges to set.
068     */
069    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
070        this.outboundTopicBridges = outboundTopicBridges;
071    }
072
073    /**
074     * @return Returns the localTopicConnectionFactory.
075     */
076    public TopicConnectionFactory getLocalTopicConnectionFactory() {
077        return localTopicConnectionFactory;
078    }
079
080    /**
081     * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
082     */
083    public void setLocalTopicConnectionFactory(TopicConnectionFactory localTopicConnectionFactory) {
084        this.localTopicConnectionFactory = localTopicConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundTopicConnectionFactory.
089     */
090    public TopicConnectionFactory getOutboundTopicConnectionFactory() {
091        return outboundTopicConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundTopicConnectionFactoryName.
096     */
097    public String getOutboundTopicConnectionFactoryName() {
098        return outboundTopicConnectionFactoryName;
099    }
100
101    /**
102     * @param foreignTopicConnectionFactoryName The foreignTopicConnectionFactoryName to set.
103     */
104    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
105        this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
106    }
107
108    /**
109     * @return Returns the localConnectionFactoryName.
110     */
111    public String getLocalConnectionFactoryName() {
112        return localConnectionFactoryName;
113    }
114
115    /**
116     * @param localConnectionFactoryName The localConnectionFactoryName to set.
117     */
118    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
119        this.localConnectionFactoryName = localConnectionFactoryName;
120    }
121
122    /**
123     * @return Returns the localTopicConnection.
124     */
125    public TopicConnection getLocalTopicConnection() {
126        return (TopicConnection) localConnection.get();
127    }
128
129    /**
130     * @param localTopicConnection The localTopicConnection to set.
131     */
132    public void setLocalTopicConnection(TopicConnection localTopicConnection) {
133        this.localConnection.set(localTopicConnection);
134    }
135
136    /**
137     * @return Returns the outboundTopicConnection.
138     */
139    public TopicConnection getOutboundTopicConnection() {
140        return (TopicConnection) foreignConnection.get();
141    }
142
143    /**
144     * @param foreignTopicConnection The foreignTopicConnection to set.
145     */
146    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
147        this.foreignConnection.set(foreignTopicConnection);
148    }
149
150    /**
151     * @param foreignTopicConnectionFactory The foreignTopicConnectionFactory to set.
152     */
153    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
154        this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
155    }
156
157    @Override
158    protected void initializeForeignConnection() throws NamingException, JMSException {
159
160        TopicConnection newConnection = null;
161
162        try {
163            if (foreignConnection.get() == null) {
164                // get the connection factories
165                if (outboundTopicConnectionFactory == null) {
166                    // look it up from JNDI
167                    if (outboundTopicConnectionFactoryName != null) {
168                        outboundTopicConnectionFactory = jndiOutboundTemplate
169                            .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
170                        if (outboundUsername != null) {
171                            newConnection = outboundTopicConnectionFactory
172                                .createTopicConnection(outboundUsername, outboundPassword);
173                        } else {
174                            newConnection = outboundTopicConnectionFactory.createTopicConnection();
175                        }
176                    } else {
177                        throw new JMSException("Cannot create foreignConnection - no information");
178                    }
179                } else {
180                    if (outboundUsername != null) {
181                        newConnection = outboundTopicConnectionFactory
182                            .createTopicConnection(outboundUsername, outboundPassword);
183                    } else {
184                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
185                    }
186                }
187            } else {
188                // Clear if for now in case something goes wrong during the init.
189                newConnection = (TopicConnection) foreignConnection.getAndSet(null);
190            }
191
192            // Register for any async error notifications now so we can reset in the
193            // case where there's not a lot of activity and a connection drops.
194            newConnection.setExceptionListener(new ExceptionListener() {
195                @Override
196                public void onException(JMSException exception) {
197                    handleConnectionFailure(foreignConnection.get());
198                }
199            });
200
201            if (outboundClientId != null && outboundClientId.length() > 0) {
202                newConnection.setClientID(getOutboundClientId());
203            }
204            newConnection.start();
205
206            outboundMessageConvertor.setConnection(newConnection);
207
208            // Configure the bridges with the new Outbound connection.
209            initializeInboundDestinationBridgesOutboundSide(newConnection);
210            initializeOutboundDestinationBridgesOutboundSide(newConnection);
211
212            // At this point all looks good, so this our current connection now.
213            foreignConnection.set(newConnection);
214        } catch (Exception ex) {
215            if (newConnection != null) {
216                try {
217                    newConnection.close();
218                } catch (Exception ignore) {}
219            }
220
221            throw ex;
222        }
223    }
224
225    @Override
226    protected void initializeLocalConnection() throws NamingException, JMSException {
227
228        TopicConnection newConnection = null;
229
230        try {
231            if (localConnection.get() == null) {
232                // get the connection factories
233                if (localTopicConnectionFactory == null) {
234                    if (embeddedConnectionFactory == null) {
235                        // look it up from JNDI
236                        if (localConnectionFactoryName != null) {
237                            localTopicConnectionFactory = jndiLocalTemplate
238                                .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
239                            if (localUsername != null) {
240                                newConnection = localTopicConnectionFactory
241                                    .createTopicConnection(localUsername, localPassword);
242                            } else {
243                                newConnection = localTopicConnectionFactory.createTopicConnection();
244                            }
245                        } else {
246                            throw new JMSException("Cannot create localConnection - no information");
247                        }
248                    } else {
249                        newConnection = embeddedConnectionFactory.createTopicConnection();
250                    }
251                } else {
252                    if (localUsername != null) {
253                        newConnection = localTopicConnectionFactory.
254                                createTopicConnection(localUsername, localPassword);
255                    } else {
256                        newConnection = localTopicConnectionFactory.createTopicConnection();
257                    }
258                }
259
260            } else {
261                // Clear if for now in case something goes wrong during the init.
262                newConnection = (TopicConnection) localConnection.getAndSet(null);
263            }
264
265            // Register for any async error notifications now so we can reset in the
266            // case where there's not a lot of activity and a connection drops.
267            newConnection.setExceptionListener(new ExceptionListener() {
268                @Override
269                public void onException(JMSException exception) {
270                    handleConnectionFailure(localConnection.get());
271                }
272            });
273
274            if (localClientId != null && localClientId.length() > 0) {
275                newConnection.setClientID(getLocalClientId());
276            }
277            newConnection.start();
278
279            inboundMessageConvertor.setConnection(newConnection);
280
281            // Configure the bridges with the new Local connection.
282            initializeInboundDestinationBridgesLocalSide(newConnection);
283            initializeOutboundDestinationBridgesLocalSide(newConnection);
284
285            // At this point all looks good, so this our current connection now.
286            localConnection.set(newConnection);
287        } catch (Exception ex) {
288            if (newConnection != null) {
289                try {
290                    newConnection.close();
291                } catch (Exception ignore) {}
292            }
293
294            throw ex;
295        }
296    }
297
298    protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
299        if (inboundTopicBridges != null) {
300            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
301
302            for (InboundTopicBridge bridge : inboundTopicBridges) {
303                String TopicName = bridge.getInboundTopicName();
304                Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
305                bridge.setConsumer(null);
306                bridge.setConsumerTopic(foreignTopic);
307                bridge.setConsumerConnection(connection);
308                bridge.setJmsConnector(this);
309                addInboundBridge(bridge);
310            }
311            outboundSession.close();
312        }
313    }
314
315    protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
316        if (inboundTopicBridges != null) {
317            TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
318
319            for (InboundTopicBridge bridge : inboundTopicBridges) {
320                String localTopicName = bridge.getLocalTopicName();
321                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
322                bridge.setProducerTopic(activemqTopic);
323                bridge.setProducerConnection(connection);
324                if (bridge.getJmsMessageConvertor() == null) {
325                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
326                }
327                bridge.setJmsConnector(this);
328                addInboundBridge(bridge);
329            }
330            localSession.close();
331        }
332    }
333
334    protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
335        if (outboundTopicBridges != null) {
336            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
337
338            for (OutboundTopicBridge bridge : outboundTopicBridges) {
339                String topicName = bridge.getOutboundTopicName();
340                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
341                bridge.setProducerTopic(foreignTopic);
342                bridge.setProducerConnection(connection);
343                if (bridge.getJmsMessageConvertor() == null) {
344                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
345                }
346                bridge.setJmsConnector(this);
347                addOutboundBridge(bridge);
348            }
349            outboundSession.close();
350        }
351    }
352
353    protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
354        if (outboundTopicBridges != null) {
355            TopicSession localSession =
356                    connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
357
358            for (OutboundTopicBridge bridge : outboundTopicBridges) {
359                String localTopicName = bridge.getLocalTopicName();
360                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
361                bridge.setConsumer(null);
362                bridge.setConsumerTopic(activemqTopic);
363                bridge.setConsumerConnection(connection);
364                bridge.setJmsConnector(this);
365                addOutboundBridge(bridge);
366            }
367            localSession.close();
368        }
369    }
370
371    @Override
372    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
373                                              Connection replyToConsumerConnection) {
374        Topic replyToProducerTopic = (Topic)destination;
375        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
376
377        if (isInbound) {
378            InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
379            if (bridge == null) {
380                bridge = new InboundTopicBridge() {
381                    @Override
382                    protected Destination processReplyToDestination(Destination destination) {
383                        return null;
384                    }
385                };
386                try {
387                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
388                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
389                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
390                    replyToConsumerSession.close();
391                    bridge.setConsumerTopic(replyToConsumerTopic);
392                    bridge.setProducerTopic(replyToProducerTopic);
393                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
394                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
395                    bridge.setDoHandleReplyTo(false);
396                    if (bridge.getJmsMessageConvertor() == null) {
397                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
398                    }
399                    bridge.setJmsConnector(this);
400                    bridge.start();
401                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
402                } catch (Exception e) {
403                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
404                    return null;
405                }
406                replyToBridges.put(replyToProducerTopic, bridge);
407            }
408            return bridge.getConsumerTopic();
409        } else {
410            OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
411            if (bridge == null) {
412                bridge = new OutboundTopicBridge() {
413                    @Override
414                    protected Destination processReplyToDestination(Destination destination) {
415                        return null;
416                    }
417                };
418                try {
419                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
420                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
421                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
422                    replyToConsumerSession.close();
423                    bridge.setConsumerTopic(replyToConsumerTopic);
424                    bridge.setProducerTopic(replyToProducerTopic);
425                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
426                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
427                    bridge.setDoHandleReplyTo(false);
428                    if (bridge.getJmsMessageConvertor() == null) {
429                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
430                    }
431                    bridge.setJmsConnector(this);
432                    bridge.start();
433                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
434                } catch (Exception e) {
435                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
436                    return null;
437                }
438                replyToBridges.put(replyToProducerTopic, bridge);
439            }
440            return bridge.getConsumerTopic();
441        }
442    }
443
444    protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
445        return session.createTopic(topicName);
446    }
447
448    protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
449        Topic result = null;
450
451        if (preferJndiDestinationLookup) {
452            try {
453                // look-up the Queue
454                result = jndiOutboundTemplate.lookup(topicName, Topic.class);
455            } catch (NamingException e) {
456                try {
457                    result = session.createTopic(topicName);
458                } catch (JMSException e1) {
459                    String errStr = "Failed to look-up or create Topic for name: " + topicName;
460                    LOG.error(errStr, e);
461                    JMSException jmsEx = new JMSException(errStr);
462                    jmsEx.setLinkedException(e1);
463                    throw jmsEx;
464                }
465            }
466        } else {
467            try {
468                result = session.createTopic(topicName);
469            } catch (JMSException e) {
470                // look-up the Topic
471                try {
472                    result = jndiOutboundTemplate.lookup(topicName, Topic.class);
473                } catch (NamingException e1) {
474                    String errStr = "Failed to look-up Topic for name: " + topicName;
475                    LOG.error(errStr, e);
476                    JMSException jmsEx = new JMSException(errStr);
477                    jmsEx.setLinkedException(e1);
478                    throw jmsEx;
479                }
480            }
481        }
482
483        return result;
484    }
485}