001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.EOFException; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.TreeMap; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 040import org.apache.hadoop.hbase.master.SplitLogManager; 041import org.apache.hadoop.hbase.monitoring.MonitoredTask; 042import org.apache.hadoop.hbase.monitoring.TaskMonitor; 043import org.apache.hadoop.hbase.procedure2.util.StringUtils; 044import org.apache.hadoop.hbase.regionserver.LastSequenceId; 045import org.apache.hadoop.hbase.regionserver.RegionServerServices; 046import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CancelableProgressable; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 052import org.apache.hadoop.hbase.wal.WAL.Entry; 053import org.apache.hadoop.hbase.wal.WAL.Reader; 054import org.apache.hadoop.ipc.RemoteException; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 064 065/** 066 * Split RegionServer WAL files. Splits the WAL into new files, 067 * one per region, to be picked up on Region reopen. Deletes the split WAL when finished. 068 * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or 069 * use static helper methods. 070 */ 071@InterfaceAudience.Private 072public class WALSplitter { 073 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 074 public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors"; 075 076 /** 077 * By default we retry errors in splitting, rather than skipping. 078 */ 079 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 080 081 // Parameters for split process 082 protected final Path walRootDir; 083 protected final FileSystem walFS; 084 protected final Configuration conf; 085 final Path rootDir; 086 final FileSystem rootFS; 087 final RegionServerServices rsServices; 088 089 // Major subcomponents of the split process. 090 // These are separated into inner classes to make testing easier. 091 OutputSink outputSink; 092 private EntryBuffers entryBuffers; 093 094 /** 095 * Coordinator for split log. Used by the zk-based log splitter. 096 * Not used by the procedure v2-based log splitter. 097 */ 098 private SplitLogWorkerCoordination splitLogWorkerCoordination; 099 100 private final WALFactory walFactory; 101 102 // For checking the latest flushed sequence id 103 protected final LastSequenceId sequenceIdChecker; 104 105 // Map encodedRegionName -> lastFlushedSequenceId 106 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 107 108 // Map encodedRegionName -> maxSeqIdInStores 109 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 110 111 // the file being split currently 112 private FileStatus fileBeingSplit; 113 114 private final String tmpDirName; 115 116 /** 117 * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files. 118 */ 119 public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; 120 public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; 121 122 /** 123 * True if we are to run with bounded amount of writers rather than let the count blossom. 124 * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that 125 * is always bounded. Only applies when you are doing recovery to 'recovered.edits' 126 * files (the old default). Bounded writing tends to have higher throughput. 127 */ 128 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 129 130 public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; 131 public final static String SPLIT_WAL_WRITER_THREADS = 132 "hbase.regionserver.hlog.splitlog.writer.threads"; 133 134 private final int numWriterThreads; 135 private final long bufferSize; 136 private final boolean splitWriterCreationBounded; 137 private final boolean hfile; 138 private final boolean skipErrors; 139 140 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, 141 FileSystem walFS, Path rootDir, FileSystem rootFS) { 142 this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null); 143 } 144 145 WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, 146 FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker, 147 SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { 148 this.conf = HBaseConfiguration.create(conf); 149 String codecClassName = 150 conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 151 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 152 this.walRootDir = walRootDir; 153 this.walFS = walFS; 154 this.rootDir = rootDir; 155 this.rootFS = rootFS; 156 this.sequenceIdChecker = idChecker; 157 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 158 this.rsServices = rsServices; 159 this.walFactory = factory; 160 this.tmpDirName = 161 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 162 // if we limit the number of writers opened for sinking recovered edits 163 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 164 this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); 165 this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); 166 this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); 167 this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT); 168 } 169 170 WALFactory getWalFactory() { 171 return this.walFactory; 172 } 173 174 FileStatus getFileBeingSplit() { 175 return fileBeingSplit; 176 } 177 178 String getTmpDirName() { 179 return this.tmpDirName; 180 } 181 182 Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { 183 return regionMaxSeqIdInStores; 184 } 185 186 /** 187 * Splits a WAL file. 188 * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests. 189 * Not used by new procedure-based WAL splitter. 190 * 191 * @return false if it is interrupted by the progress-able. 192 */ 193 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 194 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 195 SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, 196 RegionServerServices rsServices) throws IOException { 197 Path rootDir = CommonFSUtils.getRootDir(conf); 198 FileSystem rootFS = rootDir.getFileSystem(conf); 199 WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, 200 splitLogWorkerCoordination, rsServices); 201 // splitWAL returns a data structure with whether split is finished and if the file is corrupt. 202 // We don't need to propagate corruption flag here because it is propagated by the 203 // SplitLogWorkerCoordination. 204 return splitter.splitWAL(logfile, reporter).isFinished(); 205 } 206 207 /** 208 * Split a folder of WAL files. Delete the directory when done. 209 * Used by tools and unit tests. It should be package private. 210 * It is public only because TestWALObserver is in a different package, 211 * which uses this method to do log splitting. 212 * @return List of output files created by the split. 213 */ 214 public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS, 215 Configuration conf, final WALFactory factory) throws IOException { 216 Path rootDir = CommonFSUtils.getRootDir(conf); 217 FileSystem rootFS = rootDir.getFileSystem(conf); 218 WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS); 219 final List<FileStatus> wals = 220 SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null); 221 List<Path> splits = new ArrayList<>(); 222 if (!wals.isEmpty()) { 223 for (FileStatus wal: wals) { 224 SplitWALResult splitWALResult = splitter.splitWAL(wal, null); 225 if (splitWALResult.isFinished()) { 226 WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf); 227 //splitter.outputSink.splits is mark as final, do not need null check 228 splits.addAll(splitter.outputSink.splits); 229 } 230 } 231 } 232 if (!walFS.delete(walsDir, true)) { 233 throw new IOException("Unable to delete src dir " + walsDir); 234 } 235 return splits; 236 } 237 238 /** 239 * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). 240 * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if 241 * the WAL is corrupt. 242 */ 243 static final class SplitWALResult { 244 private final boolean finished; 245 private final boolean corrupt; 246 247 private SplitWALResult(boolean finished, boolean corrupt) { 248 this.finished = finished; 249 this.corrupt = corrupt; 250 } 251 252 public boolean isFinished() { 253 return finished; 254 } 255 256 public boolean isCorrupt() { 257 return corrupt; 258 } 259 } 260 261 /** 262 * Setup the output sinks and entry buffers ahead of splitting WAL. 263 */ 264 private void createOutputSinkAndEntryBuffers() { 265 PipelineController controller = new PipelineController(); 266 if (this.hfile) { 267 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 268 this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, 269 this.entryBuffers, this.numWriterThreads); 270 } else if (this.splitWriterCreationBounded) { 271 this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize); 272 this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, 273 this.entryBuffers, this.numWriterThreads); 274 } else { 275 this.entryBuffers = new EntryBuffers(controller, this.bufferSize); 276 this.outputSink = new RecoveredEditsOutputSink(this, controller, 277 this.entryBuffers, this.numWriterThreads); 278 } 279 } 280 281 /** 282 * WAL splitting implementation, splits one WAL file. 283 * @param walStatus should be for an actual WAL file. 284 */ 285 SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException { 286 Path wal = walStatus.getPath(); 287 Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString()); 288 boolean corrupt = false; 289 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 290 boolean outputSinkStarted = false; 291 boolean cancelled = false; 292 int editsCount = 0; 293 int editsSkipped = 0; 294 MonitoredTask status = 295 TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area."); 296 status.enableStatusJournal(true); 297 Reader walReader = null; 298 this.fileBeingSplit = walStatus; 299 long startTS = EnvironmentEdgeManager.currentTime(); 300 long length = walStatus.getLen(); 301 String lengthStr = StringUtils.humanSize(length); 302 createOutputSinkAndEntryBuffers(); 303 try { 304 String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)"; 305 LOG.info(logStr); 306 status.setStatus(logStr); 307 if (cancel != null && !cancel.progress()) { 308 cancelled = true; 309 return new SplitWALResult(false, corrupt); 310 } 311 walReader = getReader(walStatus, this.skipErrors, cancel); 312 if (walReader == null) { 313 LOG.warn("Nothing in {}; empty?", wal); 314 return new SplitWALResult(true, corrupt); 315 } 316 LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS); 317 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 318 int numOpenedFilesLastCheck = 0; 319 outputSink.setReporter(cancel); 320 outputSink.setStatus(status); 321 outputSink.startWriterThreads(); 322 outputSinkStarted = true; 323 Entry entry; 324 startTS = EnvironmentEdgeManager.currentTime(); 325 while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { 326 byte[] region = entry.getKey().getEncodedRegionName(); 327 String encodedRegionNameAsStr = Bytes.toString(region); 328 Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 329 if (lastFlushedSequenceId == null) { 330 if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), 331 encodedRegionNameAsStr))) { 332 // The region directory itself is not present in the FS. This indicates that 333 // the region/table is already removed. We can just skip all the edits for this 334 // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits 335 // will get skipped by the seqId check below. 336 // See more details at https://issues.apache.org/jira/browse/HBASE-24189 337 LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr); 338 lastFlushedSequenceId = Long.MAX_VALUE; 339 } else { 340 if (sequenceIdChecker != null) { 341 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 342 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 343 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 344 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 345 storeSeqId.getSequenceId()); 346 } 347 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 348 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 349 if (LOG.isDebugEnabled()) { 350 LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": " 351 + TextFormat.shortDebugString(ids)); 352 } 353 } 354 if (lastFlushedSequenceId == null) { 355 lastFlushedSequenceId = -1L; 356 } 357 } 358 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 359 } 360 editsCount++; 361 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 362 editsSkipped++; 363 continue; 364 } 365 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 366 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 367 editsSkipped++; 368 continue; 369 } 370 entryBuffers.appendEntry(entry); 371 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 372 // If sufficient edits have passed, check if we should report progress. 373 if (editsCount % interval == 0 374 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { 375 numOpenedFilesLastCheck = this.getNumOpenWriters(); 376 String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) 377 + " edits, skipped " + editsSkipped + " edits."; 378 status.setStatus("Split " + countsStr); 379 if (cancel != null && !cancel.progress()) { 380 cancelled = true; 381 return new SplitWALResult(false, corrupt); 382 } 383 } 384 } 385 } catch (InterruptedException ie) { 386 IOException iie = new InterruptedIOException(); 387 iie.initCause(ie); 388 throw iie; 389 } catch (CorruptedLogFileException e) { 390 LOG.warn("Could not parse, corrupt WAL={}", wal, e); 391 // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update 392 // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component. 393 if (this.splitLogWorkerCoordination != null) { 394 // Some tests pass in a csm of null. 395 splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS); 396 } 397 corrupt = true; 398 } catch (IOException e) { 399 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 400 throw e; 401 } finally { 402 final String log = "Finishing writing output for " + wal + " so closing down"; 403 LOG.debug(log); 404 status.setStatus(log); 405 try { 406 if (null != walReader) { 407 walReader.close(); 408 } 409 } catch (IOException exception) { 410 LOG.warn("Could not close {} reader", wal, exception); 411 } 412 try { 413 if (outputSinkStarted) { 414 // Set cancelled to true as the immediate following statement will reset its value. 415 // If close() throws an exception, cancelled will have the right value 416 cancelled = true; 417 cancelled = outputSink.close() == null; 418 } 419 } finally { 420 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 421 // See if length got updated post lease recovery 422 String msg = "Processed " + editsCount + " edits across " + 423 outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost + 424 " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + 425 ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled; 426 LOG.info(msg); 427 status.markComplete(msg); 428 if (LOG.isDebugEnabled()) { 429 LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal()); 430 } 431 } 432 } 433 return new SplitWALResult(!cancelled, corrupt); 434 } 435 436 private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException { 437 return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region)); 438 } 439 440 /** 441 * Create a new {@link Reader} for reading logs to split. 442 * @return Returns null if file has length zero or file can't be found. 443 */ 444 protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel) 445 throws IOException, CorruptedLogFileException { 446 Path path = walStatus.getPath(); 447 long length = walStatus.getLen(); 448 Reader in; 449 450 // Check for possibly empty file. With appends, currently Hadoop reports a 451 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 452 // HDFS-878 is committed. 453 if (length <= 0) { 454 LOG.warn("File {} might be still open, length is 0", path); 455 } 456 457 try { 458 RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel); 459 try { 460 in = getReader(path, cancel); 461 } catch (EOFException e) { 462 if (length <= 0) { 463 // TODO should we ignore an empty, not-last log file if skip.errors 464 // is false? Either way, the caller should decide what to do. E.g. 465 // ignore if this is the last log in sequence. 466 // TODO is this scenario still possible if the log has been 467 // recovered (i.e. closed) 468 LOG.warn("Could not open {} for reading. File is empty", path, e); 469 } 470 // EOFException being ignored 471 return null; 472 } 473 } catch (IOException e) { 474 if (e instanceof FileNotFoundException) { 475 // A wal file may not exist anymore. Nothing can be recovered so move on 476 LOG.warn("File {} does not exist anymore", path, e); 477 return null; 478 } 479 if (!skipErrors || e instanceof InterruptedIOException) { 480 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 481 } 482 throw new CorruptedLogFileException("skipErrors=true; could not open " + path + 483 ", skipping", e); 484 } 485 return in; 486 } 487 488 private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 489 throws CorruptedLogFileException, IOException { 490 try { 491 return in.next(); 492 } catch (EOFException eof) { 493 // truncated files are expected if a RS crashes (see HBASE-2643) 494 LOG.info("EOF from {}; continuing.", path); 495 return null; 496 } catch (IOException e) { 497 // If the IOE resulted from bad file format, 498 // then this problem is idempotent and retrying won't help 499 if (e.getCause() != null && (e.getCause() instanceof ParseException 500 || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { 501 LOG.warn("Parse exception from {}; continuing", path, e); 502 return null; 503 } 504 if (!skipErrors) { 505 throw e; 506 } 507 throw new CorruptedLogFileException("skipErrors=true Ignoring exception" 508 + " while parsing wal " + path + ". Marking as corrupted", e); 509 } 510 } 511 512 /** 513 * Create a new {@link WALProvider.Writer} for writing log splits. 514 * @return a new Writer instance, caller should close 515 */ 516 protected WALProvider.Writer createWriter(Path logfile) throws IOException { 517 return walFactory.createRecoveredEditsWriter(walFS, logfile); 518 } 519 520 /** 521 * Create a new {@link Reader} for reading logs to split. 522 * @return new Reader instance, caller should close 523 */ 524 private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 525 return walFactory.createReader(walFS, curLogFile, reporter); 526 } 527 528 /** 529 * Get current open writers 530 */ 531 private int getNumOpenWriters() { 532 int result = 0; 533 if (this.outputSink != null) { 534 result += this.outputSink.getNumOpenWriters(); 535 } 536 return result; 537 } 538 539 /** 540 * Contains some methods to control WAL-entries producer / consumer interactions 541 */ 542 public static class PipelineController { 543 // If an exception is thrown by one of the other threads, it will be 544 // stored here. 545 AtomicReference<Throwable> thrown = new AtomicReference<>(); 546 547 // Wait/notify for when data has been produced by the writer thread, 548 // consumed by the reader thread, or an exception occurred 549 final Object dataAvailable = new Object(); 550 551 void writerThreadError(Throwable t) { 552 thrown.compareAndSet(null, t); 553 } 554 555 /** 556 * Check for errors in the writer threads. If any is found, rethrow it. 557 */ 558 void checkForErrors() throws IOException { 559 Throwable thrown = this.thrown.get(); 560 if (thrown == null) { 561 return; 562 } 563 this.thrown.set(null); 564 if (thrown instanceof IOException) { 565 throw new IOException(thrown); 566 } else { 567 throw new RuntimeException(thrown); 568 } 569 } 570 } 571 572 static class CorruptedLogFileException extends Exception { 573 private static final long serialVersionUID = 1L; 574 575 CorruptedLogFileException(String s) { 576 super(s); 577 } 578 579 /** 580 * CorruptedLogFileException with cause 581 * 582 * @param message the message for this exception 583 * @param cause the cause for this exception 584 */ 585 CorruptedLogFileException(String message, Throwable cause) { 586 super(message, cause); 587 } 588 } 589}