001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery.simple;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.activemq.command.DiscoveryEvent;
024import org.apache.activemq.thread.DefaultThreadPools;
025import org.apache.activemq.transport.discovery.DiscoveryAgent;
026import org.apache.activemq.transport.discovery.DiscoveryListener;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A simple DiscoveryAgent that allows static configuration of the discovered
032 * services.
033 * 
034 * 
035 */
036public class SimpleDiscoveryAgent implements DiscoveryAgent {
037
038    private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039    private long initialReconnectDelay = 1000;
040    private long maxReconnectDelay = 1000 * 30;
041    private long backOffMultiplier = 2;
042    private boolean useExponentialBackOff=true;
043    private int maxReconnectAttempts;
044    private final Object sleepMutex = new Object();
045    private long minConnectTime = 5000;
046    private DiscoveryListener listener;
047    private String services[] = new String[] {};
048    private final AtomicBoolean running = new AtomicBoolean(false);
049
050    class SimpleDiscoveryEvent extends DiscoveryEvent {
051
052        private int connectFailures;
053        private long reconnectDelay = initialReconnectDelay;
054        private long connectTime = System.currentTimeMillis();
055        private AtomicBoolean failed = new AtomicBoolean(false);
056
057        public SimpleDiscoveryEvent(String service) {
058            super(service);
059        }
060
061        @Override
062        public String toString() {
063            return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
064        }
065    }
066
067    public void setDiscoveryListener(DiscoveryListener listener) {
068        this.listener = listener;
069    }
070
071    public void registerService(String name) throws IOException {
072    }
073
074    public void start() throws Exception {
075        running.set(true);
076        for (int i = 0; i < services.length; i++) {
077            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
078        }
079    }
080
081    public void stop() throws Exception {
082        running.set(false);
083        synchronized (sleepMutex) {
084            sleepMutex.notifyAll();
085        }
086    }
087
088    public String[] getServices() {
089        return services;
090    }
091
092    public void setServices(String services) {
093        this.services = services.split(",");
094    }
095
096    public void setServices(String services[]) {
097        this.services = services;
098    }
099
100    public void setServices(URI services[]) {
101        this.services = new String[services.length];
102        for (int i = 0; i < services.length; i++) {
103            this.services[i] = services[i].toString();
104        }
105    }
106
107    public void serviceFailed(DiscoveryEvent devent) throws IOException {
108
109        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
110        if (event.failed.compareAndSet(false, true)) {
111
112            listener.onServiceRemove(event);
113            DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
114                public void run() {
115
116                    // We detect a failed connection attempt because the service
117                    // fails right
118                    // away.
119                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
120                        LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
121
122                        event.connectFailures++;
123
124                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
125                            LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled for: " + event);
126                            return;
127                        }
128
129                        synchronized (sleepMutex) {
130                            try {
131                                if (!running.get()) {
132                                    LOG.debug("Reconnecting disabled: stopped");
133                                    return;
134                                }
135
136                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
137                                sleepMutex.wait(event.reconnectDelay);
138                            } catch (InterruptedException ie) {
139                                LOG.debug("Reconnecting disabled: " + ie);
140                                Thread.currentThread().interrupt();
141                                return;
142                            }
143                        }
144
145                        if (!useExponentialBackOff) {
146                            event.reconnectDelay = initialReconnectDelay;
147                        } else {
148                            // Exponential increment of reconnect delay.
149                            event.reconnectDelay *= backOffMultiplier;
150                            if (event.reconnectDelay > maxReconnectDelay) {
151                                event.reconnectDelay = maxReconnectDelay;
152                            }
153                        }
154
155                    } else {
156                        event.connectFailures = 0;
157                        event.reconnectDelay = initialReconnectDelay;
158                    }
159
160                    if (!running.get()) {
161                        LOG.debug("Reconnecting disabled: stopped");
162                        return;
163                    }
164
165                    event.connectTime = System.currentTimeMillis();
166                    event.failed.set(false);
167                    listener.onServiceAdd(event);
168                }
169            }, "Simple Discovery Agent");
170        }
171    }
172
173    public long getBackOffMultiplier() {
174        return backOffMultiplier;
175    }
176
177    public void setBackOffMultiplier(long backOffMultiplier) {
178        this.backOffMultiplier = backOffMultiplier;
179    }
180
181    public long getInitialReconnectDelay() {
182        return initialReconnectDelay;
183    }
184
185    public void setInitialReconnectDelay(long initialReconnectDelay) {
186        this.initialReconnectDelay = initialReconnectDelay;
187    }
188
189    public int getMaxReconnectAttempts() {
190        return maxReconnectAttempts;
191    }
192
193    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
194        this.maxReconnectAttempts = maxReconnectAttempts;
195    }
196
197    public long getMaxReconnectDelay() {
198        return maxReconnectDelay;
199    }
200
201    public void setMaxReconnectDelay(long maxReconnectDelay) {
202        this.maxReconnectDelay = maxReconnectDelay;
203    }
204
205    public long getMinConnectTime() {
206        return minConnectTime;
207    }
208
209    public void setMinConnectTime(long minConnectTime) {
210        this.minConnectTime = minConnectTime;
211    }
212
213    public boolean isUseExponentialBackOff() {
214        return useExponentialBackOff;
215    }
216
217    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
218        this.useExponentialBackOff = useExponentialBackOff;
219    }
220}