001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051import java.util.concurrent.ConcurrentHashMap;
052import java.util.concurrent.ConcurrentMap;
053import java.util.concurrent.Executors;
054import java.util.concurrent.ScheduledExecutorService;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.atomic.AtomicLong;
059import java.util.concurrent.atomic.AtomicReference;
060import java.util.concurrent.locks.ReentrantReadWriteLock;
061
062import org.apache.activemq.ActiveMQMessageAuditNoSync;
063import org.apache.activemq.broker.BrokerService;
064import org.apache.activemq.broker.BrokerServiceAware;
065import org.apache.activemq.broker.region.Destination;
066import org.apache.activemq.broker.region.Queue;
067import org.apache.activemq.broker.region.Topic;
068import org.apache.activemq.command.MessageAck;
069import org.apache.activemq.command.TransactionId;
070import org.apache.activemq.openwire.OpenWireFormat;
071import org.apache.activemq.protobuf.Buffer;
072import org.apache.activemq.store.MessageStore;
073import org.apache.activemq.store.MessageStoreStatistics;
074import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
075import org.apache.activemq.store.TopicMessageStore;
076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
079import org.apache.activemq.store.kahadb.data.KahaDestination;
080import org.apache.activemq.store.kahadb.data.KahaEntryType;
081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
088import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
093import org.apache.activemq.store.kahadb.disk.index.ListIndex;
094import org.apache.activemq.store.kahadb.disk.journal.DataFile;
095import org.apache.activemq.store.kahadb.disk.journal.Journal;
096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
097import org.apache.activemq.store.kahadb.disk.journal.Location;
098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
099import org.apache.activemq.store.kahadb.disk.page.Page;
100import org.apache.activemq.store.kahadb.disk.page.PageFile;
101import org.apache.activemq.store.kahadb.disk.page.Transaction;
102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
104import org.apache.activemq.store.kahadb.disk.util.Marshaller;
105import org.apache.activemq.store.kahadb.disk.util.Sequence;
106import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
109import org.apache.activemq.util.ByteSequence;
110import org.apache.activemq.util.DataByteArrayInputStream;
111import org.apache.activemq.util.DataByteArrayOutputStream;
112import org.apache.activemq.util.IOExceptionSupport;
113import org.apache.activemq.util.IOHelper;
114import org.apache.activemq.util.ServiceStopper;
115import org.apache.activemq.util.ServiceSupport;
116import org.apache.activemq.util.ThreadPoolUtils;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119import org.slf4j.MDC;
120
121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
122
123    protected BrokerService brokerService;
124
125    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
126    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
127    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
128    protected static final Buffer UNMATCHED;
129    static {
130        UNMATCHED = new Buffer(new byte[]{});
131    }
132    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
133
134    static final int CLOSED_STATE = 1;
135    static final int OPEN_STATE = 2;
136    static final long NOT_ACKED = -1;
137
138    static final int VERSION = 6;
139
140    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
141
142    protected class Metadata {
143        protected Page<Metadata> page;
144        protected int state;
145        protected BTreeIndex<String, StoredDestination> destinations;
146        protected Location lastUpdate;
147        protected Location firstInProgressTransactionLocation;
148        protected Location producerSequenceIdTrackerLocation = null;
149        protected Location ackMessageFileMapLocation = null;
150        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
151        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
152        protected int version = VERSION;
153        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
154
155        public void read(DataInput is) throws IOException {
156            state = is.readInt();
157            destinations = new BTreeIndex<>(pageFile, is.readLong());
158            if (is.readBoolean()) {
159                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
160            } else {
161                lastUpdate = null;
162            }
163            if (is.readBoolean()) {
164                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
165            } else {
166                firstInProgressTransactionLocation = null;
167            }
168            try {
169                if (is.readBoolean()) {
170                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
171                } else {
172                    producerSequenceIdTrackerLocation = null;
173                }
174            } catch (EOFException expectedOnUpgrade) {
175            }
176            try {
177                version = is.readInt();
178            } catch (EOFException expectedOnUpgrade) {
179                version = 1;
180            }
181            if (version >= 5 && is.readBoolean()) {
182                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
183            } else {
184                ackMessageFileMapLocation = null;
185            }
186            try {
187                openwireVersion = is.readInt();
188            } catch (EOFException expectedOnUpgrade) {
189                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
190            }
191            LOG.info("KahaDB is version " + version);
192        }
193
194        public void write(DataOutput os) throws IOException {
195            os.writeInt(state);
196            os.writeLong(destinations.getPageId());
197
198            if (lastUpdate != null) {
199                os.writeBoolean(true);
200                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
201            } else {
202                os.writeBoolean(false);
203            }
204
205            if (firstInProgressTransactionLocation != null) {
206                os.writeBoolean(true);
207                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
208            } else {
209                os.writeBoolean(false);
210            }
211
212            if (producerSequenceIdTrackerLocation != null) {
213                os.writeBoolean(true);
214                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
215            } else {
216                os.writeBoolean(false);
217            }
218            os.writeInt(VERSION);
219            if (ackMessageFileMapLocation != null) {
220                os.writeBoolean(true);
221                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
222            } else {
223                os.writeBoolean(false);
224            }
225            os.writeInt(this.openwireVersion);
226        }
227    }
228
229    class MetadataMarshaller extends VariableMarshaller<Metadata> {
230        @Override
231        public Metadata readPayload(DataInput dataIn) throws IOException {
232            Metadata rc = createMetadata();
233            rc.read(dataIn);
234            return rc;
235        }
236
237        @Override
238        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
239            object.write(dataOut);
240        }
241    }
242
243    protected PageFile pageFile;
244    protected Journal journal;
245    protected Metadata metadata = new Metadata();
246
247    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
248
249    protected boolean failIfDatabaseIsLocked;
250
251    protected boolean deleteAllMessages;
252    protected File directory = DEFAULT_DIRECTORY;
253    protected File indexDirectory = null;
254    protected ScheduledExecutorService scheduler;
255    private final Object schedulerLock = new Object();
256
257    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
258    protected boolean archiveDataLogs;
259    protected File directoryArchive;
260    protected AtomicLong journalSize = new AtomicLong(0);
261    long journalDiskSyncInterval = 1000;
262    long checkpointInterval = 5*1000;
263    long cleanupInterval = 30*1000;
264    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
265    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
266    boolean enableIndexWriteAsync = false;
267    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
268    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
269    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
270
271    protected AtomicBoolean opened = new AtomicBoolean();
272    private boolean ignoreMissingJournalfiles = false;
273    private int indexCacheSize = 10000;
274    private boolean checkForCorruptJournalFiles = false;
275    private boolean checksumJournalFiles = true;
276    protected boolean forceRecoverIndex = false;
277    private boolean archiveCorruptedIndex = false;
278    private boolean useIndexLFRUEviction = false;
279    private float indexLFUEvictionFactor = 0.2f;
280    private boolean enableIndexDiskSyncs = true;
281    private boolean enableIndexRecoveryFile = true;
282    private boolean enableIndexPageCaching = true;
283    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
284
285    private boolean enableAckCompaction = true;
286    private int compactAcksAfterNoGC = 10;
287    private boolean compactAcksIgnoresStoreGrowth = false;
288    private int checkPointCyclesWithNoGC;
289    private int journalLogOnLastCompactionCheck;
290    private boolean enableSubscriptionStatistics = false;
291
292    //only set when using JournalDiskSyncStrategy.PERIODIC
293    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
294
295    @Override
296    public void doStart() throws Exception {
297        load();
298    }
299
300    @Override
301    public void doStop(ServiceStopper stopper) throws Exception {
302        unload();
303    }
304
305    private void loadPageFile() throws IOException {
306        this.indexLock.writeLock().lock();
307        try {
308            final PageFile pageFile = getPageFile();
309            pageFile.load();
310            pageFile.tx().execute(new Transaction.Closure<IOException>() {
311                @Override
312                public void execute(Transaction tx) throws IOException {
313                    if (pageFile.getPageCount() == 0) {
314                        // First time this is created.. Initialize the metadata
315                        Page<Metadata> page = tx.allocate();
316                        assert page.getPageId() == 0;
317                        page.set(metadata);
318                        metadata.page = page;
319                        metadata.state = CLOSED_STATE;
320                        metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId());
321
322                        tx.store(metadata.page, metadataMarshaller, true);
323                    } else {
324                        Page<Metadata> page = tx.load(0, metadataMarshaller);
325                        metadata = page.get();
326                        metadata.page = page;
327                    }
328                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
329                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
330                    metadata.destinations.load(tx);
331                }
332            });
333            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
334            // Perhaps we should just keep an index of file
335            storedDestinations.clear();
336            pageFile.tx().execute(new Transaction.Closure<IOException>() {
337                @Override
338                public void execute(Transaction tx) throws IOException {
339                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
340                        Entry<String, StoredDestination> entry = iterator.next();
341                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
342                        storedDestinations.put(entry.getKey(), sd);
343
344                        if (checkForCorruptJournalFiles) {
345                            // sanity check the index also
346                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
347                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
348                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
349                                }
350                            }
351                        }
352                    }
353                }
354            });
355            pageFile.flush();
356        } finally {
357            this.indexLock.writeLock().unlock();
358        }
359    }
360
361    private void startCheckpoint() {
362        if (checkpointInterval == 0 && cleanupInterval == 0) {
363            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
364            return;
365        }
366        synchronized (schedulerLock) {
367            if (scheduler == null || scheduler.isShutdown()) {
368                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
369
370                    @Override
371                    public Thread newThread(Runnable r) {
372                        Thread schedulerThread = new Thread(r);
373
374                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
375                        schedulerThread.setDaemon(true);
376
377                        return schedulerThread;
378                    }
379                });
380
381                // Short intervals for check-point and cleanups
382                long delay;
383                if (journal.isJournalDiskSyncPeriodic()) {
384                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
385                } else {
386                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
387                }
388
389                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
390            }
391        }
392    }
393
394    private final class CheckpointRunner implements Runnable {
395
396        private long lastCheckpoint = System.currentTimeMillis();
397        private long lastCleanup = System.currentTimeMillis();
398        private long lastSync = System.currentTimeMillis();
399        private Location lastAsyncUpdate = null;
400
401        @Override
402        public void run() {
403            try {
404                // Decide on cleanup vs full checkpoint here.
405                if (opened.get()) {
406                    long now = System.currentTimeMillis();
407                    if (journal.isJournalDiskSyncPeriodic() &&
408                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
409                        Location currentUpdate = lastAsyncJournalUpdate.get();
410                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
411                            lastAsyncUpdate = currentUpdate;
412                            if (LOG.isTraceEnabled()) {
413                                LOG.trace("Writing trace command to trigger journal sync");
414                            }
415                            store(new KahaTraceCommand(), true, null, null);
416                        }
417                        lastSync = now;
418                    }
419                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
420                        checkpointCleanup(true);
421                        lastCleanup = now;
422                        lastCheckpoint = now;
423                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
424                        checkpointCleanup(false);
425                        lastCheckpoint = now;
426                    }
427                }
428            } catch (IOException ioe) {
429                LOG.error("Checkpoint failed", ioe);
430                brokerService.handleIOException(ioe);
431            } catch (Throwable e) {
432                LOG.error("Checkpoint failed", e);
433                brokerService.handleIOException(IOExceptionSupport.create(e));
434            }
435        }
436    }
437
438    public void open() throws IOException {
439        if( opened.compareAndSet(false, true) ) {
440            getJournal().start();
441            try {
442                loadPageFile();
443            } catch (Throwable t) {
444                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
445                if (LOG.isDebugEnabled()) {
446                    LOG.debug("Index load failure", t);
447                }
448                // try to recover index
449                try {
450                    pageFile.unload();
451                } catch (Exception ignore) {}
452                if (archiveCorruptedIndex) {
453                    pageFile.archive();
454                } else {
455                    pageFile.delete();
456                }
457                metadata = createMetadata();
458                //The metadata was recreated after a detect corruption so we need to
459                //reconfigure anything that was configured on the old metadata on startup
460                configureMetadata();
461                pageFile = null;
462                loadPageFile();
463            }
464            recover();
465            startCheckpoint();
466        }
467    }
468
469    public void load() throws IOException {
470        this.indexLock.writeLock().lock();
471        try {
472            IOHelper.mkdirs(directory);
473            if (deleteAllMessages) {
474                getJournal().start();
475                getJournal().delete();
476                getJournal().close();
477                journal = null;
478                getPageFile().delete();
479                LOG.info("Persistence store purged.");
480                deleteAllMessages = false;
481            }
482
483            open();
484            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
485        } finally {
486            this.indexLock.writeLock().unlock();
487        }
488    }
489
490    public void close() throws IOException, InterruptedException {
491        if (opened.compareAndSet(true, false)) {
492            checkpointLock.writeLock().lock();
493            try {
494                if (metadata.page != null) {
495                    checkpointUpdate(true);
496                }
497                pageFile.unload();
498                metadata = createMetadata();
499            } finally {
500                checkpointLock.writeLock().unlock();
501            }
502            journal.close();
503            synchronized(schedulerLock) {
504                if (scheduler != null) {
505                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
506                    scheduler = null;
507                }
508            }
509            // clear the cache and journalSize on shutdown of the store
510            storeCache.clear();
511            journalSize.set(0);
512        }
513    }
514
515    public void unload() throws IOException, InterruptedException {
516        this.indexLock.writeLock().lock();
517        try {
518            if( pageFile != null && pageFile.isLoaded() ) {
519                metadata.state = CLOSED_STATE;
520                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
521
522                if (metadata.page != null) {
523                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
524                        @Override
525                        public void execute(Transaction tx) throws IOException {
526                            tx.store(metadata.page, metadataMarshaller, true);
527                        }
528                    });
529                }
530            }
531        } finally {
532            this.indexLock.writeLock().unlock();
533        }
534        close();
535    }
536
537    // public for testing
538    @SuppressWarnings("rawtypes")
539    public Location[] getInProgressTxLocationRange() {
540        Location[] range = new Location[]{null, null};
541        synchronized (inflightTransactions) {
542            if (!inflightTransactions.isEmpty()) {
543                for (List<Operation> ops : inflightTransactions.values()) {
544                    if (!ops.isEmpty()) {
545                        trackMaxAndMin(range, ops);
546                    }
547                }
548            }
549            if (!preparedTransactions.isEmpty()) {
550                for (List<Operation> ops : preparedTransactions.values()) {
551                    if (!ops.isEmpty()) {
552                        trackMaxAndMin(range, ops);
553                    }
554                }
555            }
556        }
557        return range;
558    }
559
560    @SuppressWarnings("rawtypes")
561    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
562        Location t = ops.get(0).getLocation();
563        if (range[0] == null || t.compareTo(range[0]) <= 0) {
564            range[0] = t;
565        }
566        t = ops.get(ops.size() -1).getLocation();
567        if (range[1] == null || t.compareTo(range[1]) >= 0) {
568            range[1] = t;
569        }
570    }
571
572    class TranInfo {
573        TransactionId id;
574        Location location;
575
576        class opCount {
577            int add;
578            int remove;
579        }
580        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>();
581
582        @SuppressWarnings("rawtypes")
583        public void track(Operation operation) {
584            if (location == null ) {
585                location = operation.getLocation();
586            }
587            KahaDestination destination;
588            boolean isAdd = false;
589            if (operation instanceof AddOperation) {
590                AddOperation add = (AddOperation) operation;
591                destination = add.getCommand().getDestination();
592                isAdd = true;
593            } else {
594                RemoveOperation removeOpperation = (RemoveOperation) operation;
595                destination = removeOpperation.getCommand().getDestination();
596            }
597            opCount opCount = destinationOpCount.get(destination);
598            if (opCount == null) {
599                opCount = new opCount();
600                destinationOpCount.put(destination, opCount);
601            }
602            if (isAdd) {
603                opCount.add++;
604            } else {
605                opCount.remove++;
606            }
607        }
608
609        @Override
610        public String toString() {
611           StringBuffer buffer = new StringBuffer();
612           buffer.append(location).append(";").append(id).append(";\n");
613           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
614               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
615           }
616           return buffer.toString();
617        }
618    }
619
620    @SuppressWarnings("rawtypes")
621    public String getTransactions() {
622
623        ArrayList<TranInfo> infos = new ArrayList<>();
624        synchronized (inflightTransactions) {
625            if (!inflightTransactions.isEmpty()) {
626                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
627                    TranInfo info = new TranInfo();
628                    info.id = entry.getKey();
629                    for (Operation operation : entry.getValue()) {
630                        info.track(operation);
631                    }
632                    infos.add(info);
633                }
634            }
635        }
636        synchronized (preparedTransactions) {
637            if (!preparedTransactions.isEmpty()) {
638                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
639                    TranInfo info = new TranInfo();
640                    info.id = entry.getKey();
641                    for (Operation operation : entry.getValue()) {
642                        info.track(operation);
643                    }
644                    infos.add(info);
645                }
646            }
647        }
648        return infos.toString();
649    }
650
651    /**
652     * Move all the messages that were in the journal into long term storage. We
653     * just replay and do a checkpoint.
654     *
655     * @throws IOException
656     * @throws IOException
657     * @throws IllegalStateException
658     */
659    private void recover() throws IllegalStateException, IOException {
660        this.indexLock.writeLock().lock();
661        try {
662
663            long start = System.currentTimeMillis();
664            Location afterProducerAudit = recoverProducerAudit();
665            Location afterAckMessageFile = recoverAckMessageFileMap();
666            Location lastIndoubtPosition = getRecoveryPosition();
667
668            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
669                // valid checkpoint, possible recover from afterAckMessageFile
670                afterProducerAudit = null;
671            }
672            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
673            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
674
675            if (recoveryPosition != null) {
676                int redoCounter = 0;
677                int dataFileRotationTracker = recoveryPosition.getDataFileId();
678                LOG.info("Recovering from the journal @" + recoveryPosition);
679                while (recoveryPosition != null) {
680                    try {
681                        JournalCommand<?> message = load(recoveryPosition);
682                        metadata.lastUpdate = recoveryPosition;
683                        process(message, recoveryPosition, lastIndoubtPosition);
684                        redoCounter++;
685                    } catch (IOException failedRecovery) {
686                        if (isIgnoreMissingJournalfiles()) {
687                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
688                            // track this dud location
689                            journal.corruptRecoveryLocation(recoveryPosition);
690                        } else {
691                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
692                        }
693                    }
694                    recoveryPosition = journal.getNextLocation(recoveryPosition);
695                    // hold on to the minimum number of open files during recovery
696                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
697                        dataFileRotationTracker = recoveryPosition.getDataFileId();
698                        journal.cleanup();
699                    }
700                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
701                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
702                    }
703                }
704                if (LOG.isInfoEnabled()) {
705                    long end = System.currentTimeMillis();
706                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
707                }
708            }
709
710            // We may have to undo some index updates.
711            pageFile.tx().execute(new Transaction.Closure<IOException>() {
712                @Override
713                public void execute(Transaction tx) throws IOException {
714                    recoverIndex(tx);
715                }
716            });
717
718            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
719            Set<TransactionId> toRollback = new HashSet<>();
720            Set<TransactionId> toDiscard = new HashSet<>();
721            synchronized (inflightTransactions) {
722                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
723                    TransactionId id = it.next();
724                    if (id.isLocalTransaction()) {
725                        toRollback.add(id);
726                    } else {
727                        toDiscard.add(id);
728                    }
729                }
730                for (TransactionId tx: toRollback) {
731                    if (LOG.isDebugEnabled()) {
732                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
733                    }
734                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
735                }
736                for (TransactionId tx: toDiscard) {
737                    if (LOG.isDebugEnabled()) {
738                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
739                    }
740                    inflightTransactions.remove(tx);
741                }
742            }
743
744            synchronized (preparedTransactions) {
745                for (TransactionId txId : preparedTransactions.keySet()) {
746                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
747                }
748            }
749
750        } finally {
751            this.indexLock.writeLock().unlock();
752        }
753    }
754
755    @SuppressWarnings("unused")
756    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
757        return TransactionIdConversion.convertToLocal(tx);
758    }
759
760    private Location minimum(Location x,
761                             Location y) {
762        Location min = null;
763        if (x != null) {
764            min = x;
765            if (y != null) {
766                int compare = y.compareTo(x);
767                if (compare < 0) {
768                    min = y;
769                }
770            }
771        } else {
772            min = y;
773        }
774        return min;
775    }
776
777    private Location recoverProducerAudit() throws IOException {
778        if (metadata.producerSequenceIdTrackerLocation != null) {
779            try {
780                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
781                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
782                int maxNumProducers = getMaxFailoverProducersToTrack();
783                int maxAuditDepth = getFailoverProducersAuditDepth();
784                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
785                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
786                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
787                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
788            } catch (Exception e) {
789                LOG.warn("Cannot recover message audit", e);
790                return journal.getNextLocation(null);
791            }
792        } else {
793            // got no audit stored so got to recreate via replay from start of the journal
794            return journal.getNextLocation(null);
795        }
796    }
797
798    @SuppressWarnings("unchecked")
799    private Location recoverAckMessageFileMap() throws IOException {
800        if (metadata.ackMessageFileMapLocation != null) {
801            try {
802                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
803                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
804                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
805                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
806            } catch (Exception e) {
807                LOG.warn("Cannot recover ackMessageFileMap", e);
808                return journal.getNextLocation(null);
809            }
810        } else {
811            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
812            return journal.getNextLocation(null);
813        }
814    }
815
816    protected void recoverIndex(Transaction tx) throws IOException {
817        long start = System.currentTimeMillis();
818        // It is possible index updates got applied before the journal updates..
819        // in that case we need to removed references to messages that are not in the journal
820        final Location lastAppendLocation = journal.getLastAppendLocation();
821        long undoCounter=0;
822
823        // Go through all the destinations to see if they have messages past the lastAppendLocation
824        for (String key : storedDestinations.keySet()) {
825            StoredDestination sd = storedDestinations.get(key);
826
827            final ArrayList<Long> matches = new ArrayList<>();
828            // Find all the Locations that are >= than the last Append Location.
829            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
830                @Override
831                protected void matched(Location key, Long value) {
832                    matches.add(value);
833                }
834            });
835
836            for (Long sequenceId : matches) {
837                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
838                if (keys != null) {
839                    sd.locationIndex.remove(tx, keys.location);
840                    sd.messageIdIndex.remove(tx, keys.messageId);
841                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
842                    undoCounter++;
843                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
844                    // TODO: do we need to modify the ack positions for the pub sub case?
845                }
846            }
847        }
848
849        if (undoCounter > 0) {
850            // The rolledback operations are basically in flight journal writes.  To avoid getting
851            // these the end user should do sync writes to the journal.
852            if (LOG.isInfoEnabled()) {
853                long end = System.currentTimeMillis();
854                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
855            }
856        }
857
858        undoCounter = 0;
859        start = System.currentTimeMillis();
860
861        // Lets be extra paranoid here and verify that all the datafiles being referenced
862        // by the indexes still exists.
863
864        final SequenceSet ss = new SequenceSet();
865        for (StoredDestination sd : storedDestinations.values()) {
866            // Use a visitor to cut down the number of pages that we load
867            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
868                int last=-1;
869
870                @Override
871                public boolean isInterestedInKeysBetween(Location first, Location second) {
872                    if( first==null ) {
873                        return !ss.contains(0, second.getDataFileId());
874                    } else if( second==null ) {
875                        return true;
876                    } else {
877                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
878                    }
879                }
880
881                @Override
882                public void visit(List<Location> keys, List<Long> values) {
883                    for (Location l : keys) {
884                        int fileId = l.getDataFileId();
885                        if( last != fileId ) {
886                            ss.add(fileId);
887                            last = fileId;
888                        }
889                    }
890                }
891
892            });
893        }
894        HashSet<Integer> missingJournalFiles = new HashSet<>();
895        while (!ss.isEmpty()) {
896            missingJournalFiles.add((int) ss.removeFirst());
897        }
898
899        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
900            missingJournalFiles.add(entry.getKey());
901            for (Integer i : entry.getValue()) {
902                missingJournalFiles.add(i);
903            }
904        }
905
906        missingJournalFiles.removeAll(journal.getFileMap().keySet());
907
908        if (!missingJournalFiles.isEmpty()) {
909            LOG.warn("Some journal files are missing: " + missingJournalFiles);
910        }
911
912        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
913        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
914        for (Integer missing : missingJournalFiles) {
915            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
916        }
917
918        if (checkForCorruptJournalFiles) {
919            Collection<DataFile> dataFiles = journal.getFileMap().values();
920            for (DataFile dataFile : dataFiles) {
921                int id = dataFile.getDataFileId();
922                // eof to next file id
923                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
924                Sequence seq = dataFile.getCorruptedBlocks().getHead();
925                while (seq != null) {
926                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
927                        new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
928                    missingPredicates.add(visitor);
929                    knownCorruption.add(visitor);
930                    seq = seq.getNext();
931                }
932            }
933        }
934
935        if (!missingPredicates.isEmpty()) {
936            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
937                final StoredDestination sd = sdEntry.getValue();
938                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
939                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
940                    @Override
941                    protected void matched(Location key, Long value) {
942                        matches.put(value, key);
943                    }
944                });
945
946                // If some message references are affected by the missing data files...
947                if (!matches.isEmpty()) {
948
949                    // We either 'gracefully' recover dropping the missing messages or
950                    // we error out.
951                    if( ignoreMissingJournalfiles ) {
952                        // Update the index to remove the references to the missing data
953                        for (Long sequenceId : matches.keySet()) {
954                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
955                            sd.locationIndex.remove(tx, keys.location);
956                            sd.messageIdIndex.remove(tx, keys.messageId);
957                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
958                            undoCounter++;
959                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
960                            // TODO: do we need to modify the ack positions for the pub sub case?
961                        }
962                    } else {
963                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
964                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
965                    }
966                }
967            }
968        }
969
970        if (!ignoreMissingJournalfiles) {
971            if (!knownCorruption.isEmpty()) {
972                LOG.error("Detected corrupt journal files. " + knownCorruption);
973                throw new IOException("Detected corrupt journal files. " + knownCorruption);
974            }
975
976            if (!missingJournalFiles.isEmpty()) {
977                LOG.error("Detected missing journal files. " + missingJournalFiles);
978                throw new IOException("Detected missing journal files. " + missingJournalFiles);
979            }
980        }
981
982        if (undoCounter > 0) {
983            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
984            // should do sync writes to the journal.
985            if (LOG.isInfoEnabled()) {
986                long end = System.currentTimeMillis();
987                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
988            }
989        }
990    }
991
992    private Location nextRecoveryPosition;
993    private Location lastRecoveryPosition;
994
995    public void incrementalRecover() throws IOException {
996        this.indexLock.writeLock().lock();
997        try {
998            if( nextRecoveryPosition == null ) {
999                if( lastRecoveryPosition==null ) {
1000                    nextRecoveryPosition = getRecoveryPosition();
1001                } else {
1002                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1003                }
1004            }
1005            while (nextRecoveryPosition != null) {
1006                lastRecoveryPosition = nextRecoveryPosition;
1007                metadata.lastUpdate = lastRecoveryPosition;
1008                JournalCommand<?> message = load(lastRecoveryPosition);
1009                process(message, lastRecoveryPosition, (IndexAware) null);
1010                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1011            }
1012        } finally {
1013            this.indexLock.writeLock().unlock();
1014        }
1015    }
1016
1017    public Location getLastUpdatePosition() throws IOException {
1018        return metadata.lastUpdate;
1019    }
1020
1021    private Location getRecoveryPosition() throws IOException {
1022
1023        if (!this.forceRecoverIndex) {
1024
1025            // If we need to recover the transactions..
1026            if (metadata.firstInProgressTransactionLocation != null) {
1027                return metadata.firstInProgressTransactionLocation;
1028            }
1029
1030            // Perhaps there were no transactions...
1031            if( metadata.lastUpdate!=null) {
1032                // Start replay at the record after the last one recorded in the index file.
1033                return getNextInitializedLocation(metadata.lastUpdate);
1034            }
1035        }
1036        // This loads the first position.
1037        return journal.getNextLocation(null);
1038    }
1039
1040    private Location getNextInitializedLocation(Location location) throws IOException {
1041        Location mayNotBeInitialized = journal.getNextLocation(location);
1042        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
1043            // need to init size and type to skip
1044            return journal.getNextLocation(mayNotBeInitialized);
1045        } else {
1046            return mayNotBeInitialized;
1047        }
1048    }
1049
1050    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1051        long start;
1052        this.indexLock.writeLock().lock();
1053        try {
1054            start = System.currentTimeMillis();
1055            if( !opened.get() ) {
1056                return;
1057            }
1058        } finally {
1059            this.indexLock.writeLock().unlock();
1060        }
1061        checkpointUpdate(cleanup);
1062        long end = System.currentTimeMillis();
1063        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1064            if (LOG.isInfoEnabled()) {
1065                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1066            }
1067        }
1068    }
1069
1070    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1071        int size = data.serializedSizeFramed();
1072        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1073        os.writeByte(data.type().getNumber());
1074        data.writeFramed(os);
1075        return os.toByteSequence();
1076    }
1077
1078    // /////////////////////////////////////////////////////////////////
1079    // Methods call by the broker to update and query the store.
1080    // /////////////////////////////////////////////////////////////////
1081    public Location store(JournalCommand<?> data) throws IOException {
1082        return store(data, false, null,null);
1083    }
1084
1085    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1086        return store(data, false, null, null, onJournalStoreComplete);
1087    }
1088
1089    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1090        return store(data, sync, before, after, null);
1091    }
1092
1093    /**
1094     * All updated are are funneled through this method. The updates are converted
1095     * to a JournalMessage which is logged to the journal and then the data from
1096     * the JournalMessage is used to update the index just like it would be done
1097     * during a recovery process.
1098     */
1099    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1100        try {
1101            ByteSequence sequence = toByteSequence(data);
1102            Location location;
1103
1104            checkpointLock.readLock().lock();
1105            try {
1106
1107                long start = System.currentTimeMillis();
1108                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1109                long start2 = System.currentTimeMillis();
1110                //Track the last async update so we know if we need to sync at the next checkpoint
1111                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1112                    lastAsyncJournalUpdate.set(location);
1113                }
1114                process(data, location, before);
1115
1116                long end = System.currentTimeMillis();
1117                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1118                    if (LOG.isInfoEnabled()) {
1119                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1120                    }
1121                }
1122            } finally {
1123                checkpointLock.readLock().unlock();
1124            }
1125
1126            if (after != null) {
1127                after.run();
1128            }
1129
1130            if (scheduler == null && opened.get()) {
1131                startCheckpoint();
1132            }
1133            return location;
1134        } catch (IOException ioe) {
1135            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1136            brokerService.handleIOException(ioe);
1137            throw ioe;
1138        }
1139    }
1140
1141    /**
1142     * Loads a previously stored JournalMessage
1143     *
1144     * @param location
1145     * @return
1146     * @throws IOException
1147     */
1148    public JournalCommand<?> load(Location location) throws IOException {
1149        long start = System.currentTimeMillis();
1150        ByteSequence data = journal.read(location);
1151        long end = System.currentTimeMillis();
1152        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1153            if (LOG.isInfoEnabled()) {
1154                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1155            }
1156        }
1157        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1158        byte readByte = is.readByte();
1159        KahaEntryType type = KahaEntryType.valueOf(readByte);
1160        if( type == null ) {
1161            try {
1162                is.close();
1163            } catch (IOException e) {}
1164            throw new IOException("Could not load journal record. Invalid location: "+location);
1165        }
1166        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1167        message.mergeFramed(is);
1168        return message;
1169    }
1170
1171    /**
1172     * do minimal recovery till we reach the last inDoubtLocation
1173     * @param data
1174     * @param location
1175     * @param inDoubtlocation
1176     * @throws IOException
1177     */
1178    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1179        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1180            process(data, location, (IndexAware) null);
1181        } else {
1182            // just recover producer audit
1183            data.visit(new Visitor() {
1184                @Override
1185                public void visit(KahaAddMessageCommand command) throws IOException {
1186                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1187                }
1188            });
1189        }
1190    }
1191
1192    // /////////////////////////////////////////////////////////////////
1193    // Journaled record processing methods. Once the record is journaled,
1194    // these methods handle applying the index updates. These may be called
1195    // from the recovery method too so they need to be idempotent
1196    // /////////////////////////////////////////////////////////////////
1197
1198    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1199        data.visit(new Visitor() {
1200            @Override
1201            public void visit(KahaAddMessageCommand command) throws IOException {
1202                process(command, location, onSequenceAssignedCallback);
1203            }
1204
1205            @Override
1206            public void visit(KahaRemoveMessageCommand command) throws IOException {
1207                process(command, location);
1208            }
1209
1210            @Override
1211            public void visit(KahaPrepareCommand command) throws IOException {
1212                process(command, location);
1213            }
1214
1215            @Override
1216            public void visit(KahaCommitCommand command) throws IOException {
1217                process(command, location, onSequenceAssignedCallback);
1218            }
1219
1220            @Override
1221            public void visit(KahaRollbackCommand command) throws IOException {
1222                process(command, location);
1223            }
1224
1225            @Override
1226            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1227                process(command, location);
1228            }
1229
1230            @Override
1231            public void visit(KahaSubscriptionCommand command) throws IOException {
1232                process(command, location);
1233            }
1234
1235            @Override
1236            public void visit(KahaProducerAuditCommand command) throws IOException {
1237                processLocation(location);
1238            }
1239
1240            @Override
1241            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1242                processLocation(location);
1243            }
1244
1245            @Override
1246            public void visit(KahaTraceCommand command) {
1247                processLocation(location);
1248            }
1249
1250            @Override
1251            public void visit(KahaUpdateMessageCommand command) throws IOException {
1252                process(command, location);
1253            }
1254
1255            @Override
1256            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1257                process(command, location);
1258            }
1259        });
1260    }
1261
1262    @SuppressWarnings("rawtypes")
1263    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1264        if (command.hasTransactionInfo()) {
1265            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1266            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1267        } else {
1268            this.indexLock.writeLock().lock();
1269            try {
1270                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1271                    @Override
1272                    public void execute(Transaction tx) throws IOException {
1273                        long assignedIndex = updateIndex(tx, command, location);
1274                        if (runWithIndexLock != null) {
1275                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1276                        }
1277                    }
1278                });
1279
1280            } finally {
1281                this.indexLock.writeLock().unlock();
1282            }
1283        }
1284    }
1285
1286    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1287        this.indexLock.writeLock().lock();
1288        try {
1289            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1290                @Override
1291                public void execute(Transaction tx) throws IOException {
1292                    updateIndex(tx, command, location);
1293                }
1294            });
1295        } finally {
1296            this.indexLock.writeLock().unlock();
1297        }
1298    }
1299
1300    @SuppressWarnings("rawtypes")
1301    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1302        if (command.hasTransactionInfo()) {
1303           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1304           inflightTx.add(new RemoveOperation(command, location));
1305        } else {
1306            this.indexLock.writeLock().lock();
1307            try {
1308                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1309                    @Override
1310                    public void execute(Transaction tx) throws IOException {
1311                        updateIndex(tx, command, location);
1312                    }
1313                });
1314            } finally {
1315                this.indexLock.writeLock().unlock();
1316            }
1317        }
1318    }
1319
1320    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1321        this.indexLock.writeLock().lock();
1322        try {
1323            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1324                @Override
1325                public void execute(Transaction tx) throws IOException {
1326                    updateIndex(tx, command, location);
1327                }
1328            });
1329        } finally {
1330            this.indexLock.writeLock().unlock();
1331        }
1332    }
1333
1334    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1335        this.indexLock.writeLock().lock();
1336        try {
1337            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1338                @Override
1339                public void execute(Transaction tx) throws IOException {
1340                    updateIndex(tx, command, location);
1341                }
1342            });
1343        } finally {
1344            this.indexLock.writeLock().unlock();
1345        }
1346    }
1347
1348    protected void processLocation(final Location location) {
1349        this.indexLock.writeLock().lock();
1350        try {
1351            metadata.lastUpdate = location;
1352        } finally {
1353            this.indexLock.writeLock().unlock();
1354        }
1355    }
1356
1357    @SuppressWarnings("rawtypes")
1358    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1359        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1360        List<Operation> inflightTx;
1361        synchronized (inflightTransactions) {
1362            inflightTx = inflightTransactions.remove(key);
1363            if (inflightTx == null) {
1364                inflightTx = preparedTransactions.remove(key);
1365            }
1366        }
1367        if (inflightTx == null) {
1368            // only non persistent messages in this tx
1369            if (before != null) {
1370                before.sequenceAssignedWithIndexLocked(-1);
1371            }
1372            return;
1373        }
1374
1375        final List<Operation> messagingTx = inflightTx;
1376        indexLock.writeLock().lock();
1377        try {
1378            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1379                @Override
1380                public void execute(Transaction tx) throws IOException {
1381                    for (Operation op : messagingTx) {
1382                        op.execute(tx);
1383                    }
1384                }
1385            });
1386            metadata.lastUpdate = location;
1387        } finally {
1388            indexLock.writeLock().unlock();
1389        }
1390    }
1391
1392    @SuppressWarnings("rawtypes")
1393    protected void process(KahaPrepareCommand command, Location location) {
1394        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1395        synchronized (inflightTransactions) {
1396            List<Operation> tx = inflightTransactions.remove(key);
1397            if (tx != null) {
1398                preparedTransactions.put(key, tx);
1399            }
1400        }
1401    }
1402
1403    @SuppressWarnings("rawtypes")
1404    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1405        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1406        List<Operation> updates = null;
1407        synchronized (inflightTransactions) {
1408            updates = inflightTransactions.remove(key);
1409            if (updates == null) {
1410                updates = preparedTransactions.remove(key);
1411            }
1412        }
1413    }
1414
1415    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1416        final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1417
1418        // Mark the current journal file as a compacted file so that gc checks can skip
1419        // over logs that are smaller compaction type logs.
1420        DataFile current = journal.getDataFileById(location.getDataFileId());
1421        current.setTypeCode(command.getRewriteType());
1422
1423        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1424            // Move offset so that next location read jumps to next file.
1425            location.setOffset(journalMaxFileLength);
1426        }
1427    }
1428
1429    // /////////////////////////////////////////////////////////////////
1430    // These methods do the actual index updates.
1431    // /////////////////////////////////////////////////////////////////
1432
1433    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1434    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
1435
1436    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1437        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1438
1439        // Skip adding the message to the index if this is a topic and there are
1440        // no subscriptions.
1441        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1442            return -1;
1443        }
1444
1445        // Add the message.
1446        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1447        long id = sd.orderIndex.getNextMessageId();
1448        Long previous = sd.locationIndex.put(tx, location, id);
1449        if (previous == null) {
1450            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1451            if (previous == null) {
1452                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1453                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1454                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1455                    addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
1456                }
1457                metadata.lastUpdate = location;
1458            } else {
1459
1460                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1461                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1462                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1463                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1464                }
1465                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1466                sd.locationIndex.remove(tx, location);
1467                id = -1;
1468            }
1469        } else {
1470            // restore the previous value.. Looks like this was a redo of a previously
1471            // added message. We don't want to assign it a new id as the other indexes would
1472            // be wrong..
1473            sd.locationIndex.put(tx, location, previous);
1474            // ensure sequence is not broken
1475            sd.orderIndex.revertNextMessageId();
1476            metadata.lastUpdate = location;
1477        }
1478        // record this id in any event, initial send or recovery
1479        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1480
1481       return id;
1482    }
1483
1484    void trackPendingAdd(KahaDestination destination, Long seq) {
1485        StoredDestination sd = storedDestinations.get(key(destination));
1486        if (sd != null) {
1487            sd.trackPendingAdd(seq);
1488        }
1489    }
1490
1491    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1492        StoredDestination sd = storedDestinations.get(key(destination));
1493        if (sd != null) {
1494            sd.trackPendingAddComplete(seq);
1495        }
1496    }
1497
1498    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1499        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1500        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1501
1502        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1503        if (id != null) {
1504            MessageKeys previousKeys = sd.orderIndex.put(
1505                    tx,
1506                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1507                    id,
1508                    new MessageKeys(command.getMessageId(), location)
1509            );
1510            sd.locationIndex.put(tx, location, id);
1511            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1512
1513            if (previousKeys != null) {
1514                //Remove the existing from the size
1515                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1516
1517                //update all the subscription metrics
1518                if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
1519                    Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
1520                    while (iter.hasNext()) {
1521                        Entry<String, SequenceSet> e = iter.next();
1522                        if (e.getValue().contains(id)) {
1523                            incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize());
1524                            decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize());
1525                        }
1526                    }
1527                }
1528
1529                // on first update previous is original location, on recovery/replay it may be the updated location
1530                if(!previousKeys.location.equals(location)) {
1531                    sd.locationIndex.remove(tx, previousKeys.location);
1532                }
1533            }
1534            metadata.lastUpdate = location;
1535        } else {
1536            //Add the message if it can't be found
1537            this.updateIndex(tx, command, location);
1538        }
1539    }
1540
1541    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1542        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1543        if (!command.hasSubscriptionKey()) {
1544
1545            // In the queue case we just remove the message from the index..
1546            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1547            if (sequenceId != null) {
1548                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1549                if (keys != null) {
1550                    sd.locationIndex.remove(tx, keys.location);
1551                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1552                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1553                    metadata.lastUpdate = ackLocation;
1554                }  else if (LOG.isDebugEnabled()) {
1555                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1556                }
1557            } else if (LOG.isDebugEnabled()) {
1558                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1559            }
1560        } else {
1561            // In the topic case we need remove the message once it's been acked
1562            // by all the subs
1563            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1564
1565            // Make sure it's a valid message id...
1566            if (sequence != null) {
1567                String subscriptionKey = command.getSubscriptionKey();
1568                if (command.getAck() != UNMATCHED) {
1569                    sd.orderIndex.get(tx, sequence);
1570                    byte priority = sd.orderIndex.lastGetPriority();
1571                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1572                }
1573
1574                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1575                if (keys != null) {
1576                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1577                }
1578                // The following method handles deleting un-referenced messages.
1579                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1580                metadata.lastUpdate = ackLocation;
1581            } else if (LOG.isDebugEnabled()) {
1582                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1583            }
1584
1585        }
1586    }
1587
1588    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1589        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1590        if (referenceFileIds == null) {
1591            referenceFileIds = new HashSet<>();
1592            referenceFileIds.add(messageLocation.getDataFileId());
1593            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1594        } else {
1595            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1596            if (!referenceFileIds.contains(id)) {
1597                referenceFileIds.add(id);
1598            }
1599        }
1600    }
1601
1602    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1603        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1604        sd.orderIndex.remove(tx);
1605
1606        sd.locationIndex.clear(tx);
1607        sd.locationIndex.unload(tx);
1608        tx.free(sd.locationIndex.getPageId());
1609
1610        sd.messageIdIndex.clear(tx);
1611        sd.messageIdIndex.unload(tx);
1612        tx.free(sd.messageIdIndex.getPageId());
1613
1614        if (sd.subscriptions != null) {
1615            sd.subscriptions.clear(tx);
1616            sd.subscriptions.unload(tx);
1617            tx.free(sd.subscriptions.getPageId());
1618
1619            sd.subscriptionAcks.clear(tx);
1620            sd.subscriptionAcks.unload(tx);
1621            tx.free(sd.subscriptionAcks.getPageId());
1622
1623            sd.ackPositions.clear(tx);
1624            sd.ackPositions.unload(tx);
1625            tx.free(sd.ackPositions.getHeadPageId());
1626
1627            sd.subLocations.clear(tx);
1628            sd.subLocations.unload(tx);
1629            tx.free(sd.subLocations.getHeadPageId());
1630        }
1631
1632        String key = key(command.getDestination());
1633        storedDestinations.remove(key);
1634        metadata.destinations.remove(tx, key);
1635        clearStoreStats(command.getDestination());
1636        storeCache.remove(key(command.getDestination()));
1637    }
1638
1639    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1640        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1641        final String subscriptionKey = command.getSubscriptionKey();
1642
1643        // If set then we are creating it.. otherwise we are destroying the sub
1644        if (command.hasSubscriptionInfo()) {
1645            Location existing = sd.subLocations.get(tx, subscriptionKey);
1646            if (existing != null && existing.compareTo(location) == 0) {
1647                // replay on recovery, ignore
1648                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1649                return;
1650            }
1651
1652            sd.subscriptions.put(tx, subscriptionKey, command);
1653            sd.subLocations.put(tx, subscriptionKey, location);
1654            long ackLocation=NOT_ACKED;
1655            if (!command.getRetroactive()) {
1656                ackLocation = sd.orderIndex.nextMessageId-1;
1657            } else {
1658                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1659            }
1660            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1661            sd.subscriptionCache.add(subscriptionKey);
1662        } else {
1663            // delete the sub...
1664            sd.subscriptions.remove(tx, subscriptionKey);
1665            sd.subLocations.remove(tx, subscriptionKey);
1666            sd.subscriptionAcks.remove(tx, subscriptionKey);
1667            sd.subscriptionCache.remove(subscriptionKey);
1668            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1669            MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination()));
1670            if (subStats != null) {
1671                subStats.removeSubscription(subscriptionKey);
1672            }
1673
1674            if (sd.subscriptions.isEmpty(tx)) {
1675                // remove the stored destination
1676                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1677                removeDestinationCommand.setDestination(command.getDestination());
1678                updateIndex(tx, removeDestinationCommand, null);
1679                clearStoreStats(command.getDestination());
1680            }
1681        }
1682    }
1683
1684    private void checkpointUpdate(final boolean cleanup) throws IOException {
1685        checkpointLock.writeLock().lock();
1686        try {
1687            this.indexLock.writeLock().lock();
1688            try {
1689                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1690                    @Override
1691                    public Set<Integer> execute(Transaction tx) throws IOException {
1692                        return checkpointUpdate(tx, cleanup);
1693                    }
1694                });
1695                pageFile.flush();
1696                // after the index update such that partial removal does not leave dangling references in the index.
1697                journal.removeDataFiles(filesToGc);
1698            } finally {
1699                this.indexLock.writeLock().unlock();
1700            }
1701
1702        } finally {
1703            checkpointLock.writeLock().unlock();
1704        }
1705    }
1706
1707    /**
1708     * @param tx
1709     * @throws IOException
1710     */
1711    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1712        MDC.put("activemq.persistenceDir", getDirectory().getName());
1713        LOG.debug("Checkpoint started.");
1714
1715        // reflect last update exclusive of current checkpoint
1716        Location lastUpdate = metadata.lastUpdate;
1717
1718        metadata.state = OPEN_STATE;
1719        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1720        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1721        Location[] inProgressTxRange = getInProgressTxLocationRange();
1722        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1723        tx.store(metadata.page, metadataMarshaller, true);
1724
1725        final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
1726        if (cleanup) {
1727
1728            final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1729            gcCandidateSet.addAll(completeFileSet);
1730
1731            if (LOG.isTraceEnabled()) {
1732                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1733            }
1734
1735            if (lastUpdate != null) {
1736                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1737                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1738            }
1739
1740            // Don't GC files under replication
1741            if( journalFilesBeingReplicated!=null ) {
1742                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1743            }
1744
1745            if (metadata.producerSequenceIdTrackerLocation != null) {
1746                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1747                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1748                    // rewrite so we don't prevent gc
1749                    metadata.producerSequenceIdTracker.setModified(true);
1750                    if (LOG.isTraceEnabled()) {
1751                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1752                    }
1753                }
1754                gcCandidateSet.remove(dataFileId);
1755                if (LOG.isTraceEnabled()) {
1756                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1757                }
1758            }
1759
1760            if (metadata.ackMessageFileMapLocation != null) {
1761                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1762                gcCandidateSet.remove(dataFileId);
1763                if (LOG.isTraceEnabled()) {
1764                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1765                }
1766            }
1767
1768            // Don't GC files referenced by in-progress tx
1769            if (inProgressTxRange[0] != null) {
1770                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1771                    gcCandidateSet.remove(pendingTx);
1772                }
1773            }
1774            if (LOG.isTraceEnabled()) {
1775                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1776            }
1777
1778            // Go through all the destinations to see if any of them can remove GC candidates.
1779            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1780                if( gcCandidateSet.isEmpty() ) {
1781                    break;
1782                }
1783
1784                // Use a visitor to cut down the number of pages that we load
1785                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1786                    int last=-1;
1787                    @Override
1788                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1789                        if( first==null ) {
1790                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1791                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1792                                subset.remove(second.getDataFileId());
1793                            }
1794                            return !subset.isEmpty();
1795                        } else if( second==null ) {
1796                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1797                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1798                                subset.remove(first.getDataFileId());
1799                            }
1800                            return !subset.isEmpty();
1801                        } else {
1802                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1803                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1804                                subset.remove(first.getDataFileId());
1805                            }
1806                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1807                                subset.remove(second.getDataFileId());
1808                            }
1809                            return !subset.isEmpty();
1810                        }
1811                    }
1812
1813                    @Override
1814                    public void visit(List<Location> keys, List<Long> values) {
1815                        for (Location l : keys) {
1816                            int fileId = l.getDataFileId();
1817                            if( last != fileId ) {
1818                                gcCandidateSet.remove(fileId);
1819                                last = fileId;
1820                            }
1821                        }
1822                    }
1823                });
1824
1825                // Durable Subscription
1826                if (entry.getValue().subLocations != null) {
1827                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1828                    while (iter.hasNext()) {
1829                        Entry<String, Location> subscription = iter.next();
1830                        int dataFileId = subscription.getValue().getDataFileId();
1831
1832                        // Move subscription along if it has no outstanding messages that need ack'd
1833                        // and its in the last log file in the journal.
1834                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1835                            final StoredDestination destination = entry.getValue();
1836                            final String subscriptionKey = subscription.getKey();
1837                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1838
1839                            // When pending is size one that is the next message Id meaning there
1840                            // are no pending messages currently.
1841                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1842                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1843
1844                                if (LOG.isTraceEnabled()) {
1845                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1846                                }
1847
1848                                final KahaSubscriptionCommand kahaSub =
1849                                    destination.subscriptions.get(tx, subscriptionKey);
1850                                destination.subLocations.put(
1851                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1852
1853                                // Skips the remove from candidates if we rewrote the subscription
1854                                // in order to prevent duplicate subscription commands on recover.
1855                                // If another subscription is on the same file and isn't rewritten
1856                                // than it will remove the file from the set.
1857                                continue;
1858                            }
1859                        }
1860
1861                        gcCandidateSet.remove(dataFileId);
1862                    }
1863                }
1864
1865                if (LOG.isTraceEnabled()) {
1866                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1867                }
1868            }
1869
1870            // check we are not deleting file with ack for in-use journal files
1871            if (LOG.isTraceEnabled()) {
1872                LOG.trace("gc candidates: " + gcCandidateSet);
1873                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1874            }
1875
1876            boolean ackMessageFileMapMod = false;
1877            Iterator<Integer> candidates = gcCandidateSet.iterator();
1878            while (candidates.hasNext()) {
1879                Integer candidate = candidates.next();
1880                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1881                if (referencedFileIds != null) {
1882                    for (Integer referencedFileId : referencedFileIds) {
1883                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1884                            // active file that is not targeted for deletion is referenced so don't delete
1885                            candidates.remove();
1886                            break;
1887                        }
1888                    }
1889                    if (gcCandidateSet.contains(candidate)) {
1890                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1891                    } else {
1892                        if (LOG.isTraceEnabled()) {
1893                            LOG.trace("not removing data file: " + candidate
1894                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1895                        }
1896                    }
1897                }
1898            }
1899
1900            if (!gcCandidateSet.isEmpty()) {
1901                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1902                for (Integer candidate : gcCandidateSet) {
1903                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1904                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1905                    }
1906                }
1907                if (ackMessageFileMapMod) {
1908                    checkpointUpdate(tx, false);
1909                }
1910            } else if (isEnableAckCompaction()) {
1911                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1912                    // First check length of journal to make sure it makes sense to even try.
1913                    //
1914                    // If there is only one journal file with Acks in it we don't need to move
1915                    // it since it won't be chained to any later logs.
1916                    //
1917                    // If the logs haven't grown since the last time then we need to compact
1918                    // otherwise there seems to still be room for growth and we don't need to incur
1919                    // the overhead.  Depending on configuration this check can be avoided and
1920                    // Ack compaction will run any time the store has not GC'd a journal file in
1921                    // the configured amount of cycles.
1922                    if (metadata.ackMessageFileMap.size() > 1 &&
1923                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1924
1925                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1926                        try {
1927                            scheduler.execute(new AckCompactionRunner());
1928                        } catch (Exception ex) {
1929                            LOG.warn("Error on queueing the Ack Compactor", ex);
1930                        }
1931                    } else {
1932                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1933                    }
1934
1935                    checkPointCyclesWithNoGC = 0;
1936                } else {
1937                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1938                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1939                }
1940
1941                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1942            }
1943        }
1944        MDC.remove("activemq.persistenceDir");
1945
1946        LOG.debug("Checkpoint done.");
1947        return gcCandidateSet;
1948    }
1949
1950    private final class AckCompactionRunner implements Runnable {
1951
1952        @Override
1953        public void run() {
1954
1955            int journalToAdvance = -1;
1956            Set<Integer> journalLogsReferenced = new HashSet<>();
1957
1958            //flag to know whether the ack forwarding completed without an exception
1959            boolean forwarded = false;
1960
1961            try {
1962                //acquire the checkpoint lock to prevent other threads from
1963                //running a checkpoint while this is running
1964                //
1965                //Normally this task runs on the same executor as the checkpoint task
1966                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1967                //
1968                //However, there are two cases where this isn't always true.
1969                //First, the checkpoint() method is public and can be called through the
1970                //PersistenceAdapter interface by someone at the same time this is running.
1971                //Second, a checkpoint is called during shutdown without using the executor.
1972                //
1973                //In the future it might be better to just remove the checkpointLock entirely
1974                //and only use the executor but this would need to be examined for any unintended
1975                //consequences
1976                checkpointLock.readLock().lock();
1977
1978                try {
1979
1980                    // Lock index to capture the ackMessageFileMap data
1981                    indexLock.writeLock().lock();
1982
1983                    // Map keys might not be sorted, find the earliest log file to forward acks
1984                    // from and move only those, future cycles can chip away at more as needed.
1985                    // We won't move files that are themselves rewritten on a previous compaction.
1986                    List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet());
1987                    Collections.sort(journalFileIds);
1988                    for (Integer journalFileId : journalFileIds) {
1989                        DataFile current = journal.getDataFileById(journalFileId);
1990                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1991                            journalToAdvance = journalFileId;
1992                            break;
1993                        }
1994                    }
1995
1996                    // Check if we found one, or if we only found the current file being written to.
1997                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1998                        return;
1999                    }
2000
2001                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
2002
2003                } finally {
2004                    indexLock.writeLock().unlock();
2005                }
2006
2007                try {
2008                    // Background rewrite of the old acks
2009                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2010                    forwarded = true;
2011                } catch (IOException ioe) {
2012                    LOG.error("Forwarding of acks failed", ioe);
2013                    brokerService.handleIOException(ioe);
2014                } catch (Throwable e) {
2015                    LOG.error("Forwarding of acks failed", e);
2016                    brokerService.handleIOException(IOExceptionSupport.create(e));
2017                }
2018            } finally {
2019                checkpointLock.readLock().unlock();
2020            }
2021
2022            try {
2023                if (forwarded) {
2024                    // Checkpoint with changes from the ackMessageFileMap
2025                    checkpointUpdate(false);
2026                }
2027            } catch (IOException ioe) {
2028                LOG.error("Checkpoint failed", ioe);
2029                brokerService.handleIOException(ioe);
2030            } catch (Throwable e) {
2031                LOG.error("Checkpoint failed", e);
2032                brokerService.handleIOException(IOExceptionSupport.create(e));
2033            }
2034        }
2035    }
2036
2037    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2038        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
2039
2040        DataFile forwardsFile = journal.reserveDataFile();
2041        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2042        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2043
2044        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>();
2045
2046        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2047            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2048            compactionMarker.setSourceDataFileId(journalToRead);
2049            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2050
2051            ByteSequence payload = toByteSequence(compactionMarker);
2052            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2053            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2054
2055            final Location limit = new Location(journalToRead + 1, 0);
2056            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2057            while (nextLocation != null) {
2058                JournalCommand<?> command = null;
2059                try {
2060                    command = load(nextLocation);
2061                } catch (IOException ex) {
2062                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2063                }
2064
2065                if (command != null && command instanceof KahaRemoveMessageCommand) {
2066                    payload = toByteSequence(command);
2067                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2068                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2069                }
2070
2071                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2072            }
2073        }
2074
2075        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2076
2077        // Lock index while we update the ackMessageFileMap.
2078        indexLock.writeLock().lock();
2079
2080        // Update the ack map with the new locations of the acks
2081        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2082            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2083            if (referenceFileIds == null) {
2084                referenceFileIds = new HashSet<>();
2085                referenceFileIds.addAll(entry.getValue());
2086                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2087            } else {
2088                referenceFileIds.addAll(entry.getValue());
2089            }
2090        }
2091
2092        // remove the old location data from the ack map so that the old journal log file can
2093        // be removed on next GC.
2094        metadata.ackMessageFileMap.remove(journalToRead);
2095
2096        indexLock.writeLock().unlock();
2097
2098        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2099    }
2100
2101    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2102        //getNextLocation() can throw an IOException, we should handle it and set
2103        //nextLocation to null and abort gracefully
2104        //Should not happen in the normal case
2105        Location location = null;
2106        try {
2107            location = journal.getNextLocation(nextLocation, limit);
2108        } catch (IOException e) {
2109            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2110            if (LOG.isDebugEnabled()) {
2111                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2112            }
2113        }
2114        return location;
2115    }
2116
2117    final Runnable nullCompletionCallback = new Runnable() {
2118        @Override
2119        public void run() {
2120        }
2121    };
2122
2123    private Location checkpointProducerAudit() throws IOException {
2124        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2125            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2126            ObjectOutputStream oout = new ObjectOutputStream(baos);
2127            oout.writeObject(metadata.producerSequenceIdTracker);
2128            oout.flush();
2129            oout.close();
2130            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2131            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2132            try {
2133                location.getLatch().await();
2134            } catch (InterruptedException e) {
2135                throw new InterruptedIOException(e.toString());
2136            }
2137            return location;
2138        }
2139        return metadata.producerSequenceIdTrackerLocation;
2140    }
2141
2142    private Location checkpointAckMessageFileMap() throws IOException {
2143        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2144        ObjectOutputStream oout = new ObjectOutputStream(baos);
2145        oout.writeObject(metadata.ackMessageFileMap);
2146        oout.flush();
2147        oout.close();
2148        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2149        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2150        try {
2151            location.getLatch().await();
2152        } catch (InterruptedException e) {
2153            throw new InterruptedIOException(e.toString());
2154        }
2155        return location;
2156    }
2157
2158    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2159
2160        ByteSequence sequence = toByteSequence(subscription);
2161        Location location = journal.write(sequence, nullCompletionCallback) ;
2162
2163        try {
2164            location.getLatch().await();
2165        } catch (InterruptedException e) {
2166            throw new InterruptedIOException(e.toString());
2167        }
2168        return location;
2169    }
2170
2171    public HashSet<Integer> getJournalFilesBeingReplicated() {
2172        return journalFilesBeingReplicated;
2173    }
2174
2175    // /////////////////////////////////////////////////////////////////
2176    // StoredDestination related implementation methods.
2177    // /////////////////////////////////////////////////////////////////
2178
2179    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>();
2180
2181    static class MessageKeys {
2182        final String messageId;
2183        final Location location;
2184
2185        public MessageKeys(String messageId, Location location) {
2186            this.messageId=messageId;
2187            this.location=location;
2188        }
2189
2190        @Override
2191        public String toString() {
2192            return "["+messageId+","+location+"]";
2193        }
2194    }
2195
2196    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2197        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2198
2199        @Override
2200        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2201            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2202        }
2203
2204        @Override
2205        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2206            dataOut.writeUTF(object.messageId);
2207            locationSizeMarshaller.writePayload(object.location, dataOut);
2208        }
2209    }
2210
2211    class LastAck {
2212        long lastAckedSequence;
2213        byte priority;
2214
2215        public LastAck(LastAck source) {
2216            this.lastAckedSequence = source.lastAckedSequence;
2217            this.priority = source.priority;
2218        }
2219
2220        public LastAck() {
2221            this.priority = MessageOrderIndex.HI;
2222        }
2223
2224        public LastAck(long ackLocation) {
2225            this.lastAckedSequence = ackLocation;
2226            this.priority = MessageOrderIndex.LO;
2227        }
2228
2229        public LastAck(long ackLocation, byte priority) {
2230            this.lastAckedSequence = ackLocation;
2231            this.priority = priority;
2232        }
2233
2234        @Override
2235        public String toString() {
2236            return "[" + lastAckedSequence + ":" + priority + "]";
2237        }
2238    }
2239
2240    protected class LastAckMarshaller implements Marshaller<LastAck> {
2241
2242        @Override
2243        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2244            dataOut.writeLong(object.lastAckedSequence);
2245            dataOut.writeByte(object.priority);
2246        }
2247
2248        @Override
2249        public LastAck readPayload(DataInput dataIn) throws IOException {
2250            LastAck lastAcked = new LastAck();
2251            lastAcked.lastAckedSequence = dataIn.readLong();
2252            if (metadata.version >= 3) {
2253                lastAcked.priority = dataIn.readByte();
2254            }
2255            return lastAcked;
2256        }
2257
2258        @Override
2259        public int getFixedSize() {
2260            return 9;
2261        }
2262
2263        @Override
2264        public LastAck deepCopy(LastAck source) {
2265            return new LastAck(source);
2266        }
2267
2268        @Override
2269        public boolean isDeepCopySupported() {
2270            return true;
2271        }
2272    }
2273
2274    class StoredDestination {
2275
2276        MessageOrderIndex orderIndex = new MessageOrderIndex();
2277        BTreeIndex<Location, Long> locationIndex;
2278        BTreeIndex<String, Long> messageIdIndex;
2279
2280        // These bits are only set for Topics
2281        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2282        BTreeIndex<String, LastAck> subscriptionAcks;
2283        HashMap<String, MessageOrderCursor> subscriptionCursors;
2284        ListIndex<String, SequenceSet> ackPositions;
2285        ListIndex<String, Location> subLocations;
2286
2287        // Transient data used to track which Messages are no longer needed.
2288        final TreeMap<Long, Long> messageReferences = new TreeMap<>();
2289        final HashSet<String> subscriptionCache = new LinkedHashSet<>();
2290
2291        public void trackPendingAdd(Long seq) {
2292            orderIndex.trackPendingAdd(seq);
2293        }
2294
2295        public void trackPendingAddComplete(Long seq) {
2296            orderIndex.trackPendingAddComplete(seq);
2297        }
2298
2299        @Override
2300        public String toString() {
2301            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2302        }
2303    }
2304
2305    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2306
2307        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2308
2309        @Override
2310        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2311            final StoredDestination value = new StoredDestination();
2312            value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2313            value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2314            value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2315
2316            if (dataIn.readBoolean()) {
2317                value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong());
2318                value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong());
2319                if (metadata.version >= 4) {
2320                    value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong());
2321                } else {
2322                    // upgrade
2323                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2324                        @Override
2325                        public void execute(Transaction tx) throws IOException {
2326                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>();
2327
2328                            if (metadata.version >= 3) {
2329                                // migrate
2330                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2331                                        new BTreeIndex<>(pageFile, dataIn.readLong());
2332                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2333                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2334                                oldAckPositions.load(tx);
2335
2336
2337                                // Do the initial build of the data in memory before writing into the store
2338                                // based Ack Positions List to avoid a lot of disk thrashing.
2339                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2340                                while (iterator.hasNext()) {
2341                                    Entry<Long, HashSet<String>> entry = iterator.next();
2342
2343                                    for(String subKey : entry.getValue()) {
2344                                        SequenceSet pendingAcks = temp.get(subKey);
2345                                        if (pendingAcks == null) {
2346                                            pendingAcks = new SequenceSet();
2347                                            temp.put(subKey, pendingAcks);
2348                                        }
2349
2350                                        pendingAcks.add(entry.getKey());
2351                                    }
2352                                }
2353                            }
2354                            // Now move the pending messages to ack data into the store backed
2355                            // structure.
2356                            value.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2357                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2358                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2359                            value.ackPositions.load(tx);
2360                            for(String subscriptionKey : temp.keySet()) {
2361                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2362                            }
2363
2364                        }
2365                    });
2366                }
2367
2368                if (metadata.version >= 5) {
2369                    value.subLocations = new ListIndex<>(pageFile, dataIn.readLong());
2370                } else {
2371                    // upgrade
2372                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2373                        @Override
2374                        public void execute(Transaction tx) throws IOException {
2375                            value.subLocations = new ListIndex<>(pageFile, tx.allocate());
2376                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2377                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2378                            value.subLocations.load(tx);
2379                        }
2380                    });
2381                }
2382            }
2383            if (metadata.version >= 2) {
2384                value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2385                value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2386            } else {
2387                // upgrade
2388                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2389                    @Override
2390                    public void execute(Transaction tx) throws IOException {
2391                        value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2392                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2393                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2394                        value.orderIndex.lowPriorityIndex.load(tx);
2395
2396                        value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2397                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2398                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2399                        value.orderIndex.highPriorityIndex.load(tx);
2400                    }
2401                });
2402            }
2403
2404            return value;
2405        }
2406
2407        @Override
2408        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2409            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2410            dataOut.writeLong(value.locationIndex.getPageId());
2411            dataOut.writeLong(value.messageIdIndex.getPageId());
2412            if (value.subscriptions != null) {
2413                dataOut.writeBoolean(true);
2414                dataOut.writeLong(value.subscriptions.getPageId());
2415                dataOut.writeLong(value.subscriptionAcks.getPageId());
2416                dataOut.writeLong(value.ackPositions.getHeadPageId());
2417                dataOut.writeLong(value.subLocations.getHeadPageId());
2418            } else {
2419                dataOut.writeBoolean(false);
2420            }
2421            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2422            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2423        }
2424    }
2425
2426    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2427        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2428
2429        @Override
2430        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2431            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2432            rc.mergeFramed((InputStream)dataIn);
2433            return rc;
2434        }
2435
2436        @Override
2437        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2438            object.writeFramed((OutputStream)dataOut);
2439        }
2440    }
2441
2442    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2443        String key = key(destination);
2444        StoredDestination rc = storedDestinations.get(key);
2445        if (rc == null) {
2446            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2447            rc = loadStoredDestination(tx, key, topic);
2448            // Cache it. We may want to remove/unload destinations from the
2449            // cache that are not used for a while
2450            // to reduce memory usage.
2451            storedDestinations.put(key, rc);
2452        }
2453        return rc;
2454    }
2455
2456    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2457        String key = key(destination);
2458        StoredDestination rc = storedDestinations.get(key);
2459        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2460            rc = getStoredDestination(destination, tx);
2461        }
2462        return rc;
2463    }
2464
2465    /**
2466     * @param tx
2467     * @param key
2468     * @param topic
2469     * @return
2470     * @throws IOException
2471     */
2472    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2473        // Try to load the existing indexes..
2474        StoredDestination rc = metadata.destinations.get(tx, key);
2475        if (rc == null) {
2476            // Brand new destination.. allocate indexes for it.
2477            rc = new StoredDestination();
2478            rc.orderIndex.allocate(tx);
2479            rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate());
2480            rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate());
2481
2482            if (topic) {
2483                rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate());
2484                rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate());
2485                rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2486                rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
2487            }
2488            metadata.destinations.put(tx, key, rc);
2489        }
2490
2491        // Configure the marshalers and load.
2492        rc.orderIndex.load(tx);
2493
2494        // Figure out the next key using the last entry in the destination.
2495        rc.orderIndex.configureLast(tx);
2496
2497        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2498        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2499        rc.locationIndex.load(tx);
2500
2501        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2502        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2503        rc.messageIdIndex.load(tx);
2504
2505        //go through an upgrade old index if older than version 6
2506        if (metadata.version < 6) {
2507            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2508                Entry<Location, Long> entry = iterator.next();
2509                // modify so it is upgraded
2510                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2511            }
2512            //upgrade the order index
2513            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2514                Entry<Long, MessageKeys> entry = iterator.next();
2515                //call get so that the last priority is updated
2516                rc.orderIndex.get(tx, entry.getKey());
2517                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2518            }
2519        }
2520
2521        // If it was a topic...
2522        if (topic) {
2523
2524            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2525            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2526            rc.subscriptions.load(tx);
2527
2528            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2529            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2530            rc.subscriptionAcks.load(tx);
2531
2532            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2533            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2534            rc.ackPositions.load(tx);
2535
2536            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2537            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2538            rc.subLocations.load(tx);
2539
2540            rc.subscriptionCursors = new HashMap<>();
2541
2542            if (metadata.version < 3) {
2543
2544                // on upgrade need to fill ackLocation with available messages past last ack
2545                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2546                    Entry<String, LastAck> entry = iterator.next();
2547                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2548                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2549                        Long sequence = orderIterator.next().getKey();
2550                        addAckLocation(tx, rc, sequence, entry.getKey());
2551                    }
2552                    // modify so it is upgraded
2553                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2554                }
2555            }
2556
2557            // Configure the message references index
2558            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2559            while (subscriptions.hasNext()) {
2560                Entry<String, SequenceSet> subscription = subscriptions.next();
2561                SequenceSet pendingAcks = subscription.getValue();
2562                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2563                    Long lastPendingAck = pendingAcks.getTail().getLast();
2564                    for (Long sequenceId : pendingAcks) {
2565                        Long current = rc.messageReferences.get(sequenceId);
2566                        if (current == null) {
2567                            current = new Long(0);
2568                        }
2569
2570                        // We always add a trailing empty entry for the next position to start from
2571                        // so we need to ensure we don't count that as a message reference on reload.
2572                        if (!sequenceId.equals(lastPendingAck)) {
2573                            current = current.longValue() + 1;
2574                        } else {
2575                            current = Long.valueOf(0L);
2576                        }
2577
2578                        rc.messageReferences.put(sequenceId, current);
2579                    }
2580                }
2581            }
2582
2583            // Configure the subscription cache
2584            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2585                Entry<String, LastAck> entry = iterator.next();
2586                rc.subscriptionCache.add(entry.getKey());
2587            }
2588
2589            if (rc.orderIndex.nextMessageId == 0) {
2590                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2591                if (!rc.subscriptionAcks.isEmpty(tx)) {
2592                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2593                        Entry<String, LastAck> entry = iterator.next();
2594                        rc.orderIndex.nextMessageId =
2595                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2596                    }
2597                }
2598            } else {
2599                // update based on ackPositions for unmatched, last entry is always the next
2600                if (!rc.messageReferences.isEmpty()) {
2601                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2602                    rc.orderIndex.nextMessageId =
2603                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2604                }
2605            }
2606        }
2607
2608        if (metadata.version < VERSION) {
2609            // store again after upgrade
2610            metadata.destinations.put(tx, key, rc);
2611        }
2612        return rc;
2613    }
2614
2615    /**
2616     * Clear the counter for the destination, if one exists.
2617     *
2618     * @param kahaDestination
2619     */
2620    protected void clearStoreStats(KahaDestination kahaDestination) {
2621        String key = key(kahaDestination);
2622        MessageStoreStatistics storeStats = getStoreStats(key);
2623        MessageStoreSubscriptionStatistics subStats = getSubStats(key);
2624        if (storeStats != null) {
2625            storeStats.reset();
2626        }
2627        if (subStats != null) {
2628            subStats.reset();
2629        }
2630    }
2631
2632    /**
2633     * Update MessageStoreStatistics
2634     *
2635     * @param kahaDestination
2636     * @param size
2637     */
2638    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2639        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2640    }
2641
2642    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2643        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2644        if (storeStats != null) {
2645            storeStats.getMessageCount().increment();
2646            if (size > 0) {
2647                storeStats.getMessageSize().addSize(size);
2648            }
2649        }
2650    }
2651
2652    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2653        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2654    }
2655
2656    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2657        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2658        if (storeStats != null) {
2659            storeStats.getMessageCount().decrement();
2660            if (size > 0) {
2661                storeStats.getMessageSize().addSize(-size);
2662            }
2663        }
2664    }
2665
2666    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2667        incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size);
2668    }
2669
2670    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2671        if (enableSubscriptionStatistics) {
2672            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2673            if (subStats != null && subKey != null) {
2674                subStats.getMessageCount(subKey).increment();
2675                if (size > 0) {
2676                    subStats.getMessageSize(subKey).addSize(size);
2677                }
2678            }
2679        }
2680    }
2681
2682
2683    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2684        if (enableSubscriptionStatistics) {
2685            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2686            if (subStats != null && subKey != null) {
2687                subStats.getMessageCount(subKey).decrement();
2688                if (size > 0) {
2689                    subStats.getMessageSize(subKey).addSize(-size);
2690                }
2691            }
2692        }
2693    }
2694
2695    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2696        decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size);
2697    }
2698
2699    /**
2700     * This is a map to cache MessageStores for a specific
2701     * KahaDestination key
2702     */
2703    protected final ConcurrentMap<String, MessageStore> storeCache =
2704            new ConcurrentHashMap<>();
2705
2706    /**
2707     * Locate the storeMessageSize counter for this KahaDestination
2708     */
2709    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2710        MessageStoreStatistics storeStats = null;
2711        try {
2712            MessageStore messageStore = storeCache.get(kahaDestKey);
2713            if (messageStore != null) {
2714                storeStats = messageStore.getMessageStoreStatistics();
2715            }
2716        } catch (Exception e1) {
2717             LOG.error("Getting size counter of destination failed", e1);
2718        }
2719
2720        return storeStats;
2721    }
2722
2723    protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) {
2724        MessageStoreSubscriptionStatistics subStats = null;
2725        try {
2726            MessageStore messageStore = storeCache.get(kahaDestKey);
2727            if (messageStore instanceof TopicMessageStore) {
2728                subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics();
2729            }
2730        } catch (Exception e1) {
2731             LOG.error("Getting size counter of destination failed", e1);
2732        }
2733
2734        return subStats;
2735    }
2736
2737    /**
2738     * Determine whether this Destination matches the DestinationType
2739     *
2740     * @param destination
2741     * @param type
2742     * @return
2743     */
2744    protected boolean matchType(Destination destination,
2745            KahaDestination.DestinationType type) {
2746        if (destination instanceof Topic
2747                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2748            return true;
2749        } else if (destination instanceof Queue
2750                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2751            return true;
2752        }
2753        return false;
2754    }
2755
2756    class LocationSizeMarshaller implements Marshaller<Location> {
2757
2758        public LocationSizeMarshaller() {
2759
2760        }
2761
2762        @Override
2763        public Location readPayload(DataInput dataIn) throws IOException {
2764            Location rc = new Location();
2765            rc.setDataFileId(dataIn.readInt());
2766            rc.setOffset(dataIn.readInt());
2767            if (metadata.version >= 6) {
2768                rc.setSize(dataIn.readInt());
2769            }
2770            return rc;
2771        }
2772
2773        @Override
2774        public void writePayload(Location object, DataOutput dataOut)
2775                throws IOException {
2776            dataOut.writeInt(object.getDataFileId());
2777            dataOut.writeInt(object.getOffset());
2778            dataOut.writeInt(object.getSize());
2779        }
2780
2781        @Override
2782        public int getFixedSize() {
2783            return 12;
2784        }
2785
2786        @Override
2787        public Location deepCopy(Location source) {
2788            return new Location(source);
2789        }
2790
2791        @Override
2792        public boolean isDeepCopySupported() {
2793            return true;
2794        }
2795    }
2796
2797    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2798        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2799        if (sequences == null) {
2800            sequences = new SequenceSet();
2801            sequences.add(messageSequence);
2802            sd.ackPositions.add(tx, subscriptionKey, sequences);
2803        } else {
2804            sequences.add(messageSequence);
2805            sd.ackPositions.put(tx, subscriptionKey, sequences);
2806        }
2807
2808        Long count = sd.messageReferences.get(messageSequence);
2809        if (count == null) {
2810            count = Long.valueOf(0L);
2811        }
2812        count = count.longValue() + 1;
2813        sd.messageReferences.put(messageSequence, count);
2814    }
2815
2816    // new sub is interested in potentially all existing messages
2817    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2818        SequenceSet allOutstanding = new SequenceSet();
2819        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2820        while (iterator.hasNext()) {
2821            SequenceSet set = iterator.next().getValue();
2822            for (Long entry : set) {
2823                allOutstanding.add(entry);
2824            }
2825        }
2826        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2827
2828        for (Long ackPosition : allOutstanding) {
2829            Long count = sd.messageReferences.get(ackPosition);
2830
2831            // There might not be a reference if the ackLocation was the last
2832            // one which is a placeholder for the next incoming message and
2833            // no value was added to the message references table.
2834            if (count != null) {
2835                count = count.longValue() + 1;
2836                sd.messageReferences.put(ackPosition, count);
2837            }
2838        }
2839    }
2840
2841    // on a new message add, all existing subs are interested in this message
2842    private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest,
2843            StoredDestination sd, Long messageSequence) throws IOException {
2844        for(String subscriptionKey : sd.subscriptionCache) {
2845            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2846            if (sequences == null) {
2847                sequences = new SequenceSet();
2848                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2849                sd.ackPositions.add(tx, subscriptionKey, sequences);
2850            } else {
2851                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2852                sd.ackPositions.put(tx, subscriptionKey, sequences);
2853            }
2854
2855            MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2856            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
2857                    key.location.getSize());
2858
2859            Long count = sd.messageReferences.get(messageSequence);
2860            if (count == null) {
2861                count = Long.valueOf(0L);
2862            }
2863            count = count.longValue() + 1;
2864            sd.messageReferences.put(messageSequence, count);
2865            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2866        }
2867    }
2868
2869    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2870            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2871        if (!sd.ackPositions.isEmpty(tx)) {
2872            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2873            if (sequences == null || sequences.isEmpty()) {
2874                return;
2875            }
2876
2877            ArrayList<Long> unreferenced = new ArrayList<>();
2878
2879            for(Long sequenceId : sequences) {
2880                Long references = sd.messageReferences.get(sequenceId);
2881                if (references != null) {
2882                    references = references.longValue() - 1;
2883
2884                    if (references.longValue() > 0) {
2885                        sd.messageReferences.put(sequenceId, references);
2886                    } else {
2887                        sd.messageReferences.remove(sequenceId);
2888                        unreferenced.add(sequenceId);
2889                    }
2890                }
2891            }
2892
2893            for(Long sequenceId : unreferenced) {
2894                // Find all the entries that need to get deleted.
2895                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2896                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2897
2898                // Do the actual deletes.
2899                for (Entry<Long, MessageKeys> entry : deletes) {
2900                    sd.locationIndex.remove(tx, entry.getValue().location);
2901                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2902                    sd.orderIndex.remove(tx, entry.getKey());
2903                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2904                }
2905            }
2906        }
2907    }
2908
2909    /**
2910     * @param tx
2911     * @param sd
2912     * @param subscriptionKey
2913     * @param messageSequence
2914     * @throws IOException
2915     */
2916    private void removeAckLocation(KahaRemoveMessageCommand command,
2917            Transaction tx, StoredDestination sd, String subscriptionKey,
2918            Long messageSequence) throws IOException {
2919        // Remove the sub from the previous location set..
2920        if (messageSequence != null) {
2921            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2922            if (range != null && !range.isEmpty()) {
2923                range.remove(messageSequence);
2924                if (!range.isEmpty()) {
2925                    sd.ackPositions.put(tx, subscriptionKey, range);
2926                } else {
2927                    sd.ackPositions.remove(tx, subscriptionKey);
2928                }
2929
2930                MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2931                decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
2932                        key.location.getSize());
2933
2934                // Check if the message is reference by any other subscription.
2935                Long count = sd.messageReferences.get(messageSequence);
2936                if (count != null) {
2937                    long references = count.longValue() - 1;
2938                    if (references > 0) {
2939                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2940                        return;
2941                    } else {
2942                        sd.messageReferences.remove(messageSequence);
2943                    }
2944                }
2945
2946                // Find all the entries that need to get deleted.
2947                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2948                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2949
2950                // Do the actual deletes.
2951                for (Entry<Long, MessageKeys> entry : deletes) {
2952                    sd.locationIndex.remove(tx, entry.getValue().location);
2953                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2954                    sd.orderIndex.remove(tx, entry.getKey());
2955                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2956                }
2957            }
2958        }
2959    }
2960
2961    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2962        return sd.subscriptionAcks.get(tx, subscriptionKey);
2963    }
2964
2965    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2966        if (sd.ackPositions != null) {
2967            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2968            if (messageSequences != null) {
2969                long result = messageSequences.rangeSize();
2970                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2971                return result > 0 ? result - 1 : 0;
2972            }
2973        }
2974
2975        return 0;
2976    }
2977
2978    protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2979        long locationSize = 0;
2980
2981        if (sd.ackPositions != null) {
2982            //grab the messages attached to this subscription
2983            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2984
2985            if (messageSequences != null) {
2986                Sequence head = messageSequences.getHead();
2987                if (head != null) {
2988                    //get an iterator over the order index starting at the first unacked message
2989                    //and go over each message to add up the size
2990                    Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
2991                            new MessageOrderCursor(head.getFirst()));
2992
2993                    while (iterator.hasNext()) {
2994                        Entry<Long, MessageKeys> entry = iterator.next();
2995                        locationSize += entry.getValue().location.getSize();
2996                    }
2997                }
2998            }
2999        }
3000
3001        return locationSize;
3002    }
3003
3004    protected String key(KahaDestination destination) {
3005        return destination.getType().getNumber() + ":" + destination.getName();
3006    }
3007
3008    // /////////////////////////////////////////////////////////////////
3009    // Transaction related implementation methods.
3010    // /////////////////////////////////////////////////////////////////
3011    @SuppressWarnings("rawtypes")
3012    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
3013    @SuppressWarnings("rawtypes")
3014    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
3015    protected final Set<String> ackedAndPrepared = new HashSet<>();
3016    protected final Set<String> rolledBackAcks = new HashSet<>();
3017
3018    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
3019    // till then they are skipped by the store.
3020    // 'at most once' XA guarantee
3021    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
3022        this.indexLock.writeLock().lock();
3023        try {
3024            for (MessageAck ack : acks) {
3025                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
3026            }
3027        } finally {
3028            this.indexLock.writeLock().unlock();
3029        }
3030    }
3031
3032    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
3033        if (acks != null) {
3034            this.indexLock.writeLock().lock();
3035            try {
3036                for (MessageAck ack : acks) {
3037                    final String id = ack.getLastMessageId().toProducerKey();
3038                    ackedAndPrepared.remove(id);
3039                    if (rollback) {
3040                        rolledBackAcks.add(id);
3041                    }
3042                }
3043            } finally {
3044                this.indexLock.writeLock().unlock();
3045            }
3046        }
3047    }
3048
3049    @SuppressWarnings("rawtypes")
3050    private List<Operation> getInflightTx(KahaTransactionInfo info) {
3051        TransactionId key = TransactionIdConversion.convert(info);
3052        List<Operation> tx;
3053        synchronized (inflightTransactions) {
3054            tx = inflightTransactions.get(key);
3055            if (tx == null) {
3056                tx = Collections.synchronizedList(new ArrayList<Operation>());
3057                inflightTransactions.put(key, tx);
3058            }
3059        }
3060        return tx;
3061    }
3062
3063    @SuppressWarnings("unused")
3064    private TransactionId key(KahaTransactionInfo transactionInfo) {
3065        return TransactionIdConversion.convert(transactionInfo);
3066    }
3067
3068    abstract class Operation <T extends JournalCommand<T>> {
3069        final T command;
3070        final Location location;
3071
3072        public Operation(T command, Location location) {
3073            this.command = command;
3074            this.location = location;
3075        }
3076
3077        public Location getLocation() {
3078            return location;
3079        }
3080
3081        public T getCommand() {
3082            return command;
3083        }
3084
3085        abstract public void execute(Transaction tx) throws IOException;
3086    }
3087
3088    class AddOperation extends Operation<KahaAddMessageCommand> {
3089        final IndexAware runWithIndexLock;
3090        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
3091            super(command, location);
3092            this.runWithIndexLock = runWithIndexLock;
3093        }
3094
3095        @Override
3096        public void execute(Transaction tx) throws IOException {
3097            long seq = updateIndex(tx, command, location);
3098            if (runWithIndexLock != null) {
3099                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
3100            }
3101        }
3102    }
3103
3104    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
3105
3106        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
3107            super(command, location);
3108        }
3109
3110        @Override
3111        public void execute(Transaction tx) throws IOException {
3112            updateIndex(tx, command, location);
3113        }
3114    }
3115
3116    // /////////////////////////////////////////////////////////////////
3117    // Initialization related implementation methods.
3118    // /////////////////////////////////////////////////////////////////
3119
3120    private PageFile createPageFile() throws IOException {
3121        if (indexDirectory == null) {
3122            indexDirectory = directory;
3123        }
3124        IOHelper.mkdirs(indexDirectory);
3125        PageFile index = new PageFile(indexDirectory, "db");
3126        index.setEnableWriteThread(isEnableIndexWriteAsync());
3127        index.setWriteBatchSize(getIndexWriteBatchSize());
3128        index.setPageCacheSize(indexCacheSize);
3129        index.setUseLFRUEviction(isUseIndexLFRUEviction());
3130        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
3131        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
3132        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
3133        index.setEnablePageCaching(isEnableIndexPageCaching());
3134        return index;
3135    }
3136
3137    private Journal createJournal() throws IOException {
3138        Journal manager = new Journal();
3139        manager.setDirectory(directory);
3140        manager.setMaxFileLength(getJournalMaxFileLength());
3141        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
3142        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
3143        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
3144        manager.setArchiveDataLogs(isArchiveDataLogs());
3145        manager.setSizeAccumulator(journalSize);
3146        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
3147        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
3148        manager.setPreallocationStrategy(
3149                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
3150        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
3151        if (getDirectoryArchive() != null) {
3152            IOHelper.mkdirs(getDirectoryArchive());
3153            manager.setDirectoryArchive(getDirectoryArchive());
3154        }
3155        return manager;
3156    }
3157
3158    private Metadata createMetadata() {
3159        Metadata md = new Metadata();
3160        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
3161        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
3162        return md;
3163    }
3164
3165    protected abstract void configureMetadata();
3166
3167    public int getJournalMaxWriteBatchSize() {
3168        return journalMaxWriteBatchSize;
3169    }
3170
3171    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
3172        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
3173    }
3174
3175    public File getDirectory() {
3176        return directory;
3177    }
3178
3179    public void setDirectory(File directory) {
3180        this.directory = directory;
3181    }
3182
3183    public boolean isDeleteAllMessages() {
3184        return deleteAllMessages;
3185    }
3186
3187    public void setDeleteAllMessages(boolean deleteAllMessages) {
3188        this.deleteAllMessages = deleteAllMessages;
3189    }
3190
3191    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
3192        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
3193    }
3194
3195    public int getIndexWriteBatchSize() {
3196        return setIndexWriteBatchSize;
3197    }
3198
3199    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
3200        this.enableIndexWriteAsync = enableIndexWriteAsync;
3201    }
3202
3203    boolean isEnableIndexWriteAsync() {
3204        return enableIndexWriteAsync;
3205    }
3206
3207    /**
3208     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
3209     * @return
3210     */
3211    @Deprecated
3212    public boolean isEnableJournalDiskSyncs() {
3213        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
3214    }
3215
3216    /**
3217     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
3218     * @param syncWrites
3219     */
3220    @Deprecated
3221    public void setEnableJournalDiskSyncs(boolean syncWrites) {
3222        if (syncWrites) {
3223            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
3224        } else {
3225            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
3226        }
3227    }
3228
3229    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
3230        return journalDiskSyncStrategy;
3231    }
3232
3233    public String getJournalDiskSyncStrategy() {
3234        return journalDiskSyncStrategy.name();
3235    }
3236
3237    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
3238        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
3239    }
3240
3241    public long getJournalDiskSyncInterval() {
3242        return journalDiskSyncInterval;
3243    }
3244
3245    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
3246        this.journalDiskSyncInterval = journalDiskSyncInterval;
3247    }
3248
3249    public long getCheckpointInterval() {
3250        return checkpointInterval;
3251    }
3252
3253    public void setCheckpointInterval(long checkpointInterval) {
3254        this.checkpointInterval = checkpointInterval;
3255    }
3256
3257    public long getCleanupInterval() {
3258        return cleanupInterval;
3259    }
3260
3261    public void setCleanupInterval(long cleanupInterval) {
3262        this.cleanupInterval = cleanupInterval;
3263    }
3264
3265    public void setJournalMaxFileLength(int journalMaxFileLength) {
3266        this.journalMaxFileLength = journalMaxFileLength;
3267    }
3268
3269    public int getJournalMaxFileLength() {
3270        return journalMaxFileLength;
3271    }
3272
3273    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3274        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3275    }
3276
3277    public int getMaxFailoverProducersToTrack() {
3278        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3279    }
3280
3281    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3282        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3283    }
3284
3285    public int getFailoverProducersAuditDepth() {
3286        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3287    }
3288
3289    public PageFile getPageFile() throws IOException {
3290        if (pageFile == null) {
3291            pageFile = createPageFile();
3292        }
3293        return pageFile;
3294    }
3295
3296    public Journal getJournal() throws IOException {
3297        if (journal == null) {
3298            journal = createJournal();
3299        }
3300        return journal;
3301    }
3302
3303    protected Metadata getMetadata() {
3304        return metadata;
3305    }
3306
3307    public boolean isFailIfDatabaseIsLocked() {
3308        return failIfDatabaseIsLocked;
3309    }
3310
3311    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3312        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3313    }
3314
3315    public boolean isIgnoreMissingJournalfiles() {
3316        return ignoreMissingJournalfiles;
3317    }
3318
3319    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3320        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3321    }
3322
3323    public int getIndexCacheSize() {
3324        return indexCacheSize;
3325    }
3326
3327    public void setIndexCacheSize(int indexCacheSize) {
3328        this.indexCacheSize = indexCacheSize;
3329    }
3330
3331    public boolean isCheckForCorruptJournalFiles() {
3332        return checkForCorruptJournalFiles;
3333    }
3334
3335    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3336        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3337    }
3338
3339    public boolean isChecksumJournalFiles() {
3340        return checksumJournalFiles;
3341    }
3342
3343    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3344        this.checksumJournalFiles = checksumJournalFiles;
3345    }
3346
3347    @Override
3348    public void setBrokerService(BrokerService brokerService) {
3349        this.brokerService = brokerService;
3350    }
3351
3352    /**
3353     * @return the archiveDataLogs
3354     */
3355    public boolean isArchiveDataLogs() {
3356        return this.archiveDataLogs;
3357    }
3358
3359    /**
3360     * @param archiveDataLogs the archiveDataLogs to set
3361     */
3362    public void setArchiveDataLogs(boolean archiveDataLogs) {
3363        this.archiveDataLogs = archiveDataLogs;
3364    }
3365
3366    /**
3367     * @return the directoryArchive
3368     */
3369    public File getDirectoryArchive() {
3370        return this.directoryArchive;
3371    }
3372
3373    /**
3374     * @param directoryArchive the directoryArchive to set
3375     */
3376    public void setDirectoryArchive(File directoryArchive) {
3377        this.directoryArchive = directoryArchive;
3378    }
3379
3380    public boolean isArchiveCorruptedIndex() {
3381        return archiveCorruptedIndex;
3382    }
3383
3384    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3385        this.archiveCorruptedIndex = archiveCorruptedIndex;
3386    }
3387
3388    public float getIndexLFUEvictionFactor() {
3389        return indexLFUEvictionFactor;
3390    }
3391
3392    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3393        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3394    }
3395
3396    public boolean isUseIndexLFRUEviction() {
3397        return useIndexLFRUEviction;
3398    }
3399
3400    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3401        this.useIndexLFRUEviction = useIndexLFRUEviction;
3402    }
3403
3404    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3405        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3406    }
3407
3408    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3409        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3410    }
3411
3412    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3413        this.enableIndexPageCaching = enableIndexPageCaching;
3414    }
3415
3416    public boolean isEnableIndexDiskSyncs() {
3417        return enableIndexDiskSyncs;
3418    }
3419
3420    public boolean isEnableIndexRecoveryFile() {
3421        return enableIndexRecoveryFile;
3422    }
3423
3424    public boolean isEnableIndexPageCaching() {
3425        return enableIndexPageCaching;
3426    }
3427
3428    // /////////////////////////////////////////////////////////////////
3429    // Internal conversion methods.
3430    // /////////////////////////////////////////////////////////////////
3431
3432    class MessageOrderCursor{
3433        long defaultCursorPosition;
3434        long lowPriorityCursorPosition;
3435        long highPriorityCursorPosition;
3436        MessageOrderCursor(){
3437        }
3438
3439        MessageOrderCursor(long position){
3440            this.defaultCursorPosition=position;
3441            this.lowPriorityCursorPosition=position;
3442            this.highPriorityCursorPosition=position;
3443        }
3444
3445        MessageOrderCursor(MessageOrderCursor other){
3446            this.defaultCursorPosition=other.defaultCursorPosition;
3447            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3448            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3449        }
3450
3451        MessageOrderCursor copy() {
3452            return new MessageOrderCursor(this);
3453        }
3454
3455        void reset() {
3456            this.defaultCursorPosition=0;
3457            this.highPriorityCursorPosition=0;
3458            this.lowPriorityCursorPosition=0;
3459        }
3460
3461        void increment() {
3462            if (defaultCursorPosition!=0) {
3463                defaultCursorPosition++;
3464            }
3465            if (highPriorityCursorPosition!=0) {
3466                highPriorityCursorPosition++;
3467            }
3468            if (lowPriorityCursorPosition!=0) {
3469                lowPriorityCursorPosition++;
3470            }
3471        }
3472
3473        @Override
3474        public String toString() {
3475           return "MessageOrderCursor:[def:" + defaultCursorPosition
3476                   + ", low:" + lowPriorityCursorPosition
3477                   + ", high:" +  highPriorityCursorPosition + "]";
3478        }
3479
3480        public void sync(MessageOrderCursor other) {
3481            this.defaultCursorPosition=other.defaultCursorPosition;
3482            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3483            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3484        }
3485    }
3486
3487    class MessageOrderIndex {
3488        static final byte HI = 9;
3489        static final byte LO = 0;
3490        static final byte DEF = 4;
3491
3492        long nextMessageId;
3493        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3494        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3495        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3496        final MessageOrderCursor cursor = new MessageOrderCursor();
3497        Long lastDefaultKey;
3498        Long lastHighKey;
3499        Long lastLowKey;
3500        byte lastGetPriority;
3501        final List<Long> pendingAdditions = new LinkedList<>();
3502        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3503
3504        MessageKeys remove(Transaction tx, Long key) throws IOException {
3505            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3506            if (result == null && highPriorityIndex!=null) {
3507                result = highPriorityIndex.remove(tx, key);
3508                if (result ==null && lowPriorityIndex!=null) {
3509                    result = lowPriorityIndex.remove(tx, key);
3510                }
3511            }
3512            return result;
3513        }
3514
3515        void load(Transaction tx) throws IOException {
3516            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3517            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3518            defaultPriorityIndex.load(tx);
3519            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3520            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3521            lowPriorityIndex.load(tx);
3522            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3523            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3524            highPriorityIndex.load(tx);
3525        }
3526
3527        void allocate(Transaction tx) throws IOException {
3528            defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3529            if (metadata.version >= 2) {
3530                lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3531                highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3532            }
3533        }
3534
3535        void configureLast(Transaction tx) throws IOException {
3536            // Figure out the next key using the last entry in the destination.
3537            TreeSet<Long> orderedSet = new TreeSet<>();
3538
3539            addLast(orderedSet, highPriorityIndex, tx);
3540            addLast(orderedSet, defaultPriorityIndex, tx);
3541            addLast(orderedSet, lowPriorityIndex, tx);
3542
3543            if (!orderedSet.isEmpty()) {
3544                nextMessageId = orderedSet.last() + 1;
3545            }
3546        }
3547
3548        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3549            if (index != null) {
3550                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3551                if (lastEntry != null) {
3552                    orderedSet.add(lastEntry.getKey());
3553                }
3554            }
3555        }
3556
3557        void clear(Transaction tx) throws IOException {
3558            this.remove(tx);
3559            this.resetCursorPosition();
3560            this.allocate(tx);
3561            this.load(tx);
3562            this.configureLast(tx);
3563        }
3564
3565        void remove(Transaction tx) throws IOException {
3566            defaultPriorityIndex.clear(tx);
3567            defaultPriorityIndex.unload(tx);
3568            tx.free(defaultPriorityIndex.getPageId());
3569            if (lowPriorityIndex != null) {
3570                lowPriorityIndex.clear(tx);
3571                lowPriorityIndex.unload(tx);
3572
3573                tx.free(lowPriorityIndex.getPageId());
3574            }
3575            if (highPriorityIndex != null) {
3576                highPriorityIndex.clear(tx);
3577                highPriorityIndex.unload(tx);
3578                tx.free(highPriorityIndex.getPageId());
3579            }
3580        }
3581
3582        void resetCursorPosition() {
3583            this.cursor.reset();
3584            lastDefaultKey = null;
3585            lastHighKey = null;
3586            lastLowKey = null;
3587        }
3588
3589        void setBatch(Transaction tx, Long sequence) throws IOException {
3590            if (sequence != null) {
3591                Long nextPosition = new Long(sequence.longValue() + 1);
3592                lastDefaultKey = sequence;
3593                cursor.defaultCursorPosition = nextPosition.longValue();
3594                lastHighKey = sequence;
3595                cursor.highPriorityCursorPosition = nextPosition.longValue();
3596                lastLowKey = sequence;
3597                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3598            }
3599        }
3600
3601        void setBatch(Transaction tx, LastAck last) throws IOException {
3602            setBatch(tx, last.lastAckedSequence);
3603            if (cursor.defaultCursorPosition == 0
3604                    && cursor.highPriorityCursorPosition == 0
3605                    && cursor.lowPriorityCursorPosition == 0) {
3606                long next = last.lastAckedSequence + 1;
3607                switch (last.priority) {
3608                    case DEF:
3609                        cursor.defaultCursorPosition = next;
3610                        cursor.highPriorityCursorPosition = next;
3611                        break;
3612                    case HI:
3613                        cursor.highPriorityCursorPosition = next;
3614                        break;
3615                    case LO:
3616                        cursor.lowPriorityCursorPosition = next;
3617                        cursor.defaultCursorPosition = next;
3618                        cursor.highPriorityCursorPosition = next;
3619                        break;
3620                }
3621            }
3622        }
3623
3624        void stoppedIterating() {
3625            if (lastDefaultKey!=null) {
3626                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3627            }
3628            if (lastHighKey!=null) {
3629                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3630            }
3631            if (lastLowKey!=null) {
3632                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3633            }
3634            lastDefaultKey = null;
3635            lastHighKey = null;
3636            lastLowKey = null;
3637        }
3638
3639        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3640                throws IOException {
3641            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3642                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3643            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3644                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3645            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3646                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3647            }
3648        }
3649
3650        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3651                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3652
3653            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3654            deletes.add(iterator.next());
3655        }
3656
3657        long getNextMessageId() {
3658            return nextMessageId++;
3659        }
3660
3661        void revertNextMessageId() {
3662            nextMessageId--;
3663        }
3664
3665        MessageKeys get(Transaction tx, Long key) throws IOException {
3666            MessageKeys result = defaultPriorityIndex.get(tx, key);
3667            if (result == null) {
3668                result = highPriorityIndex.get(tx, key);
3669                if (result == null) {
3670                    result = lowPriorityIndex.get(tx, key);
3671                    lastGetPriority = LO;
3672                } else {
3673                    lastGetPriority = HI;
3674                }
3675            } else {
3676                lastGetPriority = DEF;
3677            }
3678            return result;
3679        }
3680
3681        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3682            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3683                return defaultPriorityIndex.put(tx, key, value);
3684            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3685                return highPriorityIndex.put(tx, key, value);
3686            } else {
3687                return lowPriorityIndex.put(tx, key, value);
3688            }
3689        }
3690
3691        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3692            return new MessageOrderIterator(tx,cursor,this);
3693        }
3694
3695        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3696            return new MessageOrderIterator(tx,m,this);
3697        }
3698
3699        public byte lastGetPriority() {
3700            return lastGetPriority;
3701        }
3702
3703        public boolean alreadyDispatched(Long sequence) {
3704            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3705                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3706                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3707        }
3708
3709        public void trackPendingAdd(Long seq) {
3710            synchronized (pendingAdditions) {
3711                pendingAdditions.add(seq);
3712            }
3713        }
3714
3715        public void trackPendingAddComplete(Long seq) {
3716            synchronized (pendingAdditions) {
3717                pendingAdditions.remove(seq);
3718            }
3719        }
3720
3721        public Long minPendingAdd() {
3722            synchronized (pendingAdditions) {
3723                if (!pendingAdditions.isEmpty()) {
3724                    return pendingAdditions.get(0);
3725                } else {
3726                    return null;
3727                }
3728            }
3729        }
3730
3731        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3732            Iterator<Entry<Long, MessageKeys>>currentIterator;
3733            final Iterator<Entry<Long, MessageKeys>>highIterator;
3734            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3735            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3736
3737            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3738                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3739                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3740                if (highPriorityIndex != null) {
3741                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3742                } else {
3743                    this.highIterator = null;
3744                }
3745                if (lowPriorityIndex != null) {
3746                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3747                } else {
3748                    this.lowIterator = null;
3749                }
3750            }
3751
3752            @Override
3753            public boolean hasNext() {
3754                if (currentIterator == null) {
3755                    if (highIterator != null) {
3756                        if (highIterator.hasNext()) {
3757                            currentIterator = highIterator;
3758                            return currentIterator.hasNext();
3759                        }
3760                        if (defaultIterator.hasNext()) {
3761                            currentIterator = defaultIterator;
3762                            return currentIterator.hasNext();
3763                        }
3764                        if (lowIterator.hasNext()) {
3765                            currentIterator = lowIterator;
3766                            return currentIterator.hasNext();
3767                        }
3768                        return false;
3769                    } else {
3770                        currentIterator = defaultIterator;
3771                        return currentIterator.hasNext();
3772                    }
3773                }
3774                if (highIterator != null) {
3775                    if (currentIterator.hasNext()) {
3776                        return true;
3777                    }
3778                    if (currentIterator == highIterator) {
3779                        if (defaultIterator.hasNext()) {
3780                            currentIterator = defaultIterator;
3781                            return currentIterator.hasNext();
3782                        }
3783                        if (lowIterator.hasNext()) {
3784                            currentIterator = lowIterator;
3785                            return currentIterator.hasNext();
3786                        }
3787                        return false;
3788                    }
3789
3790                    if (currentIterator == defaultIterator) {
3791                        if (lowIterator.hasNext()) {
3792                            currentIterator = lowIterator;
3793                            return currentIterator.hasNext();
3794                        }
3795                        return false;
3796                    }
3797                }
3798                return currentIterator.hasNext();
3799            }
3800
3801            @Override
3802            public Entry<Long, MessageKeys> next() {
3803                Entry<Long, MessageKeys> result = currentIterator.next();
3804                if (result != null) {
3805                    Long key = result.getKey();
3806                    if (highIterator != null) {
3807                        if (currentIterator == defaultIterator) {
3808                            lastDefaultKey = key;
3809                        } else if (currentIterator == highIterator) {
3810                            lastHighKey = key;
3811                        } else {
3812                            lastLowKey = key;
3813                        }
3814                    } else {
3815                        lastDefaultKey = key;
3816                    }
3817                }
3818                return result;
3819            }
3820
3821            @Override
3822            public void remove() {
3823                throw new UnsupportedOperationException();
3824            }
3825        }
3826    }
3827
3828    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3829        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3830
3831        @Override
3832        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3833            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3834            ObjectOutputStream oout = new ObjectOutputStream(baos);
3835            oout.writeObject(object);
3836            oout.flush();
3837            oout.close();
3838            byte[] data = baos.toByteArray();
3839            dataOut.writeInt(data.length);
3840            dataOut.write(data);
3841        }
3842
3843        @Override
3844        @SuppressWarnings("unchecked")
3845        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3846            int dataLen = dataIn.readInt();
3847            byte[] data = new byte[dataLen];
3848            dataIn.readFully(data);
3849            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3850            ObjectInputStream oin = new ObjectInputStream(bais);
3851            try {
3852                return (HashSet<String>) oin.readObject();
3853            } catch (ClassNotFoundException cfe) {
3854                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3855                ioe.initCause(cfe);
3856                throw ioe;
3857            }
3858        }
3859    }
3860
3861    public File getIndexDirectory() {
3862        return indexDirectory;
3863    }
3864
3865    public void setIndexDirectory(File indexDirectory) {
3866        this.indexDirectory = indexDirectory;
3867    }
3868
3869    interface IndexAware {
3870        public void sequenceAssignedWithIndexLocked(long index);
3871    }
3872
3873    public String getPreallocationScope() {
3874        return preallocationScope;
3875    }
3876
3877    public void setPreallocationScope(String preallocationScope) {
3878        this.preallocationScope = preallocationScope;
3879    }
3880
3881    public String getPreallocationStrategy() {
3882        return preallocationStrategy;
3883    }
3884
3885    public void setPreallocationStrategy(String preallocationStrategy) {
3886        this.preallocationStrategy = preallocationStrategy;
3887    }
3888
3889    public int getCompactAcksAfterNoGC() {
3890        return compactAcksAfterNoGC;
3891    }
3892
3893    /**
3894     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3895     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3896     * <p>
3897     * A value of -1 will disable this feature.
3898     *
3899     * @param compactAcksAfterNoGC
3900     *      Number of empty GC cycles before we rewrite old ACKS.
3901     */
3902    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3903        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3904    }
3905
3906    /**
3907     * Returns whether Ack compaction will ignore that the store is still growing
3908     * and run more often.
3909     *
3910     * @return the compactAcksIgnoresStoreGrowth current value.
3911     */
3912    public boolean isCompactAcksIgnoresStoreGrowth() {
3913        return compactAcksIgnoresStoreGrowth;
3914    }
3915
3916    /**
3917     * Configure if Ack compaction will occur regardless of continued growth of the
3918     * journal logs meaning that the store has not run out of space yet.  Because the
3919     * compaction operation can be costly this value is defaulted to off and the Ack
3920     * compaction is only done when it seems that the store cannot grow and larger.
3921     *
3922     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3923     */
3924    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3925        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3926    }
3927
3928    /**
3929     * Returns whether Ack compaction is enabled
3930     *
3931     * @return enableAckCompaction
3932     */
3933    public boolean isEnableAckCompaction() {
3934        return enableAckCompaction;
3935    }
3936
3937    /**
3938     * Configure if the Ack compaction task should be enabled to run
3939     *
3940     * @param enableAckCompaction
3941     */
3942    public void setEnableAckCompaction(boolean enableAckCompaction) {
3943        this.enableAckCompaction = enableAckCompaction;
3944    }
3945
3946    /**
3947     * @return
3948     */
3949    public boolean isEnableSubscriptionStatistics() {
3950        return enableSubscriptionStatistics;
3951    }
3952
3953    /**
3954     * Enable caching statistics for each subscription to allow non-blocking
3955     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
3956     * of subscriptions.
3957     *
3958     * @param enableSubscriptionStatistics
3959     */
3960    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
3961        this.enableSubscriptionStatistics = enableSubscriptionStatistics;
3962    }
3963}