001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.container;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Collection;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Set;
025
026import org.apache.activemq.kaha.ContainerId;
027import org.apache.activemq.kaha.IndexMBean;
028import org.apache.activemq.kaha.MapContainer;
029import org.apache.activemq.kaha.Marshaller;
030import org.apache.activemq.kaha.RuntimeStoreException;
031import org.apache.activemq.kaha.Store;
032import org.apache.activemq.kaha.StoreEntry;
033import org.apache.activemq.kaha.StoreLocation;
034import org.apache.activemq.kaha.impl.DataManager;
035import org.apache.activemq.kaha.impl.data.Item;
036import org.apache.activemq.kaha.impl.index.Index;
037import org.apache.activemq.kaha.impl.index.IndexItem;
038import org.apache.activemq.kaha.impl.index.IndexLinkedList;
039import org.apache.activemq.kaha.impl.index.IndexManager;
040import org.apache.activemq.kaha.impl.index.VMIndex;
041import org.apache.activemq.kaha.impl.index.hash.HashIndex;
042import org.apache.activemq.util.IOHelper;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Implementation of a MapContainer
048 * 
049 * 
050 */
051public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {
052
053    private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class);
054    protected Index index;
055    protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
056    protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
057    protected File directory;
058    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
059    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
060    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
061    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
062    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
063
064    public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
065                            DataManager dataManager, boolean persistentIndex) {
066        super(id, root, indexManager, dataManager, persistentIndex);
067        this.directory = directory;
068    }
069
070    public synchronized void init() {
071        super.init();
072        if (index == null) {
073            if (persistentIndex) {
074                String name = containerId.getDataContainerName() + "_" + containerId.getKey();
075                try {
076                    HashIndex hashIndex = new HashIndex(directory, name, indexManager);
077                    hashIndex.setNumberOfBins(getIndexBinSize());
078                    hashIndex.setKeySize(getIndexKeySize());
079                    hashIndex.setPageSize(getIndexPageSize());
080                    hashIndex.setMaximumCapacity(getIndexMaxBinSize());
081                    hashIndex.setLoadFactor(getIndexLoadFactor());
082                    this.index = hashIndex;
083                } catch (IOException e) {
084                    LOG.error("Failed to create HashIndex", e);
085                    throw new RuntimeException(e);
086                }
087            } else {
088                this.index = new VMIndex(indexManager);
089            }
090        }
091        index.setKeyMarshaller(keyMarshaller);
092    }
093
094    /*
095     * (non-Javadoc)
096     * 
097     * @see org.apache.activemq.kaha.MapContainer#load()
098     */
099    public synchronized void load() {
100        checkClosed();
101        if (!loaded) {
102            if (!loaded) {
103                loaded = true;
104                try {
105                    init();
106                    index.load();
107                    long nextItem = root.getNextItem();
108                    while (nextItem != Item.POSITION_NOT_SET) {
109                        IndexItem item = indexManager.getIndex(nextItem);
110                        StoreLocation data = item.getKeyDataItem();
111                        Object key = dataManager.readItem(keyMarshaller, data);
112                        if (index.isTransient()) {
113                            index.store(key, item);
114                        }
115                        indexList.add(item);
116                        nextItem = item.getNextItem();
117                    }
118                } catch (IOException e) {
119                    LOG.error("Failed to load container " + getId(), e);
120                    throw new RuntimeStoreException(e);
121                }
122            }
123        }
124    }
125
126    /*
127     * (non-Javadoc)
128     * 
129     * @see org.apache.activemq.kaha.MapContainer#unload()
130     */
131    public synchronized void unload() {
132        checkClosed();
133        if (loaded) {
134            loaded = false;
135            try {
136                index.unload();
137            } catch (IOException e) {
138                LOG.warn("Failed to unload the index", e);
139            }
140            indexList.clear();
141        }
142    }
143
144    public synchronized void delete() {
145        unload();
146        try {
147            index.delete();
148        } catch (IOException e) {
149            LOG.warn("Failed to unload the index", e);
150        }
151    }
152
153
154    public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
155        checkClosed();
156        this.keyMarshaller = keyMarshaller;
157        if (index != null) {
158            index.setKeyMarshaller(keyMarshaller);
159        }
160    }
161
162    public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
163        checkClosed();
164        this.valueMarshaller = valueMarshaller;
165    }
166
167    /*
168     * (non-Javadoc)
169     * 
170     * @see org.apache.activemq.kaha.MapContainer#size()
171     */
172    public synchronized int size() {
173        load();
174        return indexList.size();
175    }
176
177    /*
178     * (non-Javadoc)
179     * 
180     * @see org.apache.activemq.kaha.MapContainer#isEmpty()
181     */
182    public synchronized boolean isEmpty() {
183        load();
184        return indexList.isEmpty();
185    }
186
187    /*
188     * (non-Javadoc)
189     * 
190     * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
191     */
192    public synchronized boolean containsKey(Object key) {
193        load();
194        try {
195            return index.containsKey(key);
196        } catch (IOException e) {
197            LOG.error("Failed trying to find key: " + key, e);
198            throw new RuntimeException(e);
199        }
200    }
201
202    /*
203     * (non-Javadoc)
204     * 
205     * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
206     */
207    public synchronized Object get(Object key) {
208        load();
209        Object result = null;
210        StoreEntry item = null;
211        try {
212            item = index.get(key);
213        } catch (IOException e) {
214            LOG.error("Failed trying to get key: " + key, e);
215            throw new RuntimeException(e);
216        }
217        if (item != null) {
218            result = getValue(item);
219        }
220        return result;
221    }
222
223    /**
224     * Get the StoreEntry associated with the key
225     * 
226     * @param key
227     * @return the StoreEntry
228     */
229    public synchronized StoreEntry getEntry(Object key) {
230        load();
231        StoreEntry item = null;
232        try {
233            item = index.get(key);
234        } catch (IOException e) {
235            LOG.error("Failed trying to get key: " + key, e);
236            throw new RuntimeException(e);
237        }
238        return item;
239    }
240
241    /*
242     * (non-Javadoc)
243     * 
244     * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
245     */
246    public synchronized boolean containsValue(Object o) {
247        load();
248        boolean result = false;
249        if (o != null) {
250            IndexItem item = indexList.getFirst();
251            while (item != null) {
252                Object value = getValue(item);
253                if (value != null && value.equals(o)) {
254                    result = true;
255                    break;
256                }
257                item = indexList.getNextEntry(item);
258            }
259        }
260        return result;
261    }
262
263    /*
264     * (non-Javadoc)
265     * 
266     * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
267     */
268    public synchronized void putAll(Map t) {
269        load();
270        if (t != null) {
271            for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
272                Map.Entry entry = (Map.Entry)i.next();
273                put(entry.getKey(), entry.getValue());
274            }
275        }
276    }
277
278    /*
279     * (non-Javadoc)
280     * 
281     * @see org.apache.activemq.kaha.MapContainer#keySet()
282     */
283    public synchronized Set keySet() {
284        load();
285        return new ContainerKeySet(this);
286    }
287
288    /*
289     * (non-Javadoc)
290     * 
291     * @see org.apache.activemq.kaha.MapContainer#values()
292     */
293    public synchronized Collection values() {
294        load();
295        return new ContainerValueCollection(this);
296    }
297
298    /*
299     * (non-Javadoc)
300     * 
301     * @see org.apache.activemq.kaha.MapContainer#entrySet()
302     */
303    public synchronized Set entrySet() {
304        load();
305        return new ContainerEntrySet(this);
306    }
307
308    /*
309     * (non-Javadoc)
310     * 
311     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
312     *      java.lang.Object)
313     */
314    public synchronized Object put(Object key, Object value) {
315        load();
316        Object result = remove(key);
317        IndexItem item = write(key, value);
318        try {
319            index.store(key, item);
320        } catch (IOException e) {
321            LOG.error("Failed trying to insert key: " + key, e);
322            throw new RuntimeException(e);
323        }
324        indexList.add(item);
325        return result;
326    }
327
328    /*
329     * (non-Javadoc)
330     * 
331     * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
332     */
333    public synchronized Object remove(Object key) {
334        load();
335        try {
336            Object result = null;
337            IndexItem item = (IndexItem)index.remove(key);
338            if (item != null) {
339                // refresh the index
340                item = (IndexItem)indexList.refreshEntry(item);
341                result = getValue(item);
342                IndexItem prev = indexList.getPrevEntry(item);
343                IndexItem next = indexList.getNextEntry(item);
344                indexList.remove(item);
345                delete(item, prev, next);
346            }
347            return result;
348        } catch (IOException e) {
349            LOG.error("Failed trying to remove key: " + key, e);
350            throw new RuntimeException(e);
351        }
352    }
353
354    public synchronized boolean removeValue(Object o) {
355        load();
356        boolean result = false;
357        if (o != null) {
358            IndexItem item = indexList.getFirst();
359            while (item != null) {
360                Object value = getValue(item);
361                if (value != null && value.equals(o)) {
362                    result = true;
363                    // find the key
364                    Object key = getKey(item);
365                    if (key != null) {
366                        remove(key);
367                    }
368                    break;
369                }
370                item = indexList.getNextEntry(item);
371            }
372        }
373        return result;
374    }
375
376    protected synchronized void remove(IndexItem item) {
377        Object key = getKey(item);
378        if (key != null) {
379            remove(key);
380        }
381    }
382
383    /*
384     * (non-Javadoc)
385     * 
386     * @see org.apache.activemq.kaha.MapContainer#clear()
387     */
388    public synchronized void clear() {
389        checkClosed();
390        loaded = true;
391        init();
392        if (index != null) {
393            try {
394                index.clear();
395            } catch (IOException e) {
396                LOG.error("Failed trying clear index", e);
397                throw new RuntimeException(e);
398            }
399        }
400        super.clear();
401        doClear();
402    }
403
404    /**
405     * Add an entry to the Store Map
406     * 
407     * @param key
408     * @param value
409     * @return the StoreEntry associated with the entry
410     */
411    public synchronized StoreEntry place(Object key, Object value) {
412        load();
413        try {
414            remove(key);
415            IndexItem item = write(key, value);
416            index.store(key, item);
417            indexList.add(item);
418            return item;
419        } catch (IOException e) {
420            LOG.error("Failed trying to place key: " + key, e);
421            throw new RuntimeException(e);
422        }
423    }
424
425    /**
426     * Remove an Entry from ther Map
427     * 
428     * @param entry
429     * @throws IOException
430     */
431    public synchronized void remove(StoreEntry entry) {
432        load();
433        IndexItem item = (IndexItem)entry;
434        if (item != null) {
435            Object key = getKey(item);
436            try {
437                index.remove(key);
438            } catch (IOException e) {
439                LOG.error("Failed trying to remove entry: " + entry, e);
440                throw new RuntimeException(e);
441            }
442            IndexItem prev = indexList.getPrevEntry(item);
443            IndexItem next = indexList.getNextEntry(item);
444            indexList.remove(item);
445            delete(item, prev, next);
446        }
447    }
448
449    public synchronized StoreEntry getFirst() {
450        load();
451        return indexList.getFirst();
452    }
453
454    public synchronized StoreEntry getLast() {
455        load();
456        return indexList.getLast();
457    }
458
459    public synchronized StoreEntry getNext(StoreEntry entry) {
460        load();
461        IndexItem item = (IndexItem)entry;
462        return indexList.getNextEntry(item);
463    }
464
465    public synchronized StoreEntry getPrevious(StoreEntry entry) {
466        load();
467        IndexItem item = (IndexItem)entry;
468        return indexList.getPrevEntry(item);
469    }
470
471    public synchronized StoreEntry refresh(StoreEntry entry) {
472        load();
473        return indexList.getEntry(entry);
474    }
475
476    /**
477     * Get the value from it's location
478     * 
479     * @param item
480     * @return the value associated with the store entry
481     */
482    public synchronized Object getValue(StoreEntry item) {
483        load();
484        Object result = null;
485        if (item != null) {
486            try {
487                // ensure this value is up to date
488                // item=indexList.getEntry(item);
489                StoreLocation data = item.getValueDataItem();
490                result = dataManager.readItem(valueMarshaller, data);
491            } catch (IOException e) {
492                LOG.error("Failed to get value for " + item, e);
493                throw new RuntimeStoreException(e);
494            }
495        }
496        return result;
497    }
498
499    /**
500     * Get the Key object from it's location
501     * 
502     * @param item
503     * @return the Key Object associated with the StoreEntry
504     */
505    public synchronized Object getKey(StoreEntry item) {
506        load();
507        Object result = null;
508        if (item != null) {
509            try {
510                StoreLocation data = item.getKeyDataItem();
511                result = dataManager.readItem(keyMarshaller, data);
512            } catch (IOException e) {
513                LOG.error("Failed to get key for " + item, e);
514                throw new RuntimeStoreException(e);
515            }
516        }
517        return result;
518    }
519
520    protected IndexLinkedList getItemList() {
521        return indexList;
522    }
523
524    protected synchronized IndexItem write(Object key, Object value) {
525        IndexItem index = null;
526        try {
527            index = indexManager.createNewIndex();
528            StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
529            index.setKeyData(data);
530
531            if (value != null) {
532                data = dataManager.storeDataItem(valueMarshaller, value);
533                index.setValueData(data);
534            }
535            IndexItem prev = indexList.getLast();
536            prev = prev != null ? prev : indexList.getRoot();
537            IndexItem next = indexList.getNextEntry(prev);
538            prev.setNextItem(index.getOffset());
539            index.setPreviousItem(prev.getOffset());
540            updateIndexes(prev);
541            if (next != null) {
542                next.setPreviousItem(index.getOffset());
543                index.setNextItem(next.getOffset());
544                updateIndexes(next);
545            }
546            storeIndex(index);
547        } catch (IOException e) {
548            LOG.error("Failed to write " + key + " , " + value, e);
549            throw new RuntimeStoreException(e);
550        }
551        return index;
552    }
553
554    public int getIndexBinSize() {
555        return indexBinSize;
556    }
557
558    public void setIndexBinSize(int indexBinSize) {
559        this.indexBinSize = indexBinSize;
560    }
561
562    public int getIndexKeySize() {
563        return indexKeySize;
564    }
565
566    public void setIndexKeySize(int indexKeySize) {
567        this.indexKeySize = indexKeySize;
568    }
569
570    public int getIndexPageSize() {
571        return indexPageSize;
572    }
573
574    public void setIndexPageSize(int indexPageSize) {
575        this.indexPageSize = indexPageSize;
576    }
577    
578    public int getIndexLoadFactor() {
579        return indexLoadFactor;
580    }
581
582    public void setIndexLoadFactor(int loadFactor) {
583        this.indexLoadFactor = loadFactor;
584    }
585
586  
587    public IndexMBean getIndexMBean() {
588      return (IndexMBean) index;
589    }
590    public int getIndexMaxBinSize() {
591        return indexMaxBinSize;
592    }
593
594    public void setIndexMaxBinSize(int maxBinSize) {
595        this.indexMaxBinSize = maxBinSize;
596    }
597   
598
599   
600    public String toString() {
601        load();
602        StringBuffer buf = new StringBuffer();
603        buf.append("{");
604        Iterator i = entrySet().iterator();
605        boolean hasNext = i.hasNext();
606        while (hasNext) {
607            Map.Entry e = (Entry) i.next();
608            Object key = e.getKey();
609            Object value = e.getValue();
610            buf.append(key);
611            buf.append("=");
612
613            buf.append(value);
614            hasNext = i.hasNext();
615            if (hasNext)
616                buf.append(", ");
617        }
618        buf.append("}");
619        return buf.toString();
620    }    
621}