001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
027
028import com.lmax.disruptor.RingBuffer;
029import io.opentelemetry.api.trace.Span;
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.lang.management.MemoryType;
034import java.net.URLEncoder;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Comparator;
038import java.util.List;
039import java.util.Map;
040import java.util.OptionalLong;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentNavigableMap;
045import java.util.concurrent.ConcurrentSkipListMap;
046import java.util.concurrent.CopyOnWriteArrayList;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.LinkedBlockingQueue;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.concurrent.TimeoutException;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.atomic.AtomicInteger;
057import java.util.concurrent.atomic.AtomicLong;
058import java.util.concurrent.locks.ReentrantLock;
059import org.apache.commons.lang3.mutable.MutableLong;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.hbase.Abortable;
066import org.apache.hadoop.hbase.Cell;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.PrivateCellUtil;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
072import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
073import org.apache.hadoop.hbase.ipc.RpcServer;
074import org.apache.hadoop.hbase.ipc.ServerCall;
075import org.apache.hadoop.hbase.log.HBaseMarkers;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
078import org.apache.hadoop.hbase.trace.TraceUtil;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Pair;
083import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
084import org.apache.hadoop.hbase.wal.WAL;
085import org.apache.hadoop.hbase.wal.WALEdit;
086import org.apache.hadoop.hbase.wal.WALFactory;
087import org.apache.hadoop.hbase.wal.WALKeyImpl;
088import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
089import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
090import org.apache.hadoop.hbase.wal.WALSplitter;
091import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
092import org.apache.hadoop.util.StringUtils;
093import org.apache.yetus.audience.InterfaceAudience;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099/**
100 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
101 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
102 * This is done internal to the implementation.
103 * <p>
104 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
105 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
106 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
107 * flushed out to hfiles, and what is yet in WAL and in memory only.
108 * <p>
109 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
110 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
111 * (smaller) than the most-recent flush.
112 * <p>
113 * To read an WAL, call
114 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
115 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
116 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
117 * we have made successful appends to the WAL and we then are unable to sync them, our current
118 * semantic is to return error to the client that the appends failed but also to abort the current
119 * context, usually the hosting server. We need to replay the WALs. <br>
120 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
121 * that the append failed. <br>
122 * TODO: replication may pick up these last edits though they have been marked as failed append
123 * (Need to keep our own file lengths, not rely on HDFS).
124 */
125@InterfaceAudience.Private
126public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
127  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
128
129  private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
130  private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
131  /** Don't log blocking regions more frequently than this. */
132  private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
133
134  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
135  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
136  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
137  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
138  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
139    "hbase.regionserver.wal.slowsync.roll.threshold";
140  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
141  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
142    "hbase.regionserver.wal.slowsync.roll.interval.ms";
143  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
144
145  public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
146  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
147
148  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
149
150  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
151
152  public static final String RING_BUFFER_SLOT_COUNT =
153    "hbase.regionserver.wal.disruptor.event.count";
154
155  public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
156  public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
157
158  /**
159   * file system instance
160   */
161  protected final FileSystem fs;
162
163  /**
164   * WAL directory, where all WAL files would be placed.
165   */
166  protected final Path walDir;
167
168  /**
169   * dir path where old logs are kept.
170   */
171  protected final Path walArchiveDir;
172
173  /**
174   * Matches just those wal files that belong to this wal instance.
175   */
176  protected final PathFilter ourFiles;
177
178  /**
179   * Prefix of a WAL file, usually the region server name it is hosted on.
180   */
181  protected final String walFilePrefix;
182
183  /**
184   * Suffix included on generated wal file names
185   */
186  protected final String walFileSuffix;
187
188  /**
189   * Prefix used when checking for wal membership.
190   */
191  protected final String prefixPathStr;
192
193  protected final WALCoprocessorHost coprocessorHost;
194
195  /**
196   * conf object
197   */
198  protected final Configuration conf;
199
200  protected final Abortable abortable;
201
202  /** Listeners that are called on WAL events. */
203  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
204
205  /** Tracks the logs in the process of being closed. */
206  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
207
208  /**
209   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
210   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
211   * facility for answering questions such as "Is it safe to GC a WAL?".
212   */
213  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
214
215  /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
216  protected final long slowSyncNs, rollOnSyncNs;
217  protected final int slowSyncRollThreshold;
218  protected final int slowSyncCheckInterval;
219  protected final AtomicInteger slowSyncCount = new AtomicInteger();
220
221  private final long walSyncTimeoutNs;
222
223  private final long walTooOldNs;
224
225  // If > than this size, roll the log.
226  protected final long logrollsize;
227
228  /**
229   * Block size to use writing files.
230   */
231  protected final long blocksize;
232
233  /*
234   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
235   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
236   */
237  protected final int maxLogs;
238
239  protected final boolean useHsync;
240
241  /**
242   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
243   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
244   * warning when it thinks synchronized controls writer thread safety. It is held when we are
245   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
246   * not.
247   */
248  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
249
250  // The timestamp (in ms) when the log file was created.
251  protected final AtomicLong filenum = new AtomicLong(-1);
252
253  // Number of transactions in the current Wal.
254  protected final AtomicInteger numEntries = new AtomicInteger(0);
255
256  /**
257   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
258   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
259   * corresponding entry in queue.
260   */
261  protected volatile long highestUnsyncedTxid = -1;
262
263  /**
264   * Updated to the transaction id of the last successful sync call. This can be less than
265   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
266   * for it.
267   */
268  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
269
270  /**
271   * The total size of wal
272   */
273  protected final AtomicLong totalLogSize = new AtomicLong(0);
274  /**
275   * Current log file.
276   */
277  volatile W writer;
278
279  // Last time to check low replication on hlog's pipeline
280  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
281
282  // Last time we asked to roll the log due to a slow sync
283  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
284
285  protected volatile boolean closed = false;
286
287  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
288
289  protected final long walShutdownTimeout;
290
291  private long nextLogTooOldNs = System.nanoTime();
292
293  /**
294   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
295   * an IllegalArgumentException if used to compare paths from different wals.
296   */
297  final Comparator<Path> LOG_NAME_COMPARATOR =
298    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
299
300  private static final class WalProps {
301
302    /**
303     * Map the encoded region name to the highest sequence id.
304     * <p/>
305     * Contains all the regions it has an entry for.
306     */
307    public final Map<byte[], Long> encodedName2HighestSequenceId;
308
309    /**
310     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
311     * sub classes.
312     */
313    public final long logSize;
314
315    /**
316     * The nanoTime of the log rolling, used to determine the time interval that has passed since.
317     */
318    public final long rollTimeNs;
319
320    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
321      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
322      this.logSize = logSize;
323      this.rollTimeNs = System.nanoTime();
324    }
325  }
326
327  /**
328   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
329   * (contained in the log file name).
330   */
331  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
332    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
333
334  /**
335   * A cache of sync futures reused by threads.
336   */
337  protected final SyncFutureCache syncFutureCache;
338
339  /**
340   * The class name of the runtime implementation, used as prefix for logging/tracing.
341   * <p>
342   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
343   * refer to HBASE-17676 for more details
344   * </p>
345   */
346  protected final String implClassName;
347
348  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
349
350  // Run in caller if we get reject execution exception, to avoid aborting region server when we get
351  // reject execution exception. Usually this should not happen but let's make it more robust.
352  private final ExecutorService logArchiveExecutor =
353    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
354      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
355      new ThreadPoolExecutor.CallerRunsPolicy());
356
357  private final int archiveRetries;
358
359  public long getFilenum() {
360    return this.filenum.get();
361  }
362
363  /**
364   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
365   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
366   * the filename is created with the {@link #computeFilename(long filenum)} method.
367   * @return timestamp, as in the log file name.
368   */
369  protected long getFileNumFromFileName(Path fileName) {
370    checkNotNull(fileName, "file name can't be null");
371    if (!ourFiles.accept(fileName)) {
372      throw new IllegalArgumentException(
373        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
374    }
375    final String fileNameString = fileName.toString();
376    String chompedPath = fileNameString.substring(prefixPathStr.length(),
377      (fileNameString.length() - walFileSuffix.length()));
378    return Long.parseLong(chompedPath);
379  }
380
381  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
382    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
383    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
384  }
385
386  // must be power of 2
387  protected final int getPreallocatedEventCount() {
388    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
389    // be stuck and make no progress if the buffer is filled with appends only and there is no
390    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
391    // before they return.
392    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
393    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
394    int floor = Integer.highestOneBit(preallocatedEventCount);
395    if (floor == preallocatedEventCount) {
396      return floor;
397    }
398    // max capacity is 1 << 30
399    if (floor >= 1 << 29) {
400      return 1 << 30;
401    }
402    return floor << 1;
403  }
404
405  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
406    final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
407    final boolean failIfWALExists, final String prefix, final String suffix)
408    throws FailedLogCloseException, IOException {
409    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
410  }
411
412  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
413    final String logDir, final String archiveDir, final Configuration conf,
414    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
415    final String suffix) throws FailedLogCloseException, IOException {
416    this.fs = fs;
417    this.walDir = new Path(rootDir, logDir);
418    this.walArchiveDir = new Path(rootDir, archiveDir);
419    this.conf = conf;
420    this.abortable = abortable;
421
422    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
423      throw new IOException("Unable to mkdir " + walDir);
424    }
425
426    if (!fs.exists(this.walArchiveDir)) {
427      if (!fs.mkdirs(this.walArchiveDir)) {
428        throw new IOException("Unable to mkdir " + this.walArchiveDir);
429      }
430    }
431
432    // If prefix is null||empty then just name it wal
433    this.walFilePrefix =
434      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
435    // we only correctly differentiate suffices when numeric ones start with '.'
436    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
437      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
438        + "' but instead was '" + suffix + "'");
439    }
440    // Now that it exists, set the storage policy for the entire directory of wal files related to
441    // this FSHLog instance
442    String storagePolicy =
443      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
444    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
445    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
446    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
447
448    this.ourFiles = new PathFilter() {
449      @Override
450      public boolean accept(final Path fileName) {
451        // The path should start with dir/<prefix> and end with our suffix
452        final String fileNameString = fileName.toString();
453        if (!fileNameString.startsWith(prefixPathStr)) {
454          return false;
455        }
456        if (walFileSuffix.isEmpty()) {
457          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
458          return org.apache.commons.lang3.StringUtils
459            .isNumeric(fileNameString.substring(prefixPathStr.length()));
460        } else if (!fileNameString.endsWith(walFileSuffix)) {
461          return false;
462        }
463        return true;
464      }
465    };
466
467    if (failIfWALExists) {
468      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
469      if (null != walFiles && 0 != walFiles.length) {
470        throw new IOException("Target WAL already exists within directory " + walDir);
471      }
472    }
473
474    // Register listeners. TODO: Should this exist anymore? We have CPs?
475    if (listeners != null) {
476      for (WALActionsListener i : listeners) {
477        registerWALActionsListener(i);
478      }
479    }
480    this.coprocessorHost = new WALCoprocessorHost(this, conf);
481
482    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
483    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
484    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
485    // the block size but experience from the field has it that this was not enough time for the
486    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
487    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
488    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
489    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
490    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
491    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
492    this.logrollsize = (long) (this.blocksize * multiplier);
493    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
494
495    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
496      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
497      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
498      + ", maxLogs=" + this.maxLogs);
499    this.slowSyncNs =
500      TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
501    this.rollOnSyncNs = TimeUnit.MILLISECONDS
502      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
503    this.slowSyncRollThreshold =
504      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
505    this.slowSyncCheckInterval =
506      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
507    this.walSyncTimeoutNs =
508      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
509    this.syncFutureCache = new SyncFutureCache(conf);
510    this.implClassName = getClass().getSimpleName();
511    this.walTooOldNs = TimeUnit.SECONDS
512      .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
513    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
514    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
515    this.walShutdownTimeout =
516      conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
517  }
518
519  /**
520   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
521   */
522  public void init() throws IOException {
523    rollWriter();
524  }
525
526  @Override
527  public void registerWALActionsListener(WALActionsListener listener) {
528    this.listeners.add(listener);
529  }
530
531  @Override
532  public boolean unregisterWALActionsListener(WALActionsListener listener) {
533    return this.listeners.remove(listener);
534  }
535
536  @Override
537  public WALCoprocessorHost getCoprocessorHost() {
538    return coprocessorHost;
539  }
540
541  @Override
542  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
543    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
544  }
545
546  @Override
547  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
548    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
549  }
550
551  @Override
552  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
553    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
554  }
555
556  @Override
557  public void abortCacheFlush(byte[] encodedRegionName) {
558    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
559  }
560
561  @Override
562  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
563    // Used by tests. Deprecated as too subtle for general usage.
564    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
565  }
566
567  @Override
568  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
569    // This method is used by tests and for figuring if we should flush or not because our
570    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
571    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
572    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
573    // currently flushing sequence ids, and if anything found there, it is returning these. This is
574    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
575    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
576    // id is old even though we are currently flushing. This may mean we do too much flushing.
577    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
578  }
579
580  @Override
581  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
582    return rollWriter(false);
583  }
584
585  @Override
586  public final void sync() throws IOException {
587    sync(useHsync);
588  }
589
590  @Override
591  public final void sync(long txid) throws IOException {
592    sync(txid, useHsync);
593  }
594
595  @Override
596  public final void sync(boolean forceSync) throws IOException {
597    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
598  }
599
600  @Override
601  public final void sync(long txid, boolean forceSync) throws IOException {
602    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
603  }
604
605  protected abstract void doSync(boolean forceSync) throws IOException;
606
607  protected abstract void doSync(long txid, boolean forceSync) throws IOException;
608
609  /**
610   * This is a convenience method that computes a new filename with a given file-number.
611   * @param filenum to use
612   */
613  protected Path computeFilename(final long filenum) {
614    if (filenum < 0) {
615      throw new RuntimeException("WAL file number can't be < 0");
616    }
617    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
618    return new Path(walDir, child);
619  }
620
621  /**
622   * This is a convenience method that computes a new filename with a given using the current WAL
623   * file-number
624   */
625  public Path getCurrentFileName() {
626    return computeFilename(this.filenum.get());
627  }
628
629  /**
630   * retrieve the next path to use for writing. Increments the internal filenum.
631   */
632  private Path getNewPath() throws IOException {
633    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
634    Path newPath = getCurrentFileName();
635    return newPath;
636  }
637
638  public Path getOldPath() {
639    long currentFilenum = this.filenum.get();
640    Path oldPath = null;
641    if (currentFilenum > 0) {
642      // ComputeFilename will take care of meta wal filename
643      oldPath = computeFilename(currentFilenum);
644    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
645    return oldPath;
646  }
647
648  /**
649   * Tell listeners about pre log roll.
650   */
651  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
652    throws IOException {
653    coprocessorHost.preWALRoll(oldPath, newPath);
654
655    if (!this.listeners.isEmpty()) {
656      for (WALActionsListener i : this.listeners) {
657        i.preLogRoll(oldPath, newPath);
658      }
659    }
660  }
661
662  /**
663   * Tell listeners about post log roll.
664   */
665  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
666    throws IOException {
667    if (!this.listeners.isEmpty()) {
668      for (WALActionsListener i : this.listeners) {
669        i.postLogRoll(oldPath, newPath);
670      }
671    }
672
673    coprocessorHost.postWALRoll(oldPath, newPath);
674  }
675
676  // public only until class moves to o.a.h.h.wal
677  /** Returns the number of rolled log files */
678  public int getNumRolledLogFiles() {
679    return walFile2Props.size();
680  }
681
682  // public only until class moves to o.a.h.h.wal
683  /** Returns the number of log files in use */
684  public int getNumLogFiles() {
685    // +1 for current use log
686    return getNumRolledLogFiles() + 1;
687  }
688
689  /**
690   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
691   * first (oldest) WAL, and return those regions which should be flushed so that it can be
692   * let-go/'archived'.
693   * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
694   */
695  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
696    Map<byte[], List<byte[]>> regions = null;
697    int logCount = getNumRolledLogFiles();
698    if (logCount > this.maxLogs && logCount > 0) {
699      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
700      regions =
701        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
702    }
703    if (regions != null) {
704      List<String> listForPrint = new ArrayList<>();
705      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
706        StringBuilder families = new StringBuilder();
707        for (int i = 0; i < r.getValue().size(); i++) {
708          if (i > 0) {
709            families.append(",");
710          }
711          families.append(Bytes.toString(r.getValue().get(i)));
712        }
713        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
714      }
715      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
716        + "; forcing (partial) flush of " + regions.size() + " region(s): "
717        + StringUtils.join(",", listForPrint));
718    }
719    return regions;
720  }
721
722  /**
723   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
724   */
725  private void cleanOldLogs() throws IOException {
726    List<Pair<Path, Long>> logsToArchive = null;
727    long now = System.nanoTime();
728    boolean mayLogTooOld = nextLogTooOldNs <= now;
729    ArrayList<byte[]> regionsBlockingWal = null;
730    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
731    // are older than what is currently in memory, the WAL can be GC'd.
732    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
733      Path log = e.getKey();
734      ArrayList<byte[]> regionsBlockingThisWal = null;
735      long ageNs = now - e.getValue().rollTimeNs;
736      if (ageNs > walTooOldNs) {
737        if (mayLogTooOld && regionsBlockingWal == null) {
738          regionsBlockingWal = new ArrayList<>();
739        }
740        regionsBlockingThisWal = regionsBlockingWal;
741      }
742      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
743      if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
744        if (logsToArchive == null) {
745          logsToArchive = new ArrayList<>();
746        }
747        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
748        if (LOG.isTraceEnabled()) {
749          LOG.trace("WAL file ready for archiving " + log);
750        }
751      } else if (regionsBlockingThisWal != null) {
752        StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
753          .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
754        boolean isFirst = true;
755        for (byte[] region : regionsBlockingThisWal) {
756          if (!isFirst) {
757            sb.append("; ");
758          }
759          isFirst = false;
760          sb.append(Bytes.toString(region));
761        }
762        LOG.warn(sb.toString());
763        nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
764        regionsBlockingThisWal.clear();
765      }
766    }
767
768    if (logsToArchive != null) {
769      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
770      // make it async
771      for (Pair<Path, Long> log : localLogsToArchive) {
772        logArchiveExecutor.execute(() -> {
773          archive(log);
774        });
775        this.walFile2Props.remove(log.getFirst());
776      }
777    }
778  }
779
780  protected void archive(final Pair<Path, Long> log) {
781    totalLogSize.addAndGet(-log.getSecond());
782    int retry = 1;
783    while (true) {
784      try {
785        archiveLogFile(log.getFirst());
786        // successful
787        break;
788      } catch (Throwable e) {
789        if (retry > archiveRetries) {
790          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
791          if (this.abortable != null) {
792            this.abortable.abort("Failed log archiving", e);
793            break;
794          }
795        } else {
796          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
797        }
798        retry++;
799      }
800    }
801  }
802
803  /*
804   * only public so WALSplitter can use.
805   * @return archived location of a WAL file with the given path p
806   */
807  public static Path getWALArchivePath(Path archiveDir, Path p) {
808    return new Path(archiveDir, p.getName());
809  }
810
811  protected void archiveLogFile(final Path p) throws IOException {
812    Path newPath = getWALArchivePath(this.walArchiveDir, p);
813    // Tell our listeners that a log is going to be archived.
814    if (!this.listeners.isEmpty()) {
815      for (WALActionsListener i : this.listeners) {
816        i.preLogArchive(p, newPath);
817      }
818    }
819    LOG.info("Archiving " + p + " to " + newPath);
820    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
821      throw new IOException("Unable to rename " + p + " to " + newPath);
822    }
823    // Tell our listeners that a log has been archived.
824    if (!this.listeners.isEmpty()) {
825      for (WALActionsListener i : this.listeners) {
826        i.postLogArchive(p, newPath);
827      }
828    }
829  }
830
831  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
832    int oldNumEntries = this.numEntries.getAndSet(0);
833    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
834    if (oldPath != null) {
835      this.walFile2Props.put(oldPath,
836        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
837      this.totalLogSize.addAndGet(oldFileLen);
838      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
839        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
840        newPathString);
841    } else {
842      LOG.info("New WAL {}", newPathString);
843    }
844  }
845
846  private Span createSpan(String name) {
847    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
848  }
849
850  /**
851   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
852   * <p/>
853   * <ul>
854   * <li>In the case of creating a new WAL, oldPath will be null.</li>
855   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
856   * </li>
857   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
858   * null.</li>
859   * </ul>
860   * @param oldPath    may be null
861   * @param newPath    may be null
862   * @param nextWriter may be null
863   * @return the passed in <code>newPath</code>
864   * @throws IOException if there is a problem flushing or closing the underlying FS
865   */
866  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
867    return TraceUtil.trace(() -> {
868      doReplaceWriter(oldPath, newPath, nextWriter);
869      return newPath;
870    }, () -> createSpan("WAL.replaceWriter"));
871  }
872
873  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
874    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
875    try {
876      if (syncFuture != null) {
877        if (closed) {
878          throw new IOException("WAL has been closed");
879        } else {
880          syncFuture.get(walSyncTimeoutNs);
881        }
882      }
883    } catch (TimeoutIOException tioe) {
884      throw new WALSyncTimeoutIOException(tioe);
885    } catch (InterruptedException ie) {
886      LOG.warn("Interrupted", ie);
887      throw convertInterruptedExceptionToIOException(ie);
888    } catch (ExecutionException e) {
889      throw ensureIOException(e.getCause());
890    }
891  }
892
893  private static IOException ensureIOException(final Throwable t) {
894    return (t instanceof IOException) ? (IOException) t : new IOException(t);
895  }
896
897  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
898    Thread.currentThread().interrupt();
899    IOException ioe = new InterruptedIOException();
900    ioe.initCause(ie);
901    return ioe;
902  }
903
904  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
905    rollWriterLock.lock();
906    try {
907      if (this.closed) {
908        throw new WALClosedException("WAL has been closed");
909      }
910      // Return if nothing to flush.
911      if (!force && this.writer != null && this.numEntries.get() <= 0) {
912        return null;
913      }
914      Map<byte[], List<byte[]>> regionsToFlush = null;
915      try {
916        Path oldPath = getOldPath();
917        Path newPath = getNewPath();
918        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
919        W nextWriter = this.createWriterInstance(newPath);
920        tellListenersAboutPreLogRoll(oldPath, newPath);
921        // NewPath could be equal to oldPath if replaceWriter fails.
922        newPath = replaceWriter(oldPath, newPath, nextWriter);
923        tellListenersAboutPostLogRoll(oldPath, newPath);
924        if (LOG.isDebugEnabled()) {
925          LOG.debug("Create new " + implClassName + " writer with pipeline: "
926            + Arrays.toString(getPipeline()));
927        }
928        // We got a new writer, so reset the slow sync count
929        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
930        slowSyncCount.set(0);
931        // Can we delete any of the old log files?
932        if (getNumRolledLogFiles() > 0) {
933          cleanOldLogs();
934          regionsToFlush = findRegionsToForceFlush();
935        }
936      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
937        // If the underlying FileSystem can't do what we ask, treat as IO failure so
938        // we'll abort.
939        throw new IOException(
940          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
941          exception);
942      }
943      return regionsToFlush;
944    } finally {
945      rollWriterLock.unlock();
946    }
947  }
948
949  @Override
950  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
951    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
952  }
953
954  // public only until class moves to o.a.h.h.wal
955  /** Returns the size of log files in use */
956  public long getLogFileSize() {
957    return this.totalLogSize.get();
958  }
959
960  // public only until class moves to o.a.h.h.wal
961  public void requestLogRoll() {
962    requestLogRoll(ERROR);
963  }
964
965  /**
966   * Get the backing files associated with this WAL.
967   * @return may be null if there are no files.
968   */
969  FileStatus[] getFiles() throws IOException {
970    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
971  }
972
973  @Override
974  public void shutdown() throws IOException {
975    if (!shutdown.compareAndSet(false, true)) {
976      return;
977    }
978    closed = true;
979    // Tell our listeners that the log is closing
980    if (!this.listeners.isEmpty()) {
981      for (WALActionsListener i : this.listeners) {
982        i.logCloseRequested();
983      }
984    }
985
986    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
987      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
988
989    Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
990      @Override
991      public Void call() throws Exception {
992        if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
993          try {
994            doShutdown();
995            if (syncFutureCache != null) {
996              syncFutureCache.clear();
997            }
998          } finally {
999            rollWriterLock.unlock();
1000          }
1001        } else {
1002          throw new IOException("Waiting for rollWriterLock timeout");
1003        }
1004        return null;
1005      }
1006    });
1007    shutdownExecutor.shutdown();
1008
1009    try {
1010      future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
1011    } catch (InterruptedException e) {
1012      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1013    } catch (TimeoutException e) {
1014      throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1015        + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1016        + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1017        + "\"", e);
1018    } catch (ExecutionException e) {
1019      if (e.getCause() instanceof IOException) {
1020        throw (IOException) e.getCause();
1021      } else {
1022        throw new IOException(e.getCause());
1023      }
1024    } finally {
1025      // in shutdown we may call cleanOldLogs so shutdown this executor in the end.
1026      // In sync replication implementation, we may shutdown a WAL without shutting down the whole
1027      // region server, if we shutdown this executor earlier we may get reject execution exception
1028      // and abort the region server
1029      logArchiveExecutor.shutdown();
1030    }
1031  }
1032
1033  @Override
1034  public void close() throws IOException {
1035    shutdown();
1036    final FileStatus[] files = getFiles();
1037    if (null != files && 0 != files.length) {
1038      for (FileStatus file : files) {
1039        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
1040        // Tell our listeners that a log is going to be archived.
1041        if (!this.listeners.isEmpty()) {
1042          for (WALActionsListener i : this.listeners) {
1043            i.preLogArchive(file.getPath(), p);
1044          }
1045        }
1046
1047        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1048          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1049        }
1050        // Tell our listeners that a log was archived.
1051        if (!this.listeners.isEmpty()) {
1052          for (WALActionsListener i : this.listeners) {
1053            i.postLogArchive(file.getPath(), p);
1054          }
1055        }
1056      }
1057      LOG.debug(
1058        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
1059    }
1060    LOG.info("Closed WAL: " + toString());
1061  }
1062
1063  /** Returns number of WALs currently in the process of closing. */
1064  public int getInflightWALCloseCount() {
1065    return inflightWALClosures.size();
1066  }
1067
1068  /**
1069   * updates the sequence number of a specific store. depending on the flag: replaces current seq
1070   * number if the given seq id is bigger, or even if it is lower than existing one
1071   */
1072  @Override
1073  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
1074    boolean onlyIfGreater) {
1075    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
1076  }
1077
1078  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1079    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
1080  }
1081
1082  protected boolean isLogRollRequested() {
1083    return rollRequested.get();
1084  }
1085
1086  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
1087    // If we have already requested a roll, don't do it again
1088    // And only set rollRequested to true when there is a registered listener
1089    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
1090      for (WALActionsListener i : this.listeners) {
1091        i.logRollRequested(reason);
1092      }
1093    }
1094  }
1095
1096  long getUnflushedEntriesCount() {
1097    long highestSynced = this.highestSyncedTxid.get();
1098    long highestUnsynced = this.highestUnsyncedTxid;
1099    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1100  }
1101
1102  boolean isUnflushedEntries() {
1103    return getUnflushedEntriesCount() > 0;
1104  }
1105
1106  /**
1107   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1108   */
1109  protected void atHeadOfRingBufferEventHandlerAppend() {
1110    // Noop
1111  }
1112
1113  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1114    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1115    atHeadOfRingBufferEventHandlerAppend();
1116    long start = EnvironmentEdgeManager.currentTime();
1117    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1118    long regionSequenceId = entry.getKey().getSequenceId();
1119
1120    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1121    // region sequence id only, a region edit/sequence id that is not associated with an actual
1122    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1123    if (entry.getEdit().isEmpty()) {
1124      return false;
1125    }
1126
1127    // Coprocessor hook.
1128    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1129    if (!listeners.isEmpty()) {
1130      for (WALActionsListener i : listeners) {
1131        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1132      }
1133    }
1134    doAppend(writer, entry);
1135    assert highestUnsyncedTxid < entry.getTxid();
1136    highestUnsyncedTxid = entry.getTxid();
1137    if (entry.isCloseRegion()) {
1138      // let's clean all the records of this region
1139      sequenceIdAccounting.onRegionClose(encodedRegionName);
1140    } else {
1141      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1142        entry.isInMemStore());
1143    }
1144    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1145    // Update metrics.
1146    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1147    numEntries.incrementAndGet();
1148    return true;
1149  }
1150
1151  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1152    long len = 0;
1153    if (!listeners.isEmpty()) {
1154      for (Cell cell : e.getEdit().getCells()) {
1155        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1156      }
1157      for (WALActionsListener listener : listeners) {
1158        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1159      }
1160    }
1161    return len;
1162  }
1163
1164  protected final void postSync(long timeInNanos, int handlerSyncs) {
1165    if (timeInNanos > this.slowSyncNs) {
1166      String msg = new StringBuilder().append("Slow sync cost: ")
1167        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1168        .append(Arrays.toString(getPipeline())).toString();
1169      LOG.info(msg);
1170      if (timeInNanos > this.rollOnSyncNs) {
1171        // A single sync took too long.
1172        // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1173        // effects. Here we have a single data point that indicates we should take immediate
1174        // action, so do so.
1175        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1176          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1177          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1178          + Arrays.toString(getPipeline()));
1179        requestLogRoll(SLOW_SYNC);
1180      }
1181      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1182    }
1183    if (!listeners.isEmpty()) {
1184      for (WALActionsListener listener : listeners) {
1185        listener.postSync(timeInNanos, handlerSyncs);
1186      }
1187    }
1188  }
1189
1190  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1191    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1192    if (this.closed) {
1193      throw new IOException(
1194        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1195    }
1196    MutableLong txidHolder = new MutableLong();
1197    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1198      txidHolder.setValue(ringBuffer.next());
1199    });
1200    long txid = txidHolder.longValue();
1201    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
1202    try {
1203      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1204      entry.stampRegionSequenceId(we);
1205      ringBuffer.get(txid).load(entry);
1206    } finally {
1207      ringBuffer.publish(txid);
1208    }
1209    return txid;
1210  }
1211
1212  @Override
1213  public String toString() {
1214    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1215  }
1216
1217  /**
1218   * if the given {@code path} is being written currently, then return its length.
1219   * <p>
1220   * This is used by replication to prevent replicating unacked log entries. See
1221   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1222   */
1223  @Override
1224  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1225    rollWriterLock.lock();
1226    try {
1227      Path currentPath = getOldPath();
1228      if (path.equals(currentPath)) {
1229        // Currently active path.
1230        W writer = this.writer;
1231        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1232      } else {
1233        W temp = inflightWALClosures.get(path.getName());
1234        if (temp != null) {
1235          // In the process of being closed, trailer bytes may or may not be flushed.
1236          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1237          // use cases like replication, see HBASE-25924/HBASE-25932.
1238          return OptionalLong.of(temp.getSyncedLength());
1239        }
1240        // Log rolled successfully.
1241        return OptionalLong.empty();
1242      }
1243    } finally {
1244      rollWriterLock.unlock();
1245    }
1246  }
1247
1248  @Override
1249  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1250    return TraceUtil.trace(() -> append(info, key, edits, true),
1251      () -> createSpan("WAL.appendData"));
1252  }
1253
1254  @Override
1255  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1256    return TraceUtil.trace(() -> append(info, key, edits, false),
1257      () -> createSpan("WAL.appendMarker"));
1258  }
1259
1260  /**
1261   * Append a set of edits to the WAL.
1262   * <p/>
1263   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1264   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1265   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1266   * <p/>
1267   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1268   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1269   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1270   * 'complete' the transaction this mvcc transaction by calling
1271   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1272   * in the finally of a try/finally block within which this append lives and any subsequent
1273   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1274   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1275   * immediately available on return from this method. It WILL be available subsequent to a sync of
1276   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1277   * @param info       the regioninfo associated with append
1278   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1279   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1280   *                   sequence id that is after all currently appended edits.
1281   * @param inMemstore Always true except for case where we are writing a region event meta marker
1282   *                   edit, for example, a compaction completion record into the WAL or noting a
1283   *                   Region Open event. In these cases the entry is just so we can finish an
1284   *                   unfinished compaction after a crash when the new Server reads the WAL on
1285   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1286   *                   When memstore is false, we presume a Marker event edit.
1287   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1288   *         in it.
1289   */
1290  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1291    throws IOException;
1292
1293  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1294
1295  protected abstract W createWriterInstance(Path path)
1296    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1297
1298  /**
1299   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1300   * will begin to work before returning from this method. If we clear the flag after returning from
1301   * this call, we may miss a roll request. The implementation class should choose a proper place to
1302   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1303   * start writing to the new writer.
1304   */
1305  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1306    throws IOException;
1307
1308  protected abstract void doShutdown() throws IOException;
1309
1310  protected abstract boolean doCheckLogLowReplication();
1311
1312  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
1313  protected boolean doCheckSlowSync() {
1314    boolean result = false;
1315    long now = EnvironmentEdgeManager.currentTime();
1316    long elapsedTime = now - lastTimeCheckSlowSync;
1317    if (elapsedTime >= slowSyncCheckInterval) {
1318      if (slowSyncCount.get() >= slowSyncRollThreshold) {
1319        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
1320          // If two or more slowSyncCheckInterval have elapsed this is a corner case
1321          // where a train of slow syncs almost triggered us but then there was a long
1322          // interval from then until the one more that pushed us over. If so, we
1323          // should do nothing and let the count reset.
1324          if (LOG.isDebugEnabled()) {
1325            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
1326              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
1327              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
1328          }
1329          // Fall through to count reset below
1330        } else {
1331          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
1332            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
1333            + Arrays.toString(getPipeline()));
1334          result = true;
1335        }
1336      }
1337      lastTimeCheckSlowSync = now;
1338      slowSyncCount.set(0);
1339    }
1340    return result;
1341  }
1342
1343  public void checkLogLowReplication(long checkInterval) {
1344    long now = EnvironmentEdgeManager.currentTime();
1345    if (now - lastTimeCheckLowReplication < checkInterval) {
1346      return;
1347    }
1348    // Will return immediately if we are in the middle of a WAL log roll currently.
1349    if (!rollWriterLock.tryLock()) {
1350      return;
1351    }
1352    try {
1353      lastTimeCheckLowReplication = now;
1354      if (doCheckLogLowReplication()) {
1355        requestLogRoll(LOW_REPLICATION);
1356      }
1357    } finally {
1358      rollWriterLock.unlock();
1359    }
1360  }
1361
1362  /**
1363   * This method gets the pipeline for the current WAL.
1364   */
1365  abstract DatanodeInfo[] getPipeline();
1366
1367  /**
1368   * This method gets the datanode replication count for the current WAL.
1369   */
1370  abstract int getLogReplication();
1371
1372  private static void split(final Configuration conf, final Path p) throws IOException {
1373    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
1374    if (!fs.exists(p)) {
1375      throw new FileNotFoundException(p.toString());
1376    }
1377    if (!fs.getFileStatus(p).isDirectory()) {
1378      throw new IOException(p + " is not a directory");
1379    }
1380
1381    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
1382    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1383    if (
1384      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1385        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
1386    ) {
1387      archiveDir = new Path(archiveDir, p.getName());
1388    }
1389    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1390  }
1391
1392  private static void usage() {
1393    System.err.println("Usage: AbstractFSWAL <ARGS>");
1394    System.err.println("Arguments:");
1395    System.err.println(" --dump  Dump textual representation of passed one or more files");
1396    System.err.println("         For example: "
1397      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1398    System.err.println(" --split Split the passed directory of WAL logs");
1399    System.err.println(
1400      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1401  }
1402
1403  /**
1404   * Pass one or more log file names and it will either dump out a text version on
1405   * <code>stdout</code> or split the specified log files.
1406   */
1407  public static void main(String[] args) throws IOException {
1408    if (args.length < 2) {
1409      usage();
1410      System.exit(-1);
1411    }
1412    // either dump using the WALPrettyPrinter or split, depending on args
1413    if (args[0].compareTo("--dump") == 0) {
1414      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1415    } else if (args[0].compareTo("--perf") == 0) {
1416      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1417      LOG.error(HBaseMarkers.FATAL,
1418        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1419      System.exit(-1);
1420    } else if (args[0].compareTo("--split") == 0) {
1421      Configuration conf = HBaseConfiguration.create();
1422      for (int i = 1; i < args.length; i++) {
1423        try {
1424          Path logPath = new Path(args[i]);
1425          CommonFSUtils.setFsDefault(conf, logPath);
1426          split(conf, logPath);
1427        } catch (IOException t) {
1428          t.printStackTrace(System.err);
1429          System.exit(-1);
1430        }
1431      }
1432    } else {
1433      usage();
1434      System.exit(-1);
1435    }
1436  }
1437}