001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.text.ParseException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.atomic.AtomicReference;
033import org.apache.commons.lang3.ArrayUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
042import org.apache.hadoop.hbase.master.SplitLogManager;
043import org.apache.hadoop.hbase.monitoring.MonitoredTask;
044import org.apache.hadoop.hbase.monitoring.TaskMonitor;
045import org.apache.hadoop.hbase.procedure2.util.StringUtils;
046import org.apache.hadoop.hbase.regionserver.LastSequenceId;
047import org.apache.hadoop.hbase.regionserver.RegionServerServices;
048import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CancelableProgressable;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WAL.Reader;
056import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
057import org.apache.hadoop.ipc.RemoteException;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
063import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
066
067/**
068 * Split RegionServer WAL files. Splits the WAL into new files,
069 * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
070 * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
071 * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
072 *   LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
073 *   entry-point.
074 */
075@InterfaceAudience.Private
076public class WALSplitter {
077  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
078
079  /** By default we retry errors in splitting, rather than skipping. */
080  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
081
082  // Parameters for split process
083  protected final Path walDir;
084  protected final FileSystem walFS;
085  protected final Configuration conf;
086  final Path rootDir;
087  final FileSystem rootFS;
088  final RegionServerServices rsServices;
089
090  // Major subcomponents of the split process.
091  // These are separated into inner classes to make testing easier.
092  OutputSink outputSink;
093  private EntryBuffers entryBuffers;
094
095  /**
096   * Coordinator for split log. Used by the zk-based log splitter.
097   * Not used by the procedure v2-based log splitter.
098   */
099  private SplitLogWorkerCoordination splitLogWorkerCoordination;
100
101  private final WALFactory walFactory;
102
103  private MonitoredTask status;
104
105  // For checking the latest flushed sequence id
106  protected final LastSequenceId sequenceIdChecker;
107
108  // Map encodedRegionName -> lastFlushedSequenceId
109  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
110
111  // Map encodedRegionName -> maxSeqIdInStores
112  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
113
114  // the file being split currently
115  private FileStatus fileBeingSplit;
116
117  private final String tmpDirName;
118
119  /**
120   * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
121   */
122  public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
123  public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
124
125  /**
126   * True if we are to run with bounded amount of writers rather than let the count blossom.
127   * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
128   * is always bounded. Only applies when you are doing recovery to 'recovered.edits'
129   * files (the old default). Bounded writing tends to have higher throughput.
130   */
131  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
132
133  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
134  public final static String SPLIT_WAL_WRITER_THREADS =
135      "hbase.regionserver.hlog.splitlog.writer.threads";
136
137  @VisibleForTesting
138  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
139      Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
140      SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
141    this.conf = HBaseConfiguration.create(conf);
142    String codecClassName =
143        conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
144    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
145    this.walDir = walDir;
146    this.walFS = walFS;
147    this.rootDir = rootDir;
148    this.rootFS = rootFS;
149    this.sequenceIdChecker = idChecker;
150    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
151    this.rsServices = rsServices;
152    this.walFactory = factory;
153    PipelineController controller = new PipelineController();
154    this.tmpDirName =
155      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
156
157
158    // if we limit the number of writers opened for sinking recovered edits
159    boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
160    boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
161    long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
162    int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
163
164    if (splitToHFile) {
165      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
166      outputSink =
167          new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
168    } else if (splitWriterCreationBounded) {
169      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
170      outputSink =
171          new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
172    } else {
173      entryBuffers = new EntryBuffers(controller, bufferSize);
174      outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
175    }
176  }
177
178  WALFactory getWalFactory(){
179    return this.walFactory;
180  }
181
182  FileStatus getFileBeingSplit() {
183    return fileBeingSplit;
184  }
185
186  String getTmpDirName() {
187    return this.tmpDirName;
188  }
189
190  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
191    return regionMaxSeqIdInStores;
192  }
193
194  /**
195   * Splits a WAL file.
196   * @return false if it is interrupted by the progress-able.
197   */
198  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
199      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
200      SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
201      RegionServerServices rsServices) throws IOException {
202    Path rootDir = CommonFSUtils.getRootDir(conf);
203    FileSystem rootFS = rootDir.getFileSystem(conf);
204    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
205        splitLogWorkerCoordination, rsServices);
206    return s.splitLogFile(logfile, reporter);
207  }
208
209  /**
210   * Split a folder of WAL files. Delete the directory when done.
211   * Used by tools and unit tests. It should be package private.
212   * It is public only because TestWALObserver is in a different package,
213   * which uses this method to do log splitting.
214   * @return List of output files created by the split.
215   */
216  @VisibleForTesting
217  public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
218      Configuration conf, final WALFactory factory) throws IOException {
219    Path rootDir = CommonFSUtils.getRootDir(conf);
220    FileSystem rootFS = rootDir.getFileSystem(conf);
221    final FileStatus[] logfiles =
222        SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
223    List<Path> splits = new ArrayList<>();
224    if (ArrayUtils.isNotEmpty(logfiles)) {
225      for (FileStatus logfile : logfiles) {
226        WALSplitter s =
227            new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
228        if (s.splitLogFile(logfile, null)) {
229          finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
230          if (s.outputSink.splits != null) {
231            splits.addAll(s.outputSink.splits);
232          }
233        }
234      }
235    }
236    if (!walFS.delete(logDir, true)) {
237      throw new IOException("Unable to delete src dir: " + logDir);
238    }
239    return splits;
240  }
241
242  /**
243   * WAL splitting implementation, splits one log file.
244   * @param logfile should be an actual log file.
245   */
246  @VisibleForTesting
247  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
248    Preconditions.checkState(status == null);
249    Preconditions.checkArgument(logfile.isFile(),
250        "passed in file status is for something other than a regular file.");
251    boolean isCorrupted = false;
252    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
253      SPLIT_SKIP_ERRORS_DEFAULT);
254    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
255    Path logPath = logfile.getPath();
256    boolean outputSinkStarted = false;
257    boolean progressFailed = false;
258    int editsCount = 0;
259    int editsSkipped = 0;
260
261    status = TaskMonitor.get().createStatus(
262          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
263    status.enableStatusJournal(true);
264    Reader logFileReader = null;
265    this.fileBeingSplit = logfile;
266    long startTS = EnvironmentEdgeManager.currentTime();
267    try {
268      long logLength = logfile.getLen();
269      LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
270          logLength);
271      status.setStatus("Opening log file " + logPath);
272      if (reporter != null && !reporter.progress()) {
273        progressFailed = true;
274        return false;
275      }
276      logFileReader = getReader(logfile, skipErrors, reporter);
277      if (logFileReader == null) {
278        LOG.warn("Nothing to split in WAL={}", logPath);
279        return true;
280      }
281      long openCost = EnvironmentEdgeManager.currentTime() - startTS;
282      LOG.info("Open WAL={} cost {} ms", logPath, openCost);
283      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
284      int numOpenedFilesLastCheck = 0;
285      outputSink.setReporter(reporter);
286      outputSink.setStatus(status);
287      outputSink.startWriterThreads();
288      outputSinkStarted = true;
289      Entry entry;
290      Long lastFlushedSequenceId = -1L;
291      startTS = EnvironmentEdgeManager.currentTime();
292      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
293        byte[] region = entry.getKey().getEncodedRegionName();
294        String encodedRegionNameAsStr = Bytes.toString(region);
295        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
296        if (lastFlushedSequenceId == null) {
297          if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
298              encodedRegionNameAsStr))) {
299            // The region directory itself is not present in the FS. This indicates that
300            // the region/table is already removed. We can just skip all the edits for this
301            // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
302            // will get skipped by the seqId check below.
303            // See more details at https://issues.apache.org/jira/browse/HBASE-24189
304            LOG.info("{} no longer available in the FS. Skipping all edits for this region.",
305                encodedRegionNameAsStr);
306            lastFlushedSequenceId = Long.MAX_VALUE;
307          } else {
308            if (sequenceIdChecker != null) {
309              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
310              Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
311              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
312                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
313                    storeSeqId.getSequenceId());
314              }
315              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
316              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
317              if (LOG.isDebugEnabled()) {
318                LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
319                    + TextFormat.shortDebugString(ids));
320              }
321            }
322            if (lastFlushedSequenceId == null) {
323              lastFlushedSequenceId = -1L;
324            }
325          }
326          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
327        }
328        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
329          editsSkipped++;
330          continue;
331        }
332        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
333        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
334          editsSkipped++;
335          continue;
336        }
337        entryBuffers.appendEntry(entry);
338        editsCount++;
339        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
340        // If sufficient edits have passed, check if we should report progress.
341        if (editsCount % interval == 0
342            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
343          numOpenedFilesLastCheck = this.getNumOpenWriters();
344          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
345              + " edits, skipped " + editsSkipped + " edits.";
346          status.setStatus("Split " + countsStr);
347          if (reporter != null && !reporter.progress()) {
348            progressFailed = true;
349            return false;
350          }
351        }
352      }
353    } catch (InterruptedException ie) {
354      IOException iie = new InterruptedIOException();
355      iie.initCause(ie);
356      throw iie;
357    } catch (CorruptedLogFileException e) {
358      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
359      if (splitLogWorkerCoordination != null) {
360        // Some tests pass in a csm of null.
361        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
362      } else {
363        // for tests only
364        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
365      }
366      isCorrupted = true;
367    } catch (IOException e) {
368      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
369      throw e;
370    } finally {
371      final String log = "Finishing writing output logs and closing down";
372      LOG.debug(log);
373      status.setStatus(log);
374      try {
375        if (null != logFileReader) {
376          logFileReader.close();
377        }
378      } catch (IOException exception) {
379        LOG.warn("Could not close WAL reader", exception);
380      }
381      try {
382        if (outputSinkStarted) {
383          // Set progress_failed to true as the immediate following statement will reset its value
384          // when close() throws exception, progress_failed has the right value
385          progressFailed = true;
386          progressFailed = outputSink.close() == null;
387        }
388      } finally {
389        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
390        // See if length got updated post lease recovery
391        String msg = "Processed " + editsCount + " edits across " +
392            outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
393            " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
394            StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
395            ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
396        LOG.info(msg);
397        status.markComplete(msg);
398        if (LOG.isDebugEnabled()) {
399          LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
400            status.prettyPrintJournal());
401        }
402      }
403    }
404    return !progressFailed;
405  }
406
407  private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
408      throws IOException {
409    Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
410    return this.rootFS.exists(regionDirPath);
411  }
412
413  /**
414   * Create a new {@link Reader} for reading logs to split.
415   */
416  private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
417      throws IOException, CorruptedLogFileException {
418    Path path = file.getPath();
419    long length = file.getLen();
420    Reader in;
421
422    // Check for possibly empty file. With appends, currently Hadoop reports a
423    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
424    // HDFS-878 is committed.
425    if (length <= 0) {
426      LOG.warn("File {} might be still open, length is 0", path);
427    }
428
429    try {
430      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter);
431      try {
432        in = getReader(path, reporter);
433      } catch (EOFException e) {
434        if (length <= 0) {
435          // TODO should we ignore an empty, not-last log file if skip.errors
436          // is false? Either way, the caller should decide what to do. E.g.
437          // ignore if this is the last log in sequence.
438          // TODO is this scenario still possible if the log has been
439          // recovered (i.e. closed)
440          LOG.warn("Could not open {} for reading. File is empty", path, e);
441        }
442        // EOFException being ignored
443        return null;
444      }
445    } catch (IOException e) {
446      if (e instanceof FileNotFoundException) {
447        // A wal file may not exist anymore. Nothing can be recovered so move on
448        LOG.warn("File {} does not exist anymore", path, e);
449        return null;
450      }
451      if (!skipErrors || e instanceof InterruptedIOException) {
452        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
453      }
454      throw new CorruptedLogFileException("skipErrors=true Could not open wal "
455        + path + " ignoring", e);
456    }
457    return in;
458  }
459
460  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
461      throws CorruptedLogFileException, IOException {
462    try {
463      return in.next();
464    } catch (EOFException eof) {
465      // truncated files are expected if a RS crashes (see HBASE-2643)
466      LOG.info("EOF from wal {}. Continuing.", path);
467      return null;
468    } catch (IOException e) {
469      // If the IOE resulted from bad file format,
470      // then this problem is idempotent and retrying won't help
471      if (e.getCause() != null && (e.getCause() instanceof ParseException
472          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
473        LOG.warn("Parse exception from wal {}. Continuing", path, e);
474        return null;
475      }
476      if (!skipErrors) {
477        throw e;
478      }
479      throw new CorruptedLogFileException("skipErrors=true Ignoring exception"
480        + " while parsing wal " + path + ". Marking as corrupted", e);
481    }
482  }
483
484  /**
485   * Create a new {@link WALProvider.Writer} for writing log splits.
486   * @return a new Writer instance, caller should close
487   */
488  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
489    return walFactory.createRecoveredEditsWriter(walFS, logfile);
490  }
491
492  /**
493   * Create a new {@link Reader} for reading logs to split.
494   * @return new Reader instance, caller should close
495   */
496  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
497    return walFactory.createReader(walFS, curLogFile, reporter);
498  }
499
500  /**
501   * Get current open writers
502   */
503  private int getNumOpenWriters() {
504    int result = 0;
505    if (this.outputSink != null) {
506      result += this.outputSink.getNumOpenWriters();
507    }
508    return result;
509  }
510
511  /**
512   * Contains some methods to control WAL-entries producer / consumer interactions
513   */
514  public static class PipelineController {
515    // If an exception is thrown by one of the other threads, it will be
516    // stored here.
517    AtomicReference<Throwable> thrown = new AtomicReference<>();
518
519    // Wait/notify for when data has been produced by the writer thread,
520    // consumed by the reader thread, or an exception occurred
521    final Object dataAvailable = new Object();
522
523    void writerThreadError(Throwable t) {
524      thrown.compareAndSet(null, t);
525    }
526
527    /**
528     * Check for errors in the writer threads. If any is found, rethrow it.
529     */
530    void checkForErrors() throws IOException {
531      Throwable thrown = this.thrown.get();
532      if (thrown == null) {
533        return;
534      }
535      this.thrown.set(null);
536      if (thrown instanceof IOException) {
537        throw new IOException(thrown);
538      } else {
539        throw new RuntimeException(thrown);
540      }
541    }
542  }
543
544  static class CorruptedLogFileException extends Exception {
545    private static final long serialVersionUID = 1L;
546
547    CorruptedLogFileException(String s) {
548      super(s);
549    }
550
551    /**
552     * CorruptedLogFileException with cause
553     *
554     * @param message the message for this exception
555     * @param cause the cause for this exception
556     */
557    CorruptedLogFileException(String message, Throwable cause) {
558      super(message, cause);
559    }
560  }
561}