001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.EOFException;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.text.ParseException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.TreeMap;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
040import org.apache.hadoop.hbase.master.SplitLogManager;
041import org.apache.hadoop.hbase.monitoring.MonitoredTask;
042import org.apache.hadoop.hbase.monitoring.TaskMonitor;
043import org.apache.hadoop.hbase.procedure2.util.StringUtils;
044import org.apache.hadoop.hbase.regionserver.LastSequenceId;
045import org.apache.hadoop.hbase.regionserver.RegionServerServices;
046import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CancelableProgressable;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
052import org.apache.hadoop.hbase.wal.WAL.Entry;
053import org.apache.hadoop.hbase.wal.WAL.Reader;
054import org.apache.hadoop.ipc.RemoteException;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
064
065/**
066 * Split RegionServer WAL files. Splits the WAL into new files,
067 * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
068 * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or
069 * use static helper methods.
070 */
071@InterfaceAudience.Private
072public class WALSplitter {
073  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
074  public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
075
076  /**
077   * By default we retry errors in splitting, rather than skipping.
078   */
079  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
080
081  // Parameters for split process
082  protected final Path walRootDir;
083  protected final FileSystem walFS;
084  protected final Configuration conf;
085  final Path rootDir;
086  final FileSystem rootFS;
087  final RegionServerServices rsServices;
088
089  // Major subcomponents of the split process.
090  // These are separated into inner classes to make testing easier.
091  OutputSink outputSink;
092  private EntryBuffers entryBuffers;
093
094  /**
095   * Coordinator for split log. Used by the zk-based log splitter.
096   * Not used by the procedure v2-based log splitter.
097   */
098  private SplitLogWorkerCoordination splitLogWorkerCoordination;
099
100  private final WALFactory walFactory;
101
102  // For checking the latest flushed sequence id
103  protected final LastSequenceId sequenceIdChecker;
104
105  // Map encodedRegionName -> lastFlushedSequenceId
106  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
107
108  // Map encodedRegionName -> maxSeqIdInStores
109  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
110
111  // the file being split currently
112  private FileStatus fileBeingSplit;
113
114  private final String tmpDirName;
115
116  /**
117   * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
118   */
119  public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
120  public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
121
122  /**
123   * True if we are to run with bounded amount of writers rather than let the count blossom.
124   * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
125   * is always bounded. Only applies when you are doing recovery to 'recovered.edits'
126   * files (the old default). Bounded writing tends to have higher throughput.
127   */
128  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
129
130  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
131  public final static String SPLIT_WAL_WRITER_THREADS =
132    "hbase.regionserver.hlog.splitlog.writer.threads";
133
134  private final int numWriterThreads;
135  private final long bufferSize;
136  private final boolean splitWriterCreationBounded;
137  private final boolean hfile;
138  private final boolean skipErrors;
139
140  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
141      FileSystem walFS, Path rootDir, FileSystem rootFS) {
142    this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
143  }
144
145  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
146      FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
147      SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
148    this.conf = HBaseConfiguration.create(conf);
149    String codecClassName =
150      conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
151    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
152    this.walRootDir = walRootDir;
153    this.walFS = walFS;
154    this.rootDir = rootDir;
155    this.rootFS = rootFS;
156    this.sequenceIdChecker = idChecker;
157    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
158    this.rsServices = rsServices;
159    this.walFactory = factory;
160    this.tmpDirName =
161      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
162    // if we limit the number of writers opened for sinking recovered edits
163    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
164    this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
165    this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
166    this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
167    this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
168  }
169
170  WALFactory getWalFactory() {
171    return this.walFactory;
172  }
173
174  FileStatus getFileBeingSplit() {
175    return fileBeingSplit;
176  }
177
178  String getTmpDirName() {
179    return this.tmpDirName;
180  }
181
182  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
183    return regionMaxSeqIdInStores;
184  }
185
186  /**
187   * Splits a WAL file.
188   * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests.
189   * Not used by new procedure-based WAL splitter.
190   *
191   * @return false if it is interrupted by the progress-able.
192   */
193  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
194      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
195      SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
196      RegionServerServices rsServices) throws IOException {
197    Path rootDir = CommonFSUtils.getRootDir(conf);
198    FileSystem rootFS = rootDir.getFileSystem(conf);
199    WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
200      splitLogWorkerCoordination, rsServices);
201    // splitWAL returns a data structure with whether split is finished and if the file is corrupt.
202    // We don't need to propagate corruption flag here because it is propagated by the
203    // SplitLogWorkerCoordination.
204    return splitter.splitWAL(logfile, reporter).isFinished();
205  }
206
207  /**
208   * Split a folder of WAL files. Delete the directory when done.
209   * Used by tools and unit tests. It should be package private.
210   * It is public only because TestWALObserver is in a different package,
211   * which uses this method to do log splitting.
212   * @return List of output files created by the split.
213   */
214  public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
215      Configuration conf, final WALFactory factory) throws IOException {
216    Path rootDir = CommonFSUtils.getRootDir(conf);
217    FileSystem rootFS = rootDir.getFileSystem(conf);
218    WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
219    final List<FileStatus> wals =
220      SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
221    List<Path> splits = new ArrayList<>();
222    if (!wals.isEmpty()) {
223      for (FileStatus wal: wals) {
224        SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
225        if (splitWALResult.isFinished()) {
226          WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
227          //splitter.outputSink.splits is mark as final, do not need null check
228          splits.addAll(splitter.outputSink.splits);
229        }
230      }
231    }
232    if (!walFS.delete(walsDir, true)) {
233      throw new IOException("Unable to delete src dir " + walsDir);
234    }
235    return splits;
236  }
237
238  /**
239   * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable).
240   * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if
241   * the WAL is corrupt.
242   */
243  static final class SplitWALResult {
244    private final boolean finished;
245    private final boolean corrupt;
246
247    private SplitWALResult(boolean finished, boolean corrupt) {
248      this.finished = finished;
249      this.corrupt = corrupt;
250    }
251
252    public boolean isFinished() {
253      return finished;
254    }
255
256    public boolean isCorrupt() {
257      return corrupt;
258    }
259  }
260
261  /**
262   * Setup the output sinks and entry buffers ahead of splitting WAL.
263   */
264  private void createOutputSinkAndEntryBuffers() {
265    PipelineController controller = new PipelineController();
266    if (this.hfile) {
267      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
268      this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller,
269        this.entryBuffers, this.numWriterThreads);
270    } else if (this.splitWriterCreationBounded) {
271      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
272      this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller,
273        this.entryBuffers, this.numWriterThreads);
274    } else {
275      this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
276      this.outputSink = new RecoveredEditsOutputSink(this, controller,
277        this.entryBuffers, this.numWriterThreads);
278    }
279  }
280
281  /**
282   * WAL splitting implementation, splits one WAL file.
283   * @param walStatus should be for an actual WAL file.
284   */
285  SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
286    Path wal = walStatus.getPath();
287    Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
288    boolean corrupt = false;
289    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
290    boolean outputSinkStarted = false;
291    boolean cancelled = false;
292    int editsCount = 0;
293    int editsSkipped = 0;
294    MonitoredTask status =
295      TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
296    status.enableStatusJournal(true);
297    Reader walReader = null;
298    this.fileBeingSplit = walStatus;
299    long startTS = EnvironmentEdgeManager.currentTime();
300    long length = walStatus.getLen();
301    String lengthStr = StringUtils.humanSize(length);
302    createOutputSinkAndEntryBuffers();
303    try {
304      String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
305      LOG.info(logStr);
306      status.setStatus(logStr);
307      if (cancel != null && !cancel.progress()) {
308        cancelled = true;
309        return new SplitWALResult(false, corrupt);
310      }
311      walReader = getReader(walStatus, this.skipErrors, cancel);
312      if (walReader == null) {
313        LOG.warn("Nothing in {}; empty?", wal);
314        return new SplitWALResult(true, corrupt);
315      }
316      LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
317      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
318      int numOpenedFilesLastCheck = 0;
319      outputSink.setReporter(cancel);
320      outputSink.setStatus(status);
321      outputSink.startWriterThreads();
322      outputSinkStarted = true;
323      Entry entry;
324      startTS = EnvironmentEdgeManager.currentTime();
325      while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
326        byte[] region = entry.getKey().getEncodedRegionName();
327        String encodedRegionNameAsStr = Bytes.toString(region);
328        Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
329        if (lastFlushedSequenceId == null) {
330          if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
331              encodedRegionNameAsStr))) {
332            // The region directory itself is not present in the FS. This indicates that
333            // the region/table is already removed. We can just skip all the edits for this
334            // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
335            // will get skipped by the seqId check below.
336            // See more details at https://issues.apache.org/jira/browse/HBASE-24189
337            LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
338            lastFlushedSequenceId = Long.MAX_VALUE;
339          } else {
340            if (sequenceIdChecker != null) {
341              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
342              Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
343              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
344                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
345                    storeSeqId.getSequenceId());
346              }
347              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
348              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
349              if (LOG.isDebugEnabled()) {
350                LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
351                    + TextFormat.shortDebugString(ids));
352              }
353            }
354            if (lastFlushedSequenceId == null) {
355              lastFlushedSequenceId = -1L;
356            }
357          }
358          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
359        }
360        editsCount++;
361        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
362          editsSkipped++;
363          continue;
364        }
365        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
366        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
367          editsSkipped++;
368          continue;
369        }
370        entryBuffers.appendEntry(entry);
371        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
372        // If sufficient edits have passed, check if we should report progress.
373        if (editsCount % interval == 0
374            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
375          numOpenedFilesLastCheck = this.getNumOpenWriters();
376          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
377              + " edits, skipped " + editsSkipped + " edits.";
378          status.setStatus("Split " + countsStr);
379          if (cancel != null && !cancel.progress()) {
380            cancelled = true;
381            return new SplitWALResult(false, corrupt);
382          }
383        }
384      }
385    } catch (InterruptedException ie) {
386      IOException iie = new InterruptedIOException();
387      iie.initCause(ie);
388      throw iie;
389    } catch (CorruptedLogFileException e) {
390      LOG.warn("Could not parse, corrupt WAL={}", wal, e);
391      // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
392      // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
393      if (this.splitLogWorkerCoordination != null) {
394        // Some tests pass in a csm of null.
395        splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
396      }
397      corrupt = true;
398    } catch (IOException e) {
399      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
400      throw e;
401    } finally {
402      final String log = "Finishing writing output for " + wal + " so closing down";
403      LOG.debug(log);
404      status.setStatus(log);
405      try {
406        if (null != walReader) {
407          walReader.close();
408        }
409      } catch (IOException exception) {
410        LOG.warn("Could not close {} reader", wal, exception);
411      }
412      try {
413        if (outputSinkStarted) {
414          // Set cancelled to true as the immediate following statement will reset its value.
415          // If close() throws an exception, cancelled will have the right value
416          cancelled = true;
417          cancelled = outputSink.close() == null;
418        }
419      } finally {
420        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
421        // See if length got updated post lease recovery
422        String msg = "Processed " + editsCount + " edits across " +
423          outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost +
424          " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr +
425          ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
426        LOG.info(msg);
427        status.markComplete(msg);
428        if (LOG.isDebugEnabled()) {
429          LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
430        }
431      }
432    }
433    return new SplitWALResult(!cancelled, corrupt);
434  }
435
436  private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
437    return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
438  }
439
440  /**
441   * Create a new {@link Reader} for reading logs to split.
442   * @return Returns null if file has length zero or file can't be found.
443   */
444  protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel)
445      throws IOException, CorruptedLogFileException {
446    Path path = walStatus.getPath();
447    long length = walStatus.getLen();
448    Reader in;
449
450    // Check for possibly empty file. With appends, currently Hadoop reports a
451    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
452    // HDFS-878 is committed.
453    if (length <= 0) {
454      LOG.warn("File {} might be still open, length is 0", path);
455    }
456
457    try {
458      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
459      try {
460        in = getReader(path, cancel);
461      } catch (EOFException e) {
462        if (length <= 0) {
463          // TODO should we ignore an empty, not-last log file if skip.errors
464          // is false? Either way, the caller should decide what to do. E.g.
465          // ignore if this is the last log in sequence.
466          // TODO is this scenario still possible if the log has been
467          // recovered (i.e. closed)
468          LOG.warn("Could not open {} for reading. File is empty", path, e);
469        }
470        // EOFException being ignored
471        return null;
472      }
473    } catch (IOException e) {
474      if (e instanceof FileNotFoundException) {
475        // A wal file may not exist anymore. Nothing can be recovered so move on
476        LOG.warn("File {} does not exist anymore", path, e);
477        return null;
478      }
479      if (!skipErrors || e instanceof InterruptedIOException) {
480        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
481      }
482      throw new CorruptedLogFileException("skipErrors=true; could not open " + path +
483        ", skipping", e);
484    }
485    return in;
486  }
487
488  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
489      throws CorruptedLogFileException, IOException {
490    try {
491      return in.next();
492    } catch (EOFException eof) {
493      // truncated files are expected if a RS crashes (see HBASE-2643)
494      LOG.info("EOF from {}; continuing.", path);
495      return null;
496    } catch (IOException e) {
497      // If the IOE resulted from bad file format,
498      // then this problem is idempotent and retrying won't help
499      if (e.getCause() != null && (e.getCause() instanceof ParseException
500          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
501        LOG.warn("Parse exception from {}; continuing", path, e);
502        return null;
503      }
504      if (!skipErrors) {
505        throw e;
506      }
507      throw new CorruptedLogFileException("skipErrors=true Ignoring exception"
508        + " while parsing wal " + path + ". Marking as corrupted", e);
509    }
510  }
511
512  /**
513   * Create a new {@link WALProvider.Writer} for writing log splits.
514   * @return a new Writer instance, caller should close
515   */
516  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
517    return walFactory.createRecoveredEditsWriter(walFS, logfile);
518  }
519
520  /**
521   * Create a new {@link Reader} for reading logs to split.
522   * @return new Reader instance, caller should close
523   */
524  private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
525    return walFactory.createReader(walFS, curLogFile, reporter);
526  }
527
528  /**
529   * Get current open writers
530   */
531  private int getNumOpenWriters() {
532    int result = 0;
533    if (this.outputSink != null) {
534      result += this.outputSink.getNumOpenWriters();
535    }
536    return result;
537  }
538
539  /**
540   * Contains some methods to control WAL-entries producer / consumer interactions
541   */
542  public static class PipelineController {
543    // If an exception is thrown by one of the other threads, it will be
544    // stored here.
545    AtomicReference<Throwable> thrown = new AtomicReference<>();
546
547    // Wait/notify for when data has been produced by the writer thread,
548    // consumed by the reader thread, or an exception occurred
549    final Object dataAvailable = new Object();
550
551    void writerThreadError(Throwable t) {
552      thrown.compareAndSet(null, t);
553    }
554
555    /**
556     * Check for errors in the writer threads. If any is found, rethrow it.
557     */
558    void checkForErrors() throws IOException {
559      Throwable thrown = this.thrown.get();
560      if (thrown == null) {
561        return;
562      }
563      this.thrown.set(null);
564      if (thrown instanceof IOException) {
565        throw new IOException(thrown);
566      } else {
567        throw new RuntimeException(thrown);
568      }
569    }
570  }
571
572  static class CorruptedLogFileException extends Exception {
573    private static final long serialVersionUID = 1L;
574
575    CorruptedLogFileException(String s) {
576      super(s);
577    }
578
579    /**
580     * CorruptedLogFileException with cause
581     *
582     * @param message the message for this exception
583     * @param cause the cause for this exception
584     */
585    CorruptedLogFileException(String message, Throwable cause) {
586      super(message, cause);
587    }
588  }
589}