001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transaction; 018 019import java.io.IOException; 020import javax.transaction.xa.XAException; 021import javax.transaction.xa.XAResource; 022import org.apache.activemq.TransactionContext; 023import org.apache.activemq.broker.TransactionBroker; 024import org.apache.activemq.command.ConnectionId; 025import org.apache.activemq.command.TransactionId; 026import org.apache.activemq.command.XATransactionId; 027import org.apache.activemq.store.TransactionStore; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * 033 */ 034public class XATransaction extends Transaction { 035 036 private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class); 037 038 private final TransactionStore transactionStore; 039 private final XATransactionId xid; 040 private final TransactionBroker broker; 041 private final ConnectionId connectionId; 042 043 public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) { 044 this.transactionStore = transactionStore; 045 this.xid = xid; 046 this.broker = broker; 047 this.connectionId = connectionId; 048 if (LOG.isDebugEnabled()) { 049 LOG.debug("XA Transaction new/begin : " + xid); 050 } 051 } 052 053 @Override 054 public void commit(boolean onePhase) throws XAException, IOException { 055 if (LOG.isDebugEnabled()) { 056 LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid); 057 } 058 059 switch (getState()) { 060 case START_STATE: 061 // 1 phase commit, no work done. 062 checkForPreparedState(onePhase); 063 setStateFinished(); 064 break; 065 case IN_USE_STATE: 066 // 1 phase commit, work done. 067 checkForPreparedState(onePhase); 068 doPrePrepare(); 069 setStateFinished(); 070 storeCommit(getTransactionId(), false, preCommitTask, postCommitTask); 071 break; 072 case PREPARED_STATE: 073 // 2 phase commit, work done. 074 // We would record commit here. 075 setStateFinished(); 076 storeCommit(getTransactionId(), true, preCommitTask, postCommitTask); 077 break; 078 default: 079 illegalStateTransition("commit"); 080 } 081 } 082 083 private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) 084 throws XAException, IOException { 085 try { 086 transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask); 087 waitPostCommitDone(postCommitTask); 088 } catch (XAException xae) { 089 throw xae; 090 } catch (Throwable t) { 091 LOG.warn("Store COMMIT FAILED: ", t); 092 rollback(); 093 XAException xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBOTHER); 094 xae.initCause(t); 095 throw xae; 096 } 097 } 098 099 private void illegalStateTransition(String callName) throws XAException { 100 XAException xae = newXAException("Cannot call " + callName + " now.", XAException.XAER_PROTO); 101 throw xae; 102 } 103 104 private void checkForPreparedState(boolean onePhase) throws XAException { 105 if (!onePhase) { 106 XAException xae = newXAException("Cannot do 2 phase commit if the transaction has not been prepared", XAException.XAER_PROTO); 107 throw xae; 108 } 109 } 110 111 private void doPrePrepare() throws XAException, IOException { 112 try { 113 prePrepare(); 114 } catch (XAException e) { 115 throw e; 116 } catch (Throwable e) { 117 LOG.warn("PRE-PREPARE FAILED: ", e); 118 rollback(); 119 XAException xae = newXAException("PRE-PREPARE FAILED: Transaction rolled back", XAException.XA_RBOTHER); 120 xae.initCause(e); 121 throw xae; 122 } 123 } 124 125 @Override 126 public void rollback() throws XAException, IOException { 127 128 if (LOG.isDebugEnabled()) { 129 LOG.debug("XA Transaction rollback: " + xid); 130 } 131 132 switch (getState()) { 133 case START_STATE: 134 // 1 phase rollback no work done. 135 setStateFinished(); 136 break; 137 case IN_USE_STATE: 138 // 1 phase rollback work done. 139 setStateFinished(); 140 transactionStore.rollback(getTransactionId()); 141 doPostRollback(); 142 break; 143 case PREPARED_STATE: 144 // 2 phase rollback work done. 145 setStateFinished(); 146 transactionStore.rollback(getTransactionId()); 147 doPostRollback(); 148 break; 149 case FINISHED_STATE: 150 // failure to commit 151 transactionStore.rollback(getTransactionId()); 152 doPostRollback(); 153 break; 154 default: 155 throw newXAException("Invalid state: " + getState(), XAException.XA_RBPROTO); 156 } 157 158 } 159 160 private void doPostRollback() throws XAException { 161 try { 162 fireAfterRollback(); 163 } catch (Throwable e) { 164 // I guess this could happen. Post commit task failed 165 // to execute properly. 166 LOG.warn("POST ROLLBACK FAILED: ", e); 167 XAException xae = newXAException("POST ROLLBACK FAILED", XAException.XAER_RMERR); 168 xae.initCause(e); 169 throw xae; 170 } 171 } 172 173 @Override 174 public int prepare() throws XAException, IOException { 175 if (LOG.isDebugEnabled()) { 176 LOG.debug("XA Transaction prepare: " + xid); 177 } 178 179 switch (getState()) { 180 case START_STATE: 181 // No work done.. no commit/rollback needed. 182 setStateFinished(); 183 return XAResource.XA_RDONLY; 184 case IN_USE_STATE: 185 // We would record prepare here. 186 doPrePrepare(); 187 setState(Transaction.PREPARED_STATE); 188 transactionStore.prepare(getTransactionId()); 189 return XAResource.XA_OK; 190 default: 191 illegalStateTransition("prepare"); 192 return XAResource.XA_RDONLY; 193 } 194 } 195 196 private void setStateFinished() { 197 setState(Transaction.FINISHED_STATE); 198 broker.removeTransaction(xid); 199 } 200 201 public ConnectionId getConnectionId() { 202 return connectionId; 203 } 204 205 @Override 206 public TransactionId getTransactionId() { 207 return xid; 208 } 209 210 @Override 211 public Logger getLog() { 212 return LOG; 213 } 214 215 public XATransactionId getXid() { 216 return xid; 217 } 218}