001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.util.Set; 020import java.util.concurrent.CopyOnWriteArraySet; 021import java.util.concurrent.atomic.AtomicBoolean; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.activemq.command.ConsumerId; 025import org.apache.activemq.command.ConsumerInfo; 026import org.apache.activemq.command.NetworkBridgeFilter; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Represents a network bridge interface 032 * 033 * 034 */ 035public class DemandSubscription { 036 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); 037 038 private final ConsumerInfo remoteInfo; 039 private final ConsumerInfo localInfo; 040 private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); 041 042 private AtomicInteger dispatched = new AtomicInteger(0); 043 private AtomicBoolean activeWaiter = new AtomicBoolean(); 044 private NetworkBridgeFilter networkBridgeFilter; 045 046 DemandSubscription(ConsumerInfo info) { 047 remoteInfo = info; 048 localInfo = info.copy(); 049 localInfo.setNetworkSubscription(true); 050 remoteSubsIds.add(info.getConsumerId()); 051 } 052 053 /** 054 * Increment the consumers associated with this subscription 055 * 056 * @param id 057 * @return true if added 058 */ 059 public boolean add(ConsumerId id) { 060 return remoteSubsIds.add(id); 061 } 062 063 /** 064 * Increment the consumers associated with this subscription 065 * 066 * @param id 067 * @return true if removed 068 */ 069 public boolean remove(ConsumerId id) { 070 return remoteSubsIds.remove(id); 071 } 072 073 /** 074 * @return true if there are no interested consumers 075 */ 076 public boolean isEmpty() { 077 return remoteSubsIds.isEmpty(); 078 } 079 080 public int size() { 081 return remoteSubsIds.size(); 082 } 083 /** 084 * @return Returns the localInfo. 085 */ 086 public ConsumerInfo getLocalInfo() { 087 return localInfo; 088 } 089 090 /** 091 * @return Returns the remoteInfo. 092 */ 093 public ConsumerInfo getRemoteInfo() { 094 return remoteInfo; 095 } 096 097 public void waitForCompletion() { 098 if (dispatched.get() > 0) { 099 if (LOG.isDebugEnabled()) { 100 LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get()); 101 } 102 activeWaiter.set(true); 103 if (dispatched.get() > 0) { 104 synchronized (activeWaiter) { 105 try { 106 activeWaiter.wait(); 107 } catch (InterruptedException ignored) { 108 } 109 } 110 if (this.dispatched.get() > 0) { 111 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried"); 112 } 113 } 114 } 115 } 116 117 public void decrementOutstandingResponses() { 118 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { 119 synchronized (activeWaiter) { 120 activeWaiter.notifyAll(); 121 } 122 } 123 } 124 125 public boolean incrementOutstandingResponses() { 126 dispatched.incrementAndGet(); 127 if (activeWaiter.get()) { 128 decrementOutstandingResponses(); 129 return false; 130 } 131 return true; 132 } 133 134 public NetworkBridgeFilter getNetworkBridgeFilter() { 135 return networkBridgeFilter; 136 } 137 138 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { 139 this.networkBridgeFilter = networkBridgeFilter; 140 } 141}