001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.Map.Entry; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.Connection; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.region.Subscription; 029import org.apache.activemq.command.ConsumerControl; 030import org.apache.activemq.thread.Scheduler; 031import org.apache.activemq.transport.InactivityIOException; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds 037 * 038 * @org.apache.xbean.XBean 039 */ 040public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable { 041 042 private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class); 043 044 private String name = "AbortSlowConsumerStrategy@" + hashCode(); 045 private Scheduler scheduler; 046 private Broker broker; 047 private final AtomicBoolean taskStarted = new AtomicBoolean(false); 048 private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>(); 049 050 private long maxSlowCount = -1; 051 private long maxSlowDuration = 30*1000; 052 private long checkPeriod = 30*1000; 053 private boolean abortConnection = false; 054 055 public void setBrokerService(Broker broker) { 056 this.scheduler = broker.getScheduler(); 057 this.broker = broker; 058 } 059 060 public void slowConsumer(ConnectionContext context, Subscription subs) { 061 if (maxSlowCount < 0 && maxSlowDuration < 0) { 062 // nothing to do 063 LOG.info("no limits set, slowConsumer strategy has nothing to do"); 064 return; 065 } 066 067 if (taskStarted.compareAndSet(false, true)) { 068 scheduler.executePeriodically(this, checkPeriod); 069 } 070 071 if (!slowConsumers.containsKey(subs)) { 072 slowConsumers.put(subs, new SlowConsumerEntry(context)); 073 } else if (maxSlowCount > 0) { 074 slowConsumers.get(subs).slow(); 075 } 076 } 077 078 public void run() { 079 if (maxSlowDuration > 0) { 080 // mark 081 for (SlowConsumerEntry entry : slowConsumers.values()) { 082 entry.mark(); 083 } 084 } 085 086 HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); 087 for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { 088 if (entry.getKey().isSlowConsumer()) { 089 if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration) 090 || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { 091 toAbort.put(entry.getKey(), entry.getValue()); 092 slowConsumers.remove(entry.getKey()); 093 } 094 } else { 095 LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow"); 096 slowConsumers.remove(entry.getKey()); 097 } 098 } 099 100 abortSubscription(toAbort, abortConnection); 101 } 102 103 private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) { 104 for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) { 105 ConnectionContext connectionContext = entry.getValue().context; 106 if (connectionContext!= null) { 107 try { 108 LOG.info("aborting " 109 + (abortSubscriberConnection ? "connection" : "consumer") 110 + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); 111 112 final Connection connection = connectionContext.getConnection(); 113 if (connection != null) { 114 if (abortSubscriberConnection) { 115 scheduler.executeAfterDelay(new Runnable() { 116 public void run() { 117 connection.serviceException(new InactivityIOException("Consumer was slow too often (>" 118 + maxSlowCount + ") or too long (>" 119 + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId())); 120 }}, 0l); 121 } else { 122 // just abort the consumer by telling it to stop 123 ConsumerControl stopConsumer = new ConsumerControl(); 124 stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId()); 125 stopConsumer.setClose(true); 126 connection.dispatchAsync(stopConsumer); 127 } 128 } else { 129 LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); 130 } 131 } catch (Exception e) { 132 LOG.info("exception on stopping " 133 + (abortSubscriberConnection ? "connection" : "consumer") 134 + " to abort slow consumer: " + entry.getKey(), e); 135 } 136 } 137 } 138 } 139 140 141 public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) { 142 if (sub != null) { 143 SlowConsumerEntry entry = slowConsumers.remove(sub); 144 if (entry != null) { 145 Map toAbort = new HashMap<Subscription, SlowConsumerEntry>(); 146 toAbort.put(sub, entry); 147 abortSubscription(toAbort, abortSubscriberConnection); 148 } else { 149 LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub); 150 } 151 } 152 } 153 154 155 public long getMaxSlowCount() { 156 return maxSlowCount; 157 } 158 159 /** 160 * number of times a subscription can be deemed slow before triggering abort 161 * effect depends on dispatch rate as slow determination is done on dispatch 162 */ 163 public void setMaxSlowCount(long maxSlowCount) { 164 this.maxSlowCount = maxSlowCount; 165 } 166 167 public long getMaxSlowDuration() { 168 return maxSlowDuration; 169 } 170 171 /** 172 * time in milliseconds that a sub can remain slow before triggering 173 * an abort. 174 * @param maxSlowDuration 175 */ 176 public void setMaxSlowDuration(long maxSlowDuration) { 177 this.maxSlowDuration = maxSlowDuration; 178 } 179 180 public long getCheckPeriod() { 181 return checkPeriod; 182 } 183 184 /** 185 * time in milliseconds between checks for slow subscriptions 186 * @param checkPeriod 187 */ 188 public void setCheckPeriod(long checkPeriod) { 189 this.checkPeriod = checkPeriod; 190 } 191 192 public boolean isAbortConnection() { 193 return abortConnection; 194 } 195 196 /** 197 * abort the consumers connection rather than sending a stop command to the remote consumer 198 * @param abortConnection 199 */ 200 public void setAbortConnection(boolean abortConnection) { 201 this.abortConnection = abortConnection; 202 } 203 204 public void setName(String name) { 205 this.name = name; 206 } 207 208 public String getName() { 209 return name; 210 } 211 212 public Map<Subscription, SlowConsumerEntry> getSlowConsumers() { 213 return slowConsumers; 214 } 215}