001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.filter.BooleanExpression; 020import org.apache.activemq.filter.MessageEvaluationContext; 021import org.apache.activemq.util.JMSExceptionSupport; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.JMSException; 026import java.io.IOException; 027import java.util.Arrays; 028 029/** 030 * @openwire:marshaller code="91" 031 * 032 */ 033public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 034 035 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; 036 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); 037 038 protected BrokerId networkBrokerId; 039 protected int networkTTL; 040 041 public NetworkBridgeFilter() { 042 } 043 044 public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) { 045 this.networkBrokerId = networkBrokerId; 046 this.networkTTL = networkTTL; 047 } 048 049 public byte getDataStructureType() { 050 return DATA_STRUCTURE_TYPE; 051 } 052 053 public boolean isMarshallAware() { 054 return false; 055 } 056 057 public boolean matches(MessageEvaluationContext mec) throws JMSException { 058 try { 059 // for Queues - the message can be acknowledged and dropped whilst 060 // still 061 // in the dispatch loop 062 // so need to get the reference to it 063 Message message = mec.getMessage(); 064 return message != null && matchesForwardingFilter(message, mec); 065 } catch (IOException e) { 066 throw JMSExceptionSupport.create(e); 067 } 068 } 069 070 public Object evaluate(MessageEvaluationContext message) throws JMSException { 071 return matches(message) ? Boolean.TRUE : Boolean.FALSE; 072 } 073 074 protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) { 075 076 if (contains(message.getBrokerPath(), networkBrokerId)) { 077 if (LOG.isTraceEnabled()) { 078 LOG.trace("Message all ready routed once through target broker (" 079 + networkBrokerId + "), path: " 080 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); 081 } 082 return false; 083 } 084 085 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 086 087 if (hops >= networkTTL) { 088 if (LOG.isTraceEnabled()) { 089 LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message); 090 } 091 return false; 092 } 093 094 if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { 095 ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); 096 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; 097 if (hops >= networkTTL) { 098 if (LOG.isTraceEnabled()) { 099 LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message); 100 } 101 return false; 102 } 103 104 if (contains(info.getBrokerPath(), networkBrokerId)) { 105 LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" 106 + networkBrokerId + "), path: " 107 + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); 108 return false; 109 } 110 } 111 return true; 112 } 113 114 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 115 if (brokerPath != null && brokerId != null) { 116 for (int i = 0; i < brokerPath.length; i++) { 117 if (brokerId.equals(brokerPath[i])) { 118 return true; 119 } 120 } 121 } 122 return false; 123 } 124 125 /** 126 * @openwire:property version=1 127 */ 128 public int getNetworkTTL() { 129 return networkTTL; 130 } 131 132 public void setNetworkTTL(int networkTTL) { 133 this.networkTTL = networkTTL; 134 } 135 136 /** 137 * @openwire:property version=1 cache=true 138 */ 139 public BrokerId getNetworkBrokerId() { 140 return networkBrokerId; 141 } 142 143 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 144 this.networkBrokerId = remoteBrokerPath; 145 } 146 147}