001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
023
024import com.lmax.disruptor.RingBuffer;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.management.MemoryType;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Comparator;
033import java.util.List;
034import java.util.Map;
035import java.util.OptionalLong;
036import java.util.Set;
037import java.util.concurrent.ConcurrentNavigableMap;
038import java.util.concurrent.ConcurrentSkipListMap;
039import java.util.concurrent.CopyOnWriteArrayList;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicInteger;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.locks.ReentrantLock;
046import org.apache.commons.lang3.mutable.MutableLong;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.PrivateCellUtil;
056import org.apache.hadoop.hbase.client.RegionInfo;
057import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
058import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
059import org.apache.hadoop.hbase.ipc.RpcServer;
060import org.apache.hadoop.hbase.ipc.ServerCall;
061import org.apache.hadoop.hbase.log.HBaseMarkers;
062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
063import org.apache.hadoop.hbase.trace.TraceUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.FSUtils;
068import org.apache.hadoop.hbase.util.Pair;
069import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
070import org.apache.hadoop.hbase.wal.WAL;
071import org.apache.hadoop.hbase.wal.WALEdit;
072import org.apache.hadoop.hbase.wal.WALFactory;
073import org.apache.hadoop.hbase.wal.WALKeyImpl;
074import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
075import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
076import org.apache.hadoop.hbase.wal.WALSplitter;
077import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
078import org.apache.hadoop.util.StringUtils;
079import org.apache.htrace.core.TraceScope;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
085
086/**
087 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
088 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
089 * This is done internal to the implementation.
090 * <p>
091 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
092 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
093 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
094 * flushed out to hfiles, and what is yet in WAL and in memory only.
095 * <p>
096 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
097 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
098 * (smaller) than the most-recent flush.
099 * <p>
100 * To read an WAL, call
101 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
102 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
103 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
104 * we have made successful appends to the WAL and we then are unable to sync them, our current
105 * semantic is to return error to the client that the appends failed but also to abort the current
106 * context, usually the hosting server. We need to replay the WALs. <br>
107 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
108 * that the append failed. <br>
109 * TODO: replication may pick up these last edits though they have been marked as failed append
110 * (Need to keep our own file lengths, not rely on HDFS).
111 */
112@InterfaceAudience.Private
113public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
114
115  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
116
117  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
118
119  private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
120
121  /**
122   * file system instance
123   */
124  protected final FileSystem fs;
125
126  /**
127   * WAL directory, where all WAL files would be placed.
128   */
129  protected final Path walDir;
130
131  /**
132   * dir path where old logs are kept.
133   */
134  protected final Path walArchiveDir;
135
136  /**
137   * Matches just those wal files that belong to this wal instance.
138   */
139  protected final PathFilter ourFiles;
140
141  /**
142   * Prefix of a WAL file, usually the region server name it is hosted on.
143   */
144  protected final String walFilePrefix;
145
146  /**
147   * Suffix included on generated wal file names
148   */
149  protected final String walFileSuffix;
150
151  /**
152   * Prefix used when checking for wal membership.
153   */
154  protected final String prefixPathStr;
155
156  protected final WALCoprocessorHost coprocessorHost;
157
158  /**
159   * conf object
160   */
161  protected final Configuration conf;
162
163  /** Listeners that are called on WAL events. */
164  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
165
166  /**
167   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
168   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
169   * facility for answering questions such as "Is it safe to GC a WAL?".
170   */
171  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
172
173  protected final long slowSyncNs;
174
175  private final long walSyncTimeoutNs;
176
177  // If > than this size, roll the log.
178  protected final long logrollsize;
179
180  /**
181   * Block size to use writing files.
182   */
183  protected final long blocksize;
184
185  /*
186   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
187   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
188   */
189  protected final int maxLogs;
190
191  /**
192   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
193   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
194   * warning when it thinks synchronized controls writer thread safety. It is held when we are
195   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
196   * not.
197   */
198  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
199
200  // The timestamp (in ms) when the log file was created.
201  protected final AtomicLong filenum = new AtomicLong(-1);
202
203  // Number of transactions in the current Wal.
204  protected final AtomicInteger numEntries = new AtomicInteger(0);
205
206  /**
207   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
208   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
209   * corresponding entry in queue.
210   */
211  protected volatile long highestUnsyncedTxid = -1;
212
213  /**
214   * Updated to the transaction id of the last successful sync call. This can be less than
215   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
216   * for it.
217   */
218  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
219
220  /**
221   * The total size of wal
222   */
223  protected final AtomicLong totalLogSize = new AtomicLong(0);
224  /**
225   * Current log file.
226   */
227  volatile W writer;
228
229  // Last time to check low replication on hlog's pipeline
230  private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
231
232  protected volatile boolean closed = false;
233
234  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
235  /**
236   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
237   * an IllegalArgumentException if used to compare paths from different wals.
238   */
239  final Comparator<Path> LOG_NAME_COMPARATOR =
240    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
241
242  private static final class WalProps {
243
244    /**
245     * Map the encoded region name to the highest sequence id. Contain all the regions it has
246     * entries of
247     */
248    public final Map<byte[], Long> encodedName2HighestSequenceId;
249
250    /**
251     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
252     * sub classes.
253     */
254    public final long logSize;
255
256    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
257      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
258      this.logSize = logSize;
259    }
260  }
261
262  /**
263   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
264   * (contained in the log file name).
265   */
266  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
267    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
268
269  /**
270   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
271   * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
272   * <p>
273   */
274  private final ThreadLocal<SyncFuture> cachedSyncFutures;
275
276  /**
277   * The class name of the runtime implementation, used as prefix for logging/tracing.
278   * <p>
279   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
280   * refer to HBASE-17676 for more details
281   * </p>
282   */
283  protected final String implClassName;
284
285  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
286
287  public long getFilenum() {
288    return this.filenum.get();
289  }
290
291  /**
292   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
293   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
294   * the filename is created with the {@link #computeFilename(long filenum)} method.
295   * @return timestamp, as in the log file name.
296   */
297  protected long getFileNumFromFileName(Path fileName) {
298    checkNotNull(fileName, "file name can't be null");
299    if (!ourFiles.accept(fileName)) {
300      throw new IllegalArgumentException(
301          "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
302    }
303    final String fileNameString = fileName.toString();
304    String chompedPath = fileNameString.substring(prefixPathStr.length(),
305      (fileNameString.length() - walFileSuffix.length()));
306    return Long.parseLong(chompedPath);
307  }
308
309  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
310    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
311    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
312  }
313
314  // must be power of 2
315  protected final int getPreallocatedEventCount() {
316    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
317    // be stuck and make no progress if the buffer is filled with appends only and there is no
318    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
319    // before they return.
320    int preallocatedEventCount =
321      this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
322    checkArgument(preallocatedEventCount >= 0,
323      "hbase.regionserver.wal.disruptor.event.count must > 0");
324    int floor = Integer.highestOneBit(preallocatedEventCount);
325    if (floor == preallocatedEventCount) {
326      return floor;
327    }
328    // max capacity is 1 << 30
329    if (floor >= 1 << 29) {
330      return 1 << 30;
331    }
332    return floor << 1;
333  }
334
335  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
336      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
337      final boolean failIfWALExists, final String prefix, final String suffix)
338      throws FailedLogCloseException, IOException {
339    this.fs = fs;
340    this.walDir = new Path(rootDir, logDir);
341    this.walArchiveDir = new Path(rootDir, archiveDir);
342    this.conf = conf;
343
344    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
345      throw new IOException("Unable to mkdir " + walDir);
346    }
347
348    if (!fs.exists(this.walArchiveDir)) {
349      if (!fs.mkdirs(this.walArchiveDir)) {
350        throw new IOException("Unable to mkdir " + this.walArchiveDir);
351      }
352    }
353
354    // If prefix is null||empty then just name it wal
355    this.walFilePrefix =
356      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
357    // we only correctly differentiate suffices when numeric ones start with '.'
358    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
359      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
360        "' but instead was '" + suffix + "'");
361    }
362    // Now that it exists, set the storage policy for the entire directory of wal files related to
363    // this FSHLog instance
364    String storagePolicy =
365        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
366    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
367    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
368    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
369
370    this.ourFiles = new PathFilter() {
371      @Override
372      public boolean accept(final Path fileName) {
373        // The path should start with dir/<prefix> and end with our suffix
374        final String fileNameString = fileName.toString();
375        if (!fileNameString.startsWith(prefixPathStr)) {
376          return false;
377        }
378        if (walFileSuffix.isEmpty()) {
379          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
380          return org.apache.commons.lang3.StringUtils
381              .isNumeric(fileNameString.substring(prefixPathStr.length()));
382        } else if (!fileNameString.endsWith(walFileSuffix)) {
383          return false;
384        }
385        return true;
386      }
387    };
388
389    if (failIfWALExists) {
390      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
391      if (null != walFiles && 0 != walFiles.length) {
392        throw new IOException("Target WAL already exists within directory " + walDir);
393      }
394    }
395
396    // Register listeners. TODO: Should this exist anymore? We have CPs?
397    if (listeners != null) {
398      for (WALActionsListener i : listeners) {
399        registerWALActionsListener(i);
400      }
401    }
402    this.coprocessorHost = new WALCoprocessorHost(this, conf);
403
404    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
405    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
406    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
407    // the block size but experience from the field has it that this was not enough time for the
408    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
409    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
410    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
411    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
412    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
413    float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
414    this.logrollsize = (long)(this.blocksize * multiplier);
415    this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
416      Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
417
418    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
419      StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
420      walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
421    this.slowSyncNs = TimeUnit.MILLISECONDS
422        .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
423    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
424        .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
425    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
426      @Override
427      protected SyncFuture initialValue() {
428        return new SyncFuture();
429      }
430    };
431    this.implClassName = getClass().getSimpleName();
432  }
433
434  @Override
435  public void registerWALActionsListener(WALActionsListener listener) {
436    this.listeners.add(listener);
437  }
438
439  @Override
440  public boolean unregisterWALActionsListener(WALActionsListener listener) {
441    return this.listeners.remove(listener);
442  }
443
444  @Override
445  public WALCoprocessorHost getCoprocessorHost() {
446    return coprocessorHost;
447  }
448
449  @Override
450  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
451    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
452  }
453
454  @Override
455  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
456    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
457  }
458
459  @Override
460  public void completeCacheFlush(byte[] encodedRegionName) {
461    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
462  }
463
464  @Override
465  public void abortCacheFlush(byte[] encodedRegionName) {
466    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
467  }
468
469  @Override
470  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
471    // Used by tests. Deprecated as too subtle for general usage.
472    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
473  }
474
475  @Override
476  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
477    // This method is used by tests and for figuring if we should flush or not because our
478    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
479    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
480    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
481    // currently flushing sequence ids, and if anything found there, it is returning these. This is
482    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
483    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
484    // id is old even though we are currently flushing. This may mean we do too much flushing.
485    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
486  }
487
488  @Override
489  public byte[][] rollWriter() throws FailedLogCloseException, IOException {
490    return rollWriter(false);
491  }
492
493  /**
494   * This is a convenience method that computes a new filename with a given file-number.
495   * @param filenum to use
496   * @return Path
497   */
498  protected Path computeFilename(final long filenum) {
499    if (filenum < 0) {
500      throw new RuntimeException("WAL file number can't be < 0");
501    }
502    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
503    return new Path(walDir, child);
504  }
505
506  /**
507   * This is a convenience method that computes a new filename with a given using the current WAL
508   * file-number
509   * @return Path
510   */
511  public Path getCurrentFileName() {
512    return computeFilename(this.filenum.get());
513  }
514
515  /**
516   * retrieve the next path to use for writing. Increments the internal filenum.
517   */
518  private Path getNewPath() throws IOException {
519    this.filenum.set(System.currentTimeMillis());
520    Path newPath = getCurrentFileName();
521    while (fs.exists(newPath)) {
522      this.filenum.incrementAndGet();
523      newPath = getCurrentFileName();
524    }
525    return newPath;
526  }
527
528  @VisibleForTesting
529  Path getOldPath() {
530    long currentFilenum = this.filenum.get();
531    Path oldPath = null;
532    if (currentFilenum > 0) {
533      // ComputeFilename will take care of meta wal filename
534      oldPath = computeFilename(currentFilenum);
535    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
536    return oldPath;
537  }
538
539  /**
540   * Tell listeners about pre log roll.
541   */
542  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
543      throws IOException {
544    coprocessorHost.preWALRoll(oldPath, newPath);
545
546    if (!this.listeners.isEmpty()) {
547      for (WALActionsListener i : this.listeners) {
548        i.preLogRoll(oldPath, newPath);
549      }
550    }
551  }
552
553  /**
554   * Tell listeners about post log roll.
555   */
556  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
557      throws IOException {
558    if (!this.listeners.isEmpty()) {
559      for (WALActionsListener i : this.listeners) {
560        i.postLogRoll(oldPath, newPath);
561      }
562    }
563
564    coprocessorHost.postWALRoll(oldPath, newPath);
565  }
566
567  // public only until class moves to o.a.h.h.wal
568  /** @return the number of rolled log files */
569  public int getNumRolledLogFiles() {
570    return walFile2Props.size();
571  }
572
573  // public only until class moves to o.a.h.h.wal
574  /** @return the number of log files in use */
575  public int getNumLogFiles() {
576    // +1 for current use log
577    return getNumRolledLogFiles() + 1;
578  }
579
580  /**
581   * If the number of un-archived WAL files is greater than maximum allowed, check the first
582   * (oldest) WAL file, and returns those regions which should be flushed so that it can be
583   * archived.
584   * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
585   */
586  byte[][] findRegionsToForceFlush() throws IOException {
587    byte[][] regions = null;
588    int logCount = getNumRolledLogFiles();
589    if (logCount > this.maxLogs && logCount > 0) {
590      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
591      regions =
592        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
593    }
594    if (regions != null) {
595      StringBuilder sb = new StringBuilder();
596      for (int i = 0; i < regions.length; i++) {
597        if (i > 0) {
598          sb.append(", ");
599        }
600        sb.append(Bytes.toStringBinary(regions[i]));
601      }
602      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
603        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
604    }
605    return regions;
606  }
607
608  /**
609   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
610   */
611  private void cleanOldLogs() throws IOException {
612    List<Pair<Path, Long>> logsToArchive = null;
613    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
614    // are older than what is currently in memory, the WAL can be GC'd.
615    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
616      Path log = e.getKey();
617      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
618      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
619        if (logsToArchive == null) {
620          logsToArchive = new ArrayList<>();
621        }
622        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
623        if (LOG.isTraceEnabled()) {
624          LOG.trace("WAL file ready for archiving " + log);
625        }
626      }
627    }
628    if (logsToArchive != null) {
629      for (Pair<Path, Long> logAndSize : logsToArchive) {
630        this.totalLogSize.addAndGet(-logAndSize.getSecond());
631        archiveLogFile(logAndSize.getFirst());
632        this.walFile2Props.remove(logAndSize.getFirst());
633      }
634    }
635  }
636
637  /*
638   * only public so WALSplitter can use.
639   * @return archived location of a WAL file with the given path p
640   */
641  public static Path getWALArchivePath(Path archiveDir, Path p) {
642    return new Path(archiveDir, p.getName());
643  }
644
645  private void archiveLogFile(final Path p) throws IOException {
646    Path newPath = getWALArchivePath(this.walArchiveDir, p);
647    // Tell our listeners that a log is going to be archived.
648    if (!this.listeners.isEmpty()) {
649      for (WALActionsListener i : this.listeners) {
650        i.preLogArchive(p, newPath);
651      }
652    }
653    LOG.info("Archiving " + p + " to " + newPath);
654    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
655      throw new IOException("Unable to rename " + p + " to " + newPath);
656    }
657    // Tell our listeners that a log has been archived.
658    if (!this.listeners.isEmpty()) {
659      for (WALActionsListener i : this.listeners) {
660        i.postLogArchive(p, newPath);
661      }
662    }
663  }
664
665  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
666    int oldNumEntries = this.numEntries.getAndSet(0);
667    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
668    if (oldPath != null) {
669      this.walFile2Props.put(oldPath,
670        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
671      this.totalLogSize.addAndGet(oldFileLen);
672      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
673        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
674        newPathString);
675    } else {
676      LOG.info("New WAL {}", newPathString);
677    }
678  }
679
680  /**
681   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
682   * <p/>
683   * <ul>
684   * <li>In the case of creating a new WAL, oldPath will be null.</li>
685   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
686   * </li>
687   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
688   * null.</li>
689   * </ul>
690   * @param oldPath may be null
691   * @param newPath may be null
692   * @param nextWriter may be null
693   * @return the passed in <code>newPath</code>
694   * @throws IOException if there is a problem flushing or closing the underlying FS
695   */
696  @VisibleForTesting
697  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
698    try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
699      doReplaceWriter(oldPath, newPath, nextWriter);
700      return newPath;
701    }
702  }
703
704  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
705    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
706    try {
707      if (syncFuture != null) {
708        if (closed) {
709          throw new IOException("WAL has been closed");
710        } else {
711          syncFuture.get(walSyncTimeoutNs);
712        }
713      }
714    } catch (TimeoutIOException tioe) {
715      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
716      // still refer to it, so if this thread use it next time may get a wrong
717      // result.
718      this.cachedSyncFutures.remove();
719      throw tioe;
720    } catch (InterruptedException ie) {
721      LOG.warn("Interrupted", ie);
722      throw convertInterruptedExceptionToIOException(ie);
723    } catch (ExecutionException e) {
724      throw ensureIOException(e.getCause());
725    }
726  }
727
728  private static IOException ensureIOException(final Throwable t) {
729    return (t instanceof IOException) ? (IOException) t : new IOException(t);
730  }
731
732  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
733    Thread.currentThread().interrupt();
734    IOException ioe = new InterruptedIOException();
735    ioe.initCause(ie);
736    return ioe;
737  }
738
739  @Override
740  public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
741    rollWriterLock.lock();
742    try {
743      // Return if nothing to flush.
744      if (!force && this.writer != null && this.numEntries.get() <= 0) {
745        return null;
746      }
747      byte[][] regionsToFlush = null;
748      if (this.closed) {
749        LOG.debug("WAL closed. Skipping rolling of writer");
750        return regionsToFlush;
751      }
752      try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
753        Path oldPath = getOldPath();
754        Path newPath = getNewPath();
755        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
756        W nextWriter = this.createWriterInstance(newPath);
757        tellListenersAboutPreLogRoll(oldPath, newPath);
758        // NewPath could be equal to oldPath if replaceWriter fails.
759        newPath = replaceWriter(oldPath, newPath, nextWriter);
760        tellListenersAboutPostLogRoll(oldPath, newPath);
761        if (LOG.isDebugEnabled()) {
762          LOG.debug("Create new " + implClassName + " writer with pipeline: " +
763            Arrays.toString(getPipeline()));
764        }
765        // Can we delete any of the old log files?
766        if (getNumRolledLogFiles() > 0) {
767          cleanOldLogs();
768          regionsToFlush = findRegionsToForceFlush();
769        }
770      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
771        // If the underlying FileSystem can't do what we ask, treat as IO failure so
772        // we'll abort.
773        throw new IOException(
774            "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
775            exception);
776      }
777      return regionsToFlush;
778    } finally {
779      rollWriterLock.unlock();
780    }
781  }
782
783  // public only until class moves to o.a.h.h.wal
784  /** @return the size of log files in use */
785  public long getLogFileSize() {
786    return this.totalLogSize.get();
787  }
788
789  // public only until class moves to o.a.h.h.wal
790  public void requestLogRoll() {
791    requestLogRoll(false);
792  }
793
794  /**
795   * Get the backing files associated with this WAL.
796   * @return may be null if there are no files.
797   */
798  @VisibleForTesting
799  FileStatus[] getFiles() throws IOException {
800    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
801  }
802
803  @Override
804  public void shutdown() throws IOException {
805    if (!shutdown.compareAndSet(false, true)) {
806      return;
807    }
808    closed = true;
809    // Tell our listeners that the log is closing
810    if (!this.listeners.isEmpty()) {
811      for (WALActionsListener i : this.listeners) {
812        i.logCloseRequested();
813      }
814    }
815    rollWriterLock.lock();
816    try {
817      doShutdown();
818    } finally {
819      rollWriterLock.unlock();
820    }
821  }
822
823  @Override
824  public void close() throws IOException {
825    shutdown();
826    final FileStatus[] files = getFiles();
827    if (null != files && 0 != files.length) {
828      for (FileStatus file : files) {
829        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
830        // Tell our listeners that a log is going to be archived.
831        if (!this.listeners.isEmpty()) {
832          for (WALActionsListener i : this.listeners) {
833            i.preLogArchive(file.getPath(), p);
834          }
835        }
836
837        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
838          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
839        }
840        // Tell our listeners that a log was archived.
841        if (!this.listeners.isEmpty()) {
842          for (WALActionsListener i : this.listeners) {
843            i.postLogArchive(file.getPath(), p);
844          }
845        }
846      }
847      LOG.debug(
848        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
849    }
850    LOG.info("Closed WAL: " + toString());
851  }
852
853  /**
854   * updates the sequence number of a specific store. depending on the flag: replaces current seq
855   * number if the given seq id is bigger, or even if it is lower than existing one
856   * @param encodedRegionName
857   * @param familyName
858   * @param sequenceid
859   * @param onlyIfGreater
860   */
861  @Override
862  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
863      boolean onlyIfGreater) {
864    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
865  }
866
867  protected final SyncFuture getSyncFuture(long sequence) {
868    return cachedSyncFutures.get().reset(sequence);
869  }
870
871  protected boolean isLogRollRequested() {
872    return rollRequested.get();
873  }
874
875  protected final void requestLogRoll(boolean tooFewReplicas) {
876    // If we have already requested a roll, don't do it again
877    // And only set rollRequested to true when there is a registered listener
878    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
879      for (WALActionsListener i : this.listeners) {
880        i.logRollRequested(tooFewReplicas);
881      }
882    }
883  }
884
885  long getUnflushedEntriesCount() {
886    long highestSynced = this.highestSyncedTxid.get();
887    long highestUnsynced = this.highestUnsyncedTxid;
888    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
889  }
890
891  boolean isUnflushedEntries() {
892    return getUnflushedEntriesCount() > 0;
893  }
894
895  /**
896   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
897   */
898  @VisibleForTesting
899  protected void atHeadOfRingBufferEventHandlerAppend() {
900    // Noop
901  }
902
903  protected final boolean append(W writer, FSWALEntry entry) throws IOException {
904    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
905    atHeadOfRingBufferEventHandlerAppend();
906    long start = EnvironmentEdgeManager.currentTime();
907    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
908    long regionSequenceId = entry.getKey().getSequenceId();
909
910    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
911    // region sequence id only, a region edit/sequence id that is not associated with an actual
912    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
913    if (entry.getEdit().isEmpty()) {
914      return false;
915    }
916
917    // Coprocessor hook.
918    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
919    if (!listeners.isEmpty()) {
920      for (WALActionsListener i : listeners) {
921        i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
922      }
923    }
924    doAppend(writer, entry);
925    assert highestUnsyncedTxid < entry.getTxid();
926    highestUnsyncedTxid = entry.getTxid();
927    sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
928      entry.isInMemStore());
929    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
930    // Update metrics.
931    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
932    numEntries.incrementAndGet();
933    return true;
934  }
935
936  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
937    long len = 0;
938    if (!listeners.isEmpty()) {
939      for (Cell cell : e.getEdit().getCells()) {
940        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
941      }
942      for (WALActionsListener listener : listeners) {
943        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
944      }
945    }
946    return len;
947  }
948
949  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
950    if (timeInNanos > this.slowSyncNs) {
951      String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
952          .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
953      TraceUtil.addTimelineAnnotation(msg);
954      LOG.info(msg);
955    }
956    if (!listeners.isEmpty()) {
957      for (WALActionsListener listener : listeners) {
958        listener.postSync(timeInNanos, handlerSyncs);
959      }
960    }
961  }
962
963  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
964      WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
965      throws IOException {
966    if (this.closed) {
967      throw new IOException(
968          "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
969    }
970    MutableLong txidHolder = new MutableLong();
971    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
972      txidHolder.setValue(ringBuffer.next());
973    });
974    long txid = txidHolder.longValue();
975    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
976      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
977    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
978      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
979      entry.stampRegionSequenceId(we);
980      ringBuffer.get(txid).load(entry);
981    } finally {
982      ringBuffer.publish(txid);
983    }
984    return txid;
985  }
986
987  @Override
988  public String toString() {
989    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
990  }
991
992  /**
993   * if the given {@code path} is being written currently, then return its length.
994   * <p>
995   * This is used by replication to prevent replicating unacked log entries. See
996   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
997   */
998  @Override
999  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1000    rollWriterLock.lock();
1001    try {
1002      Path currentPath = getOldPath();
1003      if (path.equals(currentPath)) {
1004        W writer = this.writer;
1005        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
1006      } else {
1007        return OptionalLong.empty();
1008      }
1009    } finally {
1010      rollWriterLock.unlock();
1011    }
1012  }
1013
1014  /**
1015   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1016   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1017   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1018   * 'complete' the transaction this mvcc transaction by calling
1019   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1020   * in the finally of a try/finally block within which this append lives and any subsequent
1021   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1022   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1023   * immediately available on return from this method. It WILL be available subsequent to a sync of
1024   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1025   */
1026  @Override
1027  public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1028      throws IOException;
1029
1030  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1031
1032  protected abstract W createWriterInstance(Path path)
1033      throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1034
1035  /**
1036   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1037   * will begin to work before returning from this method. If we clear the flag after returning from
1038   * this call, we may miss a roll request. The implementation class should choose a proper place to
1039   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1040   * start writing to the new writer.
1041   */
1042  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1043      throws IOException;
1044
1045  protected abstract void doShutdown() throws IOException;
1046
1047  protected abstract boolean doCheckLogLowReplication();
1048
1049  public void checkLogLowReplication(long checkInterval) {
1050    long now = EnvironmentEdgeManager.currentTime();
1051    if (now - lastTimeCheckLowReplication < checkInterval) {
1052      return;
1053    }
1054    // Will return immediately if we are in the middle of a WAL log roll currently.
1055    if (!rollWriterLock.tryLock()) {
1056      return;
1057    }
1058    try {
1059      lastTimeCheckLowReplication = now;
1060      if (doCheckLogLowReplication()) {
1061        requestLogRoll(true);
1062      }
1063    } finally {
1064      rollWriterLock.unlock();
1065    }
1066  }
1067
1068  /**
1069   * This method gets the pipeline for the current WAL.
1070   */
1071  @VisibleForTesting
1072  abstract DatanodeInfo[] getPipeline();
1073
1074  /**
1075   * This method gets the datanode replication count for the current WAL.
1076   */
1077  @VisibleForTesting
1078  abstract int getLogReplication();
1079
1080  private static void split(final Configuration conf, final Path p) throws IOException {
1081    FileSystem fs = FSUtils.getWALFileSystem(conf);
1082    if (!fs.exists(p)) {
1083      throw new FileNotFoundException(p.toString());
1084    }
1085    if (!fs.getFileStatus(p).isDirectory()) {
1086      throw new IOException(p + " is not a directory");
1087    }
1088
1089    final Path baseDir = FSUtils.getWALRootDir(conf);
1090    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1091    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1092      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
1093      archiveDir = new Path(archiveDir, p.getName());
1094    }
1095    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1096  }
1097
1098  private static void usage() {
1099    System.err.println("Usage: AbstractFSWAL <ARGS>");
1100    System.err.println("Arguments:");
1101    System.err.println(" --dump  Dump textual representation of passed one or more files");
1102    System.err.println("         For example: " +
1103      "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1104    System.err.println(" --split Split the passed directory of WAL logs");
1105    System.err.println(
1106      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1107  }
1108
1109  /**
1110   * Pass one or more log file names and it will either dump out a text version on
1111   * <code>stdout</code> or split the specified log files.
1112   */
1113  public static void main(String[] args) throws IOException {
1114    if (args.length < 2) {
1115      usage();
1116      System.exit(-1);
1117    }
1118    // either dump using the WALPrettyPrinter or split, depending on args
1119    if (args[0].compareTo("--dump") == 0) {
1120      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1121    } else if (args[0].compareTo("--perf") == 0) {
1122      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1123      LOG.error(HBaseMarkers.FATAL,
1124        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1125      System.exit(-1);
1126    } else if (args[0].compareTo("--split") == 0) {
1127      Configuration conf = HBaseConfiguration.create();
1128      for (int i = 1; i < args.length; i++) {
1129        try {
1130          Path logPath = new Path(args[i]);
1131          FSUtils.setFsDefault(conf, logPath);
1132          split(conf, logPath);
1133        } catch (IOException t) {
1134          t.printStackTrace(System.err);
1135          System.exit(-1);
1136        }
1137      }
1138    } else {
1139      usage();
1140      System.exit(-1);
1141    }
1142  }
1143}