001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import org.apache.activemq.ActiveMQConnectionFactory; 020import org.apache.activemq.command.ActiveMQDestination; 021import org.apache.activemq.util.ProducerThread; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.Connection; 026import javax.jms.Session; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029 030public class ProducerCommand extends AbstractCommand { 031 private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class); 032 033 String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 034 String user = ActiveMQConnectionFactory.DEFAULT_USER; 035 String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 036 String destination = "queue://TEST"; 037 int messageCount = 1000; 038 int sleep = 0; 039 boolean persistent = true; 040 String message = null; 041 String payloadUrl = null; 042 int messageSize = 0; 043 int textMessageSize; 044 long msgTTL = 0L; 045 String msgGroupID=null; 046 int transactionBatchSize; 047 private int parallelThreads = 1; 048 049 @Override 050 protected void runTask(List<String> tokens) throws Exception { 051 LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")"); 052 LOG.info("Producing messages to " + destination); 053 LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); 054 LOG.info("Sleeping between sends " + sleep + " ms"); 055 LOG.info("Running " + parallelThreads + " parallel threads"); 056 057 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); 058 Connection conn = null; 059 try { 060 conn = factory.createConnection(user, password); 061 conn.start(); 062 063 CountDownLatch active = new CountDownLatch(parallelThreads); 064 065 for (int i = 1; i <= parallelThreads; i++) { 066 Session sess; 067 if (transactionBatchSize != 0) { 068 sess = conn.createSession(true, Session.SESSION_TRANSACTED); 069 } else { 070 sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 071 } 072 ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE)); 073 producer.setName("producer-" + i); 074 producer.setMessageCount(messageCount); 075 producer.setSleep(sleep); 076 producer.setMsgTTL(msgTTL); 077 producer.setPersistent(persistent); 078 producer.setTransactionBatchSize(transactionBatchSize); 079 producer.setMessage(message); 080 producer.setPayloadUrl(payloadUrl); 081 producer.setMessageSize(messageSize); 082 producer.setMsgGroupID(msgGroupID); 083 producer.setTextMessageSize(textMessageSize); 084 producer.setFinished(active); 085 producer.start(); 086 } 087 088 active.await(); 089 } finally { 090 if (conn != null) { 091 conn.close(); 092 } 093 } 094 } 095 096 public String getBrokerUrl() { 097 return brokerUrl; 098 } 099 100 public void setBrokerUrl(String brokerUrl) { 101 this.brokerUrl = brokerUrl; 102 } 103 104 public String getDestination() { 105 return destination; 106 } 107 108 public void setDestination(String destination) { 109 this.destination = destination; 110 } 111 112 public int getMessageCount() { 113 return messageCount; 114 } 115 116 public void setMessageCount(int messageCount) { 117 this.messageCount = messageCount; 118 } 119 120 public int getSleep() { 121 return sleep; 122 } 123 124 public void setSleep(int sleep) { 125 this.sleep = sleep; 126 } 127 128 public boolean isPersistent() { 129 return persistent; 130 } 131 132 public void setPersistent(boolean persistent) { 133 this.persistent = persistent; 134 } 135 136 public int getMessageSize() { 137 return messageSize; 138 } 139 140 public void setMessageSize(int messageSize) { 141 this.messageSize = messageSize; 142 } 143 144 public int getTextMessageSize() { 145 return textMessageSize; 146 } 147 148 public void setTextMessageSize(int textMessageSize) { 149 this.textMessageSize = textMessageSize; 150 } 151 152 public long getMsgTTL() { 153 return msgTTL; 154 } 155 156 public void setMsgTTL(long msgTTL) { 157 this.msgTTL = msgTTL; 158 } 159 160 public String getMsgGroupID() { 161 return msgGroupID; 162 } 163 164 public void setMsgGroupID(String msgGroupID) { 165 this.msgGroupID = msgGroupID; 166 } 167 168 public int getTransactionBatchSize() { 169 return transactionBatchSize; 170 } 171 172 public void setTransactionBatchSize(int transactionBatchSize) { 173 this.transactionBatchSize = transactionBatchSize; 174 } 175 176 public String getUser() { 177 return user; 178 } 179 180 public void setUser(String user) { 181 this.user = user; 182 } 183 184 public String getPassword() { 185 return password; 186 } 187 188 public void setPassword(String password) { 189 this.password = password; 190 } 191 192 public int getParallelThreads() { 193 return parallelThreads; 194 } 195 196 public void setParallelThreads(int parallelThreads) { 197 this.parallelThreads = parallelThreads; 198 } 199 200 public String getPayloadUrl() { 201 return payloadUrl; 202 } 203 204 public void setPayloadUrl(String payloadUrl) { 205 this.payloadUrl = payloadUrl; 206 } 207 208 public String getMessage() { 209 return message; 210 } 211 212 public void setMessage(String message) { 213 this.message = message; 214 } 215 216 @Override 217 protected void printHelp() { 218 printHelpFromFile(); 219 } 220 221 @Override 222 public String getName() { 223 return "producer"; 224 } 225 226 @Override 227 public String getOneLineDescription() { 228 return "Sends messages to the broker"; 229 } 230}