001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicBoolean; 021import java.util.concurrent.atomic.AtomicLong; 022 023import org.apache.activemq.broker.region.Destination; 024import org.apache.activemq.broker.region.Region; 025import org.apache.activemq.command.Message; 026import org.apache.activemq.command.MessageId; 027import org.apache.activemq.state.ProducerState; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Holds internal state in the broker for a MessageProducer 033 */ 034public class ProducerBrokerExchange { 035 036 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class); 037 private ConnectionContext connectionContext; 038 private Destination regionDestination; 039 private Region region; 040 private ProducerState producerState; 041 private boolean mutable = true; 042 private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); 043 private boolean auditProducerSequenceIds; 044 private boolean isNetworkProducer; 045 private BrokerService brokerService; 046 private final FlowControlInfo flowControlInfo = new FlowControlInfo(); 047 048 public ProducerBrokerExchange() { 049 } 050 051 public ProducerBrokerExchange copy() { 052 ProducerBrokerExchange rc = new ProducerBrokerExchange(); 053 rc.connectionContext = connectionContext.copy(); 054 rc.regionDestination = regionDestination; 055 rc.region = region; 056 rc.producerState = producerState; 057 rc.mutable = mutable; 058 return rc; 059 } 060 061 062 /** 063 * @return the connectionContext 064 */ 065 public ConnectionContext getConnectionContext() { 066 return this.connectionContext; 067 } 068 069 /** 070 * @param connectionContext the connectionContext to set 071 */ 072 public void setConnectionContext(ConnectionContext connectionContext) { 073 this.connectionContext = connectionContext; 074 } 075 076 /** 077 * @return the mutable 078 */ 079 public boolean isMutable() { 080 return this.mutable; 081 } 082 083 /** 084 * @param mutable the mutable to set 085 */ 086 public void setMutable(boolean mutable) { 087 this.mutable = mutable; 088 } 089 090 /** 091 * @return the regionDestination 092 */ 093 public Destination getRegionDestination() { 094 return this.regionDestination; 095 } 096 097 /** 098 * @param regionDestination the regionDestination to set 099 */ 100 public void setRegionDestination(Destination regionDestination) { 101 this.regionDestination = regionDestination; 102 } 103 104 /** 105 * @return the region 106 */ 107 public Region getRegion() { 108 return this.region; 109 } 110 111 /** 112 * @param region the region to set 113 */ 114 public void setRegion(Region region) { 115 this.region = region; 116 } 117 118 /** 119 * @return the producerState 120 */ 121 public ProducerState getProducerState() { 122 return this.producerState; 123 } 124 125 /** 126 * @param producerState the producerState to set 127 */ 128 public void setProducerState(ProducerState producerState) { 129 this.producerState = producerState; 130 } 131 132 /** 133 * Enforce duplicate suppression using info from persistence adapter 134 * 135 * @return false if message should be ignored as a duplicate 136 */ 137 public boolean canDispatch(Message messageSend) { 138 boolean canDispatch = true; 139 if (auditProducerSequenceIds && messageSend.isPersistent()) { 140 final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); 141 if (isNetworkProducer) { 142 // messages are multiplexed on this producer so we need to query the persistenceAdapter 143 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); 144 if (producerSequenceId <= lastStoredForMessageProducer) { 145 canDispatch = false; 146 LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 147 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer 148 }); 149 } 150 } else if (producerSequenceId <= lastSendSequenceNumber.get()) { 151 canDispatch = false; 152 if (messageSend.isInTransaction()) { 153 LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ 154 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber 155 }); 156 } else { 157 LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ 158 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber 159 }); 160 161 } 162 } else { 163 // track current so we can suppress duplicates later in the stream 164 lastSendSequenceNumber.set(producerSequenceId); 165 } 166 } 167 return canDispatch; 168 } 169 170 private long getStoredSequenceIdForMessage(MessageId messageId) { 171 try { 172 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 173 } catch (IOException ignored) { 174 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 175 } 176 return -1; 177 } 178 179 public void setLastStoredSequenceId(long l) { 180 auditProducerSequenceIds = true; 181 if (connectionContext.isNetworkConnection()) { 182 brokerService = connectionContext.getBroker().getBrokerService(); 183 isNetworkProducer = true; 184 } 185 lastSendSequenceNumber.set(l); 186 LOG.debug("last stored sequence id set: {}", l); 187 } 188 189 public void incrementSend() { 190 flowControlInfo.incrementSend(); 191 } 192 193 public void blockingOnFlowControl(boolean blockingOnFlowControl) { 194 flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl); 195 } 196 197 public void incrementTimeBlocked(Destination destination, long timeBlocked) { 198 flowControlInfo.incrementTimeBlocked(timeBlocked); 199 } 200 201 202 public boolean isBlockedForFlowControl() { 203 return flowControlInfo.isBlockingOnFlowControl(); 204 } 205 206 public void resetFlowControl() { 207 flowControlInfo.reset(); 208 } 209 210 public long getTotalTimeBlocked() { 211 return flowControlInfo.getTotalTimeBlocked(); 212 } 213 214 public int getPercentageBlocked() { 215 double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends(); 216 return (int) value * 100; 217 } 218 219 220 public static class FlowControlInfo { 221 private AtomicBoolean blockingOnFlowControl = new AtomicBoolean(); 222 private AtomicLong totalSends = new AtomicLong(); 223 private AtomicLong sendsBlocked = new AtomicLong(); 224 private AtomicLong totalTimeBlocked = new AtomicLong(); 225 226 227 public boolean isBlockingOnFlowControl() { 228 return blockingOnFlowControl.get(); 229 } 230 231 public void setBlockingOnFlowControl(boolean blockingOnFlowControl) { 232 this.blockingOnFlowControl.set(blockingOnFlowControl); 233 if (blockingOnFlowControl) { 234 incrementSendBlocked(); 235 } 236 } 237 238 239 public long getTotalSends() { 240 return totalSends.get(); 241 } 242 243 public void incrementSend() { 244 this.totalSends.incrementAndGet(); 245 } 246 247 public long getSendsBlocked() { 248 return sendsBlocked.get(); 249 } 250 251 public void incrementSendBlocked() { 252 this.sendsBlocked.incrementAndGet(); 253 } 254 255 public long getTotalTimeBlocked() { 256 return totalTimeBlocked.get(); 257 } 258 259 public void incrementTimeBlocked(long time) { 260 this.totalTimeBlocked.addAndGet(time); 261 } 262 263 public void reset() { 264 blockingOnFlowControl.set(false); 265 totalSends.set(0); 266 sendsBlocked.set(0); 267 totalTimeBlocked.set(0); 268 269 } 270 } 271}