001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.NoLocalSubscriptionAware; 055import org.apache.activemq.store.PersistenceAdapter; 056import org.apache.activemq.store.TopicMessageStore; 057import org.apache.activemq.thread.Task; 058import org.apache.activemq.thread.TaskRunner; 059import org.apache.activemq.thread.TaskRunnerFactory; 060import org.apache.activemq.transaction.Synchronization; 061import org.apache.activemq.util.SubscriptionKey; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * The Topic is a destination that sends a copy of a message to every active 067 * Subscription registered. 068 */ 069public class Topic extends BaseDestination implements Task { 070 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 071 private final TopicMessageStore topicStore; 072 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 073 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 074 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 075 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 076 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 077 private final TaskRunner taskRunner; 078 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 079 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 080 @Override 081 public void run() { 082 try { 083 Topic.this.taskRunner.wakeup(); 084 } catch (InterruptedException e) { 085 } 086 } 087 }; 088 089 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 090 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 091 super(brokerService, store, destination, parentStats); 092 this.topicStore = store; 093 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 094 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 095 } 096 097 @Override 098 public void initialize() throws Exception { 099 super.initialize(); 100 // set non default subscription recovery policy (override policyEntries) 101 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 102 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 103 setAlwaysRetroactive(true); 104 } 105 if (store != null) { 106 // AMQ-2586: Better to leave this stat at zero than to give the user 107 // misleading metrics. 108 // int messageCount = store.getMessageCount(); 109 // destinationStatistics.getMessages().setCount(messageCount); 110 store.start(); 111 } 112 } 113 114 @Override 115 public List<Subscription> getConsumers() { 116 synchronized (consumers) { 117 return new ArrayList<Subscription>(consumers); 118 } 119 } 120 121 public boolean lock(MessageReference node, LockOwner sub) { 122 return true; 123 } 124 125 @Override 126 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 127 if (!sub.getConsumerInfo().isDurable()) { 128 129 // Do a retroactive recovery if needed. 130 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 131 132 // synchronize with dispatch method so that no new messages are sent 133 // while we are recovering a subscription to avoid out of order messages. 134 dispatchLock.writeLock().lock(); 135 try { 136 boolean applyRecovery = false; 137 synchronized (consumers) { 138 if (!consumers.contains(sub)){ 139 sub.add(context, this); 140 consumers.add(sub); 141 applyRecovery=true; 142 super.addSubscription(context, sub); 143 } 144 } 145 if (applyRecovery){ 146 subscriptionRecoveryPolicy.recover(context, this, sub); 147 } 148 } finally { 149 dispatchLock.writeLock().unlock(); 150 } 151 152 } else { 153 synchronized (consumers) { 154 if (!consumers.contains(sub)){ 155 sub.add(context, this); 156 consumers.add(sub); 157 super.addSubscription(context, sub); 158 } 159 } 160 } 161 } else { 162 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 163 super.addSubscription(context, sub); 164 sub.add(context, this); 165 if(dsub.isActive()) { 166 synchronized (consumers) { 167 boolean hasSubscription = false; 168 169 if (consumers.size() == 0) { 170 hasSubscription = false; 171 } else { 172 for (Subscription currentSub : consumers) { 173 if (currentSub.getConsumerInfo().isDurable()) { 174 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 175 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 176 hasSubscription = true; 177 break; 178 } 179 } 180 } 181 } 182 183 if (!hasSubscription) { 184 consumers.add(sub); 185 } 186 } 187 } 188 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 189 } 190 } 191 192 @Override 193 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 194 if (!sub.getConsumerInfo().isDurable()) { 195 boolean removed = false; 196 synchronized (consumers) { 197 removed = consumers.remove(sub); 198 } 199 if (removed) { 200 super.removeSubscription(context, sub, lastDeliveredSequenceId); 201 } 202 } 203 sub.remove(context, this); 204 } 205 206 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 207 if (topicStore != null) { 208 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 209 DurableTopicSubscription removed = durableSubscribers.remove(key); 210 if (removed != null) { 211 destinationStatistics.getConsumers().decrement(); 212 // deactivate and remove 213 removed.deactivate(false, 0l); 214 consumers.remove(removed); 215 } 216 } 217 } 218 219 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 220 if (hasSelectorChanged(info1, info2)) { 221 return true; 222 } 223 224 return hasNoLocalChanged(info1, info2); 225 } 226 227 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 228 //Not all persistence adapters store the noLocal value for a subscription 229 PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); 230 if (adapter instanceof NoLocalSubscriptionAware) { 231 if (info1.isNoLocal() ^ info2.isNoLocal()) { 232 return true; 233 } 234 } 235 236 return false; 237 } 238 239 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 240 if (info1.getSelector() != null ^ info2.getSelector() != null) { 241 return true; 242 } 243 244 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 245 return true; 246 } 247 248 return false; 249 } 250 251 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 252 // synchronize with dispatch method so that no new messages are sent 253 // while we are recovering a subscription to avoid out of order messages. 254 dispatchLock.writeLock().lock(); 255 try { 256 257 if (topicStore == null) { 258 return; 259 } 260 261 // Recover the durable subscription. 262 String clientId = subscription.getSubscriptionKey().getClientId(); 263 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 264 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 265 if (info != null) { 266 // Check to see if selector changed. 267 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 268 // Need to delete the subscription 269 topicStore.deleteSubscription(clientId, subscriptionName); 270 info = null; 271 // Force a rebuild of the selector chain for the subscription otherwise 272 // the stored subscription is updated but the selector expression is not 273 // and the subscription will not behave according to the new configuration. 274 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 275 synchronized (consumers) { 276 consumers.remove(subscription); 277 } 278 } else { 279 synchronized (consumers) { 280 if (!consumers.contains(subscription)) { 281 consumers.add(subscription); 282 } 283 } 284 } 285 } 286 287 // Do we need to create the subscription? 288 if (info == null) { 289 info = new SubscriptionInfo(); 290 info.setClientId(clientId); 291 info.setSelector(subscription.getConsumerInfo().getSelector()); 292 info.setSubscriptionName(subscriptionName); 293 info.setDestination(getActiveMQDestination()); 294 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 295 // This destination is an actual destination id. 296 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 297 // This destination might be a pattern 298 synchronized (consumers) { 299 consumers.add(subscription); 300 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 301 } 302 } 303 304 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 305 msgContext.setDestination(destination); 306 if (subscription.isRecoveryRequired()) { 307 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 308 @Override 309 public boolean recoverMessage(Message message) throws Exception { 310 message.setRegionDestination(Topic.this); 311 try { 312 msgContext.setMessageReference(message); 313 if (subscription.matches(message, msgContext)) { 314 subscription.add(message); 315 } 316 } catch (IOException e) { 317 LOG.error("Failed to recover this message {}", message, e); 318 } 319 return true; 320 } 321 322 @Override 323 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 324 throw new RuntimeException("Should not be called."); 325 } 326 327 @Override 328 public boolean hasSpace() { 329 return true; 330 } 331 332 @Override 333 public boolean isDuplicate(MessageId id) { 334 return false; 335 } 336 }); 337 } 338 } finally { 339 dispatchLock.writeLock().unlock(); 340 } 341 } 342 343 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 344 synchronized (consumers) { 345 consumers.remove(sub); 346 } 347 sub.remove(context, this, dispatched); 348 } 349 350 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 351 if (subscription.getConsumerInfo().isRetroactive()) { 352 subscriptionRecoveryPolicy.recover(context, this, subscription); 353 } 354 } 355 356 @Override 357 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 358 final ConnectionContext context = producerExchange.getConnectionContext(); 359 360 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 361 producerExchange.incrementSend(); 362 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 363 && !context.isInRecoveryMode(); 364 365 message.setRegionDestination(this); 366 367 // There is delay between the client sending it and it arriving at the 368 // destination.. it may have expired. 369 if (message.isExpired()) { 370 broker.messageExpired(context, message, null); 371 getDestinationStatistics().getExpired().increment(); 372 if (sendProducerAck) { 373 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 374 context.getConnection().dispatchAsync(ack); 375 } 376 return; 377 } 378 379 if (memoryUsage.isFull()) { 380 isFull(context, memoryUsage); 381 fastProducer(context, producerInfo); 382 383 if (isProducerFlowControl() && context.isProducerFlowControl()) { 384 385 if (warnOnProducerFlowControl) { 386 warnOnProducerFlowControl = false; 387 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 388 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 389 } 390 391 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 392 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 393 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 394 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 395 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 396 } 397 398 // We can avoid blocking due to low usage if the producer is sending a sync message or 399 // if it is using a producer window 400 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 401 synchronized (messagesWaitingForSpace) { 402 messagesWaitingForSpace.add(new Runnable() { 403 @Override 404 public void run() { 405 try { 406 407 // While waiting for space to free up... the 408 // message may have expired. 409 if (message.isExpired()) { 410 broker.messageExpired(context, message, null); 411 getDestinationStatistics().getExpired().increment(); 412 } else { 413 doMessageSend(producerExchange, message); 414 } 415 416 if (sendProducerAck) { 417 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 418 .getSize()); 419 context.getConnection().dispatchAsync(ack); 420 } else { 421 Response response = new Response(); 422 response.setCorrelationId(message.getCommandId()); 423 context.getConnection().dispatchAsync(response); 424 } 425 426 } catch (Exception e) { 427 if (!sendProducerAck && !context.isInRecoveryMode()) { 428 ExceptionResponse response = new ExceptionResponse(e); 429 response.setCorrelationId(message.getCommandId()); 430 context.getConnection().dispatchAsync(response); 431 } 432 } 433 } 434 }); 435 436 registerCallbackForNotFullNotification(); 437 context.setDontSendReponse(true); 438 return; 439 } 440 441 } else { 442 // Producer flow control cannot be used, so we have do the flow control 443 // at the broker by blocking this thread until there is space available. 444 445 if (memoryUsage.isFull()) { 446 if (context.isInTransaction()) { 447 448 int count = 0; 449 while (!memoryUsage.waitForSpace(1000)) { 450 if (context.getStopping().get()) { 451 throw new IOException("Connection closed, send aborted."); 452 } 453 if (count > 2 && context.isInTransaction()) { 454 count = 0; 455 int size = context.getTransaction().size(); 456 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 457 } 458 count++; 459 } 460 } else { 461 waitForSpace( 462 context, 463 producerExchange, 464 memoryUsage, 465 "Usage Manager Memory Usage limit reached. Stopping producer (" 466 + message.getProducerId() 467 + ") to prevent flooding " 468 + getActiveMQDestination().getQualifiedName() 469 + "." 470 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 471 } 472 } 473 474 // The usage manager could have delayed us by the time 475 // we unblock the message could have expired.. 476 if (message.isExpired()) { 477 getDestinationStatistics().getExpired().increment(); 478 LOG.debug("Expired message: {}", message); 479 return; 480 } 481 } 482 } 483 } 484 485 doMessageSend(producerExchange, message); 486 messageDelivered(context, message); 487 if (sendProducerAck) { 488 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 489 context.getConnection().dispatchAsync(ack); 490 } 491 } 492 493 /** 494 * do send the message - this needs to be synchronized to ensure messages 495 * are stored AND dispatched in the right order 496 * 497 * @param producerExchange 498 * @param message 499 * @throws IOException 500 * @throws Exception 501 */ 502 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 503 throws IOException, Exception { 504 final ConnectionContext context = producerExchange.getConnectionContext(); 505 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 506 Future<Object> result = null; 507 508 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 509 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 510 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 511 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 512 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 513 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 514 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 515 throw new javax.jms.ResourceAllocationException(logMessage); 516 } 517 518 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 519 } 520 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 521 522 //Moved the reduceMemoryfootprint clearing to the dispatch method 523 } 524 525 message.incrementReferenceCount(); 526 527 if (context.isInTransaction()) { 528 context.getTransaction().addSynchronization(new Synchronization() { 529 @Override 530 public void afterCommit() throws Exception { 531 // It could take while before we receive the commit 532 // operation.. by that time the message could have 533 // expired.. 534 if (message.isExpired()) { 535 if (broker.isExpired(message)) { 536 getDestinationStatistics().getExpired().increment(); 537 broker.messageExpired(context, message, null); 538 } 539 message.decrementReferenceCount(); 540 return; 541 } 542 try { 543 dispatch(context, message); 544 } finally { 545 message.decrementReferenceCount(); 546 } 547 } 548 549 @Override 550 public void afterRollback() throws Exception { 551 message.decrementReferenceCount(); 552 } 553 }); 554 555 } else { 556 try { 557 dispatch(context, message); 558 } finally { 559 message.decrementReferenceCount(); 560 } 561 } 562 563 if (result != null && !result.isCancelled()) { 564 try { 565 result.get(); 566 } catch (CancellationException e) { 567 // ignore - the task has been cancelled if the message 568 // has already been deleted 569 } 570 } 571 } 572 573 private boolean canOptimizeOutPersistence() { 574 return durableSubscribers.size() == 0; 575 } 576 577 @Override 578 public String toString() { 579 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 580 } 581 582 @Override 583 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 584 final MessageReference node) throws IOException { 585 if (topicStore != null && node.isPersistent()) { 586 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 587 SubscriptionKey key = dsub.getSubscriptionKey(); 588 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 589 convertToNonRangedAck(ack, node)); 590 } 591 messageConsumed(context, node); 592 } 593 594 @Override 595 public void gc() { 596 } 597 598 public Message loadMessage(MessageId messageId) throws IOException { 599 return topicStore != null ? topicStore.getMessage(messageId) : null; 600 } 601 602 @Override 603 public void start() throws Exception { 604 if (started.compareAndSet(false, true)) { 605 this.subscriptionRecoveryPolicy.start(); 606 if (memoryUsage != null) { 607 memoryUsage.start(); 608 } 609 610 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 611 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 612 } 613 } 614 } 615 616 @Override 617 public void stop() throws Exception { 618 if (started.compareAndSet(true, false)) { 619 if (taskRunner != null) { 620 taskRunner.shutdown(); 621 } 622 this.subscriptionRecoveryPolicy.stop(); 623 if (memoryUsage != null) { 624 memoryUsage.stop(); 625 } 626 if (this.topicStore != null) { 627 this.topicStore.stop(); 628 } 629 630 scheduler.cancel(expireMessagesTask); 631 } 632 } 633 634 @Override 635 public Message[] browse() { 636 final List<Message> result = new ArrayList<Message>(); 637 doBrowse(result, getMaxBrowsePageSize()); 638 return result.toArray(new Message[result.size()]); 639 } 640 641 private void doBrowse(final List<Message> browseList, final int max) { 642 try { 643 if (topicStore != null) { 644 final List<Message> toExpire = new ArrayList<Message>(); 645 topicStore.recover(new MessageRecoveryListener() { 646 @Override 647 public boolean recoverMessage(Message message) throws Exception { 648 if (message.isExpired()) { 649 toExpire.add(message); 650 } 651 browseList.add(message); 652 return true; 653 } 654 655 @Override 656 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 657 return true; 658 } 659 660 @Override 661 public boolean hasSpace() { 662 return browseList.size() < max; 663 } 664 665 @Override 666 public boolean isDuplicate(MessageId id) { 667 return false; 668 } 669 }); 670 final ConnectionContext connectionContext = createConnectionContext(); 671 for (Message message : toExpire) { 672 for (DurableTopicSubscription sub : durableSubscribers.values()) { 673 if (!sub.isActive()) { 674 message.setRegionDestination(this); 675 messageExpired(connectionContext, sub, message); 676 } 677 } 678 } 679 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 680 if (msgs != null) { 681 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 682 browseList.add(msgs[i]); 683 } 684 } 685 } 686 } catch (Throwable e) { 687 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 688 } 689 } 690 691 @Override 692 public boolean iterate() { 693 synchronized (messagesWaitingForSpace) { 694 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 695 Runnable op = messagesWaitingForSpace.removeFirst(); 696 op.run(); 697 } 698 699 if (!messagesWaitingForSpace.isEmpty()) { 700 registerCallbackForNotFullNotification(); 701 } 702 } 703 return false; 704 } 705 706 private void registerCallbackForNotFullNotification() { 707 // If the usage manager is not full, then the task will not 708 // get called.. 709 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 710 // so call it directly here. 711 sendMessagesWaitingForSpaceTask.run(); 712 } 713 } 714 715 // Properties 716 // ------------------------------------------------------------------------- 717 718 public DispatchPolicy getDispatchPolicy() { 719 return dispatchPolicy; 720 } 721 722 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 723 this.dispatchPolicy = dispatchPolicy; 724 } 725 726 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 727 return subscriptionRecoveryPolicy; 728 } 729 730 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 731 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 732 // allow users to combine retained message policy with other ActiveMQ policies 733 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 734 policy.setWrapped(recoveryPolicy); 735 } else { 736 this.subscriptionRecoveryPolicy = recoveryPolicy; 737 } 738 } 739 740 // Implementation methods 741 // ------------------------------------------------------------------------- 742 743 @Override 744 public final void wakeup() { 745 } 746 747 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 748 // AMQ-2586: Better to leave this stat at zero than to give the user 749 // misleading metrics. 750 // destinationStatistics.getMessages().increment(); 751 destinationStatistics.getEnqueues().increment(); 752 destinationStatistics.getMessageSize().addSize(message.getSize()); 753 MessageEvaluationContext msgContext = null; 754 755 dispatchLock.readLock().lock(); 756 try { 757 if (!subscriptionRecoveryPolicy.add(context, message)) { 758 return; 759 } 760 synchronized (consumers) { 761 if (consumers.isEmpty()) { 762 onMessageWithNoConsumers(context, message); 763 return; 764 } 765 } 766 767 // Clear memory before dispatch - need to clear here because the call to 768 //subscriptionRecoveryPolicy.add() will unmarshall the state 769 if (isReduceMemoryFootprint() && message.isMarshalled()) { 770 message.clearUnMarshalledState(); 771 } 772 773 msgContext = context.getMessageEvaluationContext(); 774 msgContext.setDestination(destination); 775 msgContext.setMessageReference(message); 776 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 777 onMessageWithNoConsumers(context, message); 778 } 779 780 } finally { 781 dispatchLock.readLock().unlock(); 782 if (msgContext != null) { 783 msgContext.clear(); 784 } 785 } 786 } 787 788 private final Runnable expireMessagesTask = new Runnable() { 789 @Override 790 public void run() { 791 List<Message> browsedMessages = new InsertionCountList<Message>(); 792 doBrowse(browsedMessages, getMaxExpirePageSize()); 793 } 794 }; 795 796 @Override 797 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 798 broker.messageExpired(context, reference, subs); 799 // AMQ-2586: Better to leave this stat at zero than to give the user 800 // misleading metrics. 801 // destinationStatistics.getMessages().decrement(); 802 destinationStatistics.getExpired().increment(); 803 MessageAck ack = new MessageAck(); 804 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 805 ack.setDestination(destination); 806 ack.setMessageID(reference.getMessageId()); 807 try { 808 if (subs instanceof DurableTopicSubscription) { 809 ((DurableTopicSubscription)subs).removePending(reference); 810 } 811 acknowledge(context, subs, ack, reference); 812 } catch (Exception e) { 813 LOG.error("Failed to remove expired Message from the store ", e); 814 } 815 } 816 817 @Override 818 protected Logger getLog() { 819 return LOG; 820 } 821 822 protected boolean isOptimizeStorage(){ 823 boolean result = false; 824 825 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 826 result = true; 827 for (DurableTopicSubscription s : durableSubscribers.values()) { 828 if (s.isActive()== false){ 829 result = false; 830 break; 831 } 832 if (s.getPrefetchSize()==0){ 833 result = false; 834 break; 835 } 836 if (s.isSlowConsumer()){ 837 result = false; 838 break; 839 } 840 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 841 result = false; 842 break; 843 } 844 } 845 } 846 return result; 847 } 848 849 /** 850 * force a reread of the store - after transaction recovery completion 851 */ 852 @Override 853 public void clearPendingMessages() { 854 dispatchLock.readLock().lock(); 855 try { 856 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 857 clearPendingAndDispatch(durableTopicSubscription); 858 } 859 } finally { 860 dispatchLock.readLock().unlock(); 861 } 862 } 863 864 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 865 synchronized (durableTopicSubscription.pendingLock) { 866 durableTopicSubscription.pending.clear(); 867 try { 868 durableTopicSubscription.dispatchPending(); 869 } catch (IOException exception) { 870 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 871 durableTopicSubscription, 872 destination, 873 durableTopicSubscription.pending }, exception); 874 } 875 } 876 } 877 878 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 879 return durableSubscribers; 880 } 881}