001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020 021import org.apache.activemq.broker.region.DurableTopicSubscription; 022import org.apache.activemq.broker.region.RegionBroker; 023import org.apache.activemq.broker.region.Subscription; 024import org.apache.activemq.broker.region.TopicRegion; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ConsumerId; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.RemoveSubscriptionInfo; 029import org.apache.activemq.filter.DestinationFilter; 030import org.apache.activemq.transport.Transport; 031import org.apache.activemq.util.NetworkBridgeUtils; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Consolidates subscriptions 037 */ 038public class DurableConduitBridge extends ConduitBridge { 039 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 040 041 @Override 042 public String toString() { 043 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 044 } 045 /** 046 * Constructor 047 * 048 * @param configuration 049 * 050 * @param localBroker 051 * @param remoteBroker 052 */ 053 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 054 Transport remoteBroker) { 055 super(configuration, localBroker, remoteBroker); 056 } 057 058 /** 059 * Subscriptions for these destinations are always created 060 * 061 */ 062 @Override 063 protected void setupStaticDestinations() { 064 super.setupStaticDestinations(); 065 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 066 if (dests != null) { 067 for (ActiveMQDestination dest : dests) { 068 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 069 try { 070 //Filtering by non-empty subscriptions, see AMQ-5875 071 if (dest.isTopic()) { 072 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 073 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 074 075 String candidateSubName = getSubscriberName(dest); 076 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 077 String subName = subscription.getConsumerInfo().getSubscriptionName(); 078 if (subName != null && subName.equals(candidateSubName)) { 079 DemandSubscription sub = createDemandSubscription(dest, subName); 080 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 081 sub.setStaticallyIncluded(true); 082 addSubscription(sub); 083 break; 084 } 085 } 086 } 087 } catch (IOException e) { 088 LOG.error("Failed to add static destination {}", dest, e); 089 } 090 LOG.trace("Forwarding messages for durable destination: {}", dest); 091 } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) { 092 if (dest.isTopic()) { 093 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 094 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 095 096 String candidateSubName = getSubscriberName(dest); 097 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 098 String subName = subscription.getConsumerInfo().getSubscriptionName(); 099 if (subName != null && subName.equals(candidateSubName) && 100 subscription instanceof DurableTopicSubscription) { 101 try { 102 DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription; 103 //check the clientId so we only remove subs for the matching bridge 104 if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) { 105 // remove the NC subscription as it is no longer for a permissible dest 106 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 107 sending.setClientId(localClientId); 108 sending.setSubscriptionName(subName); 109 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 110 localBroker.oneway(sending); 111 } 112 } catch (IOException e) { 113 LOG.debug("Exception removing NC durable subscription: {}", subName, e); 114 serviceRemoteException(e); 115 } 116 break; 117 } 118 } 119 } 120 } 121 } 122 } 123 } 124 125 @Override 126 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 127 boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info, 128 dynamicallyIncludedDestinations, staticallyIncludedDestinations); 129 130 if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { 131 return null; // don't want this subscription added 132 } 133 //add our original id to ourselves 134 info.addNetworkConsumerId(info.getConsumerId()); 135 ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null; 136 137 if(info.isDurable() || isForcedDurable) { 138 // set the subscriber name to something reproducible 139 info.setSubscriptionName(getSubscriberName(info.getDestination())); 140 // and override the consumerId with something unique so that it won't 141 // be removed if the durable subscriber (at the other end) goes away 142 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 143 consumerIdGenerator.getNextSequenceId())); 144 } 145 info.setSelector(null); 146 DemandSubscription demandSubscription = doCreateDemandSubscription(info); 147 if (forcedDurableId != null) { 148 demandSubscription.addForcedDurableConsumer(forcedDurableId); 149 forcedDurableRemoteId.add(forcedDurableId); 150 } 151 return demandSubscription; 152 } 153 154 protected String getSubscriberName(ActiveMQDestination dest) { 155 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 156 return subscriberName; 157 } 158 159 protected boolean doesConsumerExist(ActiveMQDestination dest) { 160 DestinationFilter filter = DestinationFilter.parseFilter(dest); 161 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 162 if (filter.matches(ds.getLocalInfo().getDestination())) { 163 return true; 164 } 165 } 166 return false; 167 } 168}