001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Set;
024import java.util.concurrent.Callable;
025
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.LockableServiceSupport;
031import org.apache.activemq.broker.Locker;
032import org.apache.activemq.broker.jmx.AnnotatedMBean;
033import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ActiveMQQueue;
037import org.apache.activemq.command.ActiveMQTopic;
038import org.apache.activemq.command.LocalTransactionId;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.protobuf.Buffer;
043import org.apache.activemq.store.JournaledStore;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.NoLocalSubscriptionAware;
046import org.apache.activemq.store.PersistenceAdapter;
047import org.apache.activemq.store.SharedFileLocker;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionIdTransformer;
050import org.apache.activemq.store.TransactionIdTransformerAware;
051import org.apache.activemq.store.TransactionStore;
052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
054import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
055import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
056import org.apache.activemq.usage.SystemUsage;
057import org.apache.activemq.util.ServiceStopper;
058
059/**
060 * An implementation of {@link PersistenceAdapter} designed for use with
061 * KahaDB - Embedded Lightweight Non-Relational Database
062 *
063 * @org.apache.xbean.XBean element="kahaDB"
064 *
065 */
066public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
067    JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware {
068
069    private final KahaDBStore letter = new KahaDBStore();
070
071    /**
072     * @param context
073     * @throws IOException
074     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
075     */
076    @Override
077    public void beginTransaction(ConnectionContext context) throws IOException {
078        this.letter.beginTransaction(context);
079    }
080
081    /**
082     * @param sync
083     * @throws IOException
084     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
085     */
086    @Override
087    public void checkpoint(boolean sync) throws IOException {
088        this.letter.checkpoint(sync);
089    }
090
091    /**
092     * @param context
093     * @throws IOException
094     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
095     */
096    @Override
097    public void commitTransaction(ConnectionContext context) throws IOException {
098        this.letter.commitTransaction(context);
099    }
100
101    /**
102     * @param destination
103     * @return MessageStore
104     * @throws IOException
105     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
106     */
107    @Override
108    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
109        return this.letter.createQueueMessageStore(destination);
110    }
111
112    /**
113     * @param destination
114     * @return TopicMessageStore
115     * @throws IOException
116     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
117     */
118    @Override
119    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
120        return this.letter.createTopicMessageStore(destination);
121    }
122
123    /**
124     * @return TransactionStore
125     * @throws IOException
126     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
127     */
128    @Override
129    public TransactionStore createTransactionStore() throws IOException {
130        return this.letter.createTransactionStore();
131    }
132
133    /**
134     * @throws IOException
135     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
136     */
137    @Override
138    public void deleteAllMessages() throws IOException {
139        this.letter.deleteAllMessages();
140    }
141
142    /**
143     * @return destinations
144     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
145     */
146    @Override
147    public Set<ActiveMQDestination> getDestinations() {
148        return this.letter.getDestinations();
149    }
150
151    /**
152     * @return lastMessageBrokerSequenceId
153     * @throws IOException
154     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
155     */
156    @Override
157    public long getLastMessageBrokerSequenceId() throws IOException {
158        return this.letter.getLastMessageBrokerSequenceId();
159    }
160
161    @Override
162    public long getLastProducerSequenceId(ProducerId id) throws IOException {
163        return this.letter.getLastProducerSequenceId(id);
164    }
165
166    /**
167     * @param destination
168     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
169     */
170    @Override
171    public void removeQueueMessageStore(ActiveMQQueue destination) {
172        this.letter.removeQueueMessageStore(destination);
173    }
174
175    /**
176     * @param destination
177     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
178     */
179    @Override
180    public void removeTopicMessageStore(ActiveMQTopic destination) {
181        this.letter.removeTopicMessageStore(destination);
182    }
183
184    /**
185     * @param context
186     * @throws IOException
187     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
188     */
189    @Override
190    public void rollbackTransaction(ConnectionContext context) throws IOException {
191        this.letter.rollbackTransaction(context);
192    }
193
194    /**
195     * @param brokerName
196     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
197     */
198    @Override
199    public void setBrokerName(String brokerName) {
200        this.letter.setBrokerName(brokerName);
201    }
202
203    /**
204     * @param usageManager
205     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
206     */
207    @Override
208    public void setUsageManager(SystemUsage usageManager) {
209        this.letter.setUsageManager(usageManager);
210    }
211
212    /**
213     * @return the size of the store
214     * @see org.apache.activemq.store.PersistenceAdapter#size()
215     */
216    @Override
217    public long size() {
218        return this.letter.size();
219    }
220
221    /**
222     * @throws Exception
223     * @see org.apache.activemq.Service#start()
224     */
225    @Override
226    public void doStart() throws Exception {
227        this.letter.start();
228
229        if (brokerService != null && brokerService.isUseJmx()) {
230            PersistenceAdapterView view = new PersistenceAdapterView(this);
231            view.setInflightTransactionViewCallable(new Callable<String>() {
232                @Override
233                public String call() throws Exception {
234                    return letter.getTransactions();
235                }
236            });
237            view.setDataViewCallable(new Callable<String>() {
238                @Override
239                public String call() throws Exception {
240                    return letter.getJournal().getFileMap().keySet().toString();
241                }
242            });
243            AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
244                    createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
245        }
246    }
247
248    /**
249     * @throws Exception
250     * @see org.apache.activemq.Service#stop()
251     */
252    @Override
253    public void doStop(ServiceStopper stopper) throws Exception {
254        this.letter.stop();
255
256        if (brokerService != null && brokerService.isUseJmx()) {
257            ObjectName brokerObjectName = brokerService.getBrokerObjectName();
258            brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
259        }
260    }
261
262    /**
263     * Get the journalMaxFileLength
264     *
265     * @return the journalMaxFileLength
266     */
267    @Override
268    public int getJournalMaxFileLength() {
269        return this.letter.getJournalMaxFileLength();
270    }
271
272    /**
273     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
274     * be used
275     *
276     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
277     */
278    public void setJournalMaxFileLength(int journalMaxFileLength) {
279        this.letter.setJournalMaxFileLength(journalMaxFileLength);
280    }
281
282    /**
283     * Set the max number of producers (LRU cache) to track for duplicate sends
284     */
285    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
286        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
287    }
288
289    public int getMaxFailoverProducersToTrack() {
290        return this.letter.getMaxFailoverProducersToTrack();
291    }
292
293    /**
294     * set the audit window depth for duplicate suppression (should exceed the max transaction
295     * batch)
296     */
297    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
298        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
299    }
300
301    public int getFailoverProducersAuditDepth() {
302        return this.letter.getFailoverProducersAuditDepth();
303    }
304
305    /**
306     * Get the checkpointInterval
307     *
308     * @return the checkpointInterval
309     */
310    public long getCheckpointInterval() {
311        return this.letter.getCheckpointInterval();
312    }
313
314    /**
315     * Set the checkpointInterval
316     *
317     * @param checkpointInterval
318     *            the checkpointInterval to set
319     */
320    public void setCheckpointInterval(long checkpointInterval) {
321        this.letter.setCheckpointInterval(checkpointInterval);
322    }
323
324    /**
325     * Get the cleanupInterval
326     *
327     * @return the cleanupInterval
328     */
329    public long getCleanupInterval() {
330        return this.letter.getCleanupInterval();
331    }
332
333    /**
334     * Set the cleanupInterval
335     *
336     * @param cleanupInterval
337     *            the cleanupInterval to set
338     */
339    public void setCleanupInterval(long cleanupInterval) {
340        this.letter.setCleanupInterval(cleanupInterval);
341    }
342
343    /**
344     * Get the indexWriteBatchSize
345     *
346     * @return the indexWriteBatchSize
347     */
348    public int getIndexWriteBatchSize() {
349        return this.letter.getIndexWriteBatchSize();
350    }
351
352    /**
353     * Set the indexWriteBatchSize
354     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
355     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
356     * @param indexWriteBatchSize
357     *            the indexWriteBatchSize to set
358     */
359    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
360        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
361    }
362
363    /**
364     * Get the journalMaxWriteBatchSize
365     *
366     * @return the journalMaxWriteBatchSize
367     */
368    public int getJournalMaxWriteBatchSize() {
369        return this.letter.getJournalMaxWriteBatchSize();
370    }
371
372    /**
373     * Set the journalMaxWriteBatchSize
374     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
375     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
376     * @param journalMaxWriteBatchSize
377     *            the journalMaxWriteBatchSize to set
378     */
379    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
380        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
381    }
382
383    /**
384     * Get the enableIndexWriteAsync
385     *
386     * @return the enableIndexWriteAsync
387     */
388    public boolean isEnableIndexWriteAsync() {
389        return this.letter.isEnableIndexWriteAsync();
390    }
391
392    /**
393     * Set the enableIndexWriteAsync
394     *
395     * @param enableIndexWriteAsync
396     *            the enableIndexWriteAsync to set
397     */
398    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
399        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
400    }
401
402    /**
403     * Get the directory
404     *
405     * @return the directory
406     */
407    @Override
408    public File getDirectory() {
409        return this.letter.getDirectory();
410    }
411
412    /**
413     * @param dir
414     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
415     */
416    @Override
417    public void setDirectory(File dir) {
418        this.letter.setDirectory(dir);
419    }
420
421    /**
422     * @return the currently configured location of the KahaDB index files.
423     */
424    public File getIndexDirectory() {
425        return this.letter.getIndexDirectory();
426    }
427
428    /**
429     * Sets the directory where KahaDB index files should be written.
430     *
431     * @param indexDirectory
432     *        the directory where the KahaDB store index files should be written.
433     */
434    public void setIndexDirectory(File indexDirectory) {
435        this.letter.setIndexDirectory(indexDirectory);
436    }
437
438    /**
439     * Get the enableJournalDiskSyncs
440     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
441     * @return the enableJournalDiskSyncs
442     */
443    public boolean isEnableJournalDiskSyncs() {
444        return this.letter.isEnableJournalDiskSyncs();
445    }
446
447    /**
448     * Set the enableJournalDiskSyncs
449     *
450     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
451     * @param enableJournalDiskSyncs
452     *            the enableJournalDiskSyncs to set
453     */
454    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
455        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
456    }
457
458    /**
459     * @return
460     */
461    public String getJournalDiskSyncStrategy() {
462        return letter.getJournalDiskSyncStrategy();
463    }
464
465    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
466        return letter.getJournalDiskSyncStrategyEnum();
467    }
468
469    /**
470     * @param journalDiskSyncStrategy
471     */
472    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
473        letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
474    }
475
476    /**
477     * @return
478     */
479    public long getJournalDiskSyncInterval() {
480        return letter.getJournalDiskSyncInterval();
481    }
482
483    /**
484     * @param journalDiskSyncInterval
485     */
486    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
487        letter.setJournalDiskSyncInterval(journalDiskSyncInterval);
488    }
489
490    /**
491     * Get the indexCacheSize
492     *
493     * @return the indexCacheSize
494     */
495    public int getIndexCacheSize() {
496        return this.letter.getIndexCacheSize();
497    }
498
499    /**
500     * Set the indexCacheSize
501     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
502     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
503     * @param indexCacheSize
504     *            the indexCacheSize to set
505     */
506    public void setIndexCacheSize(int indexCacheSize) {
507        this.letter.setIndexCacheSize(indexCacheSize);
508    }
509
510    /**
511     * Get the ignoreMissingJournalfiles
512     *
513     * @return the ignoreMissingJournalfiles
514     */
515    public boolean isIgnoreMissingJournalfiles() {
516        return this.letter.isIgnoreMissingJournalfiles();
517    }
518
519    /**
520     * Set the ignoreMissingJournalfiles
521     *
522     * @param ignoreMissingJournalfiles
523     *            the ignoreMissingJournalfiles to set
524     */
525    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
526        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
527    }
528
529    public boolean isChecksumJournalFiles() {
530        return letter.isChecksumJournalFiles();
531    }
532
533    public boolean isCheckForCorruptJournalFiles() {
534        return letter.isCheckForCorruptJournalFiles();
535    }
536
537    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
538        letter.setChecksumJournalFiles(checksumJournalFiles);
539    }
540
541    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
542        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
543    }
544
545    @Override
546    public void setBrokerService(BrokerService brokerService) {
547        super.setBrokerService(brokerService);
548        letter.setBrokerService(brokerService);
549    }
550
551    public String getPreallocationScope() {
552        return letter.getPreallocationScope();
553    }
554
555    public void setPreallocationScope(String preallocationScope) {
556        this.letter.setPreallocationScope(preallocationScope);
557    }
558
559    public String getPreallocationStrategy() {
560        return letter.getPreallocationStrategy();
561    }
562
563    public void setPreallocationStrategy(String preallocationStrategy) {
564        this.letter.setPreallocationStrategy(preallocationStrategy);
565    }
566
567    public boolean isArchiveDataLogs() {
568        return letter.isArchiveDataLogs();
569    }
570
571    public void setArchiveDataLogs(boolean archiveDataLogs) {
572        letter.setArchiveDataLogs(archiveDataLogs);
573    }
574
575    public File getDirectoryArchive() {
576        return letter.getDirectoryArchive();
577    }
578
579    public void setDirectoryArchive(File directoryArchive) {
580        letter.setDirectoryArchive(directoryArchive);
581    }
582
583    public boolean isConcurrentStoreAndDispatchQueues() {
584        return letter.isConcurrentStoreAndDispatchQueues();
585    }
586
587    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
588        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
589    }
590
591    public boolean isConcurrentStoreAndDispatchTopics() {
592        return letter.isConcurrentStoreAndDispatchTopics();
593    }
594
595    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
596        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
597    }
598
599    public int getMaxAsyncJobs() {
600        return letter.getMaxAsyncJobs();
601    }
602    /**
603     * @param maxAsyncJobs
604     *            the maxAsyncJobs to set
605     */
606    public void setMaxAsyncJobs(int maxAsyncJobs) {
607        letter.setMaxAsyncJobs(maxAsyncJobs);
608    }
609
610    /**
611     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
612     *
613     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
614     */
615    @Deprecated
616    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
617       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
618    }
619
620    public boolean getForceRecoverIndex() {
621        return letter.getForceRecoverIndex();
622    }
623
624    public void setForceRecoverIndex(boolean forceRecoverIndex) {
625        letter.setForceRecoverIndex(forceRecoverIndex);
626    }
627
628    public boolean isArchiveCorruptedIndex() {
629        return letter.isArchiveCorruptedIndex();
630    }
631
632    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
633        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
634    }
635
636    public float getIndexLFUEvictionFactor() {
637        return letter.getIndexLFUEvictionFactor();
638    }
639
640    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
641        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
642    }
643
644    public boolean isUseIndexLFRUEviction() {
645        return letter.isUseIndexLFRUEviction();
646    }
647
648    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
649        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
650    }
651
652    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
653        letter.setEnableIndexDiskSyncs(diskSyncs);
654    }
655
656    public boolean isEnableIndexDiskSyncs() {
657        return letter.isEnableIndexDiskSyncs();
658    }
659
660    public void setEnableIndexRecoveryFile(boolean enable) {
661        letter.setEnableIndexRecoveryFile(enable);
662    }
663
664    public boolean  isEnableIndexRecoveryFile() {
665        return letter.isEnableIndexRecoveryFile();
666    }
667
668    public void setEnableIndexPageCaching(boolean enable) {
669        letter.setEnableIndexPageCaching(enable);
670    }
671
672    public boolean isEnableIndexPageCaching() {
673        return letter.isEnableIndexPageCaching();
674    }
675
676    public int getCompactAcksAfterNoGC() {
677        return letter.getCompactAcksAfterNoGC();
678    }
679
680    /**
681     * Sets the number of GC cycles where no journal logs were removed before an attempt to
682     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
683     * <p>
684     * A value of -1 will disable this feature.
685     *
686     * @param compactAcksAfterNoGC
687     *      Number of empty GC cycles before we rewrite old ACKS.
688     */
689    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
690        this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
691    }
692
693    public boolean isCompactAcksIgnoresStoreGrowth() {
694        return this.letter.isCompactAcksIgnoresStoreGrowth();
695    }
696
697    /**
698     * Configure if Ack compaction will occur regardless of continued growth of the
699     * journal logs meaning that the store has not run out of space yet.  Because the
700     * compaction operation can be costly this value is defaulted to off and the Ack
701     * compaction is only done when it seems that the store cannot grow and larger.
702     *
703     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
704     */
705    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
706        this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
707    }
708
709    /**
710     * Returns whether Ack compaction is enabled
711     *
712     * @return enableAckCompaction
713     */
714    public boolean isEnableAckCompaction() {
715        return letter.isEnableAckCompaction();
716    }
717
718    /**
719     * Configure if the Ack compaction task should be enabled to run
720     *
721     * @param enableAckCompaction
722     */
723    public void setEnableAckCompaction(boolean enableAckCompaction) {
724        letter.setEnableAckCompaction(enableAckCompaction);
725    }
726
727    /**
728     * Whether non-blocking subscription statistics have been enabled
729     *
730     * @return
731     */
732    public boolean isEnableSubscriptionStatistics() {
733        return letter.isEnableSubscriptionStatistics();
734    }
735
736    /**
737     * Enable caching statistics for each subscription to allow non-blocking
738     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
739     * of subscriptions.
740     *
741     * @param enableSubscriptionStatistics
742     */
743    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
744        letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
745    }
746
747    public KahaDBStore getStore() {
748        return letter;
749    }
750
751    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
752        if (txid == null) {
753            return null;
754        }
755        KahaTransactionInfo rc = new KahaTransactionInfo();
756
757        if (txid.isLocalTransaction()) {
758            LocalTransactionId t = (LocalTransactionId) txid;
759            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
760            kahaTxId.setConnectionId(t.getConnectionId().getValue());
761            kahaTxId.setTransactionId(t.getValue());
762            rc.setLocalTransactionId(kahaTxId);
763        } else {
764            XATransactionId t = (XATransactionId) txid;
765            KahaXATransactionId kahaTxId = new KahaXATransactionId();
766            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
767            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
768            kahaTxId.setFormatId(t.getFormatId());
769            rc.setXaTransactionId(kahaTxId);
770        }
771        return rc;
772    }
773
774    @Override
775    public Locker createDefaultLocker() throws IOException {
776        SharedFileLocker locker = new SharedFileLocker();
777        locker.configure(this);
778        return locker;
779    }
780
781    @Override
782    public void init() throws Exception {}
783
784    @Override
785    public String toString() {
786        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
787        return "KahaDBPersistenceAdapter[" + path + "]";
788    }
789
790    @Override
791    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
792        getStore().setTransactionIdTransformer(transactionIdTransformer);
793    }
794
795    @Override
796    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
797        return this.letter.createJobSchedulerStore();
798    }
799
800    /* (non-Javadoc)
801     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
802     */
803    @Override
804    public boolean isPersistNoLocal() {
805        return this.letter.isPersistNoLocal();
806    }
807}