001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.Set; 020 021import org.apache.activemq.ActiveMQPrefetchPolicy; 022import org.apache.activemq.broker.Broker; 023import org.apache.activemq.broker.region.BaseDestination; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.DurableTopicSubscription; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueBrowserSubscription; 028import org.apache.activemq.broker.region.QueueSubscription; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.broker.region.Topic; 031import org.apache.activemq.broker.region.TopicSubscription; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.group.GroupFactoryFinder; 034import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 035import org.apache.activemq.filter.DestinationMapEntry; 036import org.apache.activemq.network.NetworkBridgeFilterFactory; 037import org.apache.activemq.usage.SystemUsage; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Represents an entry in a {@link PolicyMap} for assigning policies to a 043 * specific destination or a hierarchical wildcard area of destinations. 044 * 045 * @org.apache.xbean.XBean 046 * 047 */ 048public class PolicyEntry extends DestinationMapEntry { 049 050 private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class); 051 private DispatchPolicy dispatchPolicy; 052 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 053 private boolean sendAdvisoryIfNoConsumers; 054 private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY; 055 private PendingMessageLimitStrategy pendingMessageLimitStrategy; 056 private MessageEvictionStrategy messageEvictionStrategy; 057 private long memoryLimit; 058 private String messageGroupMapFactoryType = "cached"; 059 private MessageGroupMapFactory messageGroupMapFactory; 060 private PendingQueueMessageStoragePolicy pendingQueuePolicy; 061 private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; 062 private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; 063 private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; 064 private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 065 private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 066 private boolean enableAudit=true; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 070 private boolean optimizedDispatch=false; 071 private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; 072 private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; 073 private boolean useCache=true; 074 private long minimumMessageSize=1024; 075 private boolean useConsumerPriority=true; 076 private boolean strictOrderDispatch=false; 077 private boolean lazyDispatch=false; 078 private int timeBeforeDispatchStarts = 0; 079 private int consumersBeforeDispatchStarts = 0; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private boolean includeBodyForAdvisory; 087 private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; 088 private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; 089 private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 090 private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; 091 private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 092 private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 093 private boolean usePrefetchExtension = true; 094 private int cursorMemoryHighWaterMark = 70; 095 private int storeUsageHighWaterMark = 100; 096 private SlowConsumerStrategy slowConsumerStrategy; 097 private boolean prioritizedMessages; 098 private boolean allConsumersExclusiveByDefault; 099 private boolean gcInactiveDestinations; 100 private boolean gcWithNetworkConsumers; 101 private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 102 private boolean reduceMemoryFootprint; 103 private NetworkBridgeFilterFactory networkBridgeFilterFactory; 104 private boolean doOptimzeMessageStorage = true; 105 private int maxDestinations = -1; 106 107 /* 108 * percentage of in-flight messages above which optimize message store is disabled 109 */ 110 private int optimizeMessageStoreInFlightLimit = 10; 111 private boolean persistJMSRedelivered = false; 112 113 114 public void configure(Broker broker,Queue queue) { 115 baseConfiguration(broker,queue); 116 if (dispatchPolicy != null) { 117 queue.setDispatchPolicy(dispatchPolicy); 118 } 119 queue.setDeadLetterStrategy(getDeadLetterStrategy()); 120 queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); 121 if (memoryLimit > 0) { 122 queue.getMemoryUsage().setLimit(memoryLimit); 123 } 124 if (pendingQueuePolicy != null) { 125 PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); 126 queue.setMessages(messages); 127 } 128 129 queue.setUseConsumerPriority(isUseConsumerPriority()); 130 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 131 queue.setOptimizedDispatch(isOptimizedDispatch()); 132 queue.setLazyDispatch(isLazyDispatch()); 133 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 134 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 135 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 136 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 137 } 138 139 public void update(Queue queue) { 140 update(queue, null); 141 } 142 143 /** 144 * Update a queue with this policy. Only apply properties that 145 * match the includedProperties list. Not all properties are eligible 146 * to be updated. 147 * 148 * If includedProperties is null then all of the properties will be set as 149 * isUpdate will return true 150 * @param baseDestination 151 * @param includedProperties 152 */ 153 public void update(Queue queue, Set<String> includedProperties) { 154 baseUpdate(queue, includedProperties); 155 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 156 queue.getMemoryUsage().setLimit(memoryLimit); 157 } 158 if (isUpdate("useConsumerPriority", includedProperties)) { 159 queue.setUseConsumerPriority(isUseConsumerPriority()); 160 } 161 if (isUpdate("strictOrderDispatch", includedProperties)) { 162 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 163 } 164 if (isUpdate("optimizedDispatch", includedProperties)) { 165 queue.setOptimizedDispatch(isOptimizedDispatch()); 166 } 167 if (isUpdate("lazyDispatch", includedProperties)) { 168 queue.setLazyDispatch(isLazyDispatch()); 169 } 170 if (isUpdate("timeBeforeDispatchStarts", includedProperties)) { 171 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 172 } 173 if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) { 174 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 175 } 176 if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) { 177 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 178 } 179 if (isUpdate("persistJMSRedelivered", includedProperties)) { 180 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 181 } 182 } 183 184 public void configure(Broker broker,Topic topic) { 185 baseConfiguration(broker,topic); 186 if (dispatchPolicy != null) { 187 topic.setDispatchPolicy(dispatchPolicy); 188 } 189 topic.setDeadLetterStrategy(getDeadLetterStrategy()); 190 if (subscriptionRecoveryPolicy != null) { 191 SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); 192 srp.setBroker(broker); 193 topic.setSubscriptionRecoveryPolicy(srp); 194 } 195 if (memoryLimit > 0) { 196 topic.getMemoryUsage().setLimit(memoryLimit); 197 } 198 topic.setLazyDispatch(isLazyDispatch()); 199 } 200 201 public void update(Topic topic) { 202 update(topic, null); 203 } 204 205 //If includedProperties is null then all of the properties will be set as 206 //isUpdate will return true 207 public void update(Topic topic, Set<String> includedProperties) { 208 baseUpdate(topic, includedProperties); 209 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 210 topic.getMemoryUsage().setLimit(memoryLimit); 211 } 212 if (isUpdate("lazyDispatch", includedProperties)) { 213 topic.setLazyDispatch(isLazyDispatch()); 214 } 215 } 216 217 // attributes that can change on the fly 218 public void baseUpdate(BaseDestination destination) { 219 baseUpdate(destination, null); 220 } 221 222 // attributes that can change on the fly 223 //If includedProperties is null then all of the properties will be set as 224 //isUpdate will return true 225 public void baseUpdate(BaseDestination destination, Set<String> includedProperties) { 226 if (isUpdate("producerFlowControl", includedProperties)) { 227 destination.setProducerFlowControl(isProducerFlowControl()); 228 } 229 if (isUpdate("alwaysRetroactive", includedProperties)) { 230 destination.setAlwaysRetroactive(isAlwaysRetroactive()); 231 } 232 if (isUpdate("blockedProducerWarningInterval", includedProperties)) { 233 destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); 234 } 235 if (isUpdate("maxPageSize", includedProperties)) { 236 destination.setMaxPageSize(getMaxPageSize()); 237 } 238 if (isUpdate("maxBrowsePageSize", includedProperties)) { 239 destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); 240 } 241 242 if (isUpdate("minimumMessageSize", includedProperties)) { 243 destination.setMinimumMessageSize((int) getMinimumMessageSize()); 244 } 245 if (isUpdate("maxExpirePageSize", includedProperties)) { 246 destination.setMaxExpirePageSize(getMaxExpirePageSize()); 247 } 248 if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) { 249 destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 250 } 251 if (isUpdate("storeUsageHighWaterMark", includedProperties)) { 252 destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); 253 } 254 if (isUpdate("gcInactiveDestinations", includedProperties)) { 255 destination.setGcIfInactive(isGcInactiveDestinations()); 256 } 257 if (isUpdate("gcWithNetworkConsumers", includedProperties)) { 258 destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); 259 } 260 if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) { 261 destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); 262 } 263 if (isUpdate("reduceMemoryFootprint", includedProperties)) { 264 destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); 265 } 266 if (isUpdate("doOptimizeMessageStore", includedProperties)) { 267 destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); 268 } 269 if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) { 270 destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); 271 } 272 if (isUpdate("advisoryForConsumed", includedProperties)) { 273 destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); 274 } 275 if (isUpdate("advisoryForDelivery", includedProperties)) { 276 destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); 277 } 278 if (isUpdate("advisoryForDiscardingMessages", includedProperties)) { 279 destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); 280 } 281 if (isUpdate("advisoryForSlowConsumers", includedProperties)) { 282 destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); 283 } 284 if (isUpdate("advisoryForFastProducers", includedProperties)) { 285 destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); 286 } 287 if (isUpdate("advisoryWhenFull", includedProperties)) { 288 destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); 289 } 290 if (isUpdate("includeBodyForAdvisory", includedProperties)) { 291 destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); 292 } 293 if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { 294 destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); 295 } 296 } 297 298 public void baseConfiguration(Broker broker, BaseDestination destination) { 299 baseUpdate(destination); 300 destination.setEnableAudit(isEnableAudit()); 301 destination.setMaxAuditDepth(getMaxQueueAuditDepth()); 302 destination.setMaxProducersToAudit(getMaxProducersToAudit()); 303 destination.setUseCache(isUseCache()); 304 destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); 305 SlowConsumerStrategy scs = getSlowConsumerStrategy(); 306 if (scs != null) { 307 scs.setBrokerService(broker); 308 scs.addDestination(destination); 309 } 310 destination.setSlowConsumerStrategy(scs); 311 destination.setPrioritizedMessages(isPrioritizedMessages()); 312 } 313 314 public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { 315 configurePrefetch(subscription); 316 subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 317 if (pendingMessageLimitStrategy != null) { 318 int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); 319 int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); 320 if (consumerLimit > 0) { 321 if (value < 0 || consumerLimit < value) { 322 value = consumerLimit; 323 } 324 } 325 if (value >= 0) { 326 LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId()); 327 subscription.setMaximumPendingMessages(value); 328 } 329 } 330 if (messageEvictionStrategy != null) { 331 subscription.setMessageEvictionStrategy(messageEvictionStrategy); 332 } 333 if (pendingSubscriberPolicy != null) { 334 String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); 335 int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); 336 subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); 337 } 338 if (enableAudit) { 339 subscription.setEnableAudit(enableAudit); 340 subscription.setMaxProducersToAudit(maxProducersToAudit); 341 subscription.setMaxAuditDepth(maxAuditDepth); 342 } 343 } 344 345 public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) { 346 String clientId = sub.getSubscriptionKey().getClientId(); 347 String subName = sub.getSubscriptionKey().getSubscriptionName(); 348 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 349 configurePrefetch(sub); 350 if (pendingDurableSubscriberPolicy != null) { 351 PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub); 352 cursor.setSystemUsage(memoryManager); 353 sub.setPending(cursor); 354 } 355 int auditDepth = getMaxAuditDepth(); 356 if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { 357 sub.setMaxAuditDepth(auditDepth * 10); 358 } else { 359 sub.setMaxAuditDepth(auditDepth); 360 } 361 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 362 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 363 } 364 365 public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { 366 configurePrefetch(sub); 367 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 368 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 369 370 // TODO 371 // We currently need an infinite audit because of the way that browser dispatch 372 // is done. We should refactor the browsers to better handle message dispatch so 373 // we can remove this and perform a more efficient dispatch. 374 sub.setMaxProducersToAudit(Integer.MAX_VALUE); 375 sub.setMaxAuditDepth(Short.MAX_VALUE); 376 377 // part solution - dispatching to browsers needs to be restricted 378 sub.setMaxMessages(getMaxBrowsePageSize()); 379 } 380 381 public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { 382 configurePrefetch(sub); 383 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 384 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 385 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 386 } 387 388 public void configurePrefetch(Subscription subscription) { 389 390 final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize(); 391 if (subscription instanceof QueueBrowserSubscription) { 392 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) { 393 ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch()); 394 } 395 } else if (subscription instanceof QueueSubscription) { 396 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) { 397 ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch()); 398 } 399 } else if (subscription instanceof DurableTopicSubscription) { 400 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || 401 subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) { 402 ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch()); 403 } 404 } else if (subscription instanceof TopicSubscription) { 405 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) { 406 ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch()); 407 } 408 } 409 if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) { 410 // tell the sub so that it can issue a pull request 411 subscription.updateConsumerPrefetch(0); 412 } 413 } 414 415 private boolean isUpdate(String property, Set<String> includedProperties) { 416 return includedProperties == null || includedProperties.contains(property); 417 } 418 // Properties 419 // ------------------------------------------------------------------------- 420 public DispatchPolicy getDispatchPolicy() { 421 return dispatchPolicy; 422 } 423 424 public void setDispatchPolicy(DispatchPolicy policy) { 425 this.dispatchPolicy = policy; 426 } 427 428 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 429 return subscriptionRecoveryPolicy; 430 } 431 432 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 433 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 434 } 435 436 public boolean isSendAdvisoryIfNoConsumers() { 437 return sendAdvisoryIfNoConsumers; 438 } 439 440 /** 441 * Sends an advisory message if a non-persistent message is sent and there 442 * are no active consumers 443 */ 444 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 445 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 446 } 447 448 public DeadLetterStrategy getDeadLetterStrategy() { 449 return deadLetterStrategy; 450 } 451 452 /** 453 * Sets the policy used to determine which dead letter queue destination 454 * should be used 455 */ 456 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 457 this.deadLetterStrategy = deadLetterStrategy; 458 } 459 460 public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { 461 return pendingMessageLimitStrategy; 462 } 463 464 /** 465 * Sets the strategy to calculate the maximum number of messages that are 466 * allowed to be pending on consumers (in addition to their prefetch sizes). 467 * Once the limit is reached, non-durable topics can then start discarding 468 * old messages. This allows us to keep dispatching messages to slow 469 * consumers while not blocking fast consumers and discarding the messages 470 * oldest first. 471 */ 472 public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) { 473 this.pendingMessageLimitStrategy = pendingMessageLimitStrategy; 474 } 475 476 public MessageEvictionStrategy getMessageEvictionStrategy() { 477 return messageEvictionStrategy; 478 } 479 480 /** 481 * Sets the eviction strategy used to decide which message to evict when the 482 * slow consumer needs to discard messages 483 */ 484 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 485 this.messageEvictionStrategy = messageEvictionStrategy; 486 } 487 488 public long getMemoryLimit() { 489 return memoryLimit; 490 } 491 492 /** 493 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 494 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 495 */ 496 public void setMemoryLimit(long memoryLimit) { 497 this.memoryLimit = memoryLimit; 498 } 499 500 public MessageGroupMapFactory getMessageGroupMapFactory() { 501 if (messageGroupMapFactory == null) { 502 try { 503 messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType()); 504 }catch(Exception e){ 505 LOG.error("Failed to create message group Factory ",e); 506 } 507 } 508 return messageGroupMapFactory; 509 } 510 511 /** 512 * Sets the factory used to create new instances of {MessageGroupMap} used 513 * to implement the <a 514 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 515 * functionality. 516 */ 517 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 518 this.messageGroupMapFactory = messageGroupMapFactory; 519 } 520 521 522 public String getMessageGroupMapFactoryType() { 523 return messageGroupMapFactoryType; 524 } 525 526 public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) { 527 this.messageGroupMapFactoryType = messageGroupMapFactoryType; 528 } 529 530 531 /** 532 * @return the pendingDurableSubscriberPolicy 533 */ 534 public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { 535 return this.pendingDurableSubscriberPolicy; 536 } 537 538 /** 539 * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy 540 * to set 541 */ 542 public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { 543 this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy; 544 } 545 546 /** 547 * @return the pendingQueuePolicy 548 */ 549 public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { 550 return this.pendingQueuePolicy; 551 } 552 553 /** 554 * @param pendingQueuePolicy the pendingQueuePolicy to set 555 */ 556 public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) { 557 this.pendingQueuePolicy = pendingQueuePolicy; 558 } 559 560 /** 561 * @return the pendingSubscriberPolicy 562 */ 563 public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() { 564 return this.pendingSubscriberPolicy; 565 } 566 567 /** 568 * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set 569 */ 570 public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) { 571 this.pendingSubscriberPolicy = pendingSubscriberPolicy; 572 } 573 574 /** 575 * @return true if producer flow control enabled 576 */ 577 public boolean isProducerFlowControl() { 578 return producerFlowControl; 579 } 580 581 /** 582 * @param producerFlowControl 583 */ 584 public void setProducerFlowControl(boolean producerFlowControl) { 585 this.producerFlowControl = producerFlowControl; 586 } 587 588 /** 589 * @return true if topic is always retroactive 590 */ 591 public boolean isAlwaysRetroactive() { 592 return alwaysRetroactive; 593 } 594 595 /** 596 * @param alwaysRetroactive 597 */ 598 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 599 this.alwaysRetroactive = alwaysRetroactive; 600 } 601 602 603 /** 604 * Set's the interval at which warnings about producers being blocked by 605 * resource usage will be triggered. Values of 0 or less will disable 606 * warnings 607 * 608 * @param blockedProducerWarningInterval the interval at which warning about 609 * blocked producers will be triggered. 610 */ 611 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 612 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 613 } 614 615 /** 616 * 617 * @return the interval at which warning about blocked producers will be 618 * triggered. 619 */ 620 public long getBlockedProducerWarningInterval() { 621 return blockedProducerWarningInterval; 622 } 623 624 /** 625 * @return the maxProducersToAudit 626 */ 627 public int getMaxProducersToAudit() { 628 return maxProducersToAudit; 629 } 630 631 /** 632 * @param maxProducersToAudit the maxProducersToAudit to set 633 */ 634 public void setMaxProducersToAudit(int maxProducersToAudit) { 635 this.maxProducersToAudit = maxProducersToAudit; 636 } 637 638 /** 639 * @return the maxAuditDepth 640 */ 641 public int getMaxAuditDepth() { 642 return maxAuditDepth; 643 } 644 645 /** 646 * @param maxAuditDepth the maxAuditDepth to set 647 */ 648 public void setMaxAuditDepth(int maxAuditDepth) { 649 this.maxAuditDepth = maxAuditDepth; 650 } 651 652 /** 653 * @return the enableAudit 654 */ 655 public boolean isEnableAudit() { 656 return enableAudit; 657 } 658 659 /** 660 * @param enableAudit the enableAudit to set 661 */ 662 public void setEnableAudit(boolean enableAudit) { 663 this.enableAudit = enableAudit; 664 } 665 666 public int getMaxQueueAuditDepth() { 667 return maxQueueAuditDepth; 668 } 669 670 public void setMaxQueueAuditDepth(int maxQueueAuditDepth) { 671 this.maxQueueAuditDepth = maxQueueAuditDepth; 672 } 673 674 public boolean isOptimizedDispatch() { 675 return optimizedDispatch; 676 } 677 678 public void setOptimizedDispatch(boolean optimizedDispatch) { 679 this.optimizedDispatch = optimizedDispatch; 680 } 681 682 public int getMaxPageSize() { 683 return maxPageSize; 684 } 685 686 public void setMaxPageSize(int maxPageSize) { 687 this.maxPageSize = maxPageSize; 688 } 689 690 public int getMaxBrowsePageSize() { 691 return maxBrowsePageSize; 692 } 693 694 public void setMaxBrowsePageSize(int maxPageSize) { 695 this.maxBrowsePageSize = maxPageSize; 696 } 697 698 public boolean isUseCache() { 699 return useCache; 700 } 701 702 public void setUseCache(boolean useCache) { 703 this.useCache = useCache; 704 } 705 706 public long getMinimumMessageSize() { 707 return minimumMessageSize; 708 } 709 710 public void setMinimumMessageSize(long minimumMessageSize) { 711 this.minimumMessageSize = minimumMessageSize; 712 } 713 714 public boolean isUseConsumerPriority() { 715 return useConsumerPriority; 716 } 717 718 public void setUseConsumerPriority(boolean useConsumerPriority) { 719 this.useConsumerPriority = useConsumerPriority; 720 } 721 722 public boolean isStrictOrderDispatch() { 723 return strictOrderDispatch; 724 } 725 726 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 727 this.strictOrderDispatch = strictOrderDispatch; 728 } 729 730 public boolean isLazyDispatch() { 731 return lazyDispatch; 732 } 733 734 public void setLazyDispatch(boolean lazyDispatch) { 735 this.lazyDispatch = lazyDispatch; 736 } 737 738 public int getTimeBeforeDispatchStarts() { 739 return timeBeforeDispatchStarts; 740 } 741 742 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 743 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 744 } 745 746 public int getConsumersBeforeDispatchStarts() { 747 return consumersBeforeDispatchStarts; 748 } 749 750 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 751 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 752 } 753 754 /** 755 * @return the advisoryForSlowConsumers 756 */ 757 public boolean isAdvisoryForSlowConsumers() { 758 return advisoryForSlowConsumers; 759 } 760 761 /** 762 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 763 */ 764 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 765 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 766 } 767 768 /** 769 * @return the advisoryForDiscardingMessages 770 */ 771 public boolean isAdvisoryForDiscardingMessages() { 772 return advisoryForDiscardingMessages; 773 } 774 775 /** 776 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set 777 */ 778 public void setAdvisoryForDiscardingMessages( 779 boolean advisoryForDiscardingMessages) { 780 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 781 } 782 783 /** 784 * @return the advisoryWhenFull 785 */ 786 public boolean isAdvisoryWhenFull() { 787 return advisoryWhenFull; 788 } 789 790 /** 791 * @param advisoryWhenFull the advisoryWhenFull to set 792 */ 793 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 794 this.advisoryWhenFull = advisoryWhenFull; 795 } 796 797 /** 798 * @return the advisoryForDelivery 799 */ 800 public boolean isAdvisoryForDelivery() { 801 return advisoryForDelivery; 802 } 803 804 /** 805 * @param advisoryForDelivery the advisoryForDelivery to set 806 */ 807 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 808 this.advisoryForDelivery = advisoryForDelivery; 809 } 810 811 /** 812 * @return the advisoryForConsumed 813 */ 814 public boolean isAdvisoryForConsumed() { 815 return advisoryForConsumed; 816 } 817 818 /** 819 * @param advisoryForConsumed the advisoryForConsumed to set 820 */ 821 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 822 this.advisoryForConsumed = advisoryForConsumed; 823 } 824 825 /** 826 * @return the advisdoryForFastProducers 827 */ 828 public boolean isAdvisoryForFastProducers() { 829 return advisoryForFastProducers; 830 } 831 832 /** 833 * @param advisoryForFastProducers the advisdoryForFastProducers to set 834 */ 835 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 836 this.advisoryForFastProducers = advisoryForFastProducers; 837 } 838 839 /** 840 * Returns true if the original message body should be included when applicable 841 * for advisory messages 842 * 843 * @return 844 */ 845 public boolean isIncludeBodyForAdvisory() { 846 return includeBodyForAdvisory; 847 } 848 849 /** 850 * Sets if the original message body should be included when applicable 851 * for advisory messages 852 * 853 * @param includeBodyForAdvisory 854 */ 855 public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { 856 this.includeBodyForAdvisory = includeBodyForAdvisory; 857 } 858 859 public void setMaxExpirePageSize(int maxExpirePageSize) { 860 this.maxExpirePageSize = maxExpirePageSize; 861 } 862 863 public int getMaxExpirePageSize() { 864 return maxExpirePageSize; 865 } 866 867 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 868 this.expireMessagesPeriod = expireMessagesPeriod; 869 } 870 871 public long getExpireMessagesPeriod() { 872 return expireMessagesPeriod; 873 } 874 875 /** 876 * Get the queuePrefetch 877 * @return the queuePrefetch 878 */ 879 public int getQueuePrefetch() { 880 return this.queuePrefetch; 881 } 882 883 /** 884 * Set the queuePrefetch 885 * @param queuePrefetch the queuePrefetch to set 886 */ 887 public void setQueuePrefetch(int queuePrefetch) { 888 this.queuePrefetch = queuePrefetch; 889 } 890 891 /** 892 * Get the queueBrowserPrefetch 893 * @return the queueBrowserPrefetch 894 */ 895 public int getQueueBrowserPrefetch() { 896 return this.queueBrowserPrefetch; 897 } 898 899 /** 900 * Set the queueBrowserPrefetch 901 * @param queueBrowserPrefetch the queueBrowserPrefetch to set 902 */ 903 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 904 this.queueBrowserPrefetch = queueBrowserPrefetch; 905 } 906 907 /** 908 * Get the topicPrefetch 909 * @return the topicPrefetch 910 */ 911 public int getTopicPrefetch() { 912 return this.topicPrefetch; 913 } 914 915 /** 916 * Set the topicPrefetch 917 * @param topicPrefetch the topicPrefetch to set 918 */ 919 public void setTopicPrefetch(int topicPrefetch) { 920 this.topicPrefetch = topicPrefetch; 921 } 922 923 /** 924 * Get the durableTopicPrefetch 925 * @return the durableTopicPrefetch 926 */ 927 public int getDurableTopicPrefetch() { 928 return this.durableTopicPrefetch; 929 } 930 931 /** 932 * Set the durableTopicPrefetch 933 * @param durableTopicPrefetch the durableTopicPrefetch to set 934 */ 935 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 936 this.durableTopicPrefetch = durableTopicPrefetch; 937 } 938 939 public boolean isUsePrefetchExtension() { 940 return this.usePrefetchExtension; 941 } 942 943 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 944 this.usePrefetchExtension = usePrefetchExtension; 945 } 946 947 public int getCursorMemoryHighWaterMark() { 948 return this.cursorMemoryHighWaterMark; 949 } 950 951 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 952 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 953 } 954 955 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 956 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 957 } 958 959 public int getStoreUsageHighWaterMark() { 960 return storeUsageHighWaterMark; 961 } 962 963 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 964 this.slowConsumerStrategy = slowConsumerStrategy; 965 } 966 967 public SlowConsumerStrategy getSlowConsumerStrategy() { 968 return this.slowConsumerStrategy; 969 } 970 971 972 public boolean isPrioritizedMessages() { 973 return this.prioritizedMessages; 974 } 975 976 public void setPrioritizedMessages(boolean prioritizedMessages) { 977 this.prioritizedMessages = prioritizedMessages; 978 } 979 980 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 981 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 982 } 983 984 public boolean isAllConsumersExclusiveByDefault() { 985 return allConsumersExclusiveByDefault; 986 } 987 988 public boolean isGcInactiveDestinations() { 989 return this.gcInactiveDestinations; 990 } 991 992 public void setGcInactiveDestinations(boolean gcInactiveDestinations) { 993 this.gcInactiveDestinations = gcInactiveDestinations; 994 } 995 996 /** 997 * @return the amount of time spent inactive before GC of the destination kicks in. 998 * 999 * @deprecated use getInactiveTimeoutBeforeGC instead. 1000 */ 1001 @Deprecated 1002 public long getInactiveTimoutBeforeGC() { 1003 return getInactiveTimeoutBeforeGC(); 1004 } 1005 1006 /** 1007 * Sets the amount of time a destination is inactive before it is marked for GC 1008 * 1009 * @param inactiveTimoutBeforeGC 1010 * time in milliseconds to configure as the inactive timeout. 1011 * 1012 * @deprecated use getInactiveTimeoutBeforeGC instead. 1013 */ 1014 @Deprecated 1015 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 1016 setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC); 1017 } 1018 1019 /** 1020 * @return the amount of time spent inactive before GC of the destination kicks in. 1021 */ 1022 public long getInactiveTimeoutBeforeGC() { 1023 return this.inactiveTimeoutBeforeGC; 1024 } 1025 1026 /** 1027 * Sets the amount of time a destination is inactive before it is marked for GC 1028 * 1029 * @param inactiveTimoutBeforeGC 1030 * time in milliseconds to configure as the inactive timeout. 1031 */ 1032 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 1033 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 1034 } 1035 1036 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 1037 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 1038 } 1039 1040 public boolean isGcWithNetworkConsumers() { 1041 return gcWithNetworkConsumers; 1042 } 1043 1044 public boolean isReduceMemoryFootprint() { 1045 return reduceMemoryFootprint; 1046 } 1047 1048 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 1049 this.reduceMemoryFootprint = reduceMemoryFootprint; 1050 } 1051 1052 public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) { 1053 this.networkBridgeFilterFactory = networkBridgeFilterFactory; 1054 } 1055 1056 public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { 1057 return networkBridgeFilterFactory; 1058 } 1059 1060 public boolean isDoOptimzeMessageStorage() { 1061 return doOptimzeMessageStorage; 1062 } 1063 1064 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 1065 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 1066 } 1067 1068 public int getOptimizeMessageStoreInFlightLimit() { 1069 return optimizeMessageStoreInFlightLimit; 1070 } 1071 1072 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 1073 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 1074 } 1075 1076 public void setPersistJMSRedelivered(boolean val) { 1077 this.persistJMSRedelivered = val; 1078 } 1079 1080 public boolean isPersistJMSRedelivered() { 1081 return persistJMSRedelivered; 1082 } 1083 1084 public int getMaxDestinations() { 1085 return maxDestinations; 1086 } 1087 1088 /** 1089 * Sets the maximum number of destinations that can be created 1090 * 1091 * @param maxDestinations 1092 * maximum number of destinations 1093 */ 1094 public void setMaxDestinations(int maxDestinations) { 1095 this.maxDestinations = maxDestinations; 1096 } 1097 1098 @Override 1099 public String toString() { 1100 return "PolicyEntry [" + destination + "]"; 1101 } 1102}