001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.scheduler;
018
019import org.apache.activemq.util.IOHelper;
020import org.apache.activemq.util.ServiceStopper;
021import org.apache.activemq.util.ServiceSupport;
022import org.apache.kahadb.index.BTreeIndex;
023import org.apache.kahadb.journal.Journal;
024import org.apache.kahadb.journal.Location;
025import org.apache.kahadb.page.Page;
026import org.apache.kahadb.page.PageFile;
027import org.apache.kahadb.page.Transaction;
028import org.apache.kahadb.util.ByteSequence;
029import org.apache.kahadb.util.IntegerMarshaller;
030import org.apache.kahadb.util.LockFile;
031import org.apache.kahadb.util.StringMarshaller;
032import org.apache.kahadb.util.VariableMarshaller;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import java.io.DataInput;
037import java.io.DataOutput;
038import java.io.File;
039import java.io.IOException;
040import java.util.ArrayList;
041import java.util.HashMap;
042import java.util.HashSet;
043import java.util.Iterator;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048
049public class JobSchedulerStore extends ServiceSupport {
050    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053    public static final int CLOSED_STATE = 1;
054    public static final int OPEN_STATE = 2;
055
056    private File directory;
057    PageFile pageFile;
058    private Journal journal;
059    private LockFile lockFile;
060    private boolean failIfDatabaseIsLocked;
061    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063    private boolean enableIndexWriteAsync = false;
064    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065    MetaData metaData = new MetaData(this);
066    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067    Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069    protected class MetaData {
070        protected MetaData(JobSchedulerStore store) {
071            this.store = store;
072        }
073        private final JobSchedulerStore store;
074        Page<MetaData> page;
075        BTreeIndex<Integer, Integer> journalRC;
076        BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078        void createIndexes(Transaction tx) throws IOException {
079            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081        }
082
083        void load(Transaction tx) throws IOException {
084            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086            this.storedSchedulers.load(tx);
087            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089            this.journalRC.load(tx);
090        }
091
092        void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093            for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                Entry<String, JobSchedulerImpl> entry = i.next();
095                entry.getValue().load(tx);
096                schedulers.put(entry.getKey(), entry.getValue());
097            }
098        }
099
100        public void read(DataInput is) throws IOException {
101            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107        }
108
109        public void write(DataOutput os) throws IOException {
110            os.writeLong(this.storedSchedulers.getPageId());
111            os.writeLong(this.journalRC.getPageId());
112
113        }
114    }
115
116    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117        private final JobSchedulerStore store;
118
119        MetaDataMarshaller(JobSchedulerStore store) {
120            this.store = store;
121        }
122        public MetaData readPayload(DataInput dataIn) throws IOException {
123            MetaData rc = new MetaData(this.store);
124            rc.read(dataIn);
125            return rc;
126        }
127
128        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
129            object.write(dataOut);
130        }
131    }
132
133    class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
134        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
135            List<JobLocation> result = new ArrayList<JobLocation>();
136            int size = dataIn.readInt();
137            for (int i = 0; i < size; i++) {
138                JobLocation jobLocation = new JobLocation();
139                jobLocation.readExternal(dataIn);
140                result.add(jobLocation);
141            }
142            return result;
143        }
144
145        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
146            dataOut.writeInt(value.size());
147            for (JobLocation jobLocation : value) {
148                jobLocation.writeExternal(dataOut);
149            }
150        }
151    }
152
153    class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
154        private final JobSchedulerStore store;
155        JobSchedulerMarshaller(JobSchedulerStore store) {
156            this.store = store;
157        }
158        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
159            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
160            result.read(dataIn);
161            return result;
162        }
163
164        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
165            js.write(dataOut);
166        }
167    }
168
169    public File getDirectory() {
170        return directory;
171    }
172
173    public void setDirectory(File directory) {
174        this.directory = directory;
175    }
176    
177    public long size() {
178        if ( !isStarted() ) {
179            return 0;
180        }
181        try {
182            return journal.getDiskSize() + pageFile.getDiskSize();
183        } catch (IOException e) {
184            throw new RuntimeException(e);
185        }
186    }
187
188    public JobScheduler getJobScheduler(final String name) throws Exception {
189        JobSchedulerImpl result = this.schedulers.get(name);
190        if (result == null) {
191            final JobSchedulerImpl js = new JobSchedulerImpl(this);
192            js.setName(name);
193            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
194                public void execute(Transaction tx) throws IOException {
195                    js.createIndexes(tx);
196                    js.load(tx);
197                    metaData.storedSchedulers.put(tx, name, js);
198                }
199            });
200            result = js;
201            this.schedulers.put(name, js);
202            if (isStarted()) {
203                result.start();
204            }
205            this.pageFile.flush();
206        }
207        return result;
208    }
209
210    synchronized public boolean removeJobScheduler(final String name) throws Exception {
211        boolean result = false;
212        final JobSchedulerImpl js = this.schedulers.remove(name);
213        result = js != null;
214        if (result) {
215            js.stop();
216            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
217                public void execute(Transaction tx) throws IOException {
218                    metaData.storedSchedulers.remove(tx, name);
219                    js.destroy(tx);
220                }
221            });
222        }
223        return result;
224    }
225
226    @Override
227    protected synchronized void doStart() throws Exception {
228        if (this.directory == null) {
229            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
230        }
231        IOHelper.mkdirs(this.directory);
232        lock();
233        this.journal = new Journal();
234        this.journal.setDirectory(directory);
235        this.journal.setMaxFileLength(getJournalMaxFileLength());
236        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
237        this.journal.start();
238        this.pageFile = new PageFile(directory, "scheduleDB");
239        this.pageFile.setWriteBatchSize(1);
240        this.pageFile.load();
241
242        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
243            public void execute(Transaction tx) throws IOException {
244                if (pageFile.getPageCount() == 0) {
245                    Page<MetaData> page = tx.allocate();
246                    assert page.getPageId() == 0;
247                    page.set(metaData);
248                    metaData.page = page;
249                    metaData.createIndexes(tx);
250                    tx.store(metaData.page, metaDataMarshaller, true);
251
252                } else {
253                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
254                    metaData = page.get();
255                    metaData.page = page;
256                }
257                metaData.load(tx);
258                metaData.loadScheduler(tx, schedulers);
259                for (JobSchedulerImpl js :schedulers.values()) {
260                    try {
261                        js.start();
262                    } catch (Exception e) {
263                        JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
264                    }
265               }
266            }
267        });
268
269        this.pageFile.flush();
270        LOG.info(this + " started");
271    }
272    
273    @Override
274    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
275        for (JobSchedulerImpl js : this.schedulers.values()) {
276            js.stop();
277        }
278        if (this.pageFile != null) {
279            this.pageFile.unload();
280        }
281        if (this.journal != null) {
282            journal.close();
283        }
284        if (this.lockFile != null) {
285            this.lockFile.unlock();
286        }
287        this.lockFile = null;
288        LOG.info(this + " stopped");
289
290    }
291
292    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
293        int logId = location.getDataFileId();
294        Integer val = this.metaData.journalRC.get(tx, logId);
295        int refCount = val != null ? val.intValue() + 1 : 1;
296        this.metaData.journalRC.put(tx, logId, refCount);
297
298    }
299
300    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
301        int logId = location.getDataFileId();
302        int refCount = this.metaData.journalRC.get(tx, logId);
303        refCount--;
304        if (refCount <= 0) {
305            this.metaData.journalRC.remove(tx, logId);
306            Set<Integer> set = new HashSet<Integer>();
307            set.add(logId);
308            this.journal.removeDataFiles(set);
309        } else {
310            this.metaData.journalRC.put(tx, logId, refCount);
311        }
312
313    }
314
315    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
316        ByteSequence result = null;
317        result = this.journal.read(location);
318        return result;
319    }
320
321    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
322        return this.journal.write(payload, sync);
323    }
324
325    private void lock() throws IOException {
326        if (lockFile == null) {
327            File lockFileName = new File(directory, "lock");
328            lockFile = new LockFile(lockFileName, true);
329            if (failIfDatabaseIsLocked) {
330                lockFile.lock();
331            } else {
332                while (true) {
333                    try {
334                        lockFile.lock();
335                        break;
336                    } catch (IOException e) {
337                        LOG.info("Database " + lockFileName + " is locked... waiting "
338                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
339                                + " seconds for the database to be unlocked. Reason: " + e);
340                        try {
341                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
342                        } catch (InterruptedException e1) {
343                        }
344                    }
345                }
346            }
347        }
348    }
349
350    PageFile getPageFile() {
351        this.pageFile.isLoaded();
352        return this.pageFile;
353    }
354
355    public boolean isFailIfDatabaseIsLocked() {
356        return failIfDatabaseIsLocked;
357    }
358
359    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
360        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
361    }
362
363    public int getJournalMaxFileLength() {
364        return journalMaxFileLength;
365    }
366
367    public void setJournalMaxFileLength(int journalMaxFileLength) {
368        this.journalMaxFileLength = journalMaxFileLength;
369    }
370
371    public int getJournalMaxWriteBatchSize() {
372        return journalMaxWriteBatchSize;
373    }
374
375    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
376        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
377    }
378
379    public boolean isEnableIndexWriteAsync() {
380        return enableIndexWriteAsync;
381    }
382
383    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
384        this.enableIndexWriteAsync = enableIndexWriteAsync;
385    }
386
387    @Override
388    public String toString() {
389        return "JobSchedulerStore:" + this.directory;
390    }
391
392}