001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.TreeMap;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
040import org.apache.hadoop.hbase.master.SplitLogManager;
041import org.apache.hadoop.hbase.monitoring.MonitoredTask;
042import org.apache.hadoop.hbase.monitoring.TaskMonitor;
043import org.apache.hadoop.hbase.procedure2.util.StringUtils;
044import org.apache.hadoop.hbase.regionserver.LastSequenceId;
045import org.apache.hadoop.hbase.regionserver.RegionServerServices;
046import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CancelableProgressable;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
052import org.apache.hadoop.hbase.wal.WAL.Entry;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
059import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
063
064/**
065 * Split RegionServer WAL files. Splits the WAL into new files, one per region, to be picked up on
066 * Region reopen. Deletes the split WAL when finished. Create an instance and call
067 * {@link #splitWAL(FileStatus, CancelableProgressable)} per file or use static helper methods.
068 */
069@InterfaceAudience.Private
070public class WALSplitter {
071  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
072  public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
073
074  /**
075   * By default we retry errors in splitting, rather than skipping.
076   */
077  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
078
079  // Parameters for split process
080  protected final Path walRootDir;
081  protected final FileSystem walFS;
082  protected final Configuration conf;
083  final Path rootDir;
084  final FileSystem rootFS;
085  final RegionServerServices rsServices;
086
087  // Major subcomponents of the split process.
088  // These are separated into inner classes to make testing easier.
089  OutputSink outputSink;
090  private EntryBuffers entryBuffers;
091
092  /**
093   * Coordinator for split log. Used by the zk-based log splitter. Not used by the procedure
094   * v2-based log splitter.
095   */
096  private SplitLogWorkerCoordination splitLogWorkerCoordination;
097
098  private final WALFactory walFactory;
099
100  // For checking the latest flushed sequence id
101  protected final LastSequenceId sequenceIdChecker;
102
103  // Map encodedRegionName -> lastFlushedSequenceId
104  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
105
106  // Map encodedRegionName -> maxSeqIdInStores
107  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
108
109  // the file being split currently
110  private FileStatus fileBeingSplit;
111
112  private final String tmpDirName;
113
114  /**
115   * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
116   */
117  public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
118  public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
119
120  /**
121   * True if we are to run with bounded amount of writers rather than let the count blossom. Default
122   * is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that is always
123   * bounded. Only applies when you are doing recovery to 'recovered.edits' files (the old default).
124   * Bounded writing tends to have higher throughput.
125   */
126  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
127
128  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
129  public final static String SPLIT_WAL_WRITER_THREADS =
130    "hbase.regionserver.hlog.splitlog.writer.threads";
131
132  private final int numWriterThreads;
133  private final long bufferSize;
134  private final boolean splitWriterCreationBounded;
135  private final boolean hfile;
136  private final boolean skipErrors;
137
138  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS,
139    Path rootDir, FileSystem rootFS) {
140    this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
141  }
142
143  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir, FileSystem walFS,
144    Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
145    SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
146    this.conf = HBaseConfiguration.create(conf);
147    String codecClassName =
148      conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
149    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
150    this.walRootDir = walRootDir;
151    this.walFS = walFS;
152    this.rootDir = rootDir;
153    this.rootFS = rootFS;
154    this.sequenceIdChecker = idChecker;
155    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
156    this.rsServices = rsServices;
157    this.walFactory = factory;
158    this.tmpDirName =
159      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
160    // if we limit the number of writers opened for sinking recovered edits
161    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
162    this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
163    this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
164    this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
165    this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
166  }
167
168  WALFactory getWalFactory() {
169    return this.walFactory;
170  }
171
172  FileStatus getFileBeingSplit() {
173    return fileBeingSplit;
174  }
175
176  String getTmpDirName() {
177    return this.tmpDirName;
178  }
179
180  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
181    return regionMaxSeqIdInStores;
182  }
183
184  /**
185   * Splits a WAL file. Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and
186   * tests. Not used by new procedure-based WAL splitter.
187   * @return false if it is interrupted by the progress-able.
188   */
189  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
190    Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
191    SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
192    RegionServerServices rsServices) throws IOException {
193    Path rootDir = CommonFSUtils.getRootDir(conf);
194    FileSystem rootFS = rootDir.getFileSystem(conf);
195    WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
196      splitLogWorkerCoordination, rsServices);
197    // splitWAL returns a data structure with whether split is finished and if the file is corrupt.
198    // We don't need to propagate corruption flag here because it is propagated by the
199    // SplitLogWorkerCoordination.
200    return splitter.splitWAL(logfile, reporter).isFinished();
201  }
202
203  /**
204   * Split a folder of WAL files. Delete the directory when done. Used by tools and unit tests. It
205   * should be package private. It is public only because TestWALObserver is in a different package,
206   * which uses this method to do log splitting.
207   * @return List of output files created by the split.
208   */
209  public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
210    Configuration conf, final WALFactory factory) throws IOException {
211    Path rootDir = CommonFSUtils.getRootDir(conf);
212    FileSystem rootFS = rootDir.getFileSystem(conf);
213    WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
214    final List<FileStatus> wals =
215      SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
216    List<Path> splits = new ArrayList<>();
217    if (!wals.isEmpty()) {
218      for (FileStatus wal : wals) {
219        SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
220        if (splitWALResult.isFinished()) {
221          WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
222          // splitter.outputSink.splits is mark as final, do not need null check
223          splits.addAll(splitter.outputSink.splits);
224        }
225      }
226    }
227    if (!walFS.delete(walsDir, true)) {
228      throw new IOException("Unable to delete src dir " + walsDir);
229    }
230    return splits;
231  }
232
233  /**
234   * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable). Test
235   * {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if the
236   * WAL is corrupt.
237   */
238  static final class SplitWALResult {
239    private final boolean finished;
240    private final boolean corrupt;
241
242    private SplitWALResult(boolean finished, boolean corrupt) {
243      this.finished = finished;
244      this.corrupt = corrupt;
245    }
246
247    public boolean isFinished() {
248      return finished;
249    }
250
251    public boolean isCorrupt() {
252      return corrupt;
253    }
254  }
255
256  /**
257   * Setup the output sinks and entry buffers ahead of splitting WAL.
258   */
259  private void createOutputSinkAndEntryBuffers() {
260    PipelineController controller = new PipelineController();
261    if (this.hfile) {
262      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
263      this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller, this.entryBuffers,
264        this.numWriterThreads);
265    } else if (this.splitWriterCreationBounded) {
266      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
267      this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller, this.entryBuffers,
268        this.numWriterThreads);
269    } else {
270      this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
271      this.outputSink =
272        new RecoveredEditsOutputSink(this, controller, this.entryBuffers, this.numWriterThreads);
273    }
274  }
275
276  /**
277   * WAL splitting implementation, splits one WAL file.
278   * @param walStatus should be for an actual WAL file.
279   */
280  SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
281    Path wal = walStatus.getPath();
282    Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
283    boolean corrupt = false;
284    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
285    boolean outputSinkStarted = false;
286    boolean cancelled = false;
287    int editsCount = 0;
288    int editsSkipped = 0;
289    MonitoredTask status = TaskMonitor.get()
290      .createStatus("Splitting " + wal + " to temporary staging area.", false, true);
291    WALStreamReader walReader = null;
292    this.fileBeingSplit = walStatus;
293    long startTS = EnvironmentEdgeManager.currentTime();
294    long length = walStatus.getLen();
295    String lengthStr = StringUtils.humanSize(length);
296    createOutputSinkAndEntryBuffers();
297    try {
298      String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
299      LOG.info(logStr);
300      status.setStatus(logStr);
301      if (cancel != null && !cancel.progress()) {
302        cancelled = true;
303        return new SplitWALResult(false, corrupt);
304      }
305      walReader = getReader(walStatus, this.skipErrors, cancel);
306      if (walReader == null) {
307        LOG.warn("Nothing in {}; empty?", wal);
308        return new SplitWALResult(true, corrupt);
309      }
310      LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
311      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
312      int numOpenedFilesLastCheck = 0;
313      outputSink.setReporter(cancel);
314      outputSink.setStatus(status);
315      outputSink.startWriterThreads();
316      outputSinkStarted = true;
317      Entry entry;
318      startTS = EnvironmentEdgeManager.currentTime();
319      while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
320        if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
321          // Skip processing the replication marker edits.
322          if (LOG.isDebugEnabled()) {
323            LOG.debug("Ignoring Replication marker edits.");
324          }
325          continue;
326        }
327        byte[] region = entry.getKey().getEncodedRegionName();
328        String encodedRegionNameAsStr = Bytes.toString(region);
329        Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
330        if (lastFlushedSequenceId == null) {
331          if (
332            !(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))
333          ) {
334            // The region directory itself is not present in the FS. This indicates that
335            // the region/table is already removed. We can just skip all the edits for this
336            // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
337            // will get skipped by the seqId check below.
338            // See more details at https://issues.apache.org/jira/browse/HBASE-24189
339            LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
340            lastFlushedSequenceId = Long.MAX_VALUE;
341          } else {
342            if (sequenceIdChecker != null) {
343              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
344              Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
345              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
346                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
347                  storeSeqId.getSequenceId());
348              }
349              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
350              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
351              if (LOG.isDebugEnabled()) {
352                LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
353                  + TextFormat.shortDebugString(ids));
354              }
355            }
356            if (lastFlushedSequenceId == null) {
357              lastFlushedSequenceId = -1L;
358            }
359          }
360          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
361        }
362        editsCount++;
363        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
364          editsSkipped++;
365          continue;
366        }
367        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
368        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
369          editsSkipped++;
370          continue;
371        }
372        entryBuffers.appendEntry(entry);
373        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
374        // If sufficient edits have passed, check if we should report progress.
375        if (
376          editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting
377        ) {
378          numOpenedFilesLastCheck = this.getNumOpenWriters();
379          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
380            + " edits, skipped " + editsSkipped + " edits.";
381          status.setStatus("Split " + countsStr);
382          if (cancel != null && !cancel.progress()) {
383            cancelled = true;
384            return new SplitWALResult(false, corrupt);
385          }
386        }
387      }
388    } catch (InterruptedException ie) {
389      IOException iie = new InterruptedIOException();
390      iie.initCause(ie);
391      throw iie;
392    } catch (CorruptedLogFileException e) {
393      LOG.warn("Could not parse, corrupt WAL={}", wal, e);
394      // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
395      // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
396      if (this.splitLogWorkerCoordination != null) {
397        // Some tests pass in a csm of null.
398        splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
399      }
400      corrupt = true;
401    } catch (IOException e) {
402      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
403      throw e;
404    } finally {
405      final String log = "Finishing writing output for " + wal + " so closing down";
406      LOG.debug(log);
407      status.setStatus(log);
408      if (null != walReader) {
409        walReader.close();
410      }
411      try {
412        if (outputSinkStarted) {
413          // Set cancelled to true as the immediate following statement will reset its value.
414          // If close() throws an exception, cancelled will have the right value
415          cancelled = true;
416          cancelled = outputSink.close() == null;
417        }
418      } finally {
419        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
420        // See if length got updated post lease recovery
421        String msg = "Processed " + editsCount + " edits across "
422          + outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost
423          + " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr + ", length="
424          + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
425        LOG.info(msg);
426        status.markComplete(msg);
427        if (LOG.isDebugEnabled()) {
428          LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
429        }
430      }
431    }
432    return new SplitWALResult(!cancelled, corrupt);
433  }
434
435  private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
436    return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
437  }
438
439  /**
440   * Create a new {@link WALStreamReader} for reading logs to split.
441   * @return Returns null if file has length zero or file can't be found.
442   */
443  protected WALStreamReader getReader(FileStatus walStatus, boolean skipErrors,
444    CancelableProgressable cancel) throws IOException, CorruptedLogFileException {
445    Path path = walStatus.getPath();
446    long length = walStatus.getLen();
447    WALStreamReader in;
448
449    // Check for possibly empty file. With appends, currently Hadoop reports a
450    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
451    // HDFS-878 is committed.
452    if (length <= 0) {
453      LOG.warn("File {} might be still open, length is 0", path);
454    }
455
456    try {
457      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
458      try {
459        in = getReader(path, cancel);
460      } catch (EOFException e) {
461        if (length <= 0) {
462          // TODO should we ignore an empty, not-last log file if skip.errors
463          // is false? Either way, the caller should decide what to do. E.g.
464          // ignore if this is the last log in sequence.
465          // TODO is this scenario still possible if the log has been
466          // recovered (i.e. closed)
467          LOG.warn("Could not open {} for reading. File is empty", path, e);
468        }
469        // EOFException being ignored
470        return null;
471      }
472    } catch (IOException e) {
473      if (e instanceof FileNotFoundException) {
474        // A wal file may not exist anymore. Nothing can be recovered so move on
475        LOG.warn("File {} does not exist anymore", path, e);
476        return null;
477      }
478      if (!skipErrors || e instanceof InterruptedIOException) {
479        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
480      }
481      throw new CorruptedLogFileException("skipErrors=true; could not open " + path + ", skipping",
482        e);
483    }
484    return in;
485  }
486
487  private Entry getNextLogLine(WALStreamReader in, Path path, boolean skipErrors)
488    throws CorruptedLogFileException, IOException {
489    try {
490      return in.next();
491    } catch (EOFException eof) {
492      // truncated files are expected if a RS crashes (see HBASE-2643)
493      LOG.info("EOF from {}; continuing.", path);
494      return null;
495    } catch (IOException e) {
496      // If the IOE resulted from bad file format,
497      // then this problem is idempotent and retrying won't help
498      if (
499        e.getCause() != null && (e.getCause() instanceof ParseException
500          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)
501      ) {
502        LOG.warn("Parse exception from {}; continuing", path, e);
503        return null;
504      }
505      if (!skipErrors) {
506        throw e;
507      }
508      throw new CorruptedLogFileException("skipErrors=true Ignoring exception"
509        + " while parsing wal " + path + ". Marking as corrupted", e);
510    }
511  }
512
513  /**
514   * Create a new {@link WALProvider.Writer} for writing log splits.
515   * @return a new Writer instance, caller should close
516   */
517  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
518    return walFactory.createRecoveredEditsWriter(walFS, logfile);
519  }
520
521  /**
522   * Create a new {@link WALStreamReader} for reading logs to split.
523   * @return new Reader instance, caller should close
524   */
525  private WALStreamReader getReader(Path curLogFile, CancelableProgressable reporter)
526    throws IOException {
527    return walFactory.createStreamReader(walFS, curLogFile, reporter);
528  }
529
530  /**
531   * Get current open writers
532   */
533  private int getNumOpenWriters() {
534    int result = 0;
535    if (this.outputSink != null) {
536      result += this.outputSink.getNumOpenWriters();
537    }
538    return result;
539  }
540
541  /**
542   * Contains some methods to control WAL-entries producer / consumer interactions
543   */
544  public static class PipelineController {
545    // If an exception is thrown by one of the other threads, it will be
546    // stored here.
547    AtomicReference<Throwable> thrown = new AtomicReference<>();
548
549    // Wait/notify for when data has been produced by the writer thread,
550    // consumed by the reader thread, or an exception occurred
551    final Object dataAvailable = new Object();
552
553    void writerThreadError(Throwable t) {
554      thrown.compareAndSet(null, t);
555    }
556
557    /**
558     * Check for errors in the writer threads. If any is found, rethrow it.
559     */
560    void checkForErrors() throws IOException {
561      Throwable thrown = this.thrown.get();
562      if (thrown == null) {
563        return;
564      }
565      if (thrown instanceof IOException) {
566        throw new IOException(thrown);
567      } else {
568        throw new RuntimeException(thrown);
569      }
570    }
571  }
572
573  static class CorruptedLogFileException extends Exception {
574    private static final long serialVersionUID = 1L;
575
576    CorruptedLogFileException(String s) {
577      super(s);
578    }
579
580    /**
581     * CorruptedLogFileException with cause
582     * @param message the message for this exception
583     * @param cause   the cause for this exception
584     */
585    CorruptedLogFileException(String message, Throwable cause) {
586      super(message, cause);
587    }
588  }
589}