001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import org.apache.activemq.broker.BrokerService;
030import org.apache.activemq.broker.BrokerServiceAware;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQQueue;
034import org.apache.activemq.command.ActiveMQTopic;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.ProducerId;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.filter.AnyDestination;
040import org.apache.activemq.filter.DestinationMap;
041import org.apache.activemq.protobuf.Buffer;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.store.TransactionStore;
046import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
047import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
048import org.apache.activemq.usage.SystemUsage;
049import org.apache.activemq.util.IOHelper;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
056 * distribution of destinations across multiple kahaDB persistence adapters
057 *
058 * @org.apache.xbean.XBean element="mKahaDB"
059 */
060public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
061    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
062
063    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
064    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
065
066    BrokerService brokerService;
067    List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
068    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
069
070    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
071
072    // all local store transactions are XA, 2pc if more than one adapter involved
073    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
074        @Override
075        public KahaTransactionInfo transform(TransactionId txid) {
076            if (txid == null) {
077                return null;
078            }
079            KahaTransactionInfo rc = new KahaTransactionInfo();
080            KahaXATransactionId kahaTxId = new KahaXATransactionId();
081            if (txid.isLocalTransaction()) {
082                LocalTransactionId t = (LocalTransactionId) txid;
083                kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
084                kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
085                kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
086            } else {
087                XATransactionId t = (XATransactionId) txid;
088                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
089                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
090                kahaTxId.setFormatId(t.getFormatId());
091            }
092            rc.setXaTransacitonId(kahaTxId);
093            return rc;
094        }
095    };
096
097    /**
098     * Sets the  FilteredKahaDBPersistenceAdapter entries
099     *
100     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
101     */
102    @SuppressWarnings({ "rawtypes", "unchecked" })
103    public void setFilteredPersistenceAdapters(List entries) {
104        for (Object entry : entries) {
105            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
106            KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
107            if (filteredAdapter.getDestination() == null) {
108                filteredAdapter.setDestination(matchAll);
109            }
110
111            if (filteredAdapter.isPerDestination()) {
112                configureDirectory(adapter, null);
113                // per destination adapters will be created on demand or during recovery
114                continue;
115            } else {
116                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
117            }
118
119            configureAdapter(adapter);
120            adapters.add(adapter);
121        }
122        super.setEntries(entries);
123    }
124
125    private String nameFromDestinationFilter(ActiveMQDestination destination) {
126        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
127    }
128
129    public boolean isLocalXid(TransactionId xid) {
130        return xid instanceof XATransactionId &&
131                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
132    }
133
134    public void beginTransaction(ConnectionContext context) throws IOException {
135        throw new IllegalStateException();
136    }
137
138    public void checkpoint(final boolean sync) throws IOException {
139        for (PersistenceAdapter persistenceAdapter : adapters) {
140            persistenceAdapter.checkpoint(sync);
141        }
142    }
143
144    public void commitTransaction(ConnectionContext context) throws IOException {
145        throw new IllegalStateException();
146    }
147
148    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
149        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
150        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
151    }
152
153    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
154        Object result = this.chooseValue(destination);
155        if (result == null) {
156            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
157        }
158        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
159        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
160            result = addAdapter(filteredAdapter, destination);
161            startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
162            if (LOG.isTraceEnabled()) {
163                LOG.info("created per destination adapter for: " + destination  + ", " + result);
164            }
165        }
166        return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
167    }
168
169    private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
170        try {
171            kahaDBPersistenceAdapter.start();
172        } catch (Exception e) {
173            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
174            LOG.error(detail.toString(), e);
175            throw detail;
176        }
177    }
178
179    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
180        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
181        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
182    }
183
184    public TransactionStore createTransactionStore() throws IOException {
185        return transactionStore;
186    }
187
188    public void deleteAllMessages() throws IOException {
189        for (PersistenceAdapter persistenceAdapter : adapters) {
190            persistenceAdapter.deleteAllMessages();
191        }
192        transactionStore.deleteAllMessages();
193        IOHelper.deleteChildren(getDirectory());
194    }
195
196    public Set<ActiveMQDestination> getDestinations() {
197        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
198        for (PersistenceAdapter persistenceAdapter : adapters) {
199            results.addAll(persistenceAdapter.getDestinations());
200        }
201        return results;
202    }
203
204    public long getLastMessageBrokerSequenceId() throws IOException {
205        long maxId = -1;
206        for (PersistenceAdapter persistenceAdapter : adapters) {
207            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
208        }
209        return maxId;
210    }
211
212    public long getLastProducerSequenceId(ProducerId id) throws IOException {
213        long maxId = -1;
214        for (PersistenceAdapter persistenceAdapter : adapters) {
215            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
216        }
217        return maxId;
218    }
219
220    public void removeQueueMessageStore(ActiveMQQueue destination) {
221        getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
222    }
223
224    public void removeTopicMessageStore(ActiveMQTopic destination) {
225        getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
226    }
227
228    public void rollbackTransaction(ConnectionContext context) throws IOException {
229        throw new IllegalStateException();
230    }
231
232    public void setBrokerName(String brokerName) {
233        for (PersistenceAdapter persistenceAdapter : adapters) {
234            persistenceAdapter.setBrokerName(brokerName);
235        }
236    }
237
238    public void setUsageManager(SystemUsage usageManager) {
239        for (PersistenceAdapter persistenceAdapter : adapters) {
240            persistenceAdapter.setUsageManager(usageManager);
241        }
242    }
243
244    public long size() {
245        long size = 0;
246        for (PersistenceAdapter persistenceAdapter : adapters) {
247            size += persistenceAdapter.size();
248        }
249        return size;
250    }
251
252    public void start() throws Exception {
253        Object result = this.chooseValue(matchAll);
254        if (result != null) {
255            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
256            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
257                findAndRegisterExistingAdapters(filteredAdapter);
258            }
259        }
260        for (PersistenceAdapter persistenceAdapter : adapters) {
261            persistenceAdapter.start();
262        }
263    }
264
265    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
266        FileFilter destinationNames = new FileFilter() {
267            @Override
268            public boolean accept(File file) {
269                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
270            }
271        };
272        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
273        if (candidates != null) {
274            for (File candidate : candidates) {
275                registerExistingAdapter(template, candidate);
276            }
277        }
278    }
279
280    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
281        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
282        startAdapter(adapter, candidate.getName());
283        registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
284    }
285
286    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
287        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
288        return registerAdapter(adapter, destination);
289    }
290
291    private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
292        KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
293        configureAdapter(adapter);
294        configureDirectory(adapter, destinationName);
295        return adapter;
296    }
297
298    private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
299        File directory = null;
300        if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
301            // not set so inherit from mkahadb
302            directory = getDirectory();
303        } else {
304            directory = adapter.getDirectory();
305        }
306        if (fileName != null) {
307            directory = new File(directory, fileName);
308        }
309        adapter.setDirectory(directory);
310    }
311
312    private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
313        adapters.add(adapter);
314        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
315        put(destination, result);
316        return result;
317    }
318
319    private void configureAdapter(KahaDBPersistenceAdapter adapter) {
320        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
321        adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
322        adapter.setBrokerService(getBrokerService());
323    }
324
325    private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
326        Map<String, Object> configuration = new HashMap<String, Object>();
327        IntrospectionSupport.getProperties(template, configuration, null);
328        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
329        IntrospectionSupport.setProperties(adapter, configuration);
330        return adapter;
331    }
332
333    public void stop() throws Exception {
334        for (PersistenceAdapter persistenceAdapter : adapters) {
335            persistenceAdapter.stop();
336        }
337    }
338
339    public File getDirectory() {
340        return this.directory;
341    }
342
343    @Override
344    public void setDirectory(File directory) {
345        this.directory = directory;
346    }
347
348    public void setBrokerService(BrokerService brokerService) {
349        for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
350            persistenceAdapter.setBrokerService(brokerService);
351        }
352        this.brokerService = brokerService;
353    }
354
355    public BrokerService getBrokerService() {
356        return brokerService;
357    }
358
359    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
360        this.transactionStore = transactionStore;
361    }
362
363    /**
364     * Set the max file length of the transaction journal
365     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
366     * be used
367     *
368     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
369     */
370    public void setJournalMaxFileLength(int maxFileLength) {
371        transactionStore.setJournalMaxFileLength(maxFileLength);
372    }
373
374    public int getJournalMaxFileLength() {
375        return transactionStore.getJournalMaxFileLength();
376    }
377
378    /**
379     * Set the max write batch size of  the transaction journal
380     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
381     * be used
382     *
383     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
384     */
385    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
386        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
387    }
388
389    public int getJournalWriteBatchSize() {
390        return transactionStore.getJournalMaxWriteBatchSize();
391    }
392
393    @Override
394    public String toString() {
395        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
396        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
397    }
398
399}