001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import org.apache.activemq.broker.region.policy.SimpleDispatchSelector; 020import org.apache.activemq.command.ActiveMQDestination; 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023 024/** 025 * Queue dispatch policy that determines if a message can be sent to a subscription 026 * 027 * @org.apache.xbean.XBean 028 * 029 */ 030public class QueueDispatchSelector extends SimpleDispatchSelector { 031 private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class); 032 private Subscription exclusiveConsumer; 033 private boolean paused; 034 035 /** 036 * @param destination 037 */ 038 public QueueDispatchSelector(ActiveMQDestination destination) { 039 super(destination); 040 } 041 042 public Subscription getExclusiveConsumer() { 043 return exclusiveConsumer; 044 } 045 public void setExclusiveConsumer(Subscription exclusiveConsumer) { 046 this.exclusiveConsumer = exclusiveConsumer; 047 } 048 049 public boolean isExclusiveConsumer(Subscription s) { 050 return s == this.exclusiveConsumer; 051 } 052 053 public boolean canSelect(Subscription subscription, 054 MessageReference m) throws Exception { 055 056 boolean result = !paused && super.canDispatch(subscription, m); 057 if (result && !subscription.isBrowser()) { 058 result = exclusiveConsumer == null || exclusiveConsumer == subscription; 059 } 060 return result; 061 } 062 063 public void pause() { 064 paused = true; 065 } 066 067 public void resume() { 068 paused = false; 069 } 070 071 public boolean isPaused() { 072 return paused; 073 } 074}