001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.EOFException; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.io.RandomAccessFile; 025import java.io.UnsupportedEncodingException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedByInterruptException; 028import java.nio.channels.FileChannel; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.LinkedList; 035import java.util.Map; 036import java.util.Set; 037import java.util.TreeMap; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.Executors; 040import java.util.concurrent.Future; 041import java.util.concurrent.ScheduledExecutorService; 042import java.util.concurrent.ScheduledFuture; 043import java.util.concurrent.ThreadFactory; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.atomic.AtomicReference; 047import java.util.zip.Adler32; 048import java.util.zip.Checksum; 049 050import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 051import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 052import org.apache.activemq.store.kahadb.disk.util.Sequence; 053import org.apache.activemq.util.ByteSequence; 054import org.apache.activemq.util.DataByteArrayInputStream; 055import org.apache.activemq.util.DataByteArrayOutputStream; 056import org.apache.activemq.util.IOHelper; 057import org.apache.activemq.util.RecoverableRandomAccessFile; 058import org.apache.activemq.util.ThreadPoolUtils; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Manages DataFiles 064 */ 065public class Journal { 066 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 067 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 068 069 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 070 071 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 072 public static final int RECORD_HEAD_SPACE = 4 + 1; 073 074 public static final byte USER_RECORD_TYPE = 1; 075 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 076 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 077 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 078 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 079 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 080 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 081 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 082 public static final byte EOF_EOT = '4'; 083 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 084 085 private ScheduledExecutorService scheduler; 086 087 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 088 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 089 DataFile dataFile = getDataFile(recoveryPosition); 090 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 091 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 092 try { 093 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 094 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 095 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 096 097 // skip corruption on getNextLocation 098 recoveryPosition.setOffset((int) sequence.getLast() + 1); 099 recoveryPosition.setSize(-1); 100 101 dataFile.corruptedBlocks.add(sequence); 102 } catch (IOException e) { 103 } finally { 104 accessorPool.closeDataFileAccessor(reader); 105 } 106 } 107 108 public DataFileAccessorPool getAccessorPool() { 109 return accessorPool; 110 } 111 112 public enum PreallocationStrategy { 113 SPARSE_FILE, 114 OS_KERNEL_COPY, 115 ZEROS, 116 CHUNKED_ZEROS; 117 } 118 119 public enum PreallocationScope { 120 ENTIRE_JOURNAL, 121 ENTIRE_JOURNAL_ASYNC, 122 NONE; 123 } 124 125 public enum JournalDiskSyncStrategy { 126 ALWAYS, 127 PERIODIC, 128 NEVER; 129 } 130 131 private static byte[] createBatchControlRecordHeader() { 132 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 133 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 134 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 135 os.write(BATCH_CONTROL_RECORD_MAGIC); 136 ByteSequence sequence = os.toByteSequence(); 137 sequence.compact(); 138 return sequence.getData(); 139 } catch (IOException e) { 140 throw new RuntimeException("Could not create batch control record header.", e); 141 } 142 } 143 144 private static byte[] createEmptyBatchControlRecordHeader() { 145 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 146 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 147 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 148 os.write(BATCH_CONTROL_RECORD_MAGIC); 149 os.writeInt(0); 150 os.writeLong(0l); 151 ByteSequence sequence = os.toByteSequence(); 152 sequence.compact(); 153 return sequence.getData(); 154 } catch (IOException e) { 155 throw new RuntimeException("Could not create empty batch control record header.", e); 156 } 157 } 158 159 private static byte[] createEofBatchAndLocationRecord() { 160 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 161 os.writeInt(EOF_INT); 162 os.writeByte(EOF_EOT); 163 ByteSequence sequence = os.toByteSequence(); 164 sequence.compact(); 165 return sequence.getData(); 166 } catch (IOException e) { 167 throw new RuntimeException("Could not create eof header.", e); 168 } 169 } 170 171 public static final String DEFAULT_DIRECTORY = "."; 172 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 173 public static final String DEFAULT_FILE_PREFIX = "db-"; 174 public static final String DEFAULT_FILE_SUFFIX = ".log"; 175 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 176 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 177 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 178 179 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 180 181 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 182 183 protected File directory = new File(DEFAULT_DIRECTORY); 184 protected File directoryArchive; 185 private boolean directoryArchiveOverridden = false; 186 187 protected String filePrefix = DEFAULT_FILE_PREFIX; 188 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 189 protected boolean started; 190 191 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 192 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 193 194 protected FileAppender appender; 195 protected DataFileAccessorPool accessorPool; 196 197 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 198 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 199 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 200 201 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 202 protected ScheduledFuture cleanupTask; 203 protected AtomicLong totalLength = new AtomicLong(); 204 protected boolean archiveDataLogs; 205 private ReplicationTarget replicationTarget; 206 protected boolean checksum; 207 protected boolean checkForCorruptionOnStartup; 208 protected boolean enableAsyncDiskSync = true; 209 private int nextDataFileId = 1; 210 private Object dataFileIdLock = new Object(); 211 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 212 private volatile DataFile nextDataFile; 213 214 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 215 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 216 private File osKernelCopyTemplateFile = null; 217 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 218 219 public interface DataFileRemovedListener { 220 void fileRemoved(DataFile datafile); 221 } 222 223 private DataFileRemovedListener dataFileRemovedListener; 224 225 public synchronized void start() throws IOException { 226 if (started) { 227 return; 228 } 229 230 long start = System.currentTimeMillis(); 231 accessorPool = new DataFileAccessorPool(this); 232 started = true; 233 234 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 235 236 File[] files = directory.listFiles(new FilenameFilter() { 237 @Override 238 public boolean accept(File dir, String n) { 239 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 240 } 241 }); 242 243 if (files != null) { 244 for (File file : files) { 245 try { 246 String n = file.getName(); 247 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 248 int num = Integer.parseInt(numStr); 249 DataFile dataFile = new DataFile(file, num); 250 fileMap.put(dataFile.getDataFileId(), dataFile); 251 totalLength.addAndGet(dataFile.getLength()); 252 } catch (NumberFormatException e) { 253 // Ignore file that do not match the pattern. 254 } 255 } 256 257 // Sort the list so that we can link the DataFiles together in the 258 // right order. 259 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 260 Collections.sort(l); 261 for (DataFile df : l) { 262 if (df.getLength() == 0) { 263 // possibly the result of a previous failed write 264 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 265 continue; 266 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 267 continue; 268 } 269 dataFiles.addLast(df); 270 fileByFileMap.put(df.getFile(), df); 271 272 if( isCheckForCorruptionOnStartup() ) { 273 lastAppendLocation.set(recoveryCheck(df)); 274 } 275 } 276 } 277 278 if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { 279 // create a template file that will be used to pre-allocate the journal files 280 if (osKernelCopyTemplateFile == null) { 281 osKernelCopyTemplateFile = createJournalTemplateFile(); 282 } 283 } 284 285 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 286 @Override 287 public Thread newThread(Runnable r) { 288 Thread schedulerThread = new Thread(r); 289 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 290 schedulerThread.setDaemon(true); 291 return schedulerThread; 292 } 293 }); 294 295 // init current write file 296 if (dataFiles.isEmpty()) { 297 nextDataFileId = 1; 298 rotateWriteFile(); 299 } else { 300 currentDataFile.set(dataFiles.getTail()); 301 nextDataFileId = currentDataFile.get().dataFileId + 1; 302 } 303 304 if( lastAppendLocation.get()==null ) { 305 DataFile df = dataFiles.getTail(); 306 lastAppendLocation.set(recoveryCheck(df)); 307 } 308 309 // ensure we don't report unused space of last journal file in size metric 310 int lastFileLength = dataFiles.getTail().getLength(); 311 if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) { 312 totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength); 313 } 314 315 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 316 @Override 317 public void run() { 318 cleanup(); 319 } 320 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 321 322 long end = System.currentTimeMillis(); 323 LOG.trace("Startup took: "+(end-start)+" ms"); 324 } 325 326 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 327 328 if (PreallocationScope.NONE != preallocationScope) { 329 330 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 331 doPreallocationKernelCopy(file); 332 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 333 doPreallocationZeros(file); 334 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 335 doPreallocationChunkedZeros(file); 336 } else { 337 doPreallocationSparseFile(file); 338 } 339 } 340 } 341 342 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 343 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 344 try { 345 FileChannel channel = file.getChannel(); 346 channel.position(0); 347 channel.write(journalEof); 348 channel.position(maxFileLength - 5); 349 journalEof.rewind(); 350 channel.write(journalEof); 351 channel.force(false); 352 channel.position(0); 353 } catch (ClosedByInterruptException ignored) { 354 LOG.trace("Could not preallocate journal file with sparse file", ignored); 355 } catch (IOException e) { 356 LOG.error("Could not preallocate journal file with sparse file", e); 357 } 358 } 359 360 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 361 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 362 buffer.put(EOF_RECORD); 363 buffer.rewind(); 364 try { 365 FileChannel channel = file.getChannel(); 366 channel.write(buffer); 367 channel.force(false); 368 channel.position(0); 369 } catch (ClosedByInterruptException ignored) { 370 LOG.trace("Could not preallocate journal file with zeros", ignored); 371 } catch (IOException e) { 372 LOG.error("Could not preallocate journal file with zeros", e); 373 } 374 } 375 376 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 377 try { 378 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 379 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 380 templateRaf.close(); 381 } catch (ClosedByInterruptException ignored) { 382 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 383 } catch (FileNotFoundException e) { 384 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 385 } catch (IOException e) { 386 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 387 } 388 } 389 390 private File createJournalTemplateFile() { 391 String fileName = "db-log.template"; 392 File rc = new File(directory, fileName); 393 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 394 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 395 templateRaf.setLength(maxFileLength); 396 templateRaf.getChannel().force(true); 397 } catch (FileNotFoundException e) { 398 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 399 } catch (IOException e) { 400 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 401 } 402 return rc; 403 } 404 405 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 406 407 ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); 408 buffer.put(EOF_RECORD); 409 buffer.rewind(); 410 411 try { 412 FileChannel channel = file.getChannel(); 413 414 int remLen = maxFileLength; 415 while (remLen > 0) { 416 if (remLen < buffer.remaining()) { 417 buffer.limit(remLen); 418 } 419 int writeLen = channel.write(buffer); 420 remLen -= writeLen; 421 buffer.rewind(); 422 } 423 424 channel.force(false); 425 channel.position(0); 426 } catch (ClosedByInterruptException ignored) { 427 LOG.trace("Could not preallocate journal file with zeros", ignored); 428 } catch (IOException e) { 429 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 430 } 431 } 432 433 private static byte[] bytes(String string) { 434 try { 435 return string.getBytes("UTF-8"); 436 } catch (UnsupportedEncodingException e) { 437 throw new RuntimeException(e); 438 } 439 } 440 441 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 442 int firstBatchRecordSize = -1; 443 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 444 Location location = new Location(); 445 location.setDataFileId(dataFile.getDataFileId()); 446 location.setOffset(0); 447 448 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 449 try { 450 firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); 451 } catch (Exception ignored) { 452 } finally { 453 accessorPool.closeDataFileAccessor(reader); 454 } 455 } 456 return firstBatchRecordSize == 0; 457 } 458 459 protected Location recoveryCheck(DataFile dataFile) throws IOException { 460 Location location = new Location(); 461 location.setDataFileId(dataFile.getDataFileId()); 462 location.setOffset(0); 463 464 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 465 try { 466 while (true) { 467 int size = checkBatchRecord(reader, location.getOffset()); 468 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { 469 if (size == 0) { 470 // eof batch record 471 break; 472 } 473 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 474 } else { 475 476 // Perhaps it's just some corruption... scan through the 477 // file to find the next valid batch record. We 478 // may have subsequent valid batch records. 479 int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); 480 if (nextOffset >= 0) { 481 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 482 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 483 dataFile.corruptedBlocks.add(sequence); 484 location.setOffset(nextOffset); 485 } else { 486 break; 487 } 488 } 489 } 490 491 } catch (IOException e) { 492 } finally { 493 accessorPool.closeDataFileAccessor(reader); 494 } 495 496 int existingLen = dataFile.getLength(); 497 dataFile.setLength(location.getOffset()); 498 if (existingLen > dataFile.getLength()) { 499 totalLength.addAndGet(dataFile.getLength() - existingLen); 500 } 501 502 if (!dataFile.corruptedBlocks.isEmpty()) { 503 // Is the end of the data file corrupted? 504 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 505 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 506 } 507 } 508 509 return location; 510 } 511 512 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 513 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 514 byte data[] = new byte[1024*4]; 515 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 516 517 int pos = 0; 518 while (true) { 519 pos = bs.indexOf(header, pos); 520 if (pos >= 0) { 521 return offset + pos; 522 } else { 523 // need to load the next data chunck in.. 524 if (bs.length != data.length) { 525 // If we had a short read then we were at EOF 526 return -1; 527 } 528 offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; 529 bs = new ByteSequence(data, 0, reader.read(offset, data)); 530 pos = 0; 531 } 532 } 533 } 534 535 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 536 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 537 538 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { 539 540 reader.readFully(offset, controlRecord); 541 542 // check for journal eof 543 if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { 544 // eof batch 545 return 0; 546 } 547 548 // Assert that it's a batch record. 549 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 550 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 551 return -1; 552 } 553 } 554 555 int size = controlIs.readInt(); 556 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 557 return -1; 558 } 559 560 if (isChecksum()) { 561 562 long expectedChecksum = controlIs.readLong(); 563 if (expectedChecksum == 0) { 564 // Checksuming was not enabled when the record was stored. 565 // we can't validate the record :( 566 return size; 567 } 568 569 byte data[] = new byte[size]; 570 reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); 571 572 Checksum checksum = new Adler32(); 573 checksum.update(data, 0, data.length); 574 575 if (expectedChecksum != checksum.getValue()) { 576 return -1; 577 } 578 } 579 return size; 580 } 581 } 582 583 void addToTotalLength(int size) { 584 totalLength.addAndGet(size); 585 } 586 587 public long length() { 588 return totalLength.get(); 589 } 590 591 private void rotateWriteFile() throws IOException { 592 synchronized (dataFileIdLock) { 593 DataFile dataFile = nextDataFile; 594 if (dataFile == null) { 595 dataFile = newDataFile(); 596 } 597 synchronized (currentDataFile) { 598 fileMap.put(dataFile.getDataFileId(), dataFile); 599 fileByFileMap.put(dataFile.getFile(), dataFile); 600 dataFiles.addLast(dataFile); 601 currentDataFile.set(dataFile); 602 } 603 nextDataFile = null; 604 } 605 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 606 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 607 } 608 } 609 610 private Runnable preAllocateNextDataFileTask = new Runnable() { 611 @Override 612 public void run() { 613 if (nextDataFile == null) { 614 synchronized (dataFileIdLock){ 615 try { 616 nextDataFile = newDataFile(); 617 } catch (IOException e) { 618 LOG.warn("Failed to proactively allocate data file", e); 619 } 620 } 621 } 622 } 623 }; 624 625 private volatile Future preAllocateNextDataFileFuture; 626 627 private DataFile newDataFile() throws IOException { 628 int nextNum = nextDataFileId++; 629 File file = getFile(nextNum); 630 DataFile nextWriteFile = new DataFile(file, nextNum); 631 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 632 return nextWriteFile; 633 } 634 635 636 public DataFile reserveDataFile() { 637 synchronized (dataFileIdLock) { 638 int nextNum = nextDataFileId++; 639 File file = getFile(nextNum); 640 DataFile reservedDataFile = new DataFile(file, nextNum); 641 synchronized (currentDataFile) { 642 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 643 fileByFileMap.put(file, reservedDataFile); 644 if (dataFiles.isEmpty()) { 645 dataFiles.addLast(reservedDataFile); 646 } else { 647 dataFiles.getTail().linkBefore(reservedDataFile); 648 } 649 } 650 return reservedDataFile; 651 } 652 } 653 654 public File getFile(int nextNum) { 655 String fileName = filePrefix + nextNum + fileSuffix; 656 File file = new File(directory, fileName); 657 return file; 658 } 659 660 DataFile getDataFile(Location item) throws IOException { 661 Integer key = Integer.valueOf(item.getDataFileId()); 662 DataFile dataFile = null; 663 synchronized (currentDataFile) { 664 dataFile = fileMap.get(key); 665 } 666 if (dataFile == null) { 667 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 668 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 669 } 670 return dataFile; 671 } 672 673 public void close() throws IOException { 674 synchronized (this) { 675 if (!started) { 676 return; 677 } 678 cleanupTask.cancel(true); 679 if (preAllocateNextDataFileFuture != null) { 680 preAllocateNextDataFileFuture.cancel(true); 681 } 682 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 683 accessorPool.close(); 684 } 685 // the appender can be calling back to to the journal blocking a close AMQ-5620 686 appender.close(); 687 synchronized (currentDataFile) { 688 fileMap.clear(); 689 fileByFileMap.clear(); 690 dataFiles.clear(); 691 lastAppendLocation.set(null); 692 started = false; 693 } 694 } 695 696 public synchronized void cleanup() { 697 if (accessorPool != null) { 698 accessorPool.disposeUnused(); 699 } 700 } 701 702 public synchronized boolean delete() throws IOException { 703 704 // Close all open file handles... 705 appender.close(); 706 accessorPool.close(); 707 708 boolean result = true; 709 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 710 DataFile dataFile = i.next(); 711 result &= dataFile.delete(); 712 } 713 714 if (preAllocateNextDataFileFuture != null) { 715 preAllocateNextDataFileFuture.cancel(true); 716 } 717 synchronized (dataFileIdLock) { 718 if (nextDataFile != null) { 719 nextDataFile.delete(); 720 nextDataFile = null; 721 } 722 } 723 724 totalLength.set(0); 725 synchronized (currentDataFile) { 726 fileMap.clear(); 727 fileByFileMap.clear(); 728 lastAppendLocation.set(null); 729 dataFiles = new LinkedNodeList<DataFile>(); 730 } 731 // reopen open file handles... 732 accessorPool = new DataFileAccessorPool(this); 733 appender = new DataFileAppender(this); 734 return result; 735 } 736 737 public void removeDataFiles(Set<Integer> files) throws IOException { 738 for (Integer key : files) { 739 // Can't remove the data file (or subsequent files) that is currently being written to. 740 if (key >= lastAppendLocation.get().getDataFileId()) { 741 continue; 742 } 743 DataFile dataFile = null; 744 synchronized (currentDataFile) { 745 dataFile = fileMap.remove(key); 746 if (dataFile != null) { 747 fileByFileMap.remove(dataFile.getFile()); 748 dataFile.unlink(); 749 } 750 } 751 if (dataFile != null) { 752 forceRemoveDataFile(dataFile); 753 } 754 } 755 } 756 757 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 758 accessorPool.disposeDataFileAccessors(dataFile); 759 totalLength.addAndGet(-dataFile.getLength()); 760 if (archiveDataLogs) { 761 File directoryArchive = getDirectoryArchive(); 762 if (directoryArchive.exists()) { 763 LOG.debug("Archive directory exists: {}", directoryArchive); 764 } else { 765 if (directoryArchive.isAbsolute()) 766 if (LOG.isDebugEnabled()) { 767 LOG.debug("Archive directory [{}] does not exist - creating it now", 768 directoryArchive.getAbsolutePath()); 769 } 770 IOHelper.mkdirs(directoryArchive); 771 } 772 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 773 dataFile.move(directoryArchive); 774 LOG.debug("Successfully moved data file"); 775 } else { 776 LOG.debug("Deleting data file: {}", dataFile); 777 if (dataFile.delete()) { 778 LOG.debug("Discarded data file: {}", dataFile); 779 } else { 780 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 781 } 782 } 783 if (dataFileRemovedListener != null) { 784 dataFileRemovedListener.fileRemoved(dataFile); 785 } 786 } 787 788 /** 789 * @return the maxFileLength 790 */ 791 public int getMaxFileLength() { 792 return maxFileLength; 793 } 794 795 /** 796 * @param maxFileLength the maxFileLength to set 797 */ 798 public void setMaxFileLength(int maxFileLength) { 799 this.maxFileLength = maxFileLength; 800 } 801 802 @Override 803 public String toString() { 804 return directory.toString(); 805 } 806 807 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 808 return getNextLocation(location, null); 809 } 810 811 public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { 812 Location cur = null; 813 while (true) { 814 if (cur == null) { 815 if (location == null) { 816 DataFile head = null; 817 synchronized (currentDataFile) { 818 head = dataFiles.getHead(); 819 } 820 if (head == null) { 821 return null; 822 } 823 cur = new Location(); 824 cur.setDataFileId(head.getDataFileId()); 825 cur.setOffset(0); 826 } else { 827 // Set to the next offset.. 828 if (location.getSize() == -1) { 829 cur = new Location(location); 830 } else { 831 cur = new Location(location); 832 cur.setOffset(location.getOffset() + location.getSize()); 833 } 834 } 835 } else { 836 cur.setOffset(cur.getOffset() + cur.getSize()); 837 } 838 839 DataFile dataFile = getDataFile(cur); 840 841 // Did it go into the next file?? 842 if (dataFile.getLength() <= cur.getOffset()) { 843 synchronized (currentDataFile) { 844 dataFile = dataFile.getNext(); 845 } 846 if (dataFile == null) { 847 return null; 848 } else { 849 cur.setDataFileId(dataFile.getDataFileId().intValue()); 850 cur.setOffset(0); 851 if (limit != null && cur.compareTo(limit) >= 0) { 852 LOG.trace("reached limit: {} at: {}", limit, cur); 853 return null; 854 } 855 } 856 } 857 858 // Load in location size and type. 859 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 860 try { 861 reader.readLocationDetails(cur); 862 } catch (EOFException eof) { 863 LOG.trace("EOF on next: " + location + ", cur: " + cur); 864 throw eof; 865 } finally { 866 accessorPool.closeDataFileAccessor(reader); 867 } 868 869 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 870 if (corruptedRange != null) { 871 // skip corruption 872 cur.setSize((int) corruptedRange.range()); 873 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 874 (cur.getType() == 0 && cur.getSize() == 0)) { 875 // eof - jump to next datafile 876 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 877 // replay of existing journals 878 // possibly journal is larger than maxFileLength after config change 879 cur.setSize(EOF_RECORD.length); 880 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 881 } else if (cur.getType() == USER_RECORD_TYPE) { 882 // Only return user records. 883 return cur; 884 } 885 } 886 } 887 888 public ByteSequence read(Location location) throws IOException, IllegalStateException { 889 DataFile dataFile = getDataFile(location); 890 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 891 ByteSequence rc = null; 892 try { 893 rc = reader.readRecord(location); 894 } finally { 895 accessorPool.closeDataFileAccessor(reader); 896 } 897 return rc; 898 } 899 900 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 901 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 902 return loc; 903 } 904 905 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 906 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 907 return loc; 908 } 909 910 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 911 DataFile dataFile = getDataFile(location); 912 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 913 try { 914 updater.updateRecord(location, data, sync); 915 } finally { 916 accessorPool.closeDataFileAccessor(updater); 917 } 918 } 919 920 public PreallocationStrategy getPreallocationStrategy() { 921 return preallocationStrategy; 922 } 923 924 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 925 this.preallocationStrategy = preallocationStrategy; 926 } 927 928 public PreallocationScope getPreallocationScope() { 929 return preallocationScope; 930 } 931 932 public void setPreallocationScope(PreallocationScope preallocationScope) { 933 this.preallocationScope = preallocationScope; 934 } 935 936 public File getDirectory() { 937 return directory; 938 } 939 940 public void setDirectory(File directory) { 941 this.directory = directory; 942 } 943 944 public String getFilePrefix() { 945 return filePrefix; 946 } 947 948 public void setFilePrefix(String filePrefix) { 949 this.filePrefix = filePrefix; 950 } 951 952 public Map<WriteKey, WriteCommand> getInflightWrites() { 953 return inflightWrites; 954 } 955 956 public Location getLastAppendLocation() { 957 return lastAppendLocation.get(); 958 } 959 960 public void setLastAppendLocation(Location lastSyncedLocation) { 961 this.lastAppendLocation.set(lastSyncedLocation); 962 } 963 964 public File getDirectoryArchive() { 965 if (!directoryArchiveOverridden && (directoryArchive == null)) { 966 // create the directoryArchive relative to the journal location 967 directoryArchive = new File(directory.getAbsolutePath() + 968 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 969 } 970 return directoryArchive; 971 } 972 973 public void setDirectoryArchive(File directoryArchive) { 974 directoryArchiveOverridden = true; 975 this.directoryArchive = directoryArchive; 976 } 977 978 public boolean isArchiveDataLogs() { 979 return archiveDataLogs; 980 } 981 982 public void setArchiveDataLogs(boolean archiveDataLogs) { 983 this.archiveDataLogs = archiveDataLogs; 984 } 985 986 public DataFile getDataFileById(int dataFileId) { 987 synchronized (currentDataFile) { 988 return fileMap.get(Integer.valueOf(dataFileId)); 989 } 990 } 991 992 public DataFile getCurrentDataFile(int capacity) throws IOException { 993 //First just acquire the currentDataFile lock and return if no rotation needed 994 synchronized (currentDataFile) { 995 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 996 return currentDataFile.get(); 997 } 998 } 999 1000 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 1001 //then re-check if rotation is needed 1002 synchronized (dataFileIdLock) { 1003 synchronized (currentDataFile) { 1004 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 1005 rotateWriteFile(); 1006 } 1007 return currentDataFile.get(); 1008 } 1009 } 1010 } 1011 1012 public Integer getCurrentDataFileId() { 1013 synchronized (currentDataFile) { 1014 return currentDataFile.get().getDataFileId(); 1015 } 1016 } 1017 1018 /** 1019 * Get a set of files - only valid after start() 1020 * 1021 * @return files currently being used 1022 */ 1023 public Set<File> getFiles() { 1024 synchronized (currentDataFile) { 1025 return fileByFileMap.keySet(); 1026 } 1027 } 1028 1029 public Map<Integer, DataFile> getFileMap() { 1030 synchronized (currentDataFile) { 1031 return new TreeMap<Integer, DataFile>(fileMap); 1032 } 1033 } 1034 1035 public long getDiskSize() { 1036 return totalLength.get(); 1037 } 1038 1039 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1040 this.replicationTarget = replicationTarget; 1041 } 1042 1043 public ReplicationTarget getReplicationTarget() { 1044 return replicationTarget; 1045 } 1046 1047 public String getFileSuffix() { 1048 return fileSuffix; 1049 } 1050 1051 public void setFileSuffix(String fileSuffix) { 1052 this.fileSuffix = fileSuffix; 1053 } 1054 1055 public boolean isChecksum() { 1056 return checksum; 1057 } 1058 1059 public void setChecksum(boolean checksumWrites) { 1060 this.checksum = checksumWrites; 1061 } 1062 1063 public boolean isCheckForCorruptionOnStartup() { 1064 return checkForCorruptionOnStartup; 1065 } 1066 1067 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1068 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1069 } 1070 1071 public void setWriteBatchSize(int writeBatchSize) { 1072 this.writeBatchSize = writeBatchSize; 1073 } 1074 1075 public int getWriteBatchSize() { 1076 return writeBatchSize; 1077 } 1078 1079 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1080 this.totalLength = storeSizeAccumulator; 1081 } 1082 1083 public void setEnableAsyncDiskSync(boolean val) { 1084 this.enableAsyncDiskSync = val; 1085 } 1086 1087 public boolean isEnableAsyncDiskSync() { 1088 return enableAsyncDiskSync; 1089 } 1090 1091 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1092 return journalDiskSyncStrategy; 1093 } 1094 1095 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1096 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1097 } 1098 1099 public boolean isJournalDiskSyncPeriodic() { 1100 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1101 } 1102 1103 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1104 this.dataFileRemovedListener = dataFileRemovedListener; 1105 } 1106 1107 public static class WriteCommand extends LinkedNode<WriteCommand> { 1108 public final Location location; 1109 public final ByteSequence data; 1110 final boolean sync; 1111 public final Runnable onComplete; 1112 1113 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1114 this.location = location; 1115 this.data = data; 1116 this.sync = sync; 1117 this.onComplete = null; 1118 } 1119 1120 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1121 this.location = location; 1122 this.data = data; 1123 this.onComplete = onComplete; 1124 this.sync = false; 1125 } 1126 } 1127 1128 public static class WriteKey { 1129 private final int file; 1130 private final long offset; 1131 private final int hash; 1132 1133 public WriteKey(Location item) { 1134 file = item.getDataFileId(); 1135 offset = item.getOffset(); 1136 // TODO: see if we can build a better hash 1137 hash = (int)(file ^ offset); 1138 } 1139 1140 @Override 1141 public int hashCode() { 1142 return hash; 1143 } 1144 1145 @Override 1146 public boolean equals(Object obj) { 1147 if (obj instanceof WriteKey) { 1148 WriteKey di = (WriteKey)obj; 1149 return di.file == file && di.offset == offset; 1150 } 1151 return false; 1152 } 1153 } 1154}