001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
021import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
022import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
023
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.text.ParseException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.commons.lang3.ArrayUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
044import org.apache.hadoop.hbase.master.SplitLogManager;
045import org.apache.hadoop.hbase.monitoring.MonitoredTask;
046import org.apache.hadoop.hbase.monitoring.TaskMonitor;
047import org.apache.hadoop.hbase.procedure2.util.StringUtils;
048import org.apache.hadoop.hbase.regionserver.LastSequenceId;
049import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CancelableProgressable;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.FSUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WAL.Reader;
056import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
057import org.apache.hadoop.ipc.RemoteException;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
068
069/**
070 * This class is responsible for splitting up a bunch of regionserver commit log
071 * files that are no longer being written to, into new files, one per region, for
072 * recovering data on startup. Delete the old log files when finished.
073 */
074@InterfaceAudience.Private
075public class WALSplitter {
076  private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
077
078  /** By default we retry errors in splitting, rather than skipping. */
079  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
080
081  // Parameters for split process
082  protected final Path walDir;
083  protected final FileSystem walFS;
084  protected final Configuration conf;
085
086  // Major subcomponents of the split process.
087  // These are separated into inner classes to make testing easier.
088  OutputSink outputSink;
089  private EntryBuffers entryBuffers;
090
091  private SplitLogWorkerCoordination splitLogWorkerCoordination;
092  private final WALFactory walFactory;
093
094  private MonitoredTask status;
095
096  // For checking the latest flushed sequence id
097  protected final LastSequenceId sequenceIdChecker;
098
099  // Map encodedRegionName -> lastFlushedSequenceId
100  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
101
102  // Map encodedRegionName -> maxSeqIdInStores
103  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
104
105  // the file being split currently
106  private FileStatus fileBeingSplit;
107
108  private final String tmpDirName;
109
110  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
111  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
112  public final static String SPLIT_WAL_WRITER_THREADS =
113      "hbase.regionserver.hlog.splitlog.writer.threads";
114
115  @VisibleForTesting
116  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
117    LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
118    this.conf = HBaseConfiguration.create(conf);
119    String codecClassName =
120        conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
121    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
122    this.walDir = walDir;
123    this.walFS = walFS;
124    this.sequenceIdChecker = idChecker;
125    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
126
127    this.walFactory = factory;
128    PipelineController controller = new PipelineController();
129    this.tmpDirName =
130      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
131
132    // if we limit the number of writers opened for sinking recovered edits
133    boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
134    boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
135    long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
136    int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
137
138    if (splitToHFile) {
139      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
140      outputSink =
141          new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
142    } else if (splitWriterCreationBounded) {
143      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
144      outputSink =
145          new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
146    } else {
147      entryBuffers = new EntryBuffers(controller, bufferSize);
148      outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
149    }
150  }
151
152  WALFactory getWalFactory(){
153    return this.walFactory;
154  }
155
156  FileStatus getFileBeingSplit() {
157    return fileBeingSplit;
158  }
159
160  String getTmpDirName() {
161    return this.tmpDirName;
162  }
163
164  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
165    return regionMaxSeqIdInStores;
166  }
167
168  /**
169   * Splits a WAL file into region's recovered-edits directory.
170   * This is the main entry point for distributed log splitting from SplitLogWorker.
171   * <p>
172   * If the log file has N regions then N recovered.edits files will be produced.
173   * <p>
174   * @return false if it is interrupted by the progress-able.
175   */
176  public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
177      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
178      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
179      throws IOException {
180    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
181        splitLogWorkerCoordination);
182    return s.splitLogFile(logfile, reporter);
183  }
184
185  // A wrapper to split one log folder using the method used by distributed
186  // log splitting. Used by tools and unit tests. It should be package private.
187  // It is public only because TestWALObserver is in a different package,
188  // which uses this method to do log splitting.
189  @VisibleForTesting
190  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
191      FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
192    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
193        Collections.singletonList(logDir), null);
194    List<Path> splits = new ArrayList<>();
195    if (ArrayUtils.isNotEmpty(logfiles)) {
196      for (FileStatus logfile: logfiles) {
197        WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
198        if (s.splitLogFile(logfile, null)) {
199          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
200          if (s.outputSink.splits != null) {
201            splits.addAll(s.outputSink.splits);
202          }
203        }
204      }
205    }
206    if (!walFS.delete(logDir, true)) {
207      throw new IOException("Unable to delete src dir: " + logDir);
208    }
209    return splits;
210  }
211
212  /**
213   * log splitting implementation, splits one log file.
214   * @param logfile should be an actual log file.
215   */
216  @VisibleForTesting
217  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
218    Preconditions.checkState(status == null);
219    Preconditions.checkArgument(logfile.isFile(),
220        "passed in file status is for something other than a regular file.");
221    boolean isCorrupted = false;
222    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
223      SPLIT_SKIP_ERRORS_DEFAULT);
224    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
225    Path logPath = logfile.getPath();
226    boolean outputSinkStarted = false;
227    boolean progressFailed = false;
228    int editsCount = 0;
229    int editsSkipped = 0;
230
231    status = TaskMonitor.get().createStatus(
232          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
233    Reader logFileReader = null;
234    this.fileBeingSplit = logfile;
235    long startTS = EnvironmentEdgeManager.currentTime();
236    try {
237      long logLength = logfile.getLen();
238      LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
239          logLength);
240      status.setStatus("Opening log file");
241      if (reporter != null && !reporter.progress()) {
242        progressFailed = true;
243        return false;
244      }
245      logFileReader = getReader(logfile, skipErrors, reporter);
246      if (logFileReader == null) {
247        LOG.warn("Nothing to split in WAL={}", logPath);
248        return true;
249      }
250      long openCost = EnvironmentEdgeManager.currentTime() - startTS;
251      LOG.info("Open WAL={} cost {} ms", logPath, openCost);
252      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
253      int numOpenedFilesLastCheck = 0;
254      outputSink.setReporter(reporter);
255      outputSink.startWriterThreads();
256      outputSinkStarted = true;
257      Entry entry;
258      Long lastFlushedSequenceId = -1L;
259      startTS = EnvironmentEdgeManager.currentTime();
260      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
261        byte[] region = entry.getKey().getEncodedRegionName();
262        String encodedRegionNameAsStr = Bytes.toString(region);
263        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
264        if (lastFlushedSequenceId == null) {
265          if (sequenceIdChecker != null) {
266            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
267            Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
268            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
269              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
270                storeSeqId.getSequenceId());
271            }
272            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
273            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
274            if (LOG.isDebugEnabled()) {
275              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
276                  TextFormat.shortDebugString(ids));
277            }
278          }
279          if (lastFlushedSequenceId == null) {
280            lastFlushedSequenceId = -1L;
281          }
282          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
283        }
284        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
285          editsSkipped++;
286          continue;
287        }
288        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
289        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
290          editsSkipped++;
291          continue;
292        }
293        entryBuffers.appendEntry(entry);
294        editsCount++;
295        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
296        // If sufficient edits have passed, check if we should report progress.
297        if (editsCount % interval == 0
298            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
299          numOpenedFilesLastCheck = this.getNumOpenWriters();
300          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
301              + " edits, skipped " + editsSkipped + " edits.";
302          status.setStatus("Split " + countsStr);
303          if (reporter != null && !reporter.progress()) {
304            progressFailed = true;
305            return false;
306          }
307        }
308      }
309    } catch (InterruptedException ie) {
310      IOException iie = new InterruptedIOException();
311      iie.initCause(ie);
312      throw iie;
313    } catch (CorruptedLogFileException e) {
314      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
315      if (splitLogWorkerCoordination != null) {
316        // Some tests pass in a csm of null.
317        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
318      } else {
319        // for tests only
320        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
321      }
322      isCorrupted = true;
323    } catch (IOException e) {
324      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
325      throw e;
326    } finally {
327      LOG.debug("Finishing writing output logs and closing down");
328      try {
329        if (null != logFileReader) {
330          logFileReader.close();
331        }
332      } catch (IOException exception) {
333        LOG.warn("Could not close WAL reader", exception);
334      }
335      try {
336        if (outputSinkStarted) {
337          // Set progress_failed to true as the immediate following statement will reset its value
338          // when close() throws exception, progress_failed has the right value
339          progressFailed = true;
340          progressFailed = outputSink.close() == null;
341        }
342      } finally {
343        long processCost = EnvironmentEdgeManager.currentTime() - startTS;
344        // See if length got updated post lease recovery
345        String msg = "Processed " + editsCount + " edits across " +
346            outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
347            " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
348            StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
349            ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
350        LOG.info(msg);
351        status.markComplete(msg);
352      }
353    }
354    return !progressFailed;
355  }
356
357  /**
358   * Create a new {@link Reader} for reading logs to split.
359   */
360  private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
361      throws IOException, CorruptedLogFileException {
362    Path path = file.getPath();
363    long length = file.getLen();
364    Reader in;
365
366    // Check for possibly empty file. With appends, currently Hadoop reports a
367    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
368    // HDFS-878 is committed.
369    if (length <= 0) {
370      LOG.warn("File {} might be still open, length is 0", path);
371    }
372
373    try {
374      FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
375      try {
376        in = getReader(path, reporter);
377      } catch (EOFException e) {
378        if (length <= 0) {
379          // TODO should we ignore an empty, not-last log file if skip.errors
380          // is false? Either way, the caller should decide what to do. E.g.
381          // ignore if this is the last log in sequence.
382          // TODO is this scenario still possible if the log has been
383          // recovered (i.e. closed)
384          LOG.warn("Could not open {} for reading. File is empty", path, e);
385        }
386        // EOFException being ignored
387        return null;
388      }
389    } catch (IOException e) {
390      if (e instanceof FileNotFoundException) {
391        // A wal file may not exist anymore. Nothing can be recovered so move on
392        LOG.warn("File {} does not exist anymore", path, e);
393        return null;
394      }
395      if (!skipErrors || e instanceof InterruptedIOException) {
396        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
397      }
398      throw new CorruptedLogFileException("skipErrors=true Could not open wal "
399        + path + " ignoring", e);
400    }
401    return in;
402  }
403
404  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
405      throws CorruptedLogFileException, IOException {
406    try {
407      return in.next();
408    } catch (EOFException eof) {
409      // truncated files are expected if a RS crashes (see HBASE-2643)
410      LOG.info("EOF from wal {}. Continuing.", path);
411      return null;
412    } catch (IOException e) {
413      // If the IOE resulted from bad file format,
414      // then this problem is idempotent and retrying won't help
415      if (e.getCause() != null && (e.getCause() instanceof ParseException
416          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
417        LOG.warn("Parse exception from wal {}. Continuing", path, e);
418        return null;
419      }
420      if (!skipErrors) {
421        throw e;
422      }
423      throw new CorruptedLogFileException("skipErrors=true Ignoring exception"
424        + " while parsing wal " + path + ". Marking as corrupted", e);
425    }
426  }
427
428  /**
429   * Create a new {@link WALProvider.Writer} for writing log splits.
430   * @return a new Writer instance, caller should close
431   */
432  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
433    return walFactory.createRecoveredEditsWriter(walFS, logfile);
434  }
435
436  /**
437   * Create a new {@link Reader} for reading logs to split.
438   * @return new Reader instance, caller should close
439   */
440  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
441    return walFactory.createReader(walFS, curLogFile, reporter);
442  }
443
444  /**
445   * Get current open writers
446   */
447  private int getNumOpenWriters() {
448    int result = 0;
449    if (this.outputSink != null) {
450      result += this.outputSink.getNumOpenWriters();
451    }
452    return result;
453  }
454
455  /**
456   * Contains some methods to control WAL-entries producer / consumer interactions
457   */
458  public static class PipelineController {
459    // If an exception is thrown by one of the other threads, it will be
460    // stored here.
461    AtomicReference<Throwable> thrown = new AtomicReference<>();
462
463    // Wait/notify for when data has been produced by the writer thread,
464    // consumed by the reader thread, or an exception occurred
465    final Object dataAvailable = new Object();
466
467    void writerThreadError(Throwable t) {
468      thrown.compareAndSet(null, t);
469    }
470
471    /**
472     * Check for errors in the writer threads. If any is found, rethrow it.
473     */
474    void checkForErrors() throws IOException {
475      Throwable thrown = this.thrown.get();
476      if (thrown == null) {
477        return;
478      }
479      if (thrown instanceof IOException) {
480        throw new IOException(thrown);
481      } else {
482        throw new RuntimeException(thrown);
483      }
484    }
485  }
486
487  static class CorruptedLogFileException extends Exception {
488    private static final long serialVersionUID = 1L;
489
490    CorruptedLogFileException(String s) {
491      super(s);
492    }
493
494    /**
495     * CorruptedLogFileException with cause
496     *
497     * @param message the message for this exception
498     * @param cause the cause for this exception
499     */
500    CorruptedLogFileException(String message, Throwable cause) {
501      super(message, cause);
502    }
503  }
504}