001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.security.AccessController; 023import java.security.PrivilegedAction; 024import java.util.*; 025import java.util.concurrent.RejectedExecutionHandler; 026 027import javax.jms.Connection; 028import javax.jms.ConnectionFactory; 029import javax.jms.ExceptionListener; 030import javax.jms.JMSException; 031import javax.jms.QueueConnection; 032import javax.jms.QueueConnectionFactory; 033import javax.jms.TopicConnection; 034import javax.jms.TopicConnectionFactory; 035import javax.naming.Context; 036 037import org.apache.activemq.blob.BlobTransferPolicy; 038import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 039import org.apache.activemq.jndi.JNDIBaseStorable; 040import org.apache.activemq.management.JMSStatsImpl; 041import org.apache.activemq.management.StatsCapable; 042import org.apache.activemq.management.StatsImpl; 043import org.apache.activemq.thread.TaskRunnerFactory; 044import org.apache.activemq.transport.Transport; 045import org.apache.activemq.transport.TransportFactory; 046import org.apache.activemq.transport.TransportListener; 047import org.apache.activemq.util.*; 048import org.apache.activemq.util.URISupport.CompositeData; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A ConnectionFactory is an an Administered object, and is used for creating 054 * Connections. <p/> This class also implements QueueConnectionFactory and 055 * TopicConnectionFactory. You can use this connection to create both 056 * QueueConnections and TopicConnections. 057 * 058 * 059 * @see javax.jms.ConnectionFactory 060 */ 061public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 062 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class); 063 private static final String DEFAULT_BROKER_HOST; 064 private static final int DEFAULT_BROKER_PORT; 065 static{ 066 String host = null; 067 String port = null; 068 try { 069 host = AccessController.doPrivileged(new PrivilegedAction<String>() { 070 @Override 071 public String run() { 072 String result = System.getProperty("org.apache.activemq.AMQ_HOST"); 073 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result; 074 return result; 075 } 076 }); 077 port = AccessController.doPrivileged(new PrivilegedAction<String>() { 078 @Override 079 public String run() { 080 String result = System.getProperty("org.apache.activemq.AMQ_PORT"); 081 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result; 082 return result; 083 } 084 }); 085 }catch(Throwable e){ 086 LOG.debug("Failed to look up System properties for host and port",e); 087 } 088 host = (host == null || host.isEmpty()) ? "localhost" : host; 089 port = (port == null || port.isEmpty()) ? "61616" : port; 090 DEFAULT_BROKER_HOST = host; 091 DEFAULT_BROKER_PORT = Integer.parseInt(port); 092 } 093 094 095 public static final String DEFAULT_BROKER_BIND_URL; 096 097 static{ 098 final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 099 String bindURL = null; 100 101 try { 102 bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() { 103 @Override 104 public String run() { 105 String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL"); 106 result = (result==null||result.isEmpty()) ? System.getProperty("BROKER_BIND_URL",defaultURL) : result; 107 return result; 108 } 109 }); 110 }catch(Throwable e){ 111 LOG.debug("Failed to look up System properties for host and port",e); 112 } 113 bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; 114 DEFAULT_BROKER_BIND_URL = bindURL; 115 } 116 117 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 118 public static final String DEFAULT_USER = null; 119 public static final String DEFAULT_PASSWORD = null; 120 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 121 122 protected URI brokerURL; 123 protected String userName; 124 protected String password; 125 protected String clientID; 126 protected boolean dispatchAsync=true; 127 protected boolean alwaysSessionAsync=true; 128 129 JMSStatsImpl factoryStats = new JMSStatsImpl(); 130 131 private IdGenerator clientIdGenerator; 132 private String clientIDPrefix; 133 private IdGenerator connectionIdGenerator; 134 private String connectionIDPrefix; 135 136 // client policies 137 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 138 private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); 139 { 140 redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); 141 } 142 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 143 private MessageTransformer transformer; 144 145 private boolean disableTimeStampsByDefault; 146 private boolean optimizedMessageDispatch = true; 147 private long optimizeAcknowledgeTimeOut = 300; 148 private long optimizedAckScheduledAckInterval = 0; 149 private boolean copyMessageOnSend = true; 150 private boolean useCompression; 151 private boolean objectMessageSerializationDefered; 152 private boolean useAsyncSend; 153 private boolean optimizeAcknowledge; 154 private int closeTimeout = 15000; 155 private boolean useRetroactiveConsumer; 156 private boolean exclusiveConsumer; 157 private boolean nestedMapAndListEnabled = true; 158 private boolean alwaysSyncSend; 159 private boolean watchTopicAdvisories = true; 160 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 161 private long warnAboutUnstartedConnectionTimeout = 500L; 162 private int sendTimeout = 0; 163 private int connectResponseTimeout = 0; 164 private boolean sendAcksAsync=true; 165 private TransportListener transportListener; 166 private ExceptionListener exceptionListener; 167 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 168 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 169 private boolean useDedicatedTaskRunner; 170 private long consumerFailoverRedeliveryWaitPeriod = 0; 171 private boolean checkForDuplicates = true; 172 private ClientInternalExceptionListener clientInternalExceptionListener; 173 private boolean messagePrioritySupported = false; 174 private boolean transactedIndividualAck = false; 175 private boolean nonBlockingRedelivery = false; 176 private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; 177 private TaskRunnerFactory sessionTaskRunner; 178 private RejectedExecutionHandler rejectedTaskHandler = null; 179 protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class 180 private boolean rmIdFromConnectionId = false; 181 private boolean consumerExpiryCheckEnabled = true; 182 private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages); 183 private boolean trustAllPackages = false; 184 185 // ///////////////////////////////////////////// 186 // 187 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 188 // 189 // ///////////////////////////////////////////// 190 191 public ActiveMQConnectionFactory() { 192 this(DEFAULT_BROKER_URL); 193 } 194 195 public ActiveMQConnectionFactory(String brokerURL) { 196 this(createURI(brokerURL)); 197 } 198 199 public ActiveMQConnectionFactory(URI brokerURL) { 200 setBrokerURL(brokerURL.toString()); 201 } 202 203 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 204 setUserName(userName); 205 setPassword(password); 206 setBrokerURL(brokerURL.toString()); 207 } 208 209 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 210 setUserName(userName); 211 setPassword(password); 212 setBrokerURL(brokerURL); 213 } 214 215 /** 216 * Returns a copy of the given connection factory 217 */ 218 public ActiveMQConnectionFactory copy() { 219 try { 220 return (ActiveMQConnectionFactory)super.clone(); 221 } catch (CloneNotSupportedException e) { 222 throw new RuntimeException("This should never happen: " + e, e); 223 } 224 } 225 226 /*boolean* 227 * @param brokerURL 228 * @return 229 * @throws URISyntaxException 230 */ 231 private static URI createURI(String brokerURL) { 232 try { 233 return new URI(brokerURL); 234 } catch (URISyntaxException e) { 235 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 236 } 237 } 238 239 /** 240 * @return Returns the Connection. 241 */ 242 @Override 243 public Connection createConnection() throws JMSException { 244 return createActiveMQConnection(); 245 } 246 247 /** 248 * @return Returns the Connection. 249 */ 250 @Override 251 public Connection createConnection(String userName, String password) throws JMSException { 252 return createActiveMQConnection(userName, password); 253 } 254 255 /** 256 * @return Returns the QueueConnection. 257 * @throws JMSException 258 */ 259 @Override 260 public QueueConnection createQueueConnection() throws JMSException { 261 return createActiveMQConnection().enforceQueueOnlyConnection(); 262 } 263 264 /** 265 * @return Returns the QueueConnection. 266 */ 267 @Override 268 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 269 return createActiveMQConnection(userName, password).enforceQueueOnlyConnection(); 270 } 271 272 /** 273 * @return Returns the TopicConnection. 274 * @throws JMSException 275 */ 276 @Override 277 public TopicConnection createTopicConnection() throws JMSException { 278 return createActiveMQConnection(); 279 } 280 281 /** 282 * @return Returns the TopicConnection. 283 */ 284 @Override 285 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 286 return createActiveMQConnection(userName, password); 287 } 288 289 /** 290 * @return the StatsImpl associated with this ConnectionFactory. 291 */ 292 @Override 293 public StatsImpl getStats() { 294 return this.factoryStats; 295 } 296 297 // ///////////////////////////////////////////// 298 // 299 // Implementation methods. 300 // 301 // ///////////////////////////////////////////// 302 303 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 304 return createActiveMQConnection(userName, password); 305 } 306 307 /** 308 * Creates a Transport based on this object's connection settings. Separated 309 * from createActiveMQConnection to allow for subclasses to override. 310 * 311 * @return The newly created Transport. 312 * @throws JMSException If unable to create trasnport. 313 */ 314 protected Transport createTransport() throws JMSException { 315 try { 316 URI connectBrokerUL = brokerURL; 317 String scheme = brokerURL.getScheme(); 318 if (scheme == null) { 319 throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); 320 } 321 if (scheme.equals("auto")) { 322 connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); 323 } else if (scheme.equals("auto+ssl")) { 324 connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); 325 } else if (scheme.equals("auto+nio")) { 326 connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); 327 } else if (scheme.equals("auto+nio+ssl")) { 328 connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); 329 } 330 331 return TransportFactory.connect(connectBrokerUL); 332 } catch (Exception e) { 333 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 334 } 335 } 336 337 /** 338 * @return Returns the Connection. 339 */ 340 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 341 if (brokerURL == null) { 342 throw new ConfigurationException("brokerURL not set."); 343 } 344 ActiveMQConnection connection = null; 345 try { 346 Transport transport = createTransport(); 347 connection = createActiveMQConnection(transport, factoryStats); 348 349 connection.setUserName(userName); 350 connection.setPassword(password); 351 352 configureConnection(connection); 353 354 transport.start(); 355 356 if (clientID != null) { 357 connection.setDefaultClientID(clientID); 358 } 359 360 return connection; 361 } catch (JMSException e) { 362 // Clean up! 363 try { 364 connection.close(); 365 } catch (Throwable ignore) { 366 } 367 throw e; 368 } catch (Exception e) { 369 // Clean up! 370 try { 371 connection.close(); 372 } catch (Throwable ignore) { 373 } 374 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 375 } 376 } 377 378 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 379 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), 380 getConnectionIdGenerator(), stats); 381 return connection; 382 } 383 384 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 385 connection.setPrefetchPolicy(getPrefetchPolicy()); 386 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 387 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 388 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 389 connection.setUseCompression(isUseCompression()); 390 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 391 connection.setDispatchAsync(isDispatchAsync()); 392 connection.setUseAsyncSend(isUseAsyncSend()); 393 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 394 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 395 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 396 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); 397 connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval()); 398 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 399 connection.setExclusiveConsumer(isExclusiveConsumer()); 400 connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); 401 connection.setTransformer(getTransformer()); 402 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 403 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 404 connection.setProducerWindowSize(getProducerWindowSize()); 405 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 406 connection.setSendTimeout(getSendTimeout()); 407 connection.setCloseTimeout(getCloseTimeout()); 408 connection.setSendAcksAsync(isSendAcksAsync()); 409 connection.setAuditDepth(getAuditDepth()); 410 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 411 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 412 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 413 connection.setCheckForDuplicates(isCheckForDuplicates()); 414 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 415 connection.setTransactedIndividualAck(isTransactedIndividualAck()); 416 connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); 417 connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); 418 connection.setSessionTaskRunner(getSessionTaskRunner()); 419 connection.setRejectedTaskHandler(getRejectedTaskHandler()); 420 connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); 421 connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); 422 connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); 423 connection.setTrustedPackages(getTrustedPackages()); 424 connection.setTrustAllPackages(isTrustAllPackages()); 425 connection.setConnectResponseTimeout(getConnectResponseTimeout()); 426 if (transportListener != null) { 427 connection.addTransportListener(transportListener); 428 } 429 if (exceptionListener != null) { 430 connection.setExceptionListener(exceptionListener); 431 } 432 if (clientInternalExceptionListener != null) { 433 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 434 } 435 } 436 437 // ///////////////////////////////////////////// 438 // 439 // Property Accessors 440 // 441 // ///////////////////////////////////////////// 442 443 public String getBrokerURL() { 444 return brokerURL == null ? null : brokerURL.toString(); 445 } 446 447 /** 448 * Sets the <a 449 * href="http://activemq.apache.org/configuring-transports.html">connection 450 * URL</a> used to connect to the ActiveMQ broker. 451 */ 452 public void setBrokerURL(String brokerURL) { 453 this.brokerURL = createURI(brokerURL); 454 455 // Use all the properties prefixed with 'jms.' to set the connection 456 // factory 457 // options. 458 if (this.brokerURL.getQuery() != null) { 459 // It might be a standard URI or... 460 try { 461 462 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery()); 463 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); 464 if (buildFromMap(jmsOptionsMap)) { 465 if (!jmsOptionsMap.isEmpty()) { 466 String msg = "There are " + jmsOptionsMap.size() 467 + " jms options that couldn't be set on the ConnectionFactory." 468 + " Check the options are spelled correctly." 469 + " Unknown parameters=[" + jmsOptionsMap + "]." 470 + " This connection factory cannot be started."; 471 throw new IllegalArgumentException(msg); 472 } 473 474 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 475 } 476 477 } catch (URISyntaxException e) { 478 } 479 480 } else { 481 482 // It might be a composite URI. 483 try { 484 CompositeData data = URISupport.parseComposite(this.brokerURL); 485 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); 486 if (buildFromMap(jmsOptionsMap)) { 487 if (!jmsOptionsMap.isEmpty()) { 488 String msg = "There are " + jmsOptionsMap.size() 489 + " jms options that couldn't be set on the ConnectionFactory." 490 + " Check the options are spelled correctly." 491 + " Unknown parameters=[" + jmsOptionsMap + "]." 492 + " This connection factory cannot be started."; 493 throw new IllegalArgumentException(msg); 494 } 495 496 this.brokerURL = data.toURI(); 497 } 498 } catch (URISyntaxException e) { 499 } 500 } 501 } 502 503 public String getClientID() { 504 return clientID; 505 } 506 507 /** 508 * Sets the JMS clientID to use for the created connection. Note that this 509 * can only be used by one connection at once so generally its a better idea 510 * to set the clientID on a Connection 511 */ 512 public void setClientID(String clientID) { 513 this.clientID = clientID; 514 } 515 516 public boolean isCopyMessageOnSend() { 517 return copyMessageOnSend; 518 } 519 520 /** 521 * Should a JMS message be copied to a new JMS Message object as part of the 522 * send() method in JMS. This is enabled by default to be compliant with the 523 * JMS specification. You can disable it if you do not mutate JMS messages 524 * after they are sent for a performance boost 525 */ 526 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 527 this.copyMessageOnSend = copyMessageOnSend; 528 } 529 530 public boolean isDisableTimeStampsByDefault() { 531 return disableTimeStampsByDefault; 532 } 533 534 /** 535 * Sets whether or not timestamps on messages should be disabled or not. If 536 * you disable them it adds a small performance boost. 537 */ 538 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 539 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 540 } 541 542 public boolean isOptimizedMessageDispatch() { 543 return optimizedMessageDispatch; 544 } 545 546 /** 547 * If this flag is set then an larger prefetch limit is used - only 548 * applicable for durable topic subscribers. 549 */ 550 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 551 this.optimizedMessageDispatch = optimizedMessageDispatch; 552 } 553 554 public String getPassword() { 555 return password; 556 } 557 558 /** 559 * Sets the JMS password used for connections created from this factory 560 */ 561 public void setPassword(String password) { 562 this.password = password; 563 } 564 565 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 566 return prefetchPolicy; 567 } 568 569 /** 570 * Sets the <a 571 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 572 * policy</a> for consumers created by this connection. 573 */ 574 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 575 this.prefetchPolicy = prefetchPolicy; 576 } 577 578 public boolean isUseAsyncSend() { 579 return useAsyncSend; 580 } 581 582 public BlobTransferPolicy getBlobTransferPolicy() { 583 return blobTransferPolicy; 584 } 585 586 /** 587 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 588 * OBjects) are transferred from producers to brokers to consumers 589 */ 590 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 591 this.blobTransferPolicy = blobTransferPolicy; 592 } 593 594 /** 595 * Forces the use of <a 596 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 597 * adds a massive performance boost; but means that the send() method will 598 * return immediately whether the message has been sent or not which could 599 * lead to message loss. 600 */ 601 public void setUseAsyncSend(boolean useAsyncSend) { 602 this.useAsyncSend = useAsyncSend; 603 } 604 605 public synchronized boolean isWatchTopicAdvisories() { 606 return watchTopicAdvisories; 607 } 608 609 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 610 this.watchTopicAdvisories = watchTopicAdvisories; 611 } 612 613 /** 614 * @return true if always sync send messages 615 */ 616 public boolean isAlwaysSyncSend() { 617 return this.alwaysSyncSend; 618 } 619 620 /** 621 * Set true if always require messages to be sync sent 622 * 623 * @param alwaysSyncSend 624 */ 625 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 626 this.alwaysSyncSend = alwaysSyncSend; 627 } 628 629 public String getUserName() { 630 return userName; 631 } 632 633 /** 634 * Sets the JMS userName used by connections created by this factory 635 */ 636 public void setUserName(String userName) { 637 this.userName = userName; 638 } 639 640 public boolean isUseRetroactiveConsumer() { 641 return useRetroactiveConsumer; 642 } 643 644 /** 645 * Sets whether or not retroactive consumers are enabled. Retroactive 646 * consumers allow non-durable topic subscribers to receive old messages 647 * that were published before the non-durable subscriber started. 648 */ 649 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 650 this.useRetroactiveConsumer = useRetroactiveConsumer; 651 } 652 653 public boolean isExclusiveConsumer() { 654 return exclusiveConsumer; 655 } 656 657 /** 658 * Enables or disables whether or not queue consumers should be exclusive or 659 * not for example to preserve ordering when not using <a 660 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 661 * 662 * @param exclusiveConsumer 663 */ 664 public void setExclusiveConsumer(boolean exclusiveConsumer) { 665 this.exclusiveConsumer = exclusiveConsumer; 666 } 667 668 public RedeliveryPolicy getRedeliveryPolicy() { 669 return redeliveryPolicyMap.getDefaultEntry(); 670 } 671 672 /** 673 * Sets the global default redelivery policy to be used when a message is delivered 674 * but the session is rolled back 675 */ 676 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 677 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 678 } 679 680 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 681 return this.redeliveryPolicyMap; 682 } 683 684 /** 685 * Sets the global redelivery policy mapping to be used when a message is delivered 686 * but the session is rolled back 687 */ 688 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 689 this.redeliveryPolicyMap = redeliveryPolicyMap; 690 } 691 692 public MessageTransformer getTransformer() { 693 return transformer; 694 } 695 696 /** 697 * @return the sendTimeout (in milliseconds) 698 */ 699 public int getSendTimeout() { 700 return sendTimeout; 701 } 702 703 /** 704 * @param sendTimeout the sendTimeout to set (in milliseconds) 705 */ 706 public void setSendTimeout(int sendTimeout) { 707 this.sendTimeout = sendTimeout; 708 } 709 710 /** 711 * @return the sendAcksAsync 712 */ 713 public boolean isSendAcksAsync() { 714 return sendAcksAsync; 715 } 716 717 /** 718 * @param sendAcksAsync the sendAcksAsync to set 719 */ 720 public void setSendAcksAsync(boolean sendAcksAsync) { 721 this.sendAcksAsync = sendAcksAsync; 722 } 723 724 /** 725 * @return the messagePrioritySupported 726 */ 727 public boolean isMessagePrioritySupported() { 728 return this.messagePrioritySupported; 729 } 730 731 /** 732 * @param messagePrioritySupported the messagePrioritySupported to set 733 */ 734 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 735 this.messagePrioritySupported = messagePrioritySupported; 736 } 737 738 739 /** 740 * Sets the transformer used to transform messages before they are sent on 741 * to the JMS bus or when they are received from the bus but before they are 742 * delivered to the JMS client 743 */ 744 public void setTransformer(MessageTransformer transformer) { 745 this.transformer = transformer; 746 } 747 748 @SuppressWarnings({ "unchecked", "rawtypes" }) 749 @Override 750 public void buildFromProperties(Properties properties) { 751 752 if (properties == null) { 753 properties = new Properties(); 754 } 755 756 String temp = properties.getProperty(Context.PROVIDER_URL); 757 if (temp == null || temp.length() == 0) { 758 temp = properties.getProperty("brokerURL"); 759 } 760 if (temp != null && temp.length() > 0) { 761 setBrokerURL(temp); 762 } 763 764 Map<String, Object> p = new HashMap(properties); 765 buildFromMap(p); 766 } 767 768 public boolean buildFromMap(Map<String, Object> properties) { 769 boolean rc = false; 770 771 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 772 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 773 setPrefetchPolicy(p); 774 rc = true; 775 } 776 777 RedeliveryPolicy rp = new RedeliveryPolicy(); 778 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 779 setRedeliveryPolicy(rp); 780 rc = true; 781 } 782 783 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 784 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 785 setBlobTransferPolicy(blobTransferPolicy); 786 rc = true; 787 } 788 789 rc |= IntrospectionSupport.setProperties(this, properties); 790 791 return rc; 792 } 793 794 @Override 795 public void populateProperties(Properties props) { 796 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 797 798 if (getBrokerURL() != null) { 799 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 800 props.setProperty("brokerURL", getBrokerURL()); 801 } 802 803 if (getClientID() != null) { 804 props.setProperty("clientID", getClientID()); 805 } 806 807 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 808 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 809 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 810 811 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 812 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 813 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 814 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 815 816 if (getPassword() != null) { 817 props.setProperty("password", getPassword()); 818 } 819 820 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 821 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 822 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 823 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 824 825 if (getUserName() != null) { 826 props.setProperty("userName", getUserName()); 827 } 828 829 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 830 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 831 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 832 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 833 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 834 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 835 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 836 props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout())); 837 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 838 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 839 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 840 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 841 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 842 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); 843 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); 844 props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); 845 props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); 846 props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); 847 props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); 848 props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); 849 } 850 851 public boolean isUseCompression() { 852 return useCompression; 853 } 854 855 /** 856 * Enables the use of compression of the message bodies 857 */ 858 public void setUseCompression(boolean useCompression) { 859 this.useCompression = useCompression; 860 } 861 862 public boolean isObjectMessageSerializationDefered() { 863 return objectMessageSerializationDefered; 864 } 865 866 /** 867 * When an object is set on an ObjectMessage, the JMS spec requires the 868 * object to be serialized by that set method. Enabling this flag causes the 869 * object to not get serialized. The object may subsequently get serialized 870 * if the message needs to be sent over a socket or stored to disk. 871 */ 872 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 873 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 874 } 875 876 public boolean isDispatchAsync() { 877 return dispatchAsync; 878 } 879 880 /** 881 * Enables or disables the default setting of whether or not consumers have 882 * their messages <a 883 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 884 * synchronously or asynchronously by the broker</a>. For non-durable 885 * topics for example we typically dispatch synchronously by default to 886 * minimize context switches which boost performance. However sometimes its 887 * better to go slower to ensure that a single blocked consumer socket does 888 * not block delivery to other consumers. 889 * 890 * @param asyncDispatch If true then consumers created on this connection 891 * will default to having their messages dispatched 892 * asynchronously. The default value is true. 893 */ 894 public void setDispatchAsync(boolean asyncDispatch) { 895 this.dispatchAsync = asyncDispatch; 896 } 897 898 /** 899 * @return Returns the closeTimeout. 900 */ 901 public int getCloseTimeout() { 902 return closeTimeout; 903 } 904 905 /** 906 * Sets the timeout before a close is considered complete. Normally a 907 * close() on a connection waits for confirmation from the broker; this 908 * allows that operation to timeout to save the client hanging if there is 909 * no broker 910 */ 911 public void setCloseTimeout(int closeTimeout) { 912 this.closeTimeout = closeTimeout; 913 } 914 915 /** 916 * @return Returns the alwaysSessionAsync. 917 */ 918 public boolean isAlwaysSessionAsync() { 919 return alwaysSessionAsync; 920 } 921 922 /** 923 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 924 * the Connection. However, a separate thread is always used if there is more than one session, or the session 925 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 926 * happens asynchronously. 927 */ 928 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 929 this.alwaysSessionAsync = alwaysSessionAsync; 930 } 931 932 /** 933 * @return Returns the optimizeAcknowledge. 934 */ 935 public boolean isOptimizeAcknowledge() { 936 return optimizeAcknowledge; 937 } 938 939 /** 940 * @param optimizeAcknowledge The optimizeAcknowledge to set. 941 */ 942 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 943 this.optimizeAcknowledge = optimizeAcknowledge; 944 } 945 946 /** 947 * The max time in milliseconds between optimized ack batches 948 * @param optimizeAcknowledgeTimeOut 949 */ 950 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 951 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 952 } 953 954 public long getOptimizeAcknowledgeTimeOut() { 955 return optimizeAcknowledgeTimeOut; 956 } 957 958 public boolean isNestedMapAndListEnabled() { 959 return nestedMapAndListEnabled; 960 } 961 962 /** 963 * Enables/disables whether or not Message properties and MapMessage entries 964 * support <a 965 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 966 * Structures</a> of Map and List objects 967 */ 968 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 969 this.nestedMapAndListEnabled = structuredMapsEnabled; 970 } 971 972 public String getClientIDPrefix() { 973 return clientIDPrefix; 974 } 975 976 /** 977 * Sets the prefix used by autogenerated JMS Client ID values which are used 978 * if the JMS client does not explicitly specify on. 979 * 980 * @param clientIDPrefix 981 */ 982 public void setClientIDPrefix(String clientIDPrefix) { 983 this.clientIDPrefix = clientIDPrefix; 984 } 985 986 protected synchronized IdGenerator getClientIdGenerator() { 987 if (clientIdGenerator == null) { 988 if (clientIDPrefix != null) { 989 clientIdGenerator = new IdGenerator(clientIDPrefix); 990 } else { 991 clientIdGenerator = new IdGenerator(); 992 } 993 } 994 return clientIdGenerator; 995 } 996 997 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 998 this.clientIdGenerator = clientIdGenerator; 999 } 1000 1001 /** 1002 * Sets the prefix used by connection id generator 1003 * @param connectionIDPrefix 1004 */ 1005 public void setConnectionIDPrefix(String connectionIDPrefix) { 1006 this.connectionIDPrefix = connectionIDPrefix; 1007 } 1008 1009 protected synchronized IdGenerator getConnectionIdGenerator() { 1010 if (connectionIdGenerator == null) { 1011 if (connectionIDPrefix != null) { 1012 connectionIdGenerator = new IdGenerator(connectionIDPrefix); 1013 } else { 1014 connectionIdGenerator = new IdGenerator(); 1015 } 1016 } 1017 return connectionIdGenerator; 1018 } 1019 1020 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { 1021 this.connectionIdGenerator = connectionIdGenerator; 1022 } 1023 1024 /** 1025 * @return the statsEnabled 1026 */ 1027 public boolean isStatsEnabled() { 1028 return this.factoryStats.isEnabled(); 1029 } 1030 1031 /** 1032 * @param statsEnabled the statsEnabled to set 1033 */ 1034 public void setStatsEnabled(boolean statsEnabled) { 1035 this.factoryStats.setEnabled(statsEnabled); 1036 } 1037 1038 public synchronized int getProducerWindowSize() { 1039 return producerWindowSize; 1040 } 1041 1042 public synchronized void setProducerWindowSize(int producerWindowSize) { 1043 this.producerWindowSize = producerWindowSize; 1044 } 1045 1046 public long getWarnAboutUnstartedConnectionTimeout() { 1047 return warnAboutUnstartedConnectionTimeout; 1048 } 1049 1050 /** 1051 * Enables the timeout from a connection creation to when a warning is 1052 * generated if the connection is not properly started via 1053 * {@link Connection#start()} and a message is received by a consumer. It is 1054 * a very common gotcha to forget to <a 1055 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1056 * the connection</a> so this option makes the default case to create a 1057 * warning if the user forgets. To disable the warning just set the value to < 1058 * 0 (say -1). 1059 */ 1060 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1061 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1062 } 1063 1064 public TransportListener getTransportListener() { 1065 return transportListener; 1066 } 1067 1068 /** 1069 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 1070 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 1071 * a transport listener. 1072 * 1073 * @param transportListener sets the listener to be registered on all connections 1074 * created by this factory 1075 */ 1076 public void setTransportListener(TransportListener transportListener) { 1077 this.transportListener = transportListener; 1078 } 1079 1080 1081 public ExceptionListener getExceptionListener() { 1082 return exceptionListener; 1083 } 1084 1085 /** 1086 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 1087 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1088 * an exception listener. 1089 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 1090 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1091 * @param exceptionListener sets the exception listener to be registered on all connections 1092 * created by this factory 1093 */ 1094 public void setExceptionListener(ExceptionListener exceptionListener) { 1095 this.exceptionListener = exceptionListener; 1096 } 1097 1098 public int getAuditDepth() { 1099 return auditDepth; 1100 } 1101 1102 public void setAuditDepth(int auditDepth) { 1103 this.auditDepth = auditDepth; 1104 } 1105 1106 public int getAuditMaximumProducerNumber() { 1107 return auditMaximumProducerNumber; 1108 } 1109 1110 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 1111 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 1112 } 1113 1114 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1115 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1116 } 1117 1118 public boolean isUseDedicatedTaskRunner() { 1119 return useDedicatedTaskRunner; 1120 } 1121 1122 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 1123 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 1124 } 1125 1126 public long getConsumerFailoverRedeliveryWaitPeriod() { 1127 return consumerFailoverRedeliveryWaitPeriod; 1128 } 1129 1130 public ClientInternalExceptionListener getClientInternalExceptionListener() { 1131 return clientInternalExceptionListener; 1132 } 1133 1134 /** 1135 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 1136 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1137 * an exception listener. 1138 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 1139 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1140 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 1141 * created by this factory 1142 */ 1143 public void setClientInternalExceptionListener( 1144 ClientInternalExceptionListener clientInternalExceptionListener) { 1145 this.clientInternalExceptionListener = clientInternalExceptionListener; 1146 } 1147 1148 /** 1149 * @return the checkForDuplicates 1150 */ 1151 public boolean isCheckForDuplicates() { 1152 return this.checkForDuplicates; 1153 } 1154 1155 /** 1156 * @param checkForDuplicates the checkForDuplicates to set 1157 */ 1158 public void setCheckForDuplicates(boolean checkForDuplicates) { 1159 this.checkForDuplicates = checkForDuplicates; 1160 } 1161 1162 public boolean isTransactedIndividualAck() { 1163 return transactedIndividualAck; 1164 } 1165 1166 /** 1167 * when true, submit individual transacted acks immediately rather than with transaction completion. 1168 * This allows the acks to represent delivery status which can be persisted on rollback 1169 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true 1170 */ 1171 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 1172 this.transactedIndividualAck = transactedIndividualAck; 1173 } 1174 1175 1176 public boolean isNonBlockingRedelivery() { 1177 return nonBlockingRedelivery; 1178 } 1179 1180 /** 1181 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages 1182 * from a rolled back transaction. This implies that message order will not be preserved and 1183 * also will result in the TransactedIndividualAck option to be enabled. 1184 */ 1185 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 1186 this.nonBlockingRedelivery = nonBlockingRedelivery; 1187 } 1188 1189 public int getMaxThreadPoolSize() { 1190 return maxThreadPoolSize; 1191 } 1192 1193 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 1194 this.maxThreadPoolSize = maxThreadPoolSize; 1195 } 1196 1197 public TaskRunnerFactory getSessionTaskRunner() { 1198 return sessionTaskRunner; 1199 } 1200 1201 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1202 this.sessionTaskRunner = sessionTaskRunner; 1203 } 1204 1205 public RejectedExecutionHandler getRejectedTaskHandler() { 1206 return rejectedTaskHandler; 1207 } 1208 1209 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 1210 this.rejectedTaskHandler = rejectedTaskHandler; 1211 } 1212 1213 /** 1214 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 1215 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 1216 * will not do any background Message acknowledgment. 1217 * 1218 * @return the scheduledOptimizedAckInterval 1219 */ 1220 public long getOptimizedAckScheduledAckInterval() { 1221 return optimizedAckScheduledAckInterval; 1222 } 1223 1224 /** 1225 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 1226 * have been configured with optimizeAcknowledge enabled. 1227 * 1228 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 1229 */ 1230 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 1231 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1232 } 1233 1234 1235 public boolean isRmIdFromConnectionId() { 1236 return rmIdFromConnectionId; 1237 } 1238 1239 /** 1240 * uses the connection id as the resource identity for XAResource.isSameRM 1241 * ensuring join will only occur on a single connection 1242 */ 1243 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 1244 this.rmIdFromConnectionId = rmIdFromConnectionId; 1245 } 1246 1247 /** 1248 * @return true if MessageConsumer instance will check for expired messages before dispatch. 1249 */ 1250 public boolean isConsumerExpiryCheckEnabled() { 1251 return consumerExpiryCheckEnabled; 1252 } 1253 1254 /** 1255 * Controls whether message expiration checking is done in each MessageConsumer 1256 * prior to dispatching a message. Disabling this check can lead to consumption 1257 * of expired messages. 1258 * 1259 * @param consumerExpiryCheckEnabled 1260 * controls whether expiration checking is done prior to dispatch. 1261 */ 1262 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1263 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1264 } 1265 1266 public List<String> getTrustedPackages() { 1267 return trustedPackages; 1268 } 1269 1270 public void setTrustedPackages(List<String> trustedPackages) { 1271 this.trustedPackages = trustedPackages; 1272 } 1273 1274 public boolean isTrustAllPackages() { 1275 return trustAllPackages; 1276 } 1277 1278 public void setTrustAllPackages(boolean trustAllPackages) { 1279 this.trustAllPackages = trustAllPackages; 1280 } 1281 1282 public int getConnectResponseTimeout() { 1283 return connectResponseTimeout; 1284 } 1285 1286 public void setConnectResponseTimeout(int connectResponseTimeout) { 1287 this.connectResponseTimeout = connectResponseTimeout; 1288 } 1289}