001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.async;
018
019import java.io.IOException;
020
021import org.apache.activeio.journal.InvalidRecordLocationException;
022import org.apache.activeio.journal.Journal;
023import org.apache.activeio.journal.JournalEventListener;
024import org.apache.activeio.journal.RecordLocation;
025import org.apache.activeio.packet.ByteArrayPacket;
026import org.apache.activeio.packet.Packet;
027import org.apache.activemq.util.ByteSequence;
028
029/**
030 * Provides a Journal Facade to the DataManager.
031 * 
032 * 
033 */
034public final class JournalFacade implements Journal {
035
036    private final AsyncDataManager dataManager;
037
038    public static class RecordLocationFacade implements RecordLocation {
039        private final Location location;
040
041        public RecordLocationFacade(Location location) {
042            this.location = location;
043        }
044
045        public Location getLocation() {
046            return location;
047        }
048
049        public int compareTo(Object o) {
050            RecordLocationFacade rlf = (RecordLocationFacade)o;
051            int rc = location.compareTo(rlf.location);
052            return rc;
053        }
054    }
055
056    public JournalFacade(AsyncDataManager dataManager) {
057        this.dataManager = dataManager;
058    }
059
060    private static RecordLocation convertToRecordLocation(Location location) {
061        if (location == null) {
062            return null;
063        }
064        return new RecordLocationFacade(location);
065    }
066
067    private static Location convertFromRecordLocation(RecordLocation location) {
068
069        if (location == null) {
070            return null;
071        }
072
073        return ((RecordLocationFacade)location).getLocation();
074    }
075
076    public void close() throws IOException {
077        dataManager.close();
078    }
079
080    public RecordLocation getMark() throws IllegalStateException {
081        return convertToRecordLocation(dataManager.getMark());
082    }
083
084    public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
085        return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
086    }
087
088    public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
089        ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
090        if (rc == null) {
091            return null;
092        }
093        return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
094    }
095
096    public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
097    }
098
099    public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
100        dataManager.setMark(convertFromRecordLocation(location), sync);
101    }
102
103    public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
104        org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
105        ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
106        return convertToRecordLocation(dataManager.write(sequence, sync));
107    }
108    
109    public RecordLocation write(Packet packet, Runnable onComplete) throws IOException, IllegalStateException {
110        org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
111        ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
112        return convertToRecordLocation(dataManager.write(sequence, onComplete));
113    }
114
115}