001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.net.URI; 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.activemq.Service; 025import org.apache.activemq.broker.region.Destination; 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.broker.region.Region; 028import org.apache.activemq.broker.region.Subscription; 029import org.apache.activemq.broker.region.virtual.VirtualDestination; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.BrokerId; 032import org.apache.activemq.command.BrokerInfo; 033import org.apache.activemq.command.ConnectionInfo; 034import org.apache.activemq.command.DestinationInfo; 035import org.apache.activemq.command.MessageDispatch; 036import org.apache.activemq.command.ProducerInfo; 037import org.apache.activemq.command.SessionInfo; 038import org.apache.activemq.command.TransactionId; 039import org.apache.activemq.store.PListStore; 040import org.apache.activemq.thread.Scheduler; 041import org.apache.activemq.usage.Usage; 042 043/** 044 * The Message Broker which routes messages, maintains subscriptions and 045 * connections, acknowledges messages and handles transactions. 046 */ 047public interface Broker extends Region, Service { 048 049 /** 050 * Get a Broker from the Broker Stack that is a particular class 051 * 052 * @param type 053 * @return a Broker instance. 054 */ 055 Broker getAdaptor(Class type); 056 057 /** 058 * Get the id of the broker 059 */ 060 BrokerId getBrokerId(); 061 062 /** 063 * Get the name of the broker 064 */ 065 String getBrokerName(); 066 067 /** 068 * A remote Broker connects 069 */ 070 void addBroker(Connection connection, BrokerInfo info); 071 072 /** 073 * Remove a BrokerInfo 074 * 075 * @param connection 076 * @param info 077 */ 078 void removeBroker(Connection connection, BrokerInfo info); 079 080 /** 081 * A client is establishing a connection with the broker. 082 * 083 * @throws Exception TODO 084 */ 085 void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception; 086 087 /** 088 * A client is disconnecting from the broker. 089 * 090 * @param context the environment the operation is being executed under. 091 * @param info 092 * @param error null if the client requested the disconnect or the error 093 * that caused the client to disconnect. 094 * @throws Exception TODO 095 */ 096 void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception; 097 098 /** 099 * Adds a session. 100 * 101 * @param context 102 * @param info 103 * @throws Exception TODO 104 */ 105 void addSession(ConnectionContext context, SessionInfo info) throws Exception; 106 107 /** 108 * Removes a session. 109 * 110 * @param context 111 * @param info 112 * @throws Exception TODO 113 */ 114 void removeSession(ConnectionContext context, SessionInfo info) throws Exception; 115 116 /** 117 * Adds a producer. 118 * 119 * @param context the environment the operation is being executed under. 120 * @throws Exception TODO 121 */ 122 @Override 123 void addProducer(ConnectionContext context, ProducerInfo info) throws Exception; 124 125 /** 126 * Removes a producer. 127 * 128 * @param context the environment the operation is being executed under. 129 * @throws Exception TODO 130 */ 131 @Override 132 void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception; 133 134 /** 135 * @return all clients added to the Broker. 136 * @throws Exception TODO 137 */ 138 Connection[] getClients() throws Exception; 139 140 /** 141 * @return all destinations added to the Broker. 142 * @throws Exception TODO 143 */ 144 ActiveMQDestination[] getDestinations() throws Exception; 145 146 /** 147 * return a reference destination map of a region based on the destination type 148 * 149 * @param destination 150 * 151 * @return destination Map 152 */ 153 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination); 154 155 /** 156 * Gets a list of all the prepared xa transactions. 157 * 158 * @param context transaction ids 159 * @return array of TransactionId values 160 * @throws Exception TODO 161 */ 162 TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception; 163 164 /** 165 * Starts a transaction. 166 * 167 * @param context 168 * @param xid 169 * @throws Exception TODO 170 */ 171 void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception; 172 173 /** 174 * Prepares a transaction. Only valid for xa transactions. 175 * 176 * @param context 177 * @param xid 178 * @return id 179 * @throws Exception TODO 180 */ 181 int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception; 182 183 /** 184 * Rollsback a transaction. 185 * 186 * @param context 187 * @param xid 188 * @throws Exception TODO 189 */ 190 void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception; 191 192 /** 193 * Commits a transaction. 194 * 195 * @param context 196 * @param xid 197 * @param onePhase 198 * @throws Exception TODO 199 */ 200 void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception; 201 202 /** 203 * Forgets a transaction. 204 * 205 * @param context 206 * @param transactionId 207 * @throws Exception 208 */ 209 void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception; 210 211 /** 212 * Get the BrokerInfo's of any connected Brokers 213 * 214 * @return array of peer BrokerInfos 215 */ 216 BrokerInfo[] getPeerBrokerInfos(); 217 218 /** 219 * Notify the Broker that a dispatch is going to happen 220 * 221 * @param messageDispatch 222 */ 223 void preProcessDispatch(MessageDispatch messageDispatch); 224 225 /** 226 * Notify the Broker that a dispatch has happened 227 * 228 * @param messageDispatch 229 */ 230 void postProcessDispatch(MessageDispatch messageDispatch); 231 232 /** 233 * @return true if the broker has stopped 234 */ 235 boolean isStopped(); 236 237 /** 238 * @return a Set of all durable destinations 239 */ 240 Set<ActiveMQDestination> getDurableDestinations(); 241 242 /** 243 * Add and process a DestinationInfo object 244 * 245 * @param context 246 * @param info 247 * @throws Exception 248 */ 249 void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception; 250 251 /** 252 * Remove and process a DestinationInfo object 253 * 254 * @param context 255 * @param info 256 * 257 * @throws Exception 258 */ 259 void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception; 260 261 /** 262 * @return true if fault tolerant 263 */ 264 boolean isFaultTolerantConfiguration(); 265 266 /** 267 * @return the connection context used to make administration operations on 268 * startup or via JMX MBeans 269 */ 270 ConnectionContext getAdminConnectionContext(); 271 272 /** 273 * Sets the default administration connection context used when configuring 274 * the broker on startup or via JMX 275 * 276 * @param adminConnectionContext 277 */ 278 void setAdminConnectionContext(ConnectionContext adminConnectionContext); 279 280 /** 281 * @return the temp data store 282 */ 283 PListStore getTempDataStore(); 284 285 /** 286 * @return the URI that can be used to connect to the local Broker 287 */ 288 URI getVmConnectorURI(); 289 290 /** 291 * called when the brokerService starts 292 */ 293 void brokerServiceStarted(); 294 295 /** 296 * @return the BrokerService 297 */ 298 BrokerService getBrokerService(); 299 300 /** 301 * Ensure we get the Broker at the top of the Stack 302 * 303 * @return the broker at the top of the Stack 304 */ 305 Broker getRoot(); 306 307 /** 308 * Determine if a message has expired -allows default behaviour to be 309 * overriden - as the timestamp set by the producer can be out of sync with 310 * the broker 311 * 312 * @param messageReference 313 * @return true if the message is expired 314 */ 315 boolean isExpired(MessageReference messageReference); 316 317 /** 318 * A Message has Expired 319 * 320 * @param context 321 * @param messageReference 322 * @param subscription (may be null) 323 */ 324 void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription); 325 326 /** 327 * A message needs to go the a DLQ 328 * 329 * 330 * @param context 331 * @param messageReference 332 * @param poisonCause reason for dlq submission, may be null 333 * @return true if Message was placed in a DLQ false if discarded. 334 */ 335 boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause); 336 337 /** 338 * @return the broker sequence id 339 */ 340 long getBrokerSequenceId(); 341 342 /** 343 * called when message is consumed 344 * @param context 345 * @param messageReference 346 */ 347 void messageConsumed(ConnectionContext context, MessageReference messageReference); 348 349 /** 350 * Called when message is delivered to the broker 351 * @param context 352 * @param messageReference 353 */ 354 void messageDelivered(ConnectionContext context, MessageReference messageReference); 355 356 /** 357 * Called when a message is discarded - e.g. running low on memory 358 * This will happen only if the policy is enabled - e.g. non durable topics 359 * @param context 360 * @param sub 361 * @param messageReference 362 */ 363 void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference); 364 365 /** 366 * Called when there is a slow consumer 367 * @param context 368 * @param destination 369 * @param subs 370 */ 371 void slowConsumer(ConnectionContext context,Destination destination, Subscription subs); 372 373 /** 374 * Called to notify a producer is too fast 375 * @param context 376 * @param producerInfo 377 * @param destination 378 */ 379 void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination); 380 381 /** 382 * Called when a Usage reaches a limit 383 * @param context 384 * @param destination 385 * @param usage 386 */ 387 void isFull(ConnectionContext context,Destination destination,Usage usage); 388 389 void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination); 390 391 void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination); 392 393 /** 394 * called when the broker becomes the master in a master/slave 395 * configuration 396 */ 397 void nowMasterBroker(); 398 399 Scheduler getScheduler(); 400 401 ThreadPoolExecutor getExecutor(); 402 403 void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp); 404 405 void networkBridgeStopped(BrokerInfo brokerInfo); 406 407 408}