001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.discovery.multicast; 019 020import java.io.IOException; 021import java.net.DatagramPacket; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.MulticastSocket; 025import java.net.NetworkInterface; 026import java.net.SocketAddress; 027import java.net.SocketTimeoutException; 028import java.net.URI; 029import java.util.Iterator; 030import java.util.Map; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.LinkedBlockingQueue; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038 039import org.apache.activemq.command.DiscoveryEvent; 040import org.apache.activemq.transport.discovery.DiscoveryAgent; 041import org.apache.activemq.transport.discovery.DiscoveryListener; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets 047 * encoded using any wireformat, but openwire by default. 048 * 049 * 050 */ 051public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { 052 053 public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; 054 public static final String DEFAULT_HOST_STR = "default"; 055 public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 056 public static final int DEFAULT_PORT = 6155; 057 058 private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class); 059 private static final String TYPE_SUFFIX = "ActiveMQ-4."; 060 private static final String ALIVE = "alive."; 061 private static final String DEAD = "dead."; 062 private static final String DELIMITER = "%"; 063 private static final int BUFF_SIZE = 8192; 064 private static final int DEFAULT_IDLE_TIME = 500; 065 private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10; 066 067 private long initialReconnectDelay = 1000 * 5; 068 private long maxReconnectDelay = 1000 * 30; 069 private long backOffMultiplier = 2; 070 private boolean useExponentialBackOff; 071 private int maxReconnectAttempts; 072 073 private int timeToLive = 1; 074 private boolean loopBackMode; 075 private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>(); 076 private String group = "default"; 077 private URI discoveryURI; 078 private InetAddress inetAddress; 079 private SocketAddress sockAddress; 080 private DiscoveryListener discoveryListener; 081 private String selfService; 082 private MulticastSocket mcast; 083 private Thread runner; 084 private long keepAliveInterval = DEFAULT_IDLE_TIME; 085 private String mcInterface; 086 private String mcNetworkInterface; 087 private String mcJoinNetworkInterface; 088 private long lastAdvertizeTime; 089 private AtomicBoolean started = new AtomicBoolean(false); 090 private boolean reportAdvertizeFailed = true; 091 private ExecutorService executor = null; 092 093 class RemoteBrokerData { 094 final String brokerName; 095 final String service; 096 long lastHeartBeat; 097 long recoveryTime; 098 int failureCount; 099 boolean failed; 100 101 public RemoteBrokerData(String brokerName, String service) { 102 this.brokerName = brokerName; 103 this.service = service; 104 this.lastHeartBeat = System.currentTimeMillis(); 105 } 106 107 public synchronized void updateHeartBeat() { 108 lastHeartBeat = System.currentTimeMillis(); 109 110 // Consider that the broker recovery has succeeded if it has not 111 // failed in 60 seconds. 112 if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { 113 if (LOG.isDebugEnabled()) { 114 LOG.debug("I now think that the " + service + " service has recovered."); 115 } 116 failureCount = 0; 117 recoveryTime = 0; 118 } 119 } 120 121 public synchronized long getLastHeartBeat() { 122 return lastHeartBeat; 123 } 124 125 public synchronized boolean markFailed() { 126 if (!failed) { 127 failed = true; 128 failureCount++; 129 130 long reconnectDelay; 131 if (!useExponentialBackOff) { 132 reconnectDelay = initialReconnectDelay; 133 } else { 134 reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount); 135 if (reconnectDelay > maxReconnectDelay) { 136 reconnectDelay = maxReconnectDelay; 137 } 138 } 139 140 if (LOG.isDebugEnabled()) { 141 LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay 142 + " ms, the current failure count is: " + failureCount); 143 } 144 145 recoveryTime = System.currentTimeMillis() + reconnectDelay; 146 return true; 147 } 148 return false; 149 } 150 151 /** 152 * @return true if this broker is marked failed and it is now the right 153 * time to start recovery. 154 */ 155 public synchronized boolean doRecovery() { 156 if (!failed) { 157 return false; 158 } 159 160 // Are we done trying to recover this guy? 161 if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { 162 if (LOG.isDebugEnabled()) { 163 LOG.debug("Max reconnect attempts of the " + service + " service has been reached."); 164 } 165 return false; 166 } 167 168 // Is it not yet time? 169 if (System.currentTimeMillis() < recoveryTime) { 170 return false; 171 } 172 173 if (LOG.isDebugEnabled()) { 174 LOG.debug("Resuming event advertisement of the " + service + " service."); 175 } 176 failed = false; 177 return true; 178 } 179 180 public boolean isFailed() { 181 return failed; 182 } 183 } 184 185 /** 186 * Set the discovery listener 187 * 188 * @param listener 189 */ 190 public void setDiscoveryListener(DiscoveryListener listener) { 191 this.discoveryListener = listener; 192 } 193 194 /** 195 * register a service 196 */ 197 public void registerService(String name) throws IOException { 198 this.selfService = name; 199 if (started.get()) { 200 doAdvertizeSelf(); 201 } 202 } 203 204 /** 205 * @return Returns the loopBackMode. 206 */ 207 public boolean isLoopBackMode() { 208 return loopBackMode; 209 } 210 211 /** 212 * @param loopBackMode The loopBackMode to set. 213 */ 214 public void setLoopBackMode(boolean loopBackMode) { 215 this.loopBackMode = loopBackMode; 216 } 217 218 /** 219 * @return Returns the timeToLive. 220 */ 221 public int getTimeToLive() { 222 return timeToLive; 223 } 224 225 /** 226 * @param timeToLive The timeToLive to set. 227 */ 228 public void setTimeToLive(int timeToLive) { 229 this.timeToLive = timeToLive; 230 } 231 232 /** 233 * @return the discoveryURI 234 */ 235 public URI getDiscoveryURI() { 236 return discoveryURI; 237 } 238 239 /** 240 * Set the discoveryURI 241 * 242 * @param discoveryURI 243 */ 244 public void setDiscoveryURI(URI discoveryURI) { 245 this.discoveryURI = discoveryURI; 246 } 247 248 public long getKeepAliveInterval() { 249 return keepAliveInterval; 250 } 251 252 public void setKeepAliveInterval(long keepAliveInterval) { 253 this.keepAliveInterval = keepAliveInterval; 254 } 255 256 public void setInterface(String mcInterface) { 257 this.mcInterface = mcInterface; 258 } 259 260 public void setNetworkInterface(String mcNetworkInterface) { 261 this.mcNetworkInterface = mcNetworkInterface; 262 } 263 264 public void setJoinNetworkInterface(String mcJoinNetwrokInterface) { 265 this.mcJoinNetworkInterface = mcJoinNetwrokInterface; 266 } 267 268 /** 269 * start the discovery agent 270 * 271 * @throws Exception 272 */ 273 public void start() throws Exception { 274 275 if (started.compareAndSet(false, true)) { 276 277 if (group == null || group.length() == 0) { 278 throw new IOException("You must specify a group to discover"); 279 } 280 String type = getType(); 281 if (!type.endsWith(".")) { 282 LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); 283 type += "."; 284 } 285 286 if (discoveryURI == null) { 287 discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING); 288 } 289 290 if (LOG.isTraceEnabled()) 291 LOG.trace("start - discoveryURI = " + discoveryURI); 292 293 String myHost = discoveryURI.getHost(); 294 int myPort = discoveryURI.getPort(); 295 296 if( DEFAULT_HOST_STR.equals(myHost) ) 297 myHost = DEFAULT_HOST_IP; 298 299 if(myPort < 0 ) 300 myPort = DEFAULT_PORT; 301 302 if (LOG.isTraceEnabled()) { 303 LOG.trace("start - myHost = " + myHost); 304 LOG.trace("start - myPort = " + myPort); 305 LOG.trace("start - group = " + group ); 306 LOG.trace("start - interface = " + mcInterface ); 307 LOG.trace("start - network interface = " + mcNetworkInterface ); 308 LOG.trace("start - join network interface = " + mcJoinNetworkInterface ); 309 } 310 311 this.inetAddress = InetAddress.getByName(myHost); 312 this.sockAddress = new InetSocketAddress(this.inetAddress, myPort); 313 mcast = new MulticastSocket(myPort); 314 mcast.setLoopbackMode(loopBackMode); 315 mcast.setTimeToLive(getTimeToLive()); 316 if (mcJoinNetworkInterface != null) { 317 mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface)); 318 } 319 else { 320 mcast.joinGroup(inetAddress); 321 } 322 mcast.setSoTimeout((int)keepAliveInterval); 323 if (mcInterface != null) { 324 mcast.setInterface(InetAddress.getByName(mcInterface)); 325 } 326 if (mcNetworkInterface != null) { 327 mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface)); 328 } 329 runner = new Thread(this); 330 runner.setName(this.toString() + ":" + runner.getName()); 331 runner.setDaemon(true); 332 runner.start(); 333 doAdvertizeSelf(); 334 } 335 } 336 337 /** 338 * stop the channel 339 * 340 * @throws Exception 341 */ 342 public void stop() throws Exception { 343 if (started.compareAndSet(true, false)) { 344 doAdvertizeSelf(); 345 if (mcast != null) { 346 mcast.close(); 347 } 348 if (runner != null) { 349 runner.interrupt(); 350 } 351 getExecutor().shutdownNow(); 352 } 353 } 354 355 public String getType() { 356 return group + "." + TYPE_SUFFIX; 357 } 358 359 public void run() { 360 byte[] buf = new byte[BUFF_SIZE]; 361 DatagramPacket packet = new DatagramPacket(buf, 0, buf.length); 362 while (started.get()) { 363 doTimeKeepingServices(); 364 try { 365 mcast.receive(packet); 366 if (packet.getLength() > 0) { 367 String str = new String(packet.getData(), packet.getOffset(), packet.getLength()); 368 processData(str); 369 } 370 } catch (SocketTimeoutException se) { 371 // ignore 372 } catch (IOException e) { 373 if (started.get()) { 374 LOG.error("failed to process packet: " + e); 375 } 376 } 377 } 378 } 379 380 private void processData(String str) { 381 if (discoveryListener != null) { 382 if (str.startsWith(getType())) { 383 String payload = str.substring(getType().length()); 384 if (payload.startsWith(ALIVE)) { 385 String brokerName = getBrokerName(payload.substring(ALIVE.length())); 386 String service = payload.substring(ALIVE.length() + brokerName.length() + 2); 387 processAlive(brokerName, service); 388 } else { 389 String brokerName = getBrokerName(payload.substring(DEAD.length())); 390 String service = payload.substring(DEAD.length() + brokerName.length() + 2); 391 processDead(service); 392 } 393 } 394 } 395 } 396 397 private void doTimeKeepingServices() { 398 if (started.get()) { 399 long currentTime = System.currentTimeMillis(); 400 if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) { 401 doAdvertizeSelf(); 402 lastAdvertizeTime = currentTime; 403 } 404 doExpireOldServices(); 405 } 406 } 407 408 private void doAdvertizeSelf() { 409 if (selfService != null) { 410 String payload = getType(); 411 payload += started.get() ? ALIVE : DEAD; 412 payload += DELIMITER + "localhost" + DELIMITER; 413 payload += selfService; 414 try { 415 byte[] data = payload.getBytes(); 416 DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress); 417 mcast.send(packet); 418 } catch (IOException e) { 419 // If a send fails, chances are all subsequent sends will fail 420 // too.. No need to keep reporting the 421 // same error over and over. 422 if (reportAdvertizeFailed) { 423 reportAdvertizeFailed = false; 424 LOG.error("Failed to advertise our service: " + payload, e); 425 if ("Operation not permitted".equals(e.getMessage())) { 426 LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. " 427 + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress()); 428 } 429 } 430 } 431 } 432 } 433 434 private void processAlive(String brokerName, String service) { 435 if (selfService == null || !service.equals(selfService)) { 436 RemoteBrokerData data = brokersByService.get(service); 437 if (data == null) { 438 data = new RemoteBrokerData(brokerName, service); 439 brokersByService.put(service, data); 440 fireServiceAddEvent(data); 441 doAdvertizeSelf(); 442 } else { 443 data.updateHeartBeat(); 444 if (data.doRecovery()) { 445 fireServiceAddEvent(data); 446 } 447 } 448 } 449 } 450 451 private void processDead(String service) { 452 if (!service.equals(selfService)) { 453 RemoteBrokerData data = brokersByService.remove(service); 454 if (data != null && !data.isFailed()) { 455 fireServiceRemovedEvent(data); 456 } 457 } 458 } 459 460 private void doExpireOldServices() { 461 long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 462 for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) { 463 RemoteBrokerData data = i.next(); 464 if (data.getLastHeartBeat() < expireTime) { 465 processDead(data.service); 466 } 467 } 468 } 469 470 private String getBrokerName(String str) { 471 String result = null; 472 int start = str.indexOf(DELIMITER); 473 if (start >= 0) { 474 int end = str.indexOf(DELIMITER, start + 1); 475 result = str.substring(start + 1, end); 476 } 477 return result; 478 } 479 480 public void serviceFailed(DiscoveryEvent event) throws IOException { 481 RemoteBrokerData data = brokersByService.get(event.getServiceName()); 482 if (data != null && data.markFailed()) { 483 fireServiceRemovedEvent(data); 484 } 485 } 486 487 private void fireServiceRemovedEvent(RemoteBrokerData data) { 488 if (discoveryListener != null && started.get()) { 489 final DiscoveryEvent event = new DiscoveryEvent(data.service); 490 event.setBrokerName(data.brokerName); 491 492 // Have the listener process the event async so that 493 // he does not block this thread since we are doing time sensitive 494 // processing of events. 495 getExecutor().execute(new Runnable() { 496 public void run() { 497 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 498 if (discoveryListener != null) { 499 discoveryListener.onServiceRemove(event); 500 } 501 } 502 }); 503 } 504 } 505 506 private void fireServiceAddEvent(RemoteBrokerData data) { 507 if (discoveryListener != null && started.get()) { 508 final DiscoveryEvent event = new DiscoveryEvent(data.service); 509 event.setBrokerName(data.brokerName); 510 511 // Have the listener process the event async so that 512 // he does not block this thread since we are doing time sensitive 513 // processing of events. 514 getExecutor().execute(new Runnable() { 515 public void run() { 516 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 517 if (discoveryListener != null) { 518 discoveryListener.onServiceAdd(event); 519 } 520 } 521 }); 522 } 523 } 524 525 private ExecutorService getExecutor() { 526 if (executor == null) { 527 final String threadName = "Notifier-" + this.toString(); 528 executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 529 public Thread newThread(Runnable runable) { 530 Thread t = new Thread(runable, threadName); 531 t.setDaemon(true); 532 return t; 533 } 534 }); 535 } 536 return executor; 537 } 538 539 public long getBackOffMultiplier() { 540 return backOffMultiplier; 541 } 542 543 public void setBackOffMultiplier(long backOffMultiplier) { 544 this.backOffMultiplier = backOffMultiplier; 545 } 546 547 public long getInitialReconnectDelay() { 548 return initialReconnectDelay; 549 } 550 551 public void setInitialReconnectDelay(long initialReconnectDelay) { 552 this.initialReconnectDelay = initialReconnectDelay; 553 } 554 555 public int getMaxReconnectAttempts() { 556 return maxReconnectAttempts; 557 } 558 559 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 560 this.maxReconnectAttempts = maxReconnectAttempts; 561 } 562 563 public long getMaxReconnectDelay() { 564 return maxReconnectDelay; 565 } 566 567 public void setMaxReconnectDelay(long maxReconnectDelay) { 568 this.maxReconnectDelay = maxReconnectDelay; 569 } 570 571 public boolean isUseExponentialBackOff() { 572 return useExponentialBackOff; 573 } 574 575 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 576 this.useExponentialBackOff = useExponentialBackOff; 577 } 578 579 public void setGroup(String group) { 580 this.group = group; 581 } 582 583 @Override 584 public String toString() { 585 return "MulticastDiscoveryAgent-" 586 + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener); 587 } 588}