001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicLong;
021
022import org.apache.activemq.Service;
023import org.apache.activemq.command.ActiveMQQueue;
024import org.apache.activemq.command.ActiveMQTopic;
025import org.apache.activemq.command.BrokerId;
026import org.apache.activemq.command.BrokerInfo;
027import org.apache.activemq.command.Command;
028import org.apache.activemq.command.ConnectionId;
029import org.apache.activemq.command.ConnectionInfo;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.command.ExceptionResponse;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.command.Response;
037import org.apache.activemq.command.SessionInfo;
038import org.apache.activemq.command.ShutdownInfo;
039import org.apache.activemq.transport.DefaultTransportListener;
040import org.apache.activemq.transport.FutureResponse;
041import org.apache.activemq.transport.ResponseCallback;
042import org.apache.activemq.transport.Transport;
043import org.apache.activemq.util.IdGenerator;
044import org.apache.activemq.util.ServiceStopper;
045import org.apache.activemq.util.ServiceSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Forwards all messages from the local broker to the remote broker.
051 * 
052 * @org.apache.xbean.XBean
053 * 
054 * 
055 */
056public class ForwardingBridge implements Service {
057
058    private static final IdGenerator ID_GENERATOR = new IdGenerator();
059    private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
060
061    final AtomicLong enqueueCounter = new AtomicLong();
062    final AtomicLong dequeueCounter = new AtomicLong();
063    ConnectionInfo connectionInfo;
064    SessionInfo sessionInfo;
065    ProducerInfo producerInfo;
066    ConsumerInfo queueConsumerInfo;
067    ConsumerInfo topicConsumerInfo;
068    BrokerId localBrokerId;
069    BrokerId remoteBrokerId;
070    BrokerInfo localBrokerInfo;
071    BrokerInfo remoteBrokerInfo;
072
073    private final Transport localBroker;
074    private final Transport remoteBroker;
075    private String clientId;
076    private int prefetchSize = 1000;
077    private boolean dispatchAsync;
078    private String destinationFilter = ">";
079    private NetworkBridgeListener bridgeFailedListener;
080
081    public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082        this.localBroker = localBroker;
083        this.remoteBroker = remoteBroker;
084    }
085
086    public void start() throws Exception {
087        LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
088                 + " has been established.");
089
090        localBroker.setTransportListener(new DefaultTransportListener() {
091            public void onCommand(Object o) {
092                Command command = (Command)o;
093                serviceLocalCommand(command);
094            }
095
096            public void onException(IOException error) {
097                serviceLocalException(error);
098            }
099        });
100
101        remoteBroker.setTransportListener(new DefaultTransportListener() {
102            public void onCommand(Object o) {
103                Command command = (Command)o;
104                serviceRemoteCommand(command);
105            }
106
107            public void onException(IOException error) {
108                serviceRemoteException(error);
109            }
110        });
111
112        localBroker.start();
113        remoteBroker.start();
114    }
115
116    protected void triggerStartBridge() throws IOException {
117        Thread thead = new Thread() {
118            public void run() {
119                try {
120                    startBridge();
121                } catch (IOException e) {
122                    LOG.error("Failed to start network bridge: " + e, e);
123                }
124            }
125        };
126        thead.start();
127    }
128
129    /**
130     * @throws IOException
131     */
132    final void startBridge() throws IOException {
133        connectionInfo = new ConnectionInfo();
134        connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
135        connectionInfo.setClientId(clientId);
136        localBroker.oneway(connectionInfo);
137        remoteBroker.oneway(connectionInfo);
138
139        sessionInfo = new SessionInfo(connectionInfo, 1);
140        localBroker.oneway(sessionInfo);
141        remoteBroker.oneway(sessionInfo);
142
143        queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
144        queueConsumerInfo.setDispatchAsync(dispatchAsync);
145        queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
146        queueConsumerInfo.setPrefetchSize(prefetchSize);
147        queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
148        localBroker.oneway(queueConsumerInfo);
149
150        producerInfo = new ProducerInfo(sessionInfo, 1);
151        producerInfo.setResponseRequired(false);
152        remoteBroker.oneway(producerInfo);
153
154        if (connectionInfo.getClientId() != null) {
155            topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
156            topicConsumerInfo.setDispatchAsync(dispatchAsync);
157            topicConsumerInfo.setSubscriptionName("topic-bridge");
158            topicConsumerInfo.setRetroactive(true);
159            topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
160            topicConsumerInfo.setPrefetchSize(prefetchSize);
161            topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
162            localBroker.oneway(topicConsumerInfo);
163        }
164        LOG.info("Network connection between " + localBroker + " and " + remoteBroker
165                 + " has been established.");
166    }
167
168    public void stop() throws Exception {
169        try {
170            if (connectionInfo != null) {
171                localBroker.request(connectionInfo.createRemoveCommand());
172                remoteBroker.request(connectionInfo.createRemoveCommand());
173            }
174            localBroker.setTransportListener(null);
175            remoteBroker.setTransportListener(null);
176            localBroker.oneway(new ShutdownInfo());
177            remoteBroker.oneway(new ShutdownInfo());
178        } finally {
179            ServiceStopper ss = new ServiceStopper();
180            ss.stop(localBroker);
181            ss.stop(remoteBroker);
182            ss.throwFirstException();
183        }
184    }
185
186    public void serviceRemoteException(Throwable error) {
187        LOG.info("Unexpected remote exception: " + error);
188        LOG.debug("Exception trace: ", error);
189    }
190
191    protected void serviceRemoteCommand(Command command) {
192        try {
193            if (command.isBrokerInfo()) {
194                synchronized (this) {
195                    remoteBrokerInfo = (BrokerInfo)command;
196                    remoteBrokerId = remoteBrokerInfo.getBrokerId();
197                    if (localBrokerId != null) {
198                        if (localBrokerId.equals(remoteBrokerId)) {
199                            LOG.info("Disconnecting loop back connection.");
200                            ServiceSupport.dispose(this);
201                        } else {
202                            triggerStartBridge();
203                        }
204                    }
205                }
206            } else {
207                LOG.warn("Unexpected remote command: " + command);
208            }
209        } catch (IOException e) {
210            serviceLocalException(e);
211        }
212    }
213
214    public void serviceLocalException(Throwable error) {
215        LOG.info("Unexpected local exception: " + error);
216        LOG.debug("Exception trace: ", error);
217        fireBridgeFailed();
218    }
219
220    protected void serviceLocalCommand(Command command) {
221        try {
222            if (command.isMessageDispatch()) {
223
224                enqueueCounter.incrementAndGet();
225
226                final MessageDispatch md = (MessageDispatch)command;
227                Message message = md.getMessage();
228                message.setProducerId(producerInfo.getProducerId());
229
230                if (message.getOriginalTransactionId() == null) {
231                    message.setOriginalTransactionId(message.getTransactionId());
232                }
233                message.setTransactionId(null);
234
235                if (!message.isResponseRequired()) {
236                    // If the message was originally sent using async send, we
237                    // will preserve that QOS
238                    // by bridging it using an async send (small chance of
239                    // message loss).
240                    remoteBroker.oneway(message);
241                    dequeueCounter.incrementAndGet();
242                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
243
244                } else {
245
246                    // The message was not sent using async send, so we should
247                    // only ack the local
248                    // broker when we get confirmation that the remote broker
249                    // has received the message.
250                    ResponseCallback callback = new ResponseCallback() {
251                        public void onCompletion(FutureResponse future) {
252                            try {
253                                Response response = future.getResult();
254                                if (response.isException()) {
255                                    ExceptionResponse er = (ExceptionResponse)response;
256                                    serviceLocalException(er.getException());
257                                } else {
258                                    dequeueCounter.incrementAndGet();
259                                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
260                                }
261                            } catch (IOException e) {
262                                serviceLocalException(e);
263                            }
264                        }
265                    };
266
267                    remoteBroker.asyncRequest(message, callback);
268                }
269
270                // Ack on every message since we don't know if the broker is
271                // blocked due to memory
272                // usage and is waiting for an Ack to un-block him.
273
274                // Acking a range is more efficient, but also more prone to
275                // locking up a server
276                // Perhaps doing something like the following should be policy
277                // based.
278                // if(
279                // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
280                // ) {
281                // queueDispatched++;
282                // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
283                // ) {
284                // localBroker.oneway(new MessageAck(md,
285                // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
286                // queueDispatched=0;
287                // }
288                // } else {
289                // topicDispatched++;
290                // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
291                // ) {
292                // localBroker.oneway(new MessageAck(md,
293                // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
294                // topicDispatched=0;
295                // }
296                // }
297            } else if (command.isBrokerInfo()) {
298                synchronized (this) {
299                    localBrokerInfo = (BrokerInfo)command;
300                    localBrokerId = localBrokerInfo.getBrokerId();
301                    if (remoteBrokerId != null) {
302                        if (remoteBrokerId.equals(localBrokerId)) {
303                            LOG.info("Disconnecting loop back connection.");
304                            ServiceSupport.dispose(this);
305                        } else {
306                            triggerStartBridge();
307                        }
308                    }
309                }
310            } else {
311                LOG.debug("Unexpected local command: " + command);
312            }
313        } catch (IOException e) {
314            serviceLocalException(e);
315        }
316    }
317
318    public String getClientId() {
319        return clientId;
320    }
321
322    public void setClientId(String clientId) {
323        this.clientId = clientId;
324    }
325
326    public int getPrefetchSize() {
327        return prefetchSize;
328    }
329
330    public void setPrefetchSize(int prefetchSize) {
331        this.prefetchSize = prefetchSize;
332    }
333
334    public boolean isDispatchAsync() {
335        return dispatchAsync;
336    }
337
338    public void setDispatchAsync(boolean dispatchAsync) {
339        this.dispatchAsync = dispatchAsync;
340    }
341
342    public String getDestinationFilter() {
343        return destinationFilter;
344    }
345
346    public void setDestinationFilter(String destinationFilter) {
347        this.destinationFilter = destinationFilter;
348    }
349
350    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
351        this.bridgeFailedListener = listener;
352    }
353
354    private void fireBridgeFailed() {
355        NetworkBridgeListener l = this.bridgeFailedListener;
356        if (l != null) {
357            l.bridgeFailed();
358        }
359    }
360
361    public String getRemoteAddress() {
362        return remoteBroker.getRemoteAddress();
363    }
364
365    public String getLocalAddress() {
366        return localBroker.getRemoteAddress();
367    }
368
369    public String getLocalBrokerName() {
370        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
371    }
372
373    public String getRemoteBrokerName() {
374        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
375    }
376
377    public long getDequeueCounter() {
378        return dequeueCounter.get();
379    }
380
381    public long getEnqueueCounter() {
382        return enqueueCounter.get();
383    }
384
385}