001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
021
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.text.ParseException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.commons.lang3.ArrayUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
043import org.apache.hadoop.hbase.io.HeapSize;
044import org.apache.hadoop.hbase.master.SplitLogManager;
045import org.apache.hadoop.hbase.monitoring.MonitoredTask;
046import org.apache.hadoop.hbase.monitoring.TaskMonitor;
047import org.apache.hadoop.hbase.procedure2.util.StringUtils;
048import org.apache.hadoop.hbase.regionserver.LastSequenceId;
049import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CancelableProgressable;
052import org.apache.hadoop.hbase.util.ClassSize;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.wal.WAL.Entry;
056import org.apache.hadoop.hbase.wal.WAL.Reader;
057import org.apache.hadoop.hbase.wal.WALProvider.Writer;
058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
059import org.apache.hadoop.ipc.RemoteException;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
065import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
066import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
067
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
070/**
071 * This class is responsible for splitting up a bunch of regionserver commit log
072 * files that are no longer being written to, into new files, one per region, for
073 * recovering data on startup. Delete the old log files when finished.
074 */
075@InterfaceAudience.Private
076public class WALSplitter {
077  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
078
079  /** By default we retry errors in splitting, rather than skipping. */
080  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
081
082  // Parameters for split process
083  protected final Path walDir;
084  protected final FileSystem walFS;
085  protected final Configuration conf;
086
087  // Major subcomponents of the split process.
088  // These are separated into inner classes to make testing easier.
089  OutputSink outputSink;
090  private EntryBuffers entryBuffers;
091
092  private SplitLogWorkerCoordination splitLogWorkerCoordination;
093  private final WALFactory walFactory;
094
095  private MonitoredTask status;
096
097  // For checking the latest flushed sequence id
098  protected final LastSequenceId sequenceIdChecker;
099
100  // Map encodedRegionName -> lastFlushedSequenceId
101  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
102
103  // Map encodedRegionName -> maxSeqIdInStores
104  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
105
106  // the file being split currently
107  private FileStatus fileBeingSplit;
108
109  // if we limit the number of writers opened for sinking recovered edits
110  private final boolean splitWriterCreationBounded;
111
112  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
113
114
115  @VisibleForTesting
116  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
117      LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
118    this.conf = HBaseConfiguration.create(conf);
119    String codecClassName =
120        conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
121    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
122    this.walDir = walDir;
123    this.walFS = walFS;
124    this.sequenceIdChecker = idChecker;
125    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
126
127    this.walFactory = factory;
128    PipelineController controller = new PipelineController();
129
130    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
131
132    entryBuffers = new EntryBuffers(controller,
133        this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
134        splitWriterCreationBounded);
135
136    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
137    if (splitWriterCreationBounded) {
138      outputSink =
139          new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
140    } else {
141      outputSink =
142          new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
143    }
144  }
145
146  WALFactory getWalFactory(){
147    return this.walFactory;
148  }
149
150  FileStatus getFileBeingSplit() {
151    return fileBeingSplit;
152  }
153
154  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
155    return regionMaxSeqIdInStores;
156  }
157
158  /**
159   * Splits a WAL file into region's recovered-edits directory.
160   * This is the main entry point for distributed log splitting from SplitLogWorker.
161   * <p>
162   * If the log file has N regions then N recovered.edits files will be produced.
163   * <p>
164   * @return false if it is interrupted by the progress-able.
165   */
166  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
167      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
168      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
169      throws IOException {
170    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
171        splitLogWorkerCoordination);
172    return s.splitLogFile(logfile, reporter);
173  }
174
175  // A wrapper to split one log folder using the method used by distributed
176  // log splitting. Used by tools and unit tests. It should be package private.
177  // It is public only because TestWALObserver is in a different package,
178  // which uses this method to do log splitting.
179  @VisibleForTesting
180  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
181      FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
182    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
183        Collections.singletonList(logDir), null);
184    List<Path> splits = new ArrayList<>();
185    if (ArrayUtils.isNotEmpty(logfiles)) {
186      for (FileStatus logfile: logfiles) {
187        WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
188        if (s.splitLogFile(logfile, null)) {
189          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
190          if (s.outputSink.splits != null) {
191            splits.addAll(s.outputSink.splits);
192          }
193        }
194      }
195    }
196    if (!walFS.delete(logDir, true)) {
197      throw new IOException("Unable to delete src dir: " + logDir);
198    }
199    return splits;
200  }
201
202  /**
203   * log splitting implementation, splits one log file.
204   * @param logfile should be an actual log file.
205   */
206  @VisibleForTesting
207  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
208    Preconditions.checkState(status == null);
209    Preconditions.checkArgument(logfile.isFile(),
210        "passed in file status is for something other than a regular file.");
211    boolean isCorrupted = false;
212    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
213      SPLIT_SKIP_ERRORS_DEFAULT);
214    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
215    Path logPath = logfile.getPath();
216    boolean outputSinkStarted = false;
217    boolean progress_failed = false;
218    int editsCount = 0;
219    int editsSkipped = 0;
220
221    status = TaskMonitor.get().createStatus(
222          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
223    Reader logFileReader = null;
224    this.fileBeingSplit = logfile;
225    long startTS = EnvironmentEdgeManager.currentTime();
226    try {
227      long logLength = logfile.getLen();
228      LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
229          logLength);
230      status.setStatus("Opening log file");
231      if (reporter != null && !reporter.progress()) {
232        progress_failed = true;
233        return false;
234      }
235      logFileReader = getReader(logfile, skipErrors, reporter);
236      if (logFileReader == null) {
237        LOG.warn("Nothing to split in WAL={}", logPath);
238        return true;
239      }
240      long openCost = EnvironmentEdgeManager.currentTime() - startTS;
241      LOG.info("Open WAL={} cost {} ms", logPath, openCost);
242      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
243      int numOpenedFilesLastCheck = 0;
244      outputSink.setReporter(reporter);
245      outputSink.startWriterThreads();
246      outputSinkStarted = true;
247      Entry entry;
248      Long lastFlushedSequenceId = -1L;
249      startTS = EnvironmentEdgeManager.currentTime();
250      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
251        byte[] region = entry.getKey().getEncodedRegionName();
252        String encodedRegionNameAsStr = Bytes.toString(region);
253        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
254        if (lastFlushedSequenceId == null) {
255          if (sequenceIdChecker != null) {
256            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
257            Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
258            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
259              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
260                storeSeqId.getSequenceId());
261            }
262            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
263            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
264            if (LOG.isDebugEnabled()) {
265              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
266                  TextFormat.shortDebugString(ids));
267            }
268          }
269          if (lastFlushedSequenceId == null) {
270            lastFlushedSequenceId = -1L;
271          }
272          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
273        }
274        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
275          editsSkipped++;
276          continue;
277        }
278        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
279        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
280          editsSkipped++;
281          continue;
282        }
283        entryBuffers.appendEntry(entry);
284        editsCount++;
285        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
286        // If sufficient edits have passed, check if we should report progress.
287        if (editsCount % interval == 0
288            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
289          numOpenedFilesLastCheck = this.getNumOpenWriters();
290          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
291              + " edits, skipped " + editsSkipped + " edits.";
292          status.setStatus("Split " + countsStr);
293          if (reporter != null && !reporter.progress()) {
294            progress_failed = true;
295            return false;
296          }
297        }
298      }
299    } catch (InterruptedException ie) {
300      IOException iie = new InterruptedIOException();
301      iie.initCause(ie);
302      throw iie;
303    } catch (CorruptedLogFileException e) {
304      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
305      if (splitLogWorkerCoordination != null) {
306        // Some tests pass in a csm of null.
307        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
308      } else {
309        // for tests only
310        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
311      }
312      isCorrupted = true;
313    } catch (IOException e) {
314      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
315      throw e;
316    } finally {
317      LOG.debug("Finishing writing output logs and closing down");
318      try {
319        if (null != logFileReader) {
320          logFileReader.close();
321        }
322      } catch (IOException exception) {
323        LOG.warn("Could not close WAL reader", exception);
324      }
325      try {
326        if (outputSinkStarted) {
327          // Set progress_failed to true as the immediate following statement will reset its value
328          // when finishWritingAndClose() throws exception, progress_failed has the right value
329          progress_failed = true;
330          progress_failed = outputSink.finishWritingAndClose() == null;
331        }
332      } finally {
333        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
334        // See if length got updated post lease recovery
335        String msg = "Processed " + editsCount + " edits across " +
336            outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
337            " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
338            StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
339            ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
340        LOG.info(msg);
341        status.markComplete(msg);
342      }
343    }
344    return !progress_failed;
345  }
346
347  /**
348   * Create a new {@link Reader} for reading logs to split.
349   */
350  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
351      throws IOException, CorruptedLogFileException {
352    Path path = file.getPath();
353    long length = file.getLen();
354    Reader in;
355
356    // Check for possibly empty file. With appends, currently Hadoop reports a
357    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
358    // HDFS-878 is committed.
359    if (length <= 0) {
360      LOG.warn("File {} might be still open, length is 0", path);
361    }
362
363    try {
364      FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
365      try {
366        in = getReader(path, reporter);
367      } catch (EOFException e) {
368        if (length <= 0) {
369          // TODO should we ignore an empty, not-last log file if skip.errors
370          // is false? Either way, the caller should decide what to do. E.g.
371          // ignore if this is the last log in sequence.
372          // TODO is this scenario still possible if the log has been
373          // recovered (i.e. closed)
374          LOG.warn("Could not open {} for reading. File is empty", path, e);
375        }
376        // EOFException being ignored
377        return null;
378      }
379    } catch (IOException e) {
380      if (e instanceof FileNotFoundException) {
381        // A wal file may not exist anymore. Nothing can be recovered so move on
382        LOG.warn("File {} does not exist anymore", path, e);
383        return null;
384      }
385      if (!skipErrors || e instanceof InterruptedIOException) {
386        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
387      }
388      CorruptedLogFileException t =
389        new CorruptedLogFileException("skipErrors=true Could not open wal " +
390            path + " ignoring");
391      t.initCause(e);
392      throw t;
393    }
394    return in;
395  }
396
397  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
398      throws CorruptedLogFileException, IOException {
399    try {
400      return in.next();
401    } catch (EOFException eof) {
402      // truncated files are expected if a RS crashes (see HBASE-2643)
403      LOG.info("EOF from wal {}. Continuing.", path);
404      return null;
405    } catch (IOException e) {
406      // If the IOE resulted from bad file format,
407      // then this problem is idempotent and retrying won't help
408      if (e.getCause() != null && (e.getCause() instanceof ParseException
409          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
410        LOG.warn("Parse exception from wal {}. Continuing", path, e);
411        return null;
412      }
413      if (!skipErrors) {
414        throw e;
415      }
416      CorruptedLogFileException t =
417          new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal "
418              + path + ". Marking as corrupted");
419      t.initCause(e);
420      throw t;
421    }
422  }
423
424  /**
425   * Create a new {@link WALProvider.Writer} for writing log splits.
426   * @return a new Writer instance, caller should close
427   */
428  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
429    return walFactory.createRecoveredEditsWriter(walFS, logfile);
430  }
431
432  /**
433   * Create a new {@link Reader} for reading logs to split.
434   * @return new Reader instance, caller should close
435   */
436  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
437    return walFactory.createReader(walFS, curLogFile, reporter);
438  }
439
440  /**
441   * Get current open writers
442   */
443  private int getNumOpenWriters() {
444    int result = 0;
445    if (this.outputSink != null) {
446      result += this.outputSink.getNumOpenWriters();
447    }
448    return result;
449  }
450
451  /**
452   * Contains some methods to control WAL-entries producer / consumer interactions
453   */
454  public static class PipelineController {
455    // If an exception is thrown by one of the other threads, it will be
456    // stored here.
457    AtomicReference<Throwable> thrown = new AtomicReference<>();
458
459    // Wait/notify for when data has been produced by the writer thread,
460    // consumed by the reader thread, or an exception occurred
461    final Object dataAvailable = new Object();
462
463    void writerThreadError(Throwable t) {
464      thrown.compareAndSet(null, t);
465    }
466
467    /**
468     * Check for errors in the writer threads. If any is found, rethrow it.
469     */
470    void checkForErrors() throws IOException {
471      Throwable thrown = this.thrown.get();
472      if (thrown == null) {
473        return;
474      }
475      if (thrown instanceof IOException) {
476        throw new IOException(thrown);
477      } else {
478        throw new RuntimeException(thrown);
479      }
480    }
481  }
482
483  /**
484   * A buffer of some number of edits for a given region.
485   * This accumulates edits and also provides a memory optimization in order to
486   * share a single byte array instance for the table and region name.
487   * Also tracks memory usage of the accumulated edits.
488   */
489  public static class RegionEntryBuffer implements HeapSize {
490    long heapInBuffer = 0;
491    List<Entry> entryBuffer;
492    TableName tableName;
493    byte[] encodedRegionName;
494
495    RegionEntryBuffer(TableName tableName, byte[] region) {
496      this.tableName = tableName;
497      this.encodedRegionName = region;
498      this.entryBuffer = new ArrayList<>();
499    }
500
501    long appendEntry(Entry entry) {
502      internify(entry);
503      entryBuffer.add(entry);
504      long incrHeap = entry.getEdit().heapSize() +
505        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
506        0; // TODO linkedlist entry
507      heapInBuffer += incrHeap;
508      return incrHeap;
509    }
510
511    private void internify(Entry entry) {
512      WALKeyImpl k = entry.getKey();
513      k.internTableName(this.tableName);
514      k.internEncodedRegionName(this.encodedRegionName);
515    }
516
517    @Override
518    public long heapSize() {
519      return heapInBuffer;
520    }
521
522    public byte[] getEncodedRegionName() {
523      return encodedRegionName;
524    }
525
526    public List<Entry> getEntryBuffer() {
527      return entryBuffer;
528    }
529
530    public TableName getTableName() {
531      return tableName;
532    }
533  }
534
535  /**
536   * Class wraps the actual writer which writes data out and related statistics
537   */
538  public abstract static class SinkWriter {
539    /* Count of edits written to this path */
540    long editsWritten = 0;
541    /* Count of edits skipped to this path */
542    long editsSkipped = 0;
543    /* Number of nanos spent writing to this log */
544    long nanosSpent = 0;
545
546    void incrementEdits(int edits) {
547      editsWritten += edits;
548    }
549
550    void incrementSkippedEdits(int skipped) {
551      editsSkipped += skipped;
552    }
553
554    void incrementNanoTime(long nanos) {
555      nanosSpent += nanos;
556    }
557  }
558
559  /**
560   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
561   * data written to this output.
562   */
563  final static class WriterAndPath extends SinkWriter {
564    final Path path;
565    final Writer writer;
566    final long minLogSeqNum;
567
568    WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
569      this.path = path;
570      this.writer = writer;
571      this.minLogSeqNum = minLogSeqNum;
572    }
573  }
574
575  static class CorruptedLogFileException extends Exception {
576    private static final long serialVersionUID = 1L;
577
578    CorruptedLogFileException(String s) {
579      super(s);
580    }
581  }
582}