001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.Map.Entry;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.broker.Broker;
026import org.apache.activemq.broker.Connection;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.command.ConsumerControl;
030import org.apache.activemq.thread.Scheduler;
031import org.apache.activemq.transport.InactivityIOException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
037 * 
038 * @org.apache.xbean.XBean
039 */
040public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
041    
042    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
043
044    private String name = "AbortSlowConsumerStrategy@" + hashCode();
045    private Scheduler scheduler;
046    private Broker broker;
047    private final AtomicBoolean taskStarted = new AtomicBoolean(false);
048    private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
049
050    private long maxSlowCount = -1;
051    private long maxSlowDuration = 30*1000;
052    private long checkPeriod = 30*1000;
053    private boolean abortConnection = false;
054
055    public void setBrokerService(Broker broker) {
056       this.scheduler = broker.getScheduler();
057       this.broker = broker;
058    }
059
060    public void slowConsumer(ConnectionContext context, Subscription subs) {
061        if (maxSlowCount < 0 && maxSlowDuration < 0) {
062            // nothing to do
063            LOG.info("no limits set, slowConsumer strategy has nothing to do");
064            return;
065        }
066        
067        if (taskStarted.compareAndSet(false, true)) {
068            scheduler.executePeriodically(this, checkPeriod);
069        }
070            
071        if (!slowConsumers.containsKey(subs)) {
072            slowConsumers.put(subs, new SlowConsumerEntry(context));
073        } else if (maxSlowCount > 0) {
074            slowConsumers.get(subs).slow();
075        }
076    }
077
078    public void run() {
079        if (maxSlowDuration > 0) {
080            // mark
081            for (SlowConsumerEntry entry : slowConsumers.values()) {
082                entry.mark();
083            }
084        }
085        
086        HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
087        for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
088            if (entry.getKey().isSlowConsumer()) {
089                if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
090                        || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { 
091                    toAbort.put(entry.getKey(), entry.getValue());
092                    slowConsumers.remove(entry.getKey());
093                }
094            } else {
095                LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
096                slowConsumers.remove(entry.getKey());
097            }
098        }
099
100        abortSubscription(toAbort, abortConnection);
101    }
102
103    private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
104        for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
105            ConnectionContext connectionContext = entry.getValue().context;
106            if (connectionContext!= null) {
107                try {
108                    LOG.info("aborting "
109                            + (abortSubscriberConnection ? "connection" : "consumer") 
110                            + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
111
112                    final Connection connection = connectionContext.getConnection();
113                    if (connection != null) {
114                        if (abortSubscriberConnection) {
115                            scheduler.executeAfterDelay(new Runnable() {
116                                public void run() {
117                                    connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
118                                            + maxSlowCount +  ") or too long (>"
119                                            + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
120                                }}, 0l);
121                        } else {
122                            // just abort the consumer by telling it to stop
123                            ConsumerControl stopConsumer = new ConsumerControl();
124                            stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
125                            stopConsumer.setClose(true);
126                            connection.dispatchAsync(stopConsumer);
127                        }
128                    } else {
129                        LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
130                    }
131                } catch (Exception e) {
132                    LOG.info("exception on stopping "
133                            + (abortSubscriberConnection ? "connection" : "consumer")
134                            + " to abort slow consumer: " + entry.getKey(), e);
135                }
136            }
137        }
138    }
139
140
141    public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
142        if (sub != null) {
143            SlowConsumerEntry entry = slowConsumers.remove(sub);
144            if (entry != null) {
145                Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
146                toAbort.put(sub, entry);
147                abortSubscription(toAbort, abortSubscriberConnection);
148            } else {
149                LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
150            }
151        }
152    }
153
154
155    public long getMaxSlowCount() {
156        return maxSlowCount;
157    }
158
159    /**
160     * number of times a subscription can be deemed slow before triggering abort
161     * effect depends on dispatch rate as slow determination is done on dispatch
162     */
163    public void setMaxSlowCount(long maxSlowCount) {
164        this.maxSlowCount = maxSlowCount;
165    }
166
167    public long getMaxSlowDuration() {
168        return maxSlowDuration;
169    }
170
171    /**
172     * time in milliseconds that a sub can remain slow before triggering
173     * an abort.
174     * @param maxSlowDuration
175     */
176    public void setMaxSlowDuration(long maxSlowDuration) {
177        this.maxSlowDuration = maxSlowDuration;
178    }
179
180    public long getCheckPeriod() {
181        return checkPeriod;
182    }
183
184    /**
185     * time in milliseconds between checks for slow subscriptions
186     * @param checkPeriod
187     */
188    public void setCheckPeriod(long checkPeriod) {
189        this.checkPeriod = checkPeriod;
190    }
191
192    public boolean isAbortConnection() {
193        return abortConnection;
194    }
195
196    /**
197     * abort the consumers connection rather than sending a stop command to the remote consumer
198     * @param abortConnection
199     */
200    public void setAbortConnection(boolean abortConnection) {
201        this.abortConnection = abortConnection;
202    }
203
204    public void setName(String name) {
205        this.name = name;
206    }
207    
208    public String getName() {
209        return name;
210    }
211
212    public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
213        return slowConsumers;
214    }
215}