001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.TreeMap;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.atomic.AtomicReference;
032import edu.umd.cs.findbugs.annotations.Nullable;
033import org.apache.commons.lang3.ArrayUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
042import org.apache.hadoop.hbase.master.SplitLogManager;
043import org.apache.hadoop.hbase.monitoring.MonitoredTask;
044import org.apache.hadoop.hbase.monitoring.TaskMonitor;
045import org.apache.hadoop.hbase.procedure2.util.StringUtils;
046import org.apache.hadoop.hbase.regionserver.LastSequenceId;
047import org.apache.hadoop.hbase.regionserver.RegionServerServices;
048import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CancelableProgressable;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WAL.Reader;
056import org.apache.hadoop.ipc.RemoteException;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
062import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
065import javax.validation.constraints.Null;
066
067/**
068 * Split RegionServer WAL files. Splits the WAL into new files,
069 * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
070 * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or
071 * use static helper methods.
072 */
073@InterfaceAudience.Private
074public class WALSplitter {
075  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
076  public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
077
078  /**
079   * By default we retry errors in splitting, rather than skipping.
080   */
081  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
082
083  // Parameters for split process
084  protected final Path walRootDir;
085  protected final FileSystem walFS;
086  protected final Configuration conf;
087  final Path rootDir;
088  final FileSystem rootFS;
089  final RegionServerServices rsServices;
090
091  // Major subcomponents of the split process.
092  // These are separated into inner classes to make testing easier.
093  OutputSink outputSink;
094  private EntryBuffers entryBuffers;
095
096  /**
097   * Coordinator for split log. Used by the zk-based log splitter.
098   * Not used by the procedure v2-based log splitter.
099   */
100  private SplitLogWorkerCoordination splitLogWorkerCoordination;
101
102  private final WALFactory walFactory;
103
104  // For checking the latest flushed sequence id
105  protected final LastSequenceId sequenceIdChecker;
106
107  // Map encodedRegionName -> lastFlushedSequenceId
108  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
109
110  // Map encodedRegionName -> maxSeqIdInStores
111  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
112
113  // the file being split currently
114  private FileStatus fileBeingSplit;
115
116  private final String tmpDirName;
117
118  /**
119   * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
120   */
121  public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
122  public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
123
124  /**
125   * True if we are to run with bounded amount of writers rather than let the count blossom.
126   * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
127   * is always bounded. Only applies when you are doing recovery to 'recovered.edits'
128   * files (the old default). Bounded writing tends to have higher throughput.
129   */
130  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
131
132  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
133  public final static String SPLIT_WAL_WRITER_THREADS =
134    "hbase.regionserver.hlog.splitlog.writer.threads";
135
136  private final int numWriterThreads;
137  private final long bufferSize;
138  private final boolean splitWriterCreationBounded;
139  private final boolean hfile;
140  private final boolean skipErrors;
141
142  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
143      FileSystem walFS, Path rootDir, FileSystem rootFS) {
144    this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
145  }
146
147  @VisibleForTesting
148  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
149      FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
150      SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
151    this.conf = HBaseConfiguration.create(conf);
152    String codecClassName =
153      conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
154    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
155    this.walRootDir = walRootDir;
156    this.walFS = walFS;
157    this.rootDir = rootDir;
158    this.rootFS = rootFS;
159    this.sequenceIdChecker = idChecker;
160    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
161    this.rsServices = rsServices;
162    this.walFactory = factory;
163    this.tmpDirName =
164      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
165    // if we limit the number of writers opened for sinking recovered edits
166    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
167    this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
168    this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
169    this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
170    this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
171  }
172
173  WALFactory getWalFactory() {
174    return this.walFactory;
175  }
176
177  FileStatus getFileBeingSplit() {
178    return fileBeingSplit;
179  }
180
181  String getTmpDirName() {
182    return this.tmpDirName;
183  }
184
185  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
186    return regionMaxSeqIdInStores;
187  }
188
189  /**
190   * Splits a WAL file.
191   * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests.
192   * Not used by new procedure-based WAL splitter.
193   *
194   * @return false if it is interrupted by the progress-able.
195   */
196  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
197      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
198      SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
199      RegionServerServices rsServices) throws IOException {
200    Path rootDir = CommonFSUtils.getRootDir(conf);
201    FileSystem rootFS = rootDir.getFileSystem(conf);
202    WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
203      splitLogWorkerCoordination, rsServices);
204    // splitWAL returns a data structure with whether split is finished and if the file is corrupt.
205    // We don't need to propagate corruption flag here because it is propagated by the
206    // SplitLogWorkerCoordination.
207    return splitter.splitWAL(logfile, reporter).isFinished();
208  }
209
210  /**
211   * Split a folder of WAL files. Delete the directory when done.
212   * Used by tools and unit tests. It should be package private.
213   * It is public only because TestWALObserver is in a different package,
214   * which uses this method to do log splitting.
215   * @return List of output files created by the split.
216   */
217  @VisibleForTesting
218  public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
219      Configuration conf, final WALFactory factory) throws IOException {
220    Path rootDir = CommonFSUtils.getRootDir(conf);
221    FileSystem rootFS = rootDir.getFileSystem(conf);
222    WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
223    final FileStatus[] wals =
224      SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
225    List<Path> splits = new ArrayList<>();
226    if (ArrayUtils.isNotEmpty(wals)) {
227      for (FileStatus wal: wals) {
228        SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
229        if (splitWALResult.isFinished()) {
230          WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
231          if (splitter.outputSink.splits != null) {
232            splits.addAll(splitter.outputSink.splits);
233          }
234        }
235      }
236    }
237    if (!walFS.delete(walsDir, true)) {
238      throw new IOException("Unable to delete src dir " + walsDir);
239    }
240    return splits;
241  }
242
243  /**
244   * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable).
245   * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if
246   * the WAL is corrupt.
247   */
248  static final class SplitWALResult {
249    private final boolean finished;
250    private final boolean corrupt;
251
252    private SplitWALResult(boolean finished, boolean corrupt) {
253      this.finished = finished;
254      this.corrupt = corrupt;
255    }
256
257    public boolean isFinished() {
258      return finished;
259    }
260
261    public boolean isCorrupt() {
262      return corrupt;
263    }
264  }
265
266  /**
267   * Setup the output sinks and entry buffers ahead of splitting WAL.
268   */
269  private void createOutputSinkAndEntryBuffers() {
270    PipelineController controller = new PipelineController();
271    if (this.hfile) {
272      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
273      this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller,
274        this.entryBuffers, this.numWriterThreads);
275    } else if (this.splitWriterCreationBounded) {
276      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
277      this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller,
278        this.entryBuffers, this.numWriterThreads);
279    } else {
280      this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
281      this.outputSink = new RecoveredEditsOutputSink(this, controller,
282        this.entryBuffers, this.numWriterThreads);
283    }
284  }
285
286  /**
287   * WAL splitting implementation, splits one WAL file.
288   * @param walStatus should be for an actual WAL file.
289   */
290  @VisibleForTesting
291  SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
292    Path wal = walStatus.getPath();
293    Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
294    boolean corrupt = false;
295    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
296    boolean outputSinkStarted = false;
297    boolean cancelled = false;
298    int editsCount = 0;
299    int editsSkipped = 0;
300    MonitoredTask status =
301      TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
302    status.enableStatusJournal(true);
303    Reader walReader = null;
304    this.fileBeingSplit = walStatus;
305    long startTS = EnvironmentEdgeManager.currentTime();
306    long length = walStatus.getLen();
307    String lengthStr = StringUtils.humanSize(length);
308    createOutputSinkAndEntryBuffers();
309    try {
310      String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
311      LOG.info(logStr);
312      status.setStatus(logStr);
313      if (cancel != null && !cancel.progress()) {
314        cancelled = true;
315        return new SplitWALResult(false, corrupt);
316      }
317      walReader = getReader(walStatus, this.skipErrors, cancel);
318      if (walReader == null) {
319        LOG.warn("Nothing in {}; empty?", wal);
320        return new SplitWALResult(true, corrupt);
321      }
322      LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
323      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
324      int numOpenedFilesLastCheck = 0;
325      outputSink.setReporter(cancel);
326      outputSink.setStatus(status);
327      outputSink.startWriterThreads();
328      outputSinkStarted = true;
329      Entry entry;
330      startTS = EnvironmentEdgeManager.currentTime();
331      while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
332        byte[] region = entry.getKey().getEncodedRegionName();
333        String encodedRegionNameAsStr = Bytes.toString(region);
334        Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
335        if (lastFlushedSequenceId == null) {
336          if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
337              encodedRegionNameAsStr))) {
338            // The region directory itself is not present in the FS. This indicates that
339            // the region/table is already removed. We can just skip all the edits for this
340            // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
341            // will get skipped by the seqId check below.
342            // See more details at https://issues.apache.org/jira/browse/HBASE-24189
343            LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
344            lastFlushedSequenceId = Long.MAX_VALUE;
345          } else {
346            if (sequenceIdChecker != null) {
347              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
348              Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
349              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
350                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
351                    storeSeqId.getSequenceId());
352              }
353              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
354              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
355              if (LOG.isDebugEnabled()) {
356                LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
357                    + TextFormat.shortDebugString(ids));
358              }
359            }
360            if (lastFlushedSequenceId == null) {
361              lastFlushedSequenceId = -1L;
362            }
363          }
364          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
365        }
366        editsCount++;
367        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
368          editsSkipped++;
369          continue;
370        }
371        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
372        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
373          editsSkipped++;
374          continue;
375        }
376        entryBuffers.appendEntry(entry);
377        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
378        // If sufficient edits have passed, check if we should report progress.
379        if (editsCount % interval == 0
380            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
381          numOpenedFilesLastCheck = this.getNumOpenWriters();
382          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
383              + " edits, skipped " + editsSkipped + " edits.";
384          status.setStatus("Split " + countsStr);
385          if (cancel != null && !cancel.progress()) {
386            cancelled = true;
387            return new SplitWALResult(false, corrupt);
388          }
389        }
390      }
391    } catch (InterruptedException ie) {
392      IOException iie = new InterruptedIOException();
393      iie.initCause(ie);
394      throw iie;
395    } catch (CorruptedLogFileException e) {
396      LOG.warn("Could not parse, corrupt WAL={}", wal, e);
397      // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
398      // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
399      if (this.splitLogWorkerCoordination != null) {
400        // Some tests pass in a csm of null.
401        splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
402      }
403      corrupt = true;
404    } catch (IOException e) {
405      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
406      throw e;
407    } finally {
408      final String log = "Finishing writing output for " + wal + " so closing down";
409      LOG.debug(log);
410      status.setStatus(log);
411      try {
412        if (null != walReader) {
413          walReader.close();
414        }
415      } catch (IOException exception) {
416        LOG.warn("Could not close {} reader", wal, exception);
417      }
418      try {
419        if (outputSinkStarted) {
420          // Set cancelled to true as the immediate following statement will reset its value.
421          // If close() throws an exception, cancelled will have the right value
422          cancelled = true;
423          cancelled = outputSink.close() == null;
424        }
425      } finally {
426        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
427        // See if length got updated post lease recovery
428        String msg = "Processed " + editsCount + " edits across " +
429          outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost +
430          " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr +
431          ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
432        LOG.info(msg);
433        status.markComplete(msg);
434        if (LOG.isDebugEnabled()) {
435          LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
436        }
437      }
438    }
439    return new SplitWALResult(!cancelled, corrupt);
440  }
441
442  private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
443    return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
444  }
445
446  /**
447   * Create a new {@link Reader} for reading logs to split.
448   * @return Returns null if file has length zero or file can't be found.
449   */
450  protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel)
451      throws IOException, CorruptedLogFileException {
452    Path path = walStatus.getPath();
453    long length = walStatus.getLen();
454    Reader in;
455
456    // Check for possibly empty file. With appends, currently Hadoop reports a
457    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
458    // HDFS-878 is committed.
459    if (length <= 0) {
460      LOG.warn("File {} might be still open, length is 0", path);
461    }
462
463    try {
464      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
465      try {
466        in = getReader(path, cancel);
467      } catch (EOFException e) {
468        if (length <= 0) {
469          // TODO should we ignore an empty, not-last log file if skip.errors
470          // is false? Either way, the caller should decide what to do. E.g.
471          // ignore if this is the last log in sequence.
472          // TODO is this scenario still possible if the log has been
473          // recovered (i.e. closed)
474          LOG.warn("Could not open {} for reading. File is empty", path, e);
475        }
476        // EOFException being ignored
477        return null;
478      }
479    } catch (IOException e) {
480      if (e instanceof FileNotFoundException) {
481        // A wal file may not exist anymore. Nothing can be recovered so move on
482        LOG.warn("File {} does not exist anymore", path, e);
483        return null;
484      }
485      if (!skipErrors || e instanceof InterruptedIOException) {
486        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
487      }
488      throw new CorruptedLogFileException("skipErrors=true; could not open " + path +
489        ", skipping", e);
490    }
491    return in;
492  }
493
494  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
495      throws CorruptedLogFileException, IOException {
496    try {
497      return in.next();
498    } catch (EOFException eof) {
499      // truncated files are expected if a RS crashes (see HBASE-2643)
500      LOG.info("EOF from {}; continuing.", path);
501      return null;
502    } catch (IOException e) {
503      // If the IOE resulted from bad file format,
504      // then this problem is idempotent and retrying won't help
505      if (e.getCause() != null && (e.getCause() instanceof ParseException
506          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
507        LOG.warn("Parse exception from {}; continuing", path, e);
508        return null;
509      }
510      if (!skipErrors) {
511        throw e;
512      }
513      throw new CorruptedLogFileException("skipErrors=true Ignoring exception"
514        + " while parsing wal " + path + ". Marking as corrupted", e);
515    }
516  }
517
518  /**
519   * Create a new {@link WALProvider.Writer} for writing log splits.
520   * @return a new Writer instance, caller should close
521   */
522  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
523    return walFactory.createRecoveredEditsWriter(walFS, logfile);
524  }
525
526  /**
527   * Create a new {@link Reader} for reading logs to split.
528   * @return new Reader instance, caller should close
529   */
530  private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
531    return walFactory.createReader(walFS, curLogFile, reporter);
532  }
533
534  /**
535   * Get current open writers
536   */
537  private int getNumOpenWriters() {
538    int result = 0;
539    if (this.outputSink != null) {
540      result += this.outputSink.getNumOpenWriters();
541    }
542    return result;
543  }
544
545  /**
546   * Contains some methods to control WAL-entries producer / consumer interactions
547   */
548  public static class PipelineController {
549    // If an exception is thrown by one of the other threads, it will be
550    // stored here.
551    AtomicReference<Throwable> thrown = new AtomicReference<>();
552
553    // Wait/notify for when data has been produced by the writer thread,
554    // consumed by the reader thread, or an exception occurred
555    final Object dataAvailable = new Object();
556
557    void writerThreadError(Throwable t) {
558      thrown.compareAndSet(null, t);
559    }
560
561    /**
562     * Check for errors in the writer threads. If any is found, rethrow it.
563     */
564    void checkForErrors() throws IOException {
565      Throwable thrown = this.thrown.get();
566      if (thrown == null) {
567        return;
568      }
569      if (thrown instanceof IOException) {
570        throw new IOException(thrown);
571      } else {
572        throw new RuntimeException(thrown);
573      }
574    }
575  }
576
577  static class CorruptedLogFileException extends Exception {
578    private static final long serialVersionUID = 1L;
579
580    CorruptedLogFileException(String s) {
581      super(s);
582    }
583
584    /**
585     * CorruptedLogFileException with cause
586     *
587     * @param message the message for this exception
588     * @param cause the cause for this exception
589     */
590    CorruptedLogFileException(String message, Throwable cause) {
591      super(message, cause);
592    }
593  }
594}