001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.EOFException; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.TreeMap; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 040import org.apache.hadoop.hbase.master.SplitLogManager; 041import org.apache.hadoop.hbase.monitoring.MonitoredTask; 042import org.apache.hadoop.hbase.monitoring.TaskMonitor; 043import org.apache.hadoop.hbase.procedure2.util.StringUtils; 044import org.apache.hadoop.hbase.regionserver.LastSequenceId; 045import org.apache.hadoop.hbase.regionserver.RegionServerServices; 046import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CancelableProgressable; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 052import org.apache.hadoop.hbase.wal.WAL.Entry; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 059import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 063 064/** 065 * Split RegionServer WAL files. Splits the WAL into new files, one per region, to be picked up on 066 * Region reopen. Deletes the split WAL when finished. Create an instance and call 067 * {@link #splitWAL(FileStatus, CancelableProgressable)} per file or use static helper methods. 068 */ 069@InterfaceAudience.Private 070public class WALSplitter { 071 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 072 public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors"; 073 074 /** 075 * By default we retry errors in splitting, rather than skipping. 076 */ 077 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 078 079 // Parameters for split process 080 protected final Path walRootDir; 081 protected final FileSystem walFS; 082 protected final Configuration conf; 083 final Path rootDir; 084 final FileSystem rootFS; 085 final RegionServerServices rsServices; 086 087 // Major subcomponents of the split process. 088 // These are separated into inner classes to make testing easier. 089 OutputSink outputSink; 090 private EntryBuffers entryBuffers; 091 092 /** 093 * Coordinator for split log. Used by the zk-based log splitter. Not used by the procedure 094 * v2-based log splitter. 095 */ 096 private SplitLogWorkerCoordination splitLogWorkerCoordination; 097 098 private final WALFactory walFactory; 099 100 // For checking the latest flushed sequence id 101 protected final LastSequenceId sequenceIdChecker; 102 103 // Map encodedRegionName -> lastFlushedSequenceId 104 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 105 106 // Map encodedRegionName -> maxSeqIdInStores 107 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 108 109 // the file being split currently 110 private FileStatus fileBeingSplit; 111 112 private final String tmpDirName; 113 114 /** 115 * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files. 116 */ 117 public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; 118 public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; 119 120 /** 121 * True if we are to run with bounded amount of writers rather than let the count blossom. Default 122 * is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that is always 123 * bounded. Only applies when you are doing recovery to 'recovered.edits' files (the old default). 124 * Bounded writing tends to have higher throughput. 125 */ 126 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 127 128 public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; 129 public final static String SPLIT_WAL_WRITER_THREADS = 130 "hbase.regionserver.hlog.splitlog.writer.threads"; 131 132 private final int numWriterThreads; 133 private final long bufferSize; 134 private final boolean splitWriterCreationBounded; 135 private final boolean hfile; 136 private final boolean skipErrors; 137 138 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS, 139 Path rootDir, FileSystem rootFS) { 140 this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null); 141 } 142 143 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS, 144 Path rootDir, FileSystem rootFS, LastSequenceId idChecker, 145 SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { 146 this.conf = HBaseConfiguration.create(conf); 147 String codecClassName = 148 conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 149 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 150 this.walRootDir = walRootDir; 151 this.walFS = walFS; 152 this.rootDir = rootDir; 153 this.rootFS = rootFS; 154 this.sequenceIdChecker = idChecker; 155 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 156 this.rsServices = rsServices; 157 this.walFactory = factory; 158 this.tmpDirName = 159 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 160 // if we limit the number of writers opened for sinking recovered edits 161 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 162 this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); 163 this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); 164 this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); 165 this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT); 166 } 167 168 WALFactory getWalFactory() { 169 return this.walFactory; 170 } 171 172 FileStatus getFileBeingSplit() { 173 return fileBeingSplit; 174 } 175 176 String getTmpDirName() { 177 return this.tmpDirName; 178 } 179 180 Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { 181 return regionMaxSeqIdInStores; 182 } 183 184 /** 185 * Splits a WAL file. Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and 186 * tests. Not used by new procedure-based WAL splitter. 187 * @return false if it is interrupted by the progress-able. 188 */ 189 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 190 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 191 SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, 192 RegionServerServices rsServices) throws IOException { 193 Path rootDir = CommonFSUtils.getRootDir(conf); 194 FileSystem rootFS = rootDir.getFileSystem(conf); 195 WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, 196 splitLogWorkerCoordination, rsServices); 197 // splitWAL returns a data structure with whether split is finished and if the file is corrupt. 198 // We don't need to propagate corruption flag here because it is propagated by the 199 // SplitLogWorkerCoordination. 200 return splitter.splitWAL(logfile, reporter).isFinished(); 201 } 202 203 /** 204 * Split a folder of WAL files. Delete the directory when done. Used by tools and unit tests. It 205 * should be package private. It is public only because TestWALObserver is in a different package, 206 * which uses this method to do log splitting. 207 * @return List of output files created by the split. 208 */ 209 public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS, 210 Configuration conf, final WALFactory factory) throws IOException { 211 Path rootDir = CommonFSUtils.getRootDir(conf); 212 FileSystem rootFS = rootDir.getFileSystem(conf); 213 WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS); 214 final List<FileStatus> wals = 215 SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null); 216 List<Path> splits = new ArrayList<>(); 217 if (!wals.isEmpty()) { 218 for (FileStatus wal : wals) { 219 SplitWALResult splitWALResult = splitter.splitWAL(wal, null); 220 if (splitWALResult.isFinished()) { 221 WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf); 222 // splitter.outputSink.splits is mark as final, do not need null check 223 splits.addAll(splitter.outputSink.splits); 224 } 225 } 226 } 227 if (!walFS.delete(walsDir, true)) { 228 throw new IOException("Unable to delete src dir " + walsDir); 229 } 230 return splits; 231 } 232 233 /** 234 * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). Test 235 * {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if the 236 * WAL is corrupt. 237 */ 238 static final class SplitWALResult { 239 private final boolean finished; 240 private final boolean corrupt; 241 242 private SplitWALResult(boolean finished, boolean corrupt) { 243 this.finished = finished; 244 this.corrupt = corrupt; 245 } 246 247 public boolean isFinished() { 248 return finished; 249 } 250 251 public boolean isCorrupt() { 252 return corrupt; 253 } 254 } 255 256 /** 257 * Setup the output sinks and entry buffers ahead of splitting WAL. 258 */ 259 private void createOutputSinkAndEntryBuffers() { 260 PipelineController controller = new PipelineController(); 261 if (this.hfile) { 262 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 263 this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, this.entryBuffers, 264 this.numWriterThreads); 265 } else if (this.splitWriterCreationBounded) { 266 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 267 this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, this.entryBuffers, 268 this.numWriterThreads); 269 } else { 270 this.entryBuffers = new EntryBuffers(controller, this.bufferSize); 271 this.outputSink = 272 new RecoveredEditsOutputSink(this, controller, this.entryBuffers, this.numWriterThreads); 273 } 274 } 275 276 /** 277 * WAL splitting implementation, splits one WAL file. 278 * @param walStatus should be for an actual WAL file. 279 */ 280 SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException { 281 Path wal = walStatus.getPath(); 282 Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString()); 283 boolean corrupt = false; 284 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 285 boolean outputSinkStarted = false; 286 boolean cancelled = false; 287 int editsCount = 0; 288 int editsSkipped = 0; 289 MonitoredTask status = TaskMonitor.get() 290 .createStatus("Splitting " + wal + " to temporary staging area.", false, true); 291 WALStreamReader walReader = null; 292 this.fileBeingSplit = walStatus; 293 long startTS = EnvironmentEdgeManager.currentTime(); 294 long length = walStatus.getLen(); 295 String lengthStr = StringUtils.humanSize(length); 296 createOutputSinkAndEntryBuffers(); 297 try { 298 String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)"; 299 LOG.info(logStr); 300 status.setStatus(logStr); 301 if (cancel != null && !cancel.progress()) { 302 cancelled = true; 303 return new SplitWALResult(false, corrupt); 304 } 305 walReader = getReader(walStatus, this.skipErrors, cancel); 306 if (walReader == null) { 307 LOG.warn("Nothing in {}; empty?", wal); 308 return new SplitWALResult(true, corrupt); 309 } 310 LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS); 311 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 312 int numOpenedFilesLastCheck = 0; 313 outputSink.setReporter(cancel); 314 outputSink.setStatus(status); 315 outputSink.startWriterThreads(); 316 outputSinkStarted = true; 317 Entry entry; 318 startTS = EnvironmentEdgeManager.currentTime(); 319 while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { 320 if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) { 321 // Skip processing the replication marker edits. 322 if (LOG.isDebugEnabled()) { 323 LOG.debug("Ignoring Replication marker edits."); 324 } 325 continue; 326 } 327 byte[] region = entry.getKey().getEncodedRegionName(); 328 String encodedRegionNameAsStr = Bytes.toString(region); 329 Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 330 if (lastFlushedSequenceId == null) { 331 if ( 332 !(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr)) 333 ) { 334 // The region directory itself is not present in the FS. This indicates that 335 // the region/table is already removed. We can just skip all the edits for this 336 // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits 337 // will get skipped by the seqId check below. 338 // See more details at https://issues.apache.org/jira/browse/HBASE-24189 339 LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr); 340 lastFlushedSequenceId = Long.MAX_VALUE; 341 } else { 342 if (sequenceIdChecker != null) { 343 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 344 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 345 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 346 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 347 storeSeqId.getSequenceId()); 348 } 349 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 350 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": " 353 + TextFormat.shortDebugString(ids)); 354 } 355 } 356 if (lastFlushedSequenceId == null) { 357 lastFlushedSequenceId = -1L; 358 } 359 } 360 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 361 } 362 editsCount++; 363 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 364 editsSkipped++; 365 continue; 366 } 367 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 368 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 369 editsSkipped++; 370 continue; 371 } 372 entryBuffers.appendEntry(entry); 373 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 374 // If sufficient edits have passed, check if we should report progress. 375 if ( 376 editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting 377 ) { 378 numOpenedFilesLastCheck = this.getNumOpenWriters(); 379 String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) 380 + " edits, skipped " + editsSkipped + " edits."; 381 status.setStatus("Split " + countsStr); 382 if (cancel != null && !cancel.progress()) { 383 cancelled = true; 384 return new SplitWALResult(false, corrupt); 385 } 386 } 387 } 388 } catch (InterruptedException ie) { 389 IOException iie = new InterruptedIOException(); 390 iie.initCause(ie); 391 throw iie; 392 } catch (CorruptedLogFileException e) { 393 LOG.warn("Could not parse, corrupt WAL={}", wal, e); 394 // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update 395 // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component. 396 if (this.splitLogWorkerCoordination != null) { 397 // Some tests pass in a csm of null. 398 splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS); 399 } 400 corrupt = true; 401 } catch (IOException e) { 402 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 403 throw e; 404 } finally { 405 final String log = "Finishing writing output for " + wal + " so closing down"; 406 LOG.debug(log); 407 status.setStatus(log); 408 if (null != walReader) { 409 walReader.close(); 410 } 411 try { 412 if (outputSinkStarted) { 413 // Set cancelled to true as the immediate following statement will reset its value. 414 // If close() throws an exception, cancelled will have the right value 415 cancelled = true; 416 cancelled = outputSink.close() == null; 417 } 418 } finally { 419 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 420 // See if length got updated post lease recovery 421 String msg = "Processed " + editsCount + " edits across " 422 + outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost 423 + " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + ", length=" 424 + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled; 425 LOG.info(msg); 426 status.markComplete(msg); 427 if (LOG.isDebugEnabled()) { 428 LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal()); 429 } 430 } 431 } 432 return new SplitWALResult(!cancelled, corrupt); 433 } 434 435 private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException { 436 return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region)); 437 } 438 439 /** 440 * Create a new {@link WALStreamReader} for reading logs to split. 441 * @return Returns null if file has length zero or file can't be found. 442 */ 443 protected WALStreamReader getReader(FileStatus walStatus, boolean skipErrors, 444 CancelableProgressable cancel) throws IOException, CorruptedLogFileException { 445 Path path = walStatus.getPath(); 446 long length = walStatus.getLen(); 447 WALStreamReader in; 448 449 // Check for possibly empty file. With appends, currently Hadoop reports a 450 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 451 // HDFS-878 is committed. 452 if (length <= 0) { 453 LOG.warn("File {} might be still open, length is 0", path); 454 } 455 456 try { 457 RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel); 458 try { 459 in = getReader(path, cancel); 460 } catch (EOFException e) { 461 if (length <= 0) { 462 // TODO should we ignore an empty, not-last log file if skip.errors 463 // is false? Either way, the caller should decide what to do. E.g. 464 // ignore if this is the last log in sequence. 465 // TODO is this scenario still possible if the log has been 466 // recovered (i.e. closed) 467 LOG.warn("Could not open {} for reading. File is empty", path, e); 468 } 469 // EOFException being ignored 470 return null; 471 } 472 } catch (IOException e) { 473 if (e instanceof FileNotFoundException) { 474 // A wal file may not exist anymore. Nothing can be recovered so move on 475 LOG.warn("File {} does not exist anymore", path, e); 476 return null; 477 } 478 if (!skipErrors || e instanceof InterruptedIOException) { 479 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 480 } 481 throw new CorruptedLogFileException("skipErrors=true; could not open " + path + ", skipping", 482 e); 483 } 484 return in; 485 } 486 487 private Entry getNextLogLine(WALStreamReader in, Path path, boolean skipErrors) 488 throws CorruptedLogFileException, IOException { 489 try { 490 return in.next(); 491 } catch (EOFException eof) { 492 // truncated files are expected if a RS crashes (see HBASE-2643) 493 LOG.info("EOF from {}; continuing.", path); 494 return null; 495 } catch (IOException e) { 496 // If the IOE resulted from bad file format, 497 // then this problem is idempotent and retrying won't help 498 if ( 499 e.getCause() != null && (e.getCause() instanceof ParseException 500 || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException) 501 ) { 502 LOG.warn("Parse exception from {}; continuing", path, e); 503 return null; 504 } 505 if (!skipErrors) { 506 throw e; 507 } 508 throw new CorruptedLogFileException("skipErrors=true Ignoring exception" 509 + " while parsing wal " + path + ". Marking as corrupted", e); 510 } 511 } 512 513 /** 514 * Create a new {@link WALProvider.Writer} for writing log splits. 515 * @return a new Writer instance, caller should close 516 */ 517 protected WALProvider.Writer createWriter(Path logfile) throws IOException { 518 return walFactory.createRecoveredEditsWriter(walFS, logfile); 519 } 520 521 /** 522 * Create a new {@link WALStreamReader} for reading logs to split. 523 * @return new Reader instance, caller should close 524 */ 525 private WALStreamReader getReader(Path curLogFile, CancelableProgressable reporter) 526 throws IOException { 527 return walFactory.createStreamReader(walFS, curLogFile, reporter); 528 } 529 530 /** 531 * Get current open writers 532 */ 533 private int getNumOpenWriters() { 534 int result = 0; 535 if (this.outputSink != null) { 536 result += this.outputSink.getNumOpenWriters(); 537 } 538 return result; 539 } 540 541 /** 542 * Contains some methods to control WAL-entries producer / consumer interactions 543 */ 544 public static class PipelineController { 545 // If an exception is thrown by one of the other threads, it will be 546 // stored here. 547 AtomicReference<Throwable> thrown = new AtomicReference<>(); 548 549 // Wait/notify for when data has been produced by the writer thread, 550 // consumed by the reader thread, or an exception occurred 551 final Object dataAvailable = new Object(); 552 553 void writerThreadError(Throwable t) { 554 thrown.compareAndSet(null, t); 555 } 556 557 /** 558 * Check for errors in the writer threads. If any is found, rethrow it. 559 */ 560 void checkForErrors() throws IOException { 561 Throwable thrown = this.thrown.get(); 562 if (thrown == null) { 563 return; 564 } 565 this.thrown.set(null); 566 if (thrown instanceof IOException) { 567 throw new IOException(thrown); 568 } else { 569 throw new RuntimeException(thrown); 570 } 571 } 572 } 573 574 static class CorruptedLogFileException extends Exception { 575 private static final long serialVersionUID = 1L; 576 577 CorruptedLogFileException(String s) { 578 super(s); 579 } 580 581 /** 582 * CorruptedLogFileException with cause 583 * @param message the message for this exception 584 * @param cause the cause for this exception 585 */ 586 CorruptedLogFileException(String message, Throwable cause) { 587 super(message, cause); 588 } 589 } 590}