001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.EOFException; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.TreeMap; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 040import org.apache.hadoop.hbase.master.SplitLogManager; 041import org.apache.hadoop.hbase.monitoring.MonitoredTask; 042import org.apache.hadoop.hbase.monitoring.TaskMonitor; 043import org.apache.hadoop.hbase.procedure2.util.StringUtils; 044import org.apache.hadoop.hbase.regionserver.LastSequenceId; 045import org.apache.hadoop.hbase.regionserver.RegionServerServices; 046import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CancelableProgressable; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 052import org.apache.hadoop.hbase.wal.WAL.Entry; 053import org.apache.hadoop.hbase.wal.WAL.Reader; 054import org.apache.hadoop.ipc.RemoteException; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 064 065/** 066 * Split RegionServer WAL files. Splits the WAL into new files, one per region, to be picked up on 067 * Region reopen. Deletes the split WAL when finished. Create an instance and call 068 * {@link #splitWAL(FileStatus, CancelableProgressable)} per file or use static helper methods. 069 */ 070@InterfaceAudience.Private 071public class WALSplitter { 072 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 073 public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors"; 074 075 /** 076 * By default we retry errors in splitting, rather than skipping. 077 */ 078 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 079 080 // Parameters for split process 081 protected final Path walRootDir; 082 protected final FileSystem walFS; 083 protected final Configuration conf; 084 final Path rootDir; 085 final FileSystem rootFS; 086 final RegionServerServices rsServices; 087 088 // Major subcomponents of the split process. 089 // These are separated into inner classes to make testing easier. 090 OutputSink outputSink; 091 private EntryBuffers entryBuffers; 092 093 /** 094 * Coordinator for split log. Used by the zk-based log splitter. Not used by the procedure 095 * v2-based log splitter. 096 */ 097 private SplitLogWorkerCoordination splitLogWorkerCoordination; 098 099 private final WALFactory walFactory; 100 101 // For checking the latest flushed sequence id 102 protected final LastSequenceId sequenceIdChecker; 103 104 // Map encodedRegionName -> lastFlushedSequenceId 105 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 106 107 // Map encodedRegionName -> maxSeqIdInStores 108 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 109 110 // the file being split currently 111 private FileStatus fileBeingSplit; 112 113 private final String tmpDirName; 114 115 /** 116 * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files. 117 */ 118 public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; 119 public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; 120 121 /** 122 * True if we are to run with bounded amount of writers rather than let the count blossom. Default 123 * is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that is always 124 * bounded. Only applies when you are doing recovery to 'recovered.edits' files (the old default). 125 * Bounded writing tends to have higher throughput. 126 */ 127 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 128 129 public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; 130 public final static String SPLIT_WAL_WRITER_THREADS = 131 "hbase.regionserver.hlog.splitlog.writer.threads"; 132 133 private final int numWriterThreads; 134 private final long bufferSize; 135 private final boolean splitWriterCreationBounded; 136 private final boolean hfile; 137 private final boolean skipErrors; 138 139 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS, 140 Path rootDir, FileSystem rootFS) { 141 this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null); 142 } 143 144 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS, 145 Path rootDir, FileSystem rootFS, LastSequenceId idChecker, 146 SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { 147 this.conf = HBaseConfiguration.create(conf); 148 String codecClassName = 149 conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 150 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 151 this.walRootDir = walRootDir; 152 this.walFS = walFS; 153 this.rootDir = rootDir; 154 this.rootFS = rootFS; 155 this.sequenceIdChecker = idChecker; 156 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 157 this.rsServices = rsServices; 158 this.walFactory = factory; 159 this.tmpDirName = 160 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 161 // if we limit the number of writers opened for sinking recovered edits 162 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 163 this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); 164 this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); 165 this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); 166 this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT); 167 } 168 169 WALFactory getWalFactory() { 170 return this.walFactory; 171 } 172 173 FileStatus getFileBeingSplit() { 174 return fileBeingSplit; 175 } 176 177 String getTmpDirName() { 178 return this.tmpDirName; 179 } 180 181 Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { 182 return regionMaxSeqIdInStores; 183 } 184 185 /** 186 * Splits a WAL file. Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and 187 * tests. Not used by new procedure-based WAL splitter. 188 * @return false if it is interrupted by the progress-able. 189 */ 190 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 191 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 192 SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, 193 RegionServerServices rsServices) throws IOException { 194 Path rootDir = CommonFSUtils.getRootDir(conf); 195 FileSystem rootFS = rootDir.getFileSystem(conf); 196 WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, 197 splitLogWorkerCoordination, rsServices); 198 // splitWAL returns a data structure with whether split is finished and if the file is corrupt. 199 // We don't need to propagate corruption flag here because it is propagated by the 200 // SplitLogWorkerCoordination. 201 return splitter.splitWAL(logfile, reporter).isFinished(); 202 } 203 204 /** 205 * Split a folder of WAL files. Delete the directory when done. Used by tools and unit tests. It 206 * should be package private. It is public only because TestWALObserver is in a different package, 207 * which uses this method to do log splitting. 208 * @return List of output files created by the split. 209 */ 210 public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS, 211 Configuration conf, final WALFactory factory) throws IOException { 212 Path rootDir = CommonFSUtils.getRootDir(conf); 213 FileSystem rootFS = rootDir.getFileSystem(conf); 214 WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS); 215 final List<FileStatus> wals = 216 SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null); 217 List<Path> splits = new ArrayList<>(); 218 if (!wals.isEmpty()) { 219 for (FileStatus wal : wals) { 220 SplitWALResult splitWALResult = splitter.splitWAL(wal, null); 221 if (splitWALResult.isFinished()) { 222 WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf); 223 // splitter.outputSink.splits is mark as final, do not need null check 224 splits.addAll(splitter.outputSink.splits); 225 } 226 } 227 } 228 if (!walFS.delete(walsDir, true)) { 229 throw new IOException("Unable to delete src dir " + walsDir); 230 } 231 return splits; 232 } 233 234 /** 235 * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). Test 236 * {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if the 237 * WAL is corrupt. 238 */ 239 static final class SplitWALResult { 240 private final boolean finished; 241 private final boolean corrupt; 242 243 private SplitWALResult(boolean finished, boolean corrupt) { 244 this.finished = finished; 245 this.corrupt = corrupt; 246 } 247 248 public boolean isFinished() { 249 return finished; 250 } 251 252 public boolean isCorrupt() { 253 return corrupt; 254 } 255 } 256 257 /** 258 * Setup the output sinks and entry buffers ahead of splitting WAL. 259 */ 260 private void createOutputSinkAndEntryBuffers() { 261 PipelineController controller = new PipelineController(); 262 if (this.hfile) { 263 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 264 this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, this.entryBuffers, 265 this.numWriterThreads); 266 } else if (this.splitWriterCreationBounded) { 267 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 268 this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, this.entryBuffers, 269 this.numWriterThreads); 270 } else { 271 this.entryBuffers = new EntryBuffers(controller, this.bufferSize); 272 this.outputSink = 273 new RecoveredEditsOutputSink(this, controller, this.entryBuffers, this.numWriterThreads); 274 } 275 } 276 277 /** 278 * WAL splitting implementation, splits one WAL file. 279 * @param walStatus should be for an actual WAL file. 280 */ 281 SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException { 282 Path wal = walStatus.getPath(); 283 Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString()); 284 boolean corrupt = false; 285 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 286 boolean outputSinkStarted = false; 287 boolean cancelled = false; 288 int editsCount = 0; 289 int editsSkipped = 0; 290 MonitoredTask status = 291 TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.", true); 292 Reader walReader = null; 293 this.fileBeingSplit = walStatus; 294 long startTS = EnvironmentEdgeManager.currentTime(); 295 long length = walStatus.getLen(); 296 String lengthStr = StringUtils.humanSize(length); 297 createOutputSinkAndEntryBuffers(); 298 try { 299 String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)"; 300 LOG.info(logStr); 301 status.setStatus(logStr); 302 if (cancel != null && !cancel.progress()) { 303 cancelled = true; 304 return new SplitWALResult(false, corrupt); 305 } 306 walReader = getReader(walStatus, this.skipErrors, cancel); 307 if (walReader == null) { 308 LOG.warn("Nothing in {}; empty?", wal); 309 return new SplitWALResult(true, corrupt); 310 } 311 LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS); 312 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 313 int numOpenedFilesLastCheck = 0; 314 outputSink.setReporter(cancel); 315 outputSink.setStatus(status); 316 outputSink.startWriterThreads(); 317 outputSinkStarted = true; 318 Entry entry; 319 startTS = EnvironmentEdgeManager.currentTime(); 320 while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { 321 byte[] region = entry.getKey().getEncodedRegionName(); 322 String encodedRegionNameAsStr = Bytes.toString(region); 323 Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 324 if (lastFlushedSequenceId == null) { 325 if ( 326 !(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr)) 327 ) { 328 // The region directory itself is not present in the FS. This indicates that 329 // the region/table is already removed. We can just skip all the edits for this 330 // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits 331 // will get skipped by the seqId check below. 332 // See more details at https://issues.apache.org/jira/browse/HBASE-24189 333 LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr); 334 lastFlushedSequenceId = Long.MAX_VALUE; 335 } else { 336 if (sequenceIdChecker != null) { 337 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 338 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 339 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 340 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 341 storeSeqId.getSequenceId()); 342 } 343 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 344 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 345 if (LOG.isDebugEnabled()) { 346 LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": " 347 + TextFormat.shortDebugString(ids)); 348 } 349 } 350 if (lastFlushedSequenceId == null) { 351 lastFlushedSequenceId = -1L; 352 } 353 } 354 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 355 } 356 editsCount++; 357 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 358 editsSkipped++; 359 continue; 360 } 361 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 362 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 363 editsSkipped++; 364 continue; 365 } 366 entryBuffers.appendEntry(entry); 367 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 368 // If sufficient edits have passed, check if we should report progress. 369 if ( 370 editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting 371 ) { 372 numOpenedFilesLastCheck = this.getNumOpenWriters(); 373 String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) 374 + " edits, skipped " + editsSkipped + " edits."; 375 status.setStatus("Split " + countsStr); 376 if (cancel != null && !cancel.progress()) { 377 cancelled = true; 378 return new SplitWALResult(false, corrupt); 379 } 380 } 381 } 382 } catch (InterruptedException ie) { 383 IOException iie = new InterruptedIOException(); 384 iie.initCause(ie); 385 throw iie; 386 } catch (CorruptedLogFileException e) { 387 LOG.warn("Could not parse, corrupt WAL={}", wal, e); 388 // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update 389 // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component. 390 if (this.splitLogWorkerCoordination != null) { 391 // Some tests pass in a csm of null. 392 splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS); 393 } 394 corrupt = true; 395 } catch (IOException e) { 396 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 397 throw e; 398 } finally { 399 final String log = "Finishing writing output for " + wal + " so closing down"; 400 LOG.debug(log); 401 status.setStatus(log); 402 try { 403 if (null != walReader) { 404 walReader.close(); 405 } 406 } catch (IOException exception) { 407 LOG.warn("Could not close {} reader", wal, exception); 408 } 409 try { 410 if (outputSinkStarted) { 411 // Set cancelled to true as the immediate following statement will reset its value. 412 // If close() throws an exception, cancelled will have the right value 413 cancelled = true; 414 cancelled = outputSink.close() == null; 415 } 416 } finally { 417 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 418 // See if length got updated post lease recovery 419 String msg = "Processed " + editsCount + " edits across " 420 + outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost 421 + " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + ", length=" 422 + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled; 423 LOG.info(msg); 424 status.markComplete(msg); 425 if (LOG.isDebugEnabled()) { 426 LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal()); 427 } 428 } 429 } 430 return new SplitWALResult(!cancelled, corrupt); 431 } 432 433 private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException { 434 return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region)); 435 } 436 437 /** 438 * Create a new {@link Reader} for reading logs to split. 439 * @return Returns null if file has length zero or file can't be found. 440 */ 441 protected Reader getReader(FileStatus walStatus, boolean skipErrors, 442 CancelableProgressable cancel) throws IOException, CorruptedLogFileException { 443 Path path = walStatus.getPath(); 444 long length = walStatus.getLen(); 445 Reader in; 446 447 // Check for possibly empty file. With appends, currently Hadoop reports a 448 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 449 // HDFS-878 is committed. 450 if (length <= 0) { 451 LOG.warn("File {} might be still open, length is 0", path); 452 } 453 454 try { 455 RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel); 456 try { 457 in = getReader(path, cancel); 458 } catch (EOFException e) { 459 if (length <= 0) { 460 // TODO should we ignore an empty, not-last log file if skip.errors 461 // is false? Either way, the caller should decide what to do. E.g. 462 // ignore if this is the last log in sequence. 463 // TODO is this scenario still possible if the log has been 464 // recovered (i.e. closed) 465 LOG.warn("Could not open {} for reading. File is empty", path, e); 466 } 467 // EOFException being ignored 468 return null; 469 } 470 } catch (IOException e) { 471 if (e instanceof FileNotFoundException) { 472 // A wal file may not exist anymore. Nothing can be recovered so move on 473 LOG.warn("File {} does not exist anymore", path, e); 474 return null; 475 } 476 if (!skipErrors || e instanceof InterruptedIOException) { 477 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 478 } 479 throw new CorruptedLogFileException("skipErrors=true; could not open " + path + ", skipping", 480 e); 481 } 482 return in; 483 } 484 485 private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 486 throws CorruptedLogFileException, IOException { 487 try { 488 return in.next(); 489 } catch (EOFException eof) { 490 // truncated files are expected if a RS crashes (see HBASE-2643) 491 LOG.info("EOF from {}; continuing.", path); 492 return null; 493 } catch (IOException e) { 494 // If the IOE resulted from bad file format, 495 // then this problem is idempotent and retrying won't help 496 if ( 497 e.getCause() != null && (e.getCause() instanceof ParseException 498 || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException) 499 ) { 500 LOG.warn("Parse exception from {}; continuing", path, e); 501 return null; 502 } 503 if (!skipErrors) { 504 throw e; 505 } 506 throw new CorruptedLogFileException("skipErrors=true Ignoring exception" 507 + " while parsing wal " + path + ". Marking as corrupted", e); 508 } 509 } 510 511 /** 512 * Create a new {@link WALProvider.Writer} for writing log splits. 513 * @return a new Writer instance, caller should close 514 */ 515 protected WALProvider.Writer createWriter(Path logfile) throws IOException { 516 return walFactory.createRecoveredEditsWriter(walFS, logfile); 517 } 518 519 /** 520 * Create a new {@link Reader} for reading logs to split. 521 * @return new Reader instance, caller should close 522 */ 523 private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 524 return walFactory.createReader(walFS, curLogFile, reporter); 525 } 526 527 /** 528 * Get current open writers 529 */ 530 private int getNumOpenWriters() { 531 int result = 0; 532 if (this.outputSink != null) { 533 result += this.outputSink.getNumOpenWriters(); 534 } 535 return result; 536 } 537 538 /** 539 * Contains some methods to control WAL-entries producer / consumer interactions 540 */ 541 public static class PipelineController { 542 // If an exception is thrown by one of the other threads, it will be 543 // stored here. 544 AtomicReference<Throwable> thrown = new AtomicReference<>(); 545 546 // Wait/notify for when data has been produced by the writer thread, 547 // consumed by the reader thread, or an exception occurred 548 final Object dataAvailable = new Object(); 549 550 void writerThreadError(Throwable t) { 551 thrown.compareAndSet(null, t); 552 } 553 554 /** 555 * Check for errors in the writer threads. If any is found, rethrow it. 556 */ 557 void checkForErrors() throws IOException { 558 Throwable thrown = this.thrown.get(); 559 if (thrown == null) { 560 return; 561 } 562 this.thrown.set(null); 563 if (thrown instanceof IOException) { 564 throw new IOException(thrown); 565 } else { 566 throw new RuntimeException(thrown); 567 } 568 } 569 } 570 571 static class CorruptedLogFileException extends Exception { 572 private static final long serialVersionUID = 1L; 573 574 CorruptedLogFileException(String s) { 575 super(s); 576 } 577 578 /** 579 * CorruptedLogFileException with cause 580 * @param message the message for this exception 581 * @param cause the cause for this exception 582 */ 583 CorruptedLogFileException(String message, Throwable cause) { 584 super(message, cause); 585 } 586 } 587}