001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.beans.Transient; 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.zip.DeflaterOutputStream; 029 030import javax.jms.JMSException; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.advisory.AdvisorySupport; 034import org.apache.activemq.broker.region.MessageReference; 035import org.apache.activemq.usage.MemoryUsage; 036import org.apache.activemq.util.ByteArrayInputStream; 037import org.apache.activemq.util.ByteArrayOutputStream; 038import org.apache.activemq.util.ByteSequence; 039import org.apache.activemq.util.MarshallingSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.fusesource.hawtbuf.UTF8Buffer; 042 043/** 044 * Represents an ActiveMQ message 045 * 046 * @openwire:marshaller 047 * 048 */ 049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 050 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 051 052 /** 053 * The default minimum amount of memory a message is assumed to use 054 */ 055 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 056 057 protected MessageId messageId; 058 protected ActiveMQDestination originalDestination; 059 protected TransactionId originalTransactionId; 060 061 protected ProducerId producerId; 062 protected ActiveMQDestination destination; 063 protected TransactionId transactionId; 064 065 protected long expiration; 066 protected long timestamp; 067 protected long arrival; 068 protected long brokerInTime; 069 protected long brokerOutTime; 070 protected String correlationId; 071 protected ActiveMQDestination replyTo; 072 protected boolean persistent; 073 protected String type; 074 protected byte priority; 075 protected String groupID; 076 protected int groupSequence; 077 protected ConsumerId targetConsumerId; 078 protected boolean compressed; 079 protected String userID; 080 081 protected ByteSequence content; 082 protected ByteSequence marshalledProperties; 083 protected DataStructure dataStructure; 084 protected int redeliveryCounter; 085 086 protected int size; 087 protected Map<String, Object> properties; 088 protected boolean readOnlyProperties; 089 protected boolean readOnlyBody; 090 protected transient boolean recievedByDFBridge; 091 protected boolean droppable; 092 protected boolean jmsXGroupFirstForConsumer; 093 094 private transient short referenceCount; 095 private transient ActiveMQConnection connection; 096 transient MessageDestination regionDestination; 097 transient MemoryUsage memoryUsage; 098 transient AtomicBoolean processAsExpired = new AtomicBoolean(false); 099 100 private BrokerId[] brokerPath; 101 private BrokerId[] cluster; 102 103 public static interface MessageDestination { 104 int getMinimumMessageSize(); 105 MemoryUsage getMemoryUsage(); 106 } 107 108 public abstract Message copy(); 109 public abstract void clearBody() throws JMSException; 110 public abstract void storeContent(); 111 public abstract void storeContentAndClear(); 112 113 /** 114 * @deprecated - This method name is misnamed 115 * @throws JMSException 116 */ 117 public void clearMarshalledState() throws JMSException { 118 clearUnMarshalledState(); 119 } 120 121 // useful to reduce the memory footprint of a persisted message 122 public void clearUnMarshalledState() throws JMSException { 123 properties = null; 124 } 125 126 public boolean isMarshalled() { 127 return content != null && (marshalledProperties != null || properties == null); 128 } 129 130 protected void copy(Message copy) { 131 super.copy(copy); 132 copy.producerId = producerId; 133 copy.transactionId = transactionId; 134 copy.destination = destination; 135 copy.messageId = messageId != null ? messageId.copy() : null; 136 copy.originalDestination = originalDestination; 137 copy.originalTransactionId = originalTransactionId; 138 copy.expiration = expiration; 139 copy.timestamp = timestamp; 140 copy.correlationId = correlationId; 141 copy.replyTo = replyTo; 142 copy.persistent = persistent; 143 copy.redeliveryCounter = redeliveryCounter; 144 copy.type = type; 145 copy.priority = priority; 146 copy.size = size; 147 copy.groupID = groupID; 148 copy.userID = userID; 149 copy.groupSequence = groupSequence; 150 151 if (properties != null) { 152 copy.properties = new HashMap<String, Object>(properties); 153 154 // The new message hasn't expired, so remove this feild. 155 copy.properties.remove(ORIGINAL_EXPIRATION); 156 } else { 157 copy.properties = properties; 158 } 159 160 copy.content = copyByteSequence(content); 161 copy.marshalledProperties = copyByteSequence(marshalledProperties); 162 copy.dataStructure = dataStructure; 163 copy.readOnlyProperties = readOnlyProperties; 164 copy.readOnlyBody = readOnlyBody; 165 copy.compressed = compressed; 166 copy.recievedByDFBridge = recievedByDFBridge; 167 168 copy.arrival = arrival; 169 copy.connection = connection; 170 copy.regionDestination = regionDestination; 171 copy.brokerInTime = brokerInTime; 172 copy.brokerOutTime = brokerOutTime; 173 copy.memoryUsage=this.memoryUsage; 174 copy.brokerPath = brokerPath; 175 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 176 177 // lets not copy the following fields 178 // copy.targetConsumerId = targetConsumerId; 179 // copy.referenceCount = referenceCount; 180 } 181 182 private ByteSequence copyByteSequence(ByteSequence content) { 183 if (content != null) { 184 return new ByteSequence(content.getData(), content.getOffset(), content.getLength()); 185 } 186 return null; 187 } 188 189 public Object getProperty(String name) throws IOException { 190 if (properties == null) { 191 if (marshalledProperties == null) { 192 return null; 193 } 194 properties = unmarsallProperties(marshalledProperties); 195 } 196 Object result = properties.get(name); 197 if (result instanceof UTF8Buffer) { 198 result = result.toString(); 199 } 200 201 return result; 202 } 203 204 @SuppressWarnings("unchecked") 205 public Map<String, Object> getProperties() throws IOException { 206 if (properties == null) { 207 if (marshalledProperties == null) { 208 return Collections.EMPTY_MAP; 209 } 210 properties = unmarsallProperties(marshalledProperties); 211 } 212 return Collections.unmodifiableMap(properties); 213 } 214 215 public void clearProperties() { 216 marshalledProperties = null; 217 properties = null; 218 } 219 220 public void setProperty(String name, Object value) throws IOException { 221 lazyCreateProperties(); 222 properties.put(name, value); 223 } 224 225 public void removeProperty(String name) throws IOException { 226 lazyCreateProperties(); 227 properties.remove(name); 228 } 229 230 protected void lazyCreateProperties() throws IOException { 231 if (properties == null) { 232 if (marshalledProperties == null) { 233 properties = new HashMap<String, Object>(); 234 } else { 235 properties = unmarsallProperties(marshalledProperties); 236 marshalledProperties = null; 237 } 238 } else { 239 marshalledProperties = null; 240 } 241 } 242 243 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 244 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 245 } 246 247 @Override 248 public void beforeMarshall(WireFormat wireFormat) throws IOException { 249 // Need to marshal the properties. 250 if (marshalledProperties == null && properties != null) { 251 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 252 DataOutputStream os = new DataOutputStream(baos); 253 MarshallingSupport.marshalPrimitiveMap(properties, os); 254 os.close(); 255 marshalledProperties = baos.toByteSequence(); 256 } 257 } 258 259 @Override 260 public void afterMarshall(WireFormat wireFormat) throws IOException { 261 } 262 263 @Override 264 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 265 } 266 267 @Override 268 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 269 } 270 271 // ///////////////////////////////////////////////////////////////// 272 // 273 // Simple Field accessors 274 // 275 // ///////////////////////////////////////////////////////////////// 276 277 /** 278 * @openwire:property version=1 cache=true 279 */ 280 public ProducerId getProducerId() { 281 return producerId; 282 } 283 284 public void setProducerId(ProducerId producerId) { 285 this.producerId = producerId; 286 } 287 288 /** 289 * @openwire:property version=1 cache=true 290 */ 291 public ActiveMQDestination getDestination() { 292 return destination; 293 } 294 295 public void setDestination(ActiveMQDestination destination) { 296 this.destination = destination; 297 } 298 299 /** 300 * @openwire:property version=1 cache=true 301 */ 302 public TransactionId getTransactionId() { 303 return transactionId; 304 } 305 306 public void setTransactionId(TransactionId transactionId) { 307 this.transactionId = transactionId; 308 } 309 310 public boolean isInTransaction() { 311 return transactionId != null; 312 } 313 314 /** 315 * @openwire:property version=1 cache=true 316 */ 317 public ActiveMQDestination getOriginalDestination() { 318 return originalDestination; 319 } 320 321 public void setOriginalDestination(ActiveMQDestination destination) { 322 this.originalDestination = destination; 323 } 324 325 /** 326 * @openwire:property version=1 327 */ 328 @Override 329 public MessageId getMessageId() { 330 return messageId; 331 } 332 333 public void setMessageId(MessageId messageId) { 334 this.messageId = messageId; 335 } 336 337 /** 338 * @openwire:property version=1 cache=true 339 */ 340 public TransactionId getOriginalTransactionId() { 341 return originalTransactionId; 342 } 343 344 public void setOriginalTransactionId(TransactionId transactionId) { 345 this.originalTransactionId = transactionId; 346 } 347 348 /** 349 * @openwire:property version=1 350 */ 351 @Override 352 public String getGroupID() { 353 return groupID; 354 } 355 356 public void setGroupID(String groupID) { 357 this.groupID = groupID; 358 } 359 360 /** 361 * @openwire:property version=1 362 */ 363 @Override 364 public int getGroupSequence() { 365 return groupSequence; 366 } 367 368 public void setGroupSequence(int groupSequence) { 369 this.groupSequence = groupSequence; 370 } 371 372 /** 373 * @openwire:property version=1 374 */ 375 public String getCorrelationId() { 376 return correlationId; 377 } 378 379 public void setCorrelationId(String correlationId) { 380 this.correlationId = correlationId; 381 } 382 383 /** 384 * @openwire:property version=1 385 */ 386 @Override 387 public boolean isPersistent() { 388 return persistent; 389 } 390 391 public void setPersistent(boolean deliveryMode) { 392 this.persistent = deliveryMode; 393 } 394 395 /** 396 * @openwire:property version=1 397 */ 398 @Override 399 public long getExpiration() { 400 return expiration; 401 } 402 403 public void setExpiration(long expiration) { 404 this.expiration = expiration; 405 } 406 407 /** 408 * @openwire:property version=1 409 */ 410 public byte getPriority() { 411 return priority; 412 } 413 414 public void setPriority(byte priority) { 415 if (priority < 0) { 416 this.priority = 0; 417 } else if (priority > 9) { 418 this.priority = 9; 419 } else { 420 this.priority = priority; 421 } 422 } 423 424 /** 425 * @openwire:property version=1 426 */ 427 public ActiveMQDestination getReplyTo() { 428 return replyTo; 429 } 430 431 public void setReplyTo(ActiveMQDestination replyTo) { 432 this.replyTo = replyTo; 433 } 434 435 /** 436 * @openwire:property version=1 437 */ 438 public long getTimestamp() { 439 return timestamp; 440 } 441 442 public void setTimestamp(long timestamp) { 443 this.timestamp = timestamp; 444 } 445 446 /** 447 * @openwire:property version=1 448 */ 449 public String getType() { 450 return type; 451 } 452 453 public void setType(String type) { 454 this.type = type; 455 } 456 457 /** 458 * @openwire:property version=1 459 */ 460 public ByteSequence getContent() { 461 return content; 462 } 463 464 public void setContent(ByteSequence content) { 465 this.content = content; 466 } 467 468 /** 469 * @openwire:property version=1 470 */ 471 public ByteSequence getMarshalledProperties() { 472 return marshalledProperties; 473 } 474 475 public void setMarshalledProperties(ByteSequence marshalledProperties) { 476 this.marshalledProperties = marshalledProperties; 477 } 478 479 /** 480 * @openwire:property version=1 481 */ 482 public DataStructure getDataStructure() { 483 return dataStructure; 484 } 485 486 public void setDataStructure(DataStructure data) { 487 this.dataStructure = data; 488 } 489 490 /** 491 * Can be used to route the message to a specific consumer. Should be null 492 * to allow the broker use normal JMS routing semantics. If the target 493 * consumer id is an active consumer on the broker, the message is dropped. 494 * Used by the AdvisoryBroker to replay advisory messages to a specific 495 * consumer. 496 * 497 * @openwire:property version=1 cache=true 498 */ 499 @Override 500 public ConsumerId getTargetConsumerId() { 501 return targetConsumerId; 502 } 503 504 public void setTargetConsumerId(ConsumerId targetConsumerId) { 505 this.targetConsumerId = targetConsumerId; 506 } 507 508 @Override 509 public boolean isExpired() { 510 long expireTime = getExpiration(); 511 return expireTime > 0 && System.currentTimeMillis() > expireTime; 512 } 513 514 @Override 515 public boolean isAdvisory() { 516 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 517 } 518 519 /** 520 * @openwire:property version=1 521 */ 522 public boolean isCompressed() { 523 return compressed; 524 } 525 526 public void setCompressed(boolean compressed) { 527 this.compressed = compressed; 528 } 529 530 public boolean isRedelivered() { 531 return redeliveryCounter > 0; 532 } 533 534 public void setRedelivered(boolean redelivered) { 535 if (redelivered) { 536 if (!isRedelivered()) { 537 setRedeliveryCounter(1); 538 } 539 } else { 540 if (isRedelivered()) { 541 setRedeliveryCounter(0); 542 } 543 } 544 } 545 546 @Override 547 public void incrementRedeliveryCounter() { 548 redeliveryCounter++; 549 } 550 551 /** 552 * @openwire:property version=1 553 */ 554 @Override 555 public int getRedeliveryCounter() { 556 return redeliveryCounter; 557 } 558 559 public void setRedeliveryCounter(int deliveryCounter) { 560 this.redeliveryCounter = deliveryCounter; 561 } 562 563 /** 564 * The route of brokers the command has moved through. 565 * 566 * @openwire:property version=1 cache=true 567 */ 568 public BrokerId[] getBrokerPath() { 569 return brokerPath; 570 } 571 572 public void setBrokerPath(BrokerId[] brokerPath) { 573 this.brokerPath = brokerPath; 574 } 575 576 public boolean isReadOnlyProperties() { 577 return readOnlyProperties; 578 } 579 580 public void setReadOnlyProperties(boolean readOnlyProperties) { 581 this.readOnlyProperties = readOnlyProperties; 582 } 583 584 public boolean isReadOnlyBody() { 585 return readOnlyBody; 586 } 587 588 public void setReadOnlyBody(boolean readOnlyBody) { 589 this.readOnlyBody = readOnlyBody; 590 } 591 592 public ActiveMQConnection getConnection() { 593 return this.connection; 594 } 595 596 public void setConnection(ActiveMQConnection connection) { 597 this.connection = connection; 598 } 599 600 /** 601 * Used to schedule the arrival time of a message to a broker. The broker 602 * will not dispatch a message to a consumer until it's arrival time has 603 * elapsed. 604 * 605 * @openwire:property version=1 606 */ 607 public long getArrival() { 608 return arrival; 609 } 610 611 public void setArrival(long arrival) { 612 this.arrival = arrival; 613 } 614 615 /** 616 * Only set by the broker and defines the userID of the producer connection 617 * who sent this message. This is an optional field, it needs to be enabled 618 * on the broker to have this field populated. 619 * 620 * @openwire:property version=1 621 */ 622 public String getUserID() { 623 return userID; 624 } 625 626 public void setUserID(String jmsxUserID) { 627 this.userID = jmsxUserID; 628 } 629 630 @Override 631 public int getReferenceCount() { 632 return referenceCount; 633 } 634 635 @Override 636 public Message getMessageHardRef() { 637 return this; 638 } 639 640 @Override 641 public Message getMessage() { 642 return this; 643 } 644 645 public void setRegionDestination(MessageDestination destination) { 646 this.regionDestination = destination; 647 if(this.memoryUsage==null) { 648 this.memoryUsage=destination.getMemoryUsage(); 649 } 650 } 651 652 @Override 653 @Transient 654 public MessageDestination getRegionDestination() { 655 return regionDestination; 656 } 657 658 public MemoryUsage getMemoryUsage() { 659 return this.memoryUsage; 660 } 661 662 public void setMemoryUsage(MemoryUsage usage) { 663 this.memoryUsage=usage; 664 } 665 666 @Override 667 public boolean isMarshallAware() { 668 return true; 669 } 670 671 @Override 672 public int incrementReferenceCount() { 673 int rc; 674 int size; 675 synchronized (this) { 676 rc = ++referenceCount; 677 size = getSize(); 678 } 679 680 if (rc == 1 && getMemoryUsage() != null) { 681 getMemoryUsage().increaseUsage(size); 682 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 683 684 } 685 686 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 687 return rc; 688 } 689 690 @Override 691 public int decrementReferenceCount() { 692 int rc; 693 int size; 694 synchronized (this) { 695 rc = --referenceCount; 696 size = getSize(); 697 } 698 699 if (rc == 0 && getMemoryUsage() != null) { 700 getMemoryUsage().decreaseUsage(size); 701 //Thread.dumpStack(); 702 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 703 } 704 705 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 706 707 return rc; 708 } 709 710 @Override 711 public int getSize() { 712 int minimumMessageSize = getMinimumMessageSize(); 713 if (size < minimumMessageSize || size == 0) { 714 size = minimumMessageSize; 715 if (marshalledProperties != null) { 716 size += marshalledProperties.getLength(); 717 } 718 if (content != null) { 719 size += content.getLength(); 720 } 721 } 722 return size; 723 } 724 725 protected int getMinimumMessageSize() { 726 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 727 //let destination override 728 MessageDestination dest = regionDestination; 729 if (dest != null) { 730 result=dest.getMinimumMessageSize(); 731 } 732 return result; 733 } 734 735 /** 736 * @openwire:property version=1 737 * @return Returns the recievedByDFBridge. 738 */ 739 public boolean isRecievedByDFBridge() { 740 return recievedByDFBridge; 741 } 742 743 /** 744 * @param recievedByDFBridge The recievedByDFBridge to set. 745 */ 746 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 747 this.recievedByDFBridge = recievedByDFBridge; 748 } 749 750 public void onMessageRolledBack() { 751 incrementRedeliveryCounter(); 752 } 753 754 /** 755 * @openwire:property version=2 cache=true 756 */ 757 public boolean isDroppable() { 758 return droppable; 759 } 760 761 public void setDroppable(boolean droppable) { 762 this.droppable = droppable; 763 } 764 765 /** 766 * If a message is stored in multiple nodes on a cluster, all the cluster 767 * members will be listed here. Otherwise, it will be null. 768 * 769 * @openwire:property version=3 cache=true 770 */ 771 public BrokerId[] getCluster() { 772 return cluster; 773 } 774 775 public void setCluster(BrokerId[] cluster) { 776 this.cluster = cluster; 777 } 778 779 @Override 780 public boolean isMessage() { 781 return true; 782 } 783 784 /** 785 * @openwire:property version=3 786 */ 787 public long getBrokerInTime() { 788 return this.brokerInTime; 789 } 790 791 public void setBrokerInTime(long brokerInTime) { 792 this.brokerInTime = brokerInTime; 793 } 794 795 /** 796 * @openwire:property version=3 797 */ 798 public long getBrokerOutTime() { 799 return this.brokerOutTime; 800 } 801 802 public void setBrokerOutTime(long brokerOutTime) { 803 this.brokerOutTime = brokerOutTime; 804 } 805 806 @Override 807 public boolean isDropped() { 808 return false; 809 } 810 811 /** 812 * @openwire:property version=10 813 */ 814 public boolean isJMSXGroupFirstForConsumer() { 815 return jmsXGroupFirstForConsumer; 816 } 817 818 public void setJMSXGroupFirstForConsumer(boolean val) { 819 jmsXGroupFirstForConsumer = val; 820 } 821 822 public void compress() throws IOException { 823 if (!isCompressed()) { 824 storeContent(); 825 if (!isCompressed() && getContent() != null) { 826 doCompress(); 827 } 828 } 829 } 830 831 protected void doCompress() throws IOException { 832 compressed = true; 833 ByteSequence bytes = getContent(); 834 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 835 OutputStream os = new DeflaterOutputStream(bytesOut); 836 os.write(bytes.data, bytes.offset, bytes.length); 837 os.close(); 838 setContent(bytesOut.toByteSequence()); 839 } 840 841 @Override 842 public String toString() { 843 return toString(null); 844 } 845 846 @Override 847 public String toString(Map<String, Object>overrideFields) { 848 try { 849 getProperties(); 850 } catch (IOException e) { 851 } 852 return super.toString(overrideFields); 853 } 854 855 @Override 856 public boolean canProcessAsExpired() { 857 return processAsExpired.compareAndSet(false, true); 858 } 859}