001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
023import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
024import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
027import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
028import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
029
030import com.lmax.disruptor.RingBuffer;
031import com.lmax.disruptor.Sequence;
032import com.lmax.disruptor.Sequencer;
033import io.opentelemetry.api.trace.Span;
034import java.io.FileNotFoundException;
035import java.io.IOException;
036import java.io.InterruptedIOException;
037import java.lang.management.MemoryType;
038import java.net.URLEncoder;
039import java.nio.charset.StandardCharsets;
040import java.util.ArrayDeque;
041import java.util.ArrayList;
042import java.util.Arrays;
043import java.util.Comparator;
044import java.util.Deque;
045import java.util.Iterator;
046import java.util.List;
047import java.util.Map;
048import java.util.OptionalLong;
049import java.util.Set;
050import java.util.SortedSet;
051import java.util.TreeSet;
052import java.util.concurrent.Callable;
053import java.util.concurrent.CompletableFuture;
054import java.util.concurrent.ConcurrentHashMap;
055import java.util.concurrent.ConcurrentNavigableMap;
056import java.util.concurrent.ConcurrentSkipListMap;
057import java.util.concurrent.CopyOnWriteArrayList;
058import java.util.concurrent.ExecutionException;
059import java.util.concurrent.ExecutorService;
060import java.util.concurrent.Executors;
061import java.util.concurrent.Future;
062import java.util.concurrent.LinkedBlockingQueue;
063import java.util.concurrent.ThreadPoolExecutor;
064import java.util.concurrent.TimeUnit;
065import java.util.concurrent.TimeoutException;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.atomic.AtomicInteger;
068import java.util.concurrent.atomic.AtomicLong;
069import java.util.concurrent.locks.Condition;
070import java.util.concurrent.locks.Lock;
071import java.util.concurrent.locks.ReentrantLock;
072import java.util.function.Supplier;
073import org.apache.commons.lang3.mutable.MutableLong;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileStatus;
076import org.apache.hadoop.fs.FileSystem;
077import org.apache.hadoop.fs.Path;
078import org.apache.hadoop.fs.PathFilter;
079import org.apache.hadoop.hbase.Abortable;
080import org.apache.hadoop.hbase.Cell;
081import org.apache.hadoop.hbase.HBaseConfiguration;
082import org.apache.hadoop.hbase.HConstants;
083import org.apache.hadoop.hbase.PrivateCellUtil;
084import org.apache.hadoop.hbase.client.ConnectionUtils;
085import org.apache.hadoop.hbase.client.RegionInfo;
086import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
087import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
088import org.apache.hadoop.hbase.ipc.RpcServer;
089import org.apache.hadoop.hbase.ipc.ServerCall;
090import org.apache.hadoop.hbase.log.HBaseMarkers;
091import org.apache.hadoop.hbase.regionserver.HRegion;
092import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
093import org.apache.hadoop.hbase.trace.TraceUtil;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.CommonFSUtils;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.Pair;
098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
099import org.apache.hadoop.hbase.wal.WAL;
100import org.apache.hadoop.hbase.wal.WALEdit;
101import org.apache.hadoop.hbase.wal.WALFactory;
102import org.apache.hadoop.hbase.wal.WALKeyImpl;
103import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
104import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
105import org.apache.hadoop.hbase.wal.WALSplitter;
106import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
107import org.apache.hadoop.util.StringUtils;
108import org.apache.yetus.audience.InterfaceAudience;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
113import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
114import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
115
116/**
117 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
118 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
119 * This is done internal to the implementation.
120 * <p>
121 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
122 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
123 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
124 * flushed out to hfiles, and what is yet in WAL and in memory only.
125 * <p>
126 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
127 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
128 * (smaller) than the most-recent flush.
129 * <p>
130 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
131 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
132 * replication where we may want to tail the active WAL file.
133 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
134 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
135 * we have made successful appends to the WAL and we then are unable to sync them, our current
136 * semantic is to return error to the client that the appends failed but also to abort the current
137 * context, usually the hosting server. We need to replay the WALs. <br>
138 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
139 * that the append failed. <br>
140 * TODO: replication may pick up these last edits though they have been marked as failed append
141 * (Need to keep our own file lengths, not rely on HDFS).
142 */
143@InterfaceAudience.Private
144public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
145  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
146
147  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
148    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
149
150  private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
151  private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
152  /** Don't log blocking regions more frequently than this. */
153  private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
154
155  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
156  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
157  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
158  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
159  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
160    "hbase.regionserver.wal.slowsync.roll.threshold";
161  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
162  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
163    "hbase.regionserver.wal.slowsync.roll.interval.ms";
164  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
165
166  public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
167  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
168
169  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
170
171  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
172
173  public static final String RING_BUFFER_SLOT_COUNT =
174    "hbase.regionserver.wal.disruptor.event.count";
175
176  public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
177  public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
178
179  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
180  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
181
182  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
183    "hbase.regionserver.wal.avoid-local-writes";
184  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
185
186  /**
187   * file system instance
188   */
189  protected final FileSystem fs;
190
191  /**
192   * WAL directory, where all WAL files would be placed.
193   */
194  protected final Path walDir;
195
196  private final FileSystem remoteFs;
197
198  private final Path remoteWALDir;
199
200  /**
201   * dir path where old logs are kept.
202   */
203  protected final Path walArchiveDir;
204
205  /**
206   * Matches just those wal files that belong to this wal instance.
207   */
208  protected final PathFilter ourFiles;
209
210  /**
211   * Prefix of a WAL file, usually the region server name it is hosted on.
212   */
213  protected final String walFilePrefix;
214
215  /**
216   * Suffix included on generated wal file names
217   */
218  protected final String walFileSuffix;
219
220  /**
221   * Prefix used when checking for wal membership.
222   */
223  protected final String prefixPathStr;
224
225  protected final WALCoprocessorHost coprocessorHost;
226
227  /**
228   * conf object
229   */
230  protected final Configuration conf;
231
232  protected final Abortable abortable;
233
234  /** Listeners that are called on WAL events. */
235  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
236
237  /** Tracks the logs in the process of being closed. */
238  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
239
240  /**
241   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
242   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
243   * facility for answering questions such as "Is it safe to GC a WAL?".
244   */
245  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
246
247  /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
248  protected final long slowSyncNs, rollOnSyncNs;
249  protected final int slowSyncRollThreshold;
250  protected final int slowSyncCheckInterval;
251  protected final AtomicInteger slowSyncCount = new AtomicInteger();
252
253  private final long walSyncTimeoutNs;
254
255  private final long walTooOldNs;
256
257  // If > than this size, roll the log.
258  protected final long logrollsize;
259
260  /**
261   * Block size to use writing files.
262   */
263  protected final long blocksize;
264
265  /*
266   * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If
267   * too many and we crash, then will take forever replaying. Keep the number of logs tidy.
268   */
269  protected final int maxLogs;
270
271  protected final boolean useHsync;
272
273  /**
274   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
275   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
276   * warning when it thinks synchronized controls writer thread safety. It is held when we are
277   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
278   * not.
279   */
280  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
281
282  // The timestamp (in ms) when the log file was created.
283  protected final AtomicLong filenum = new AtomicLong(-1);
284
285  // Number of transactions in the current Wal.
286  protected final AtomicInteger numEntries = new AtomicInteger(0);
287
288  /**
289   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
290   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
291   * corresponding entry in queue.
292   */
293  protected volatile long highestUnsyncedTxid = -1;
294
295  /**
296   * Updated to the transaction id of the last successful sync call. This can be less than
297   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
298   * for it.
299   */
300  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
301
302  /**
303   * The total size of wal
304   */
305  protected final AtomicLong totalLogSize = new AtomicLong(0);
306  /**
307   * Current log file.
308   */
309  volatile W writer;
310
311  // Last time to check low replication on hlog's pipeline
312  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
313
314  // Last time we asked to roll the log due to a slow sync
315  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
316
317  protected volatile boolean closed = false;
318
319  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
320
321  protected final long walShutdownTimeout;
322
323  private long nextLogTooOldNs = System.nanoTime();
324
325  /**
326   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
327   * an IllegalArgumentException if used to compare paths from different wals.
328   */
329  final Comparator<Path> LOG_NAME_COMPARATOR =
330    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
331
332  private static final class WALProps {
333
334    /**
335     * Map the encoded region name to the highest sequence id.
336     * <p/>
337     * Contains all the regions it has an entry for.
338     */
339    private final Map<byte[], Long> encodedName2HighestSequenceId;
340
341    /**
342     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
343     * subclasses.
344     */
345    private final long logSize;
346
347    /**
348     * The nanoTime of the log rolling, used to determine the time interval that has passed since.
349     */
350    private final long rollTimeNs;
351
352    /**
353     * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the
354     * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
355     * for safety.
356     */
357    private volatile boolean closed = false;
358
359    WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
360      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
361      this.logSize = logSize;
362      this.rollTimeNs = System.nanoTime();
363    }
364  }
365
366  /**
367   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
368   * (contained in the log file name).
369   */
370  protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
371    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
372
373  /**
374   * A cache of sync futures reused by threads.
375   */
376  protected final SyncFutureCache syncFutureCache;
377
378  /**
379   * The class name of the runtime implementation, used as prefix for logging/tracing.
380   * <p>
381   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
382   * refer to HBASE-17676 for more details
383   * </p>
384   */
385  protected final String implClassName;
386
387  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
388
389  protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
390    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
391
392  // Run in caller if we get reject execution exception, to avoid aborting region server when we get
393  // reject execution exception. Usually this should not happen but let's make it more robust.
394  private final ExecutorService logArchiveExecutor =
395    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
396      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
397      new ThreadPoolExecutor.CallerRunsPolicy());
398
399  private final int archiveRetries;
400
401  protected ExecutorService consumeExecutor;
402
403  private final Lock consumeLock = new ReentrantLock();
404
405  protected final Runnable consumer = this::consume;
406
407  // check if there is already a consumer task in the event loop's task queue
408  protected Supplier<Boolean> hasConsumerTask;
409
410  private static final int MAX_EPOCH = 0x3FFFFFFF;
411  // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old
412  // writer to be closed.
413  // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter
414  // is needed.
415  // all other bits are the epoch number of the current writer, this is used to detect whether the
416  // writer is still the one when you issue the sync.
417  // notice that, modification to this field is only allowed under the protection of consumeLock.
418  private volatile int epochAndState;
419
420  private boolean readyForRolling;
421
422  private final Condition readyForRollingCond = consumeLock.newCondition();
423
424  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
425
426  private final Sequence waitingConsumePayloadsGatingSequence;
427
428  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
429
430  private final long batchSize;
431
432  protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
433
434  protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
435
436  protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
437
438  // the highest txid of WAL entries being processed
439  protected long highestProcessedAppendTxid;
440
441  // file length when we issue last sync request on the writer
442  private long fileLengthAtLastSync;
443
444  private long highestProcessedAppendTxidAtLastSync;
445
446  private int waitOnShutdownInSeconds;
447
448  private String waitOnShutdownInSecondsConfigKey;
449
450  protected boolean shouldShutDownConsumeExecutorWhenClose = true;
451
452  private volatile boolean skipRemoteWAL = false;
453
454  private volatile boolean markerEditOnly = false;
455
456  public long getFilenum() {
457    return this.filenum.get();
458  }
459
460  /**
461   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
462   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
463   * the filename is created with the {@link #computeFilename(long filenum)} method.
464   * @return timestamp, as in the log file name.
465   */
466  protected long getFileNumFromFileName(Path fileName) {
467    checkNotNull(fileName, "file name can't be null");
468    if (!ourFiles.accept(fileName)) {
469      throw new IllegalArgumentException(
470        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
471    }
472    final String fileNameString = fileName.toString();
473    String chompedPath = fileNameString.substring(prefixPathStr.length(),
474      (fileNameString.length() - walFileSuffix.length()));
475    return Long.parseLong(chompedPath);
476  }
477
478  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
479    checkArgument(logRollSize > 0,
480      "The log roll size cannot be zero or negative when calculating max log files, "
481        + "current value is " + logRollSize);
482    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
483    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
484  }
485
486  // must be power of 2
487  protected final int getPreallocatedEventCount() {
488    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
489    // be stuck and make no progress if the buffer is filled with appends only and there is no
490    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
491    // before they return.
492    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
493    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
494    int floor = Integer.highestOneBit(preallocatedEventCount);
495    if (floor == preallocatedEventCount) {
496      return floor;
497    }
498    // max capacity is 1 << 30
499    if (floor >= 1 << 29) {
500      return 1 << 30;
501    }
502    return floor << 1;
503  }
504
505  protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds,
506    String waitOnShutdownInSecondsConfigKey) {
507    this.waitOnShutdownInSeconds = waitOnShutdownInSeconds;
508    this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey;
509  }
510
511  protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir,
512    final String prefix) {
513    ThreadPoolExecutor threadPool =
514      new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
515        new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"
516          + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());
517    hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
518    consumeExecutor = threadPool;
519    this.shouldShutDownConsumeExecutorWhenClose = true;
520  }
521
522  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
523    final String logDir, final String archiveDir, final Configuration conf,
524    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
525    final String suffix, FileSystem remoteFs, Path remoteWALDir)
526    throws FailedLogCloseException, IOException {
527    this.fs = fs;
528    this.walDir = new Path(rootDir, logDir);
529    this.walArchiveDir = new Path(rootDir, archiveDir);
530    this.conf = conf;
531    this.abortable = abortable;
532    this.remoteFs = remoteFs;
533    this.remoteWALDir = remoteWALDir;
534
535    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
536      throw new IOException("Unable to mkdir " + walDir);
537    }
538
539    if (!fs.exists(this.walArchiveDir)) {
540      if (!fs.mkdirs(this.walArchiveDir)) {
541        throw new IOException("Unable to mkdir " + this.walArchiveDir);
542      }
543    }
544
545    // If prefix is null||empty then just name it wal
546    this.walFilePrefix = prefix == null || prefix.isEmpty()
547      ? "wal"
548      : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name());
549    // we only correctly differentiate suffices when numeric ones start with '.'
550    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
551      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
552        + "' but instead was '" + suffix + "'");
553    }
554    // Now that it exists, set the storage policy for the entire directory of wal files related to
555    // this FSHLog instance
556    String storagePolicy =
557      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
558    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
559    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
560    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
561
562    this.ourFiles = new PathFilter() {
563      @Override
564      public boolean accept(final Path fileName) {
565        // The path should start with dir/<prefix> and end with our suffix
566        final String fileNameString = fileName.toString();
567        if (!fileNameString.startsWith(prefixPathStr)) {
568          return false;
569        }
570        if (walFileSuffix.isEmpty()) {
571          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
572          return org.apache.commons.lang3.StringUtils
573            .isNumeric(fileNameString.substring(prefixPathStr.length()));
574        } else if (!fileNameString.endsWith(walFileSuffix)) {
575          return false;
576        }
577        return true;
578      }
579    };
580
581    if (failIfWALExists) {
582      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
583      if (null != walFiles && 0 != walFiles.length) {
584        throw new IOException("Target WAL already exists within directory " + walDir);
585      }
586    }
587
588    // Register listeners. TODO: Should this exist anymore? We have CPs?
589    if (listeners != null) {
590      for (WALActionsListener i : listeners) {
591        registerWALActionsListener(i);
592      }
593    }
594    this.coprocessorHost = new WALCoprocessorHost(this, conf);
595
596    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
597    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
598    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
599    // the block size but experience from the field has it that this was not enough time for the
600    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
601    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
602    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
603    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
604    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
605    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
606    this.logrollsize = (long) (this.blocksize * multiplier);
607    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
608
609    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
610      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
611      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
612      + ", maxLogs=" + this.maxLogs);
613    this.slowSyncNs =
614      TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
615    this.rollOnSyncNs = TimeUnit.MILLISECONDS
616      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
617    this.slowSyncRollThreshold =
618      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
619    this.slowSyncCheckInterval =
620      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
621    this.walSyncTimeoutNs =
622      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
623    this.syncFutureCache = new SyncFutureCache(conf);
624    this.implClassName = getClass().getSimpleName();
625    this.walTooOldNs = TimeUnit.SECONDS
626      .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
627    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
628    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
629    this.walShutdownTimeout =
630      conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
631
632    int preallocatedEventCount =
633      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
634    waitingConsumePayloads =
635      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
636    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
637    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
638
639    // inrease the ringbuffer sequence so our txid is start from 1
640    waitingConsumePayloads.publish(waitingConsumePayloads.next());
641    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
642
643    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
644  }
645
646  /**
647   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
648   */
649  @Override
650  public void init() throws IOException {
651    rollWriter();
652  }
653
654  @Override
655  public void registerWALActionsListener(WALActionsListener listener) {
656    this.listeners.add(listener);
657  }
658
659  @Override
660  public boolean unregisterWALActionsListener(WALActionsListener listener) {
661    return this.listeners.remove(listener);
662  }
663
664  @Override
665  public WALCoprocessorHost getCoprocessorHost() {
666    return coprocessorHost;
667  }
668
669  @Override
670  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
671    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
672  }
673
674  @Override
675  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
676    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
677  }
678
679  @Override
680  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
681    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
682  }
683
684  @Override
685  public void abortCacheFlush(byte[] encodedRegionName) {
686    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
687  }
688
689  @Override
690  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
691    // Used by tests. Deprecated as too subtle for general usage.
692    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
693  }
694
695  @Override
696  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
697    // This method is used by tests and for figuring if we should flush or not because our
698    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
699    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
700    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
701    // currently flushing sequence ids, and if anything found there, it is returning these. This is
702    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
703    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
704    // id is old even though we are currently flushing. This may mean we do too much flushing.
705    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
706  }
707
708  @Override
709  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
710    return rollWriter(false);
711  }
712
713  @Override
714  public final void sync() throws IOException {
715    sync(useHsync);
716  }
717
718  @Override
719  public final void sync(long txid) throws IOException {
720    sync(txid, useHsync);
721  }
722
723  @Override
724  public final void sync(boolean forceSync) throws IOException {
725    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
726  }
727
728  @Override
729  public final void sync(long txid, boolean forceSync) throws IOException {
730    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
731  }
732
733  /**
734   * This is a convenience method that computes a new filename with a given file-number.
735   * @param filenum to use
736   */
737  protected Path computeFilename(final long filenum) {
738    if (filenum < 0) {
739      throw new RuntimeException("WAL file number can't be < 0");
740    }
741    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
742    return new Path(walDir, child);
743  }
744
745  /**
746   * This is a convenience method that computes a new filename with a given using the current WAL
747   * file-number
748   */
749  public Path getCurrentFileName() {
750    return computeFilename(this.filenum.get());
751  }
752
753  /**
754   * retrieve the next path to use for writing. Increments the internal filenum.
755   */
756  private Path getNewPath() throws IOException {
757    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
758    Path newPath = getCurrentFileName();
759    return newPath;
760  }
761
762  public Path getOldPath() {
763    long currentFilenum = this.filenum.get();
764    Path oldPath = null;
765    if (currentFilenum > 0) {
766      // ComputeFilename will take care of meta wal filename
767      oldPath = computeFilename(currentFilenum);
768    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
769    return oldPath;
770  }
771
772  /**
773   * Tell listeners about pre log roll.
774   */
775  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
776    throws IOException {
777    coprocessorHost.preWALRoll(oldPath, newPath);
778
779    if (!this.listeners.isEmpty()) {
780      for (WALActionsListener i : this.listeners) {
781        i.preLogRoll(oldPath, newPath);
782      }
783    }
784  }
785
786  /**
787   * Tell listeners about post log roll.
788   */
789  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
790    throws IOException {
791    if (!this.listeners.isEmpty()) {
792      for (WALActionsListener i : this.listeners) {
793        i.postLogRoll(oldPath, newPath);
794      }
795    }
796
797    coprocessorHost.postWALRoll(oldPath, newPath);
798  }
799
800  // public only until class moves to o.a.h.h.wal
801  /** Returns the number of rolled log files */
802  public int getNumRolledLogFiles() {
803    return walFile2Props.size();
804  }
805
806  // public only until class moves to o.a.h.h.wal
807  /** Returns the number of log files in use */
808  public int getNumLogFiles() {
809    // +1 for current use log
810    return getNumRolledLogFiles() + 1;
811  }
812
813  /**
814   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
815   * first (oldest) WAL, and return those regions which should be flushed so that it can be
816   * let-go/'archived'.
817   * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file
818   */
819  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
820    Map<byte[], List<byte[]>> regions = null;
821    int logCount = getNumRolledLogFiles();
822    if (logCount > this.maxLogs && logCount > 0) {
823      Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
824      regions =
825        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
826    }
827    if (regions != null) {
828      List<String> listForPrint = new ArrayList<>();
829      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
830        StringBuilder families = new StringBuilder();
831        for (int i = 0; i < r.getValue().size(); i++) {
832          if (i > 0) {
833            families.append(",");
834          }
835          families.append(Bytes.toString(r.getValue().get(i)));
836        }
837        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
838      }
839      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
840        + "; forcing (partial) flush of " + regions.size() + " region(s): "
841        + StringUtils.join(",", listForPrint));
842    }
843    return regions;
844  }
845
846  /**
847   * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
848   */
849  private void markClosedAndClean(Path path) {
850    WALProps props = walFile2Props.get(path);
851    // typically this should not be null, but if there is no big issue if it is already null, so
852    // let's make the code more robust
853    if (props != null) {
854      props.closed = true;
855      cleanOldLogs();
856    }
857  }
858
859  /**
860   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
861   * <p/>
862   * Use synchronized because we may call this method in different threads, normally when replacing
863   * writer, and since now close writer may be asynchronous, we will also call this method in the
864   * closeExecutor, right after we actually close a WAL writer.
865   */
866  private synchronized void cleanOldLogs() {
867    List<Pair<Path, Long>> logsToArchive = null;
868    long now = System.nanoTime();
869    boolean mayLogTooOld = nextLogTooOldNs <= now;
870    ArrayList<byte[]> regionsBlockingWal = null;
871    // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids
872    // are older than what is currently in memory, the WAL can be GC'd.
873    for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
874      if (!e.getValue().closed) {
875        LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
876        continue;
877      }
878      Path log = e.getKey();
879      ArrayList<byte[]> regionsBlockingThisWal = null;
880      long ageNs = now - e.getValue().rollTimeNs;
881      if (ageNs > walTooOldNs) {
882        if (mayLogTooOld && regionsBlockingWal == null) {
883          regionsBlockingWal = new ArrayList<>();
884        }
885        regionsBlockingThisWal = regionsBlockingWal;
886      }
887      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
888      if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
889        if (logsToArchive == null) {
890          logsToArchive = new ArrayList<>();
891        }
892        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
893        if (LOG.isTraceEnabled()) {
894          LOG.trace("WAL file ready for archiving " + log);
895        }
896      } else if (regionsBlockingThisWal != null) {
897        StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
898          .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
899        boolean isFirst = true;
900        for (byte[] region : regionsBlockingThisWal) {
901          if (!isFirst) {
902            sb.append("; ");
903          }
904          isFirst = false;
905          sb.append(Bytes.toString(region));
906        }
907        LOG.warn(sb.toString());
908        nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
909        regionsBlockingThisWal.clear();
910      }
911    }
912
913    if (logsToArchive != null) {
914      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
915      // make it async
916      for (Pair<Path, Long> log : localLogsToArchive) {
917        logArchiveExecutor.execute(() -> {
918          archive(log);
919        });
920        this.walFile2Props.remove(log.getFirst());
921      }
922    }
923  }
924
925  protected void archive(final Pair<Path, Long> log) {
926    totalLogSize.addAndGet(-log.getSecond());
927    int retry = 1;
928    while (true) {
929      try {
930        archiveLogFile(log.getFirst());
931        // successful
932        break;
933      } catch (Throwable e) {
934        if (retry > archiveRetries) {
935          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
936          if (this.abortable != null) {
937            this.abortable.abort("Failed log archiving", e);
938            break;
939          }
940        } else {
941          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
942        }
943        retry++;
944      }
945    }
946  }
947
948  /*
949   * only public so WALSplitter can use.
950   * @return archived location of a WAL file with the given path p
951   */
952  public static Path getWALArchivePath(Path archiveDir, Path p) {
953    return new Path(archiveDir, p.getName());
954  }
955
956  protected void archiveLogFile(final Path p) throws IOException {
957    Path newPath = getWALArchivePath(this.walArchiveDir, p);
958    // Tell our listeners that a log is going to be archived.
959    if (!this.listeners.isEmpty()) {
960      for (WALActionsListener i : this.listeners) {
961        i.preLogArchive(p, newPath);
962      }
963    }
964    LOG.info("Archiving " + p + " to " + newPath);
965    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
966      throw new IOException("Unable to rename " + p + " to " + newPath);
967    }
968    // Tell our listeners that a log has been archived.
969    if (!this.listeners.isEmpty()) {
970      for (WALActionsListener i : this.listeners) {
971        i.postLogArchive(p, newPath);
972      }
973    }
974  }
975
976  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
977    int oldNumEntries = this.numEntries.getAndSet(0);
978    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
979    if (oldPath != null) {
980      this.walFile2Props.put(oldPath,
981        new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
982      this.totalLogSize.addAndGet(oldFileLen);
983      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
984        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
985        newPathString);
986    } else {
987      LOG.info("New WAL {}", newPathString);
988    }
989  }
990
991  private Span createSpan(String name) {
992    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
993  }
994
995  /**
996   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
997   * <p/>
998   * <ul>
999   * <li>In the case of creating a new WAL, oldPath will be null.</li>
1000   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
1001   * </li>
1002   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
1003   * null.</li>
1004   * </ul>
1005   * @param oldPath    may be null
1006   * @param newPath    may be null
1007   * @param nextWriter may be null
1008   * @return the passed in <code>newPath</code>
1009   * @throws IOException if there is a problem flushing or closing the underlying FS
1010   */
1011  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
1012    return TraceUtil.trace(() -> {
1013      doReplaceWriter(oldPath, newPath, nextWriter);
1014      return newPath;
1015    }, () -> createSpan("WAL.replaceWriter"));
1016  }
1017
1018  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
1019    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
1020    try {
1021      if (syncFuture != null) {
1022        if (closed) {
1023          throw new IOException("WAL has been closed");
1024        } else {
1025          syncFuture.get(walSyncTimeoutNs);
1026        }
1027      }
1028    } catch (TimeoutIOException tioe) {
1029      throw new WALSyncTimeoutIOException(tioe);
1030    } catch (InterruptedException ie) {
1031      LOG.warn("Interrupted", ie);
1032      throw convertInterruptedExceptionToIOException(ie);
1033    } catch (ExecutionException e) {
1034      throw ensureIOException(e.getCause());
1035    }
1036  }
1037
1038  private static IOException ensureIOException(final Throwable t) {
1039    return (t instanceof IOException) ? (IOException) t : new IOException(t);
1040  }
1041
1042  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1043    Thread.currentThread().interrupt();
1044    IOException ioe = new InterruptedIOException();
1045    ioe.initCause(ie);
1046    return ioe;
1047  }
1048
1049  private W createCombinedWriter(W localWriter, Path localPath)
1050    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
1051    // retry forever if we can not create the remote writer to prevent aborting the RS due to log
1052    // rolling error, unless the skipRemoteWal is set to true.
1053    // TODO: since for now we only have one thread doing log rolling, this may block the rolling for
1054    // other wals
1055    Path remoteWAL = new Path(remoteWALDir, localPath.getName());
1056    for (int retry = 0;; retry++) {
1057      if (skipRemoteWAL) {
1058        return localWriter;
1059      }
1060      W remoteWriter;
1061      try {
1062        remoteWriter = createWriterInstance(remoteFs, remoteWAL);
1063      } catch (IOException e) {
1064        LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
1065        try {
1066          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
1067        } catch (InterruptedException ie) {
1068          // restore the interrupt state
1069          Thread.currentThread().interrupt();
1070          // must close local writer here otherwise no one will close it for us
1071          Closeables.close(localWriter, true);
1072          throw (IOException) new InterruptedIOException().initCause(ie);
1073        }
1074        continue;
1075      }
1076      return createCombinedWriter(localWriter, remoteWriter);
1077    }
1078  }
1079
1080  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
1081    rollWriterLock.lock();
1082    try {
1083      if (this.closed) {
1084        throw new WALClosedException("WAL has been closed");
1085      }
1086      // Return if nothing to flush.
1087      if (!force && this.writer != null && this.numEntries.get() <= 0) {
1088        return null;
1089      }
1090      Map<byte[], List<byte[]>> regionsToFlush = null;
1091      try {
1092        Path oldPath = getOldPath();
1093        Path newPath = getNewPath();
1094        // Any exception from here on is catastrophic, non-recoverable, so we currently abort.
1095        W nextWriter = this.createWriterInstance(fs, newPath);
1096        if (remoteFs != null) {
1097          // create a remote wal if necessary
1098          nextWriter = createCombinedWriter(nextWriter, newPath);
1099        }
1100        tellListenersAboutPreLogRoll(oldPath, newPath);
1101        // NewPath could be equal to oldPath if replaceWriter fails.
1102        newPath = replaceWriter(oldPath, newPath, nextWriter);
1103        tellListenersAboutPostLogRoll(oldPath, newPath);
1104        if (LOG.isDebugEnabled()) {
1105          LOG.debug("Create new " + implClassName + " writer with pipeline: "
1106            + Arrays.toString(getPipeline()));
1107        }
1108        // We got a new writer, so reset the slow sync count
1109        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
1110        slowSyncCount.set(0);
1111        // Can we delete any of the old log files?
1112        if (getNumRolledLogFiles() > 0) {
1113          cleanOldLogs();
1114          regionsToFlush = findRegionsToForceFlush();
1115        }
1116      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
1117        // If the underlying FileSystem can't do what we ask, treat as IO failure, so
1118        // we'll abort.
1119        throw new IOException(
1120          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
1121          exception);
1122      }
1123      return regionsToFlush;
1124    } finally {
1125      rollWriterLock.unlock();
1126    }
1127  }
1128
1129  @Override
1130  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
1131    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
1132  }
1133
1134  // public only until class moves to o.a.h.h.wal
1135  /** Returns the size of log files in use */
1136  public long getLogFileSize() {
1137    return this.totalLogSize.get();
1138  }
1139
1140  // public only until class moves to o.a.h.h.wal
1141  public void requestLogRoll() {
1142    requestLogRoll(ERROR);
1143  }
1144
1145  /**
1146   * Get the backing files associated with this WAL.
1147   * @return may be null if there are no files.
1148   */
1149  FileStatus[] getFiles() throws IOException {
1150    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
1151  }
1152
1153  @Override
1154  public void shutdown() throws IOException {
1155    if (!shutdown.compareAndSet(false, true)) {
1156      return;
1157    }
1158    closed = true;
1159    // Tell our listeners that the log is closing
1160    if (!this.listeners.isEmpty()) {
1161      for (WALActionsListener i : this.listeners) {
1162        i.logCloseRequested();
1163      }
1164    }
1165
1166    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
1167      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
1168
1169    Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
1170      @Override
1171      public Void call() throws Exception {
1172        if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
1173          try {
1174            doShutdown();
1175            if (syncFutureCache != null) {
1176              syncFutureCache.clear();
1177            }
1178          } finally {
1179            rollWriterLock.unlock();
1180          }
1181        } else {
1182          throw new IOException("Waiting for rollWriterLock timeout");
1183        }
1184        return null;
1185      }
1186    });
1187    shutdownExecutor.shutdown();
1188
1189    try {
1190      future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
1191    } catch (InterruptedException e) {
1192      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1193    } catch (TimeoutException e) {
1194      throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1195        + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1196        + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1197        + "\"", e);
1198    } catch (ExecutionException e) {
1199      if (e.getCause() instanceof IOException) {
1200        throw (IOException) e.getCause();
1201      } else {
1202        throw new IOException(e.getCause());
1203      }
1204    } finally {
1205      // in shutdown, we may call cleanOldLogs so shutdown this executor in the end.
1206      // In sync replication implementation, we may shut down a WAL without shutting down the whole
1207      // region server, if we shut down this executor earlier we may get reject execution exception
1208      // and abort the region server
1209      logArchiveExecutor.shutdown();
1210    }
1211    // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1212    // have some pending archiving tasks not finished yet, and in close we may archive all the
1213    // remaining WAL files, there could be race if we do not wait for the background archive task
1214    // finish
1215    try {
1216      if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1217        throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1218          + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1219          + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1220          + "\"");
1221      }
1222    } catch (InterruptedException e) {
1223      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1224    }
1225  }
1226
1227  @Override
1228  public void close() throws IOException {
1229    shutdown();
1230    final FileStatus[] files = getFiles();
1231    if (null != files && 0 != files.length) {
1232      for (FileStatus file : files) {
1233        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
1234        // Tell our listeners that a log is going to be archived.
1235        if (!this.listeners.isEmpty()) {
1236          for (WALActionsListener i : this.listeners) {
1237            i.preLogArchive(file.getPath(), p);
1238          }
1239        }
1240
1241        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1242          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1243        }
1244        // Tell our listeners that a log was archived.
1245        if (!this.listeners.isEmpty()) {
1246          for (WALActionsListener i : this.listeners) {
1247            i.postLogArchive(file.getPath(), p);
1248          }
1249        }
1250      }
1251      LOG.debug(
1252        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
1253    }
1254    LOG.info("Closed WAL: " + toString());
1255  }
1256
1257  /** Returns number of WALs currently in the process of closing. */
1258  public int getInflightWALCloseCount() {
1259    return inflightWALClosures.size();
1260  }
1261
1262  /**
1263   * updates the sequence number of a specific store. depending on the flag: replaces current seq
1264   * number if the given seq id is bigger, or even if it is lower than existing one
1265   */
1266  @Override
1267  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
1268    boolean onlyIfGreater) {
1269    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
1270  }
1271
1272  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1273    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
1274  }
1275
1276  protected boolean isLogRollRequested() {
1277    return rollRequested.get();
1278  }
1279
1280  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
1281    // If we have already requested a roll, don't do it again
1282    // And only set rollRequested to true when there is a registered listener
1283    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
1284      for (WALActionsListener i : this.listeners) {
1285        i.logRollRequested(reason);
1286      }
1287    }
1288  }
1289
1290  long getUnflushedEntriesCount() {
1291    long highestSynced = this.highestSyncedTxid.get();
1292    long highestUnsynced = this.highestUnsyncedTxid;
1293    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1294  }
1295
1296  boolean isUnflushedEntries() {
1297    return getUnflushedEntriesCount() > 0;
1298  }
1299
1300  /**
1301   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1302   */
1303  protected void atHeadOfRingBufferEventHandlerAppend() {
1304    // Noop
1305  }
1306
1307  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1308    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1309    atHeadOfRingBufferEventHandlerAppend();
1310    long start = EnvironmentEdgeManager.currentTime();
1311    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1312    long regionSequenceId = entry.getKey().getSequenceId();
1313
1314    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1315    // region sequence id only, a region edit/sequence id that is not associated with an actual
1316    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1317    if (entry.getEdit().isEmpty()) {
1318      return false;
1319    }
1320
1321    // Coprocessor hook.
1322    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1323    if (!listeners.isEmpty()) {
1324      for (WALActionsListener i : listeners) {
1325        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1326      }
1327    }
1328    doAppend(writer, entry);
1329    assert highestUnsyncedTxid < entry.getTxid();
1330    highestUnsyncedTxid = entry.getTxid();
1331    if (entry.isCloseRegion()) {
1332      // let's clean all the records of this region
1333      sequenceIdAccounting.onRegionClose(encodedRegionName);
1334    } else {
1335      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1336        entry.isInMemStore());
1337    }
1338    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1339    // Update metrics.
1340    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1341    numEntries.incrementAndGet();
1342    return true;
1343  }
1344
1345  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1346    long len = 0;
1347    if (!listeners.isEmpty()) {
1348      for (Cell cell : e.getEdit().getCells()) {
1349        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1350      }
1351      for (WALActionsListener listener : listeners) {
1352        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1353      }
1354    }
1355    return len;
1356  }
1357
1358  protected final void postSync(long timeInNanos, int handlerSyncs) {
1359    if (timeInNanos > this.slowSyncNs) {
1360      String msg = new StringBuilder().append("Slow sync cost: ")
1361        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1362        .append(Arrays.toString(getPipeline())).toString();
1363      LOG.info(msg);
1364      if (timeInNanos > this.rollOnSyncNs) {
1365        // A single sync took too long.
1366        // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1367        // effects. Here we have a single data point that indicates we should take immediate
1368        // action, so do so.
1369        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1370          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1371          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1372          + Arrays.toString(getPipeline()));
1373        requestLogRoll(SLOW_SYNC);
1374      }
1375      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1376    }
1377    if (!listeners.isEmpty()) {
1378      for (WALActionsListener listener : listeners) {
1379        listener.postSync(timeInNanos, handlerSyncs);
1380      }
1381    }
1382  }
1383
1384  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1385    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1386    if (this.closed) {
1387      throw new IOException(
1388        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1389    }
1390    MutableLong txidHolder = new MutableLong();
1391    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1392      txidHolder.setValue(ringBuffer.next());
1393    });
1394    long txid = txidHolder.longValue();
1395    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
1396    try {
1397      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1398      entry.stampRegionSequenceId(we);
1399      ringBuffer.get(txid).load(entry);
1400    } finally {
1401      ringBuffer.publish(txid);
1402    }
1403    return txid;
1404  }
1405
1406  @Override
1407  public String toString() {
1408    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1409  }
1410
1411  /**
1412   * if the given {@code path} is being written currently, then return its length.
1413   * <p>
1414   * This is used by replication to prevent replicating unacked log entries. See
1415   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1416   */
1417  @Override
1418  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1419    rollWriterLock.lock();
1420    try {
1421      Path currentPath = getOldPath();
1422      if (path.equals(currentPath)) {
1423        // Currently active path.
1424        W writer = this.writer;
1425        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1426      } else {
1427        W temp = inflightWALClosures.get(path.getName());
1428        if (temp != null) {
1429          // In the process of being closed, trailer bytes may or may not be flushed.
1430          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1431          // use cases like replication, see HBASE-25924/HBASE-25932.
1432          return OptionalLong.of(temp.getSyncedLength());
1433        }
1434        // Log rolled successfully.
1435        return OptionalLong.empty();
1436      }
1437    } finally {
1438      rollWriterLock.unlock();
1439    }
1440  }
1441
1442  @Override
1443  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1444    return TraceUtil.trace(() -> append(info, key, edits, true),
1445      () -> createSpan("WAL.appendData"));
1446  }
1447
1448  @Override
1449  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1450    return TraceUtil.trace(() -> append(info, key, edits, false),
1451      () -> createSpan("WAL.appendMarker"));
1452  }
1453
1454  /**
1455   * Helper that marks the future as DONE and offers it back to the cache.
1456   */
1457  protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
1458    future.done(txid, t);
1459    syncFutureCache.offer(future);
1460  }
1461
1462  private static boolean waitingRoll(int epochAndState) {
1463    return (epochAndState & 1) != 0;
1464  }
1465
1466  private static boolean writerBroken(int epochAndState) {
1467    return ((epochAndState >>> 1) & 1) != 0;
1468  }
1469
1470  private static int epoch(int epochAndState) {
1471    return epochAndState >>> 2;
1472  }
1473
1474  // return whether we have successfully set readyForRolling to true.
1475  private boolean trySetReadyForRolling() {
1476    // Check without holding lock first. Usually we will just return here.
1477    // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe
1478    // to check them outside the consumeLock.
1479    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
1480      return false;
1481    }
1482    consumeLock.lock();
1483    try {
1484      // 1. a roll is requested
1485      // 2. all out-going entries have been acked(we have confirmed above).
1486      if (waitingRoll(epochAndState)) {
1487        readyForRolling = true;
1488        readyForRollingCond.signalAll();
1489        return true;
1490      } else {
1491        return false;
1492      }
1493    } finally {
1494      consumeLock.unlock();
1495    }
1496  }
1497
1498  private void syncFailed(long epochWhenSync, Throwable error) {
1499    LOG.warn("sync failed", error);
1500    this.onException(epochWhenSync, error);
1501  }
1502
1503  private void onException(long epochWhenSync, Throwable error) {
1504    boolean shouldRequestLogRoll = true;
1505    consumeLock.lock();
1506    try {
1507      int currentEpochAndState = epochAndState;
1508      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
1509        // this is not the previous writer which means we have already rolled the writer.
1510        // or this is still the current writer, but we have already marked it as broken and request
1511        // a roll.
1512        return;
1513      }
1514      this.epochAndState = currentEpochAndState | 0b10;
1515      if (waitingRoll(currentEpochAndState)) {
1516        readyForRolling = true;
1517        readyForRollingCond.signalAll();
1518        // this means we have already in the middle of a rollWriter so just tell the roller thread
1519        // that you can continue without requesting an extra log roll.
1520        shouldRequestLogRoll = false;
1521      }
1522    } finally {
1523      consumeLock.unlock();
1524    }
1525    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
1526      toWriteAppends.addFirst(iter.next());
1527    }
1528    highestUnsyncedTxid = highestSyncedTxid.get();
1529    if (shouldRequestLogRoll) {
1530      // request a roll.
1531      requestLogRoll(ERROR);
1532    }
1533  }
1534
1535  private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) {
1536    // Please see the last several comments on HBASE-22761, it is possible that we get a
1537    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
1538    // writer. So here we will also check on the epoch and state, if the epoch has already been
1539    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
1540    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
1541    // cause data corruption.
1542    // The syncCompleted call is on the critical write path, so we should try our best to make it
1543    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
1544    // there are only 3 possible situations:
1545    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
1546    // Before rolling actually happen, we will only change the state to waitingRoll which is another
1547    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
1548    // no outgoing sync request. So we will always pass the check here and there is no problem.
1549    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
1550    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
1551    // situation with #1.
1552    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
1553    // only 2 possible situations:
1554    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
1555    // and give up.
1556    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
1557    // and give up.
1558    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
1559    // epochAndState as a whole.
1560    // So in general, for all the cases above, we do not need to hold the consumeLock.
1561    int epochAndState = this.epochAndState;
1562    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
1563      LOG.warn("Got a sync complete call after the writer is broken, skip");
1564      return;
1565    }
1566
1567    if (processedTxid < highestSyncedTxid.get()) {
1568      return;
1569    }
1570    highestSyncedTxid.set(processedTxid);
1571    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
1572      FSWALEntry entry = iter.next();
1573      if (entry.getTxid() <= processedTxid) {
1574        entry.release();
1575        iter.remove();
1576      } else {
1577        break;
1578      }
1579    }
1580    postSync(System.nanoTime() - startTimeNs, finishSync());
1581    /**
1582     * This method is used to be compatible with the original logic of {@link FSHLog}.
1583     */
1584    checkSlowSyncCount();
1585    if (trySetReadyForRolling()) {
1586      // we have just finished a roll, then do not need to check for log rolling, the writer will be
1587      // closed soon.
1588      return;
1589    }
1590    // If we haven't already requested a roll, check if we have exceeded logrollsize
1591    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
1592      if (LOG.isDebugEnabled()) {
1593        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
1594          + ", logrollsize=" + logrollsize);
1595      }
1596      requestLogRoll(SIZE);
1597    }
1598  }
1599
1600  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
1601  // sync futures then just use the default one.
1602  private boolean isHsync(long beginTxid, long endTxid) {
1603    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
1604      new SyncFuture().reset(endTxid + 1, false));
1605    if (futures.isEmpty()) {
1606      return useHsync;
1607    }
1608    for (SyncFuture future : futures) {
1609      if (future.isForceSync()) {
1610        return true;
1611      }
1612    }
1613    return false;
1614  }
1615
1616  private void sync(W writer) {
1617    fileLengthAtLastSync = writer.getLength();
1618    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
1619    boolean shouldUseHsync =
1620      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
1621    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
1622    final long startTimeNs = System.nanoTime();
1623    final long epoch = (long) epochAndState >>> 2L;
1624    addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid),
1625      (result, error) -> {
1626        if (error != null) {
1627          syncFailed(epoch, error);
1628        } else {
1629          long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result);
1630          syncCompleted(epoch, writer, syncedTxid, startTimeNs);
1631        }
1632      }, consumeExecutor);
1633  }
1634
1635  /**
1636   * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use
1637   * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling
1638   * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we
1639   * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as
1640   * successful syncedTxid.
1641   */
1642  protected long getSyncedTxid(long processedTxid, long completableFutureResult) {
1643    return processedTxid;
1644  }
1645
1646  protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync,
1647    long txidWhenSyn);
1648
1649  private int finishSyncLowerThanTxid(long txid) {
1650    int finished = 0;
1651    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
1652      SyncFuture sync = iter.next();
1653      if (sync.getTxid() <= txid) {
1654        markFutureDoneAndOffer(sync, txid, null);
1655        iter.remove();
1656        finished++;
1657      } else {
1658        break;
1659      }
1660    }
1661    return finished;
1662  }
1663
1664  // try advancing the highestSyncedTxid as much as possible
1665  private int finishSync() {
1666    if (unackedAppends.isEmpty()) {
1667      // All outstanding appends have been acked.
1668      if (toWriteAppends.isEmpty()) {
1669        // Also no appends that wait to be written out, then just finished all pending syncs.
1670        long maxSyncTxid = highestSyncedTxid.get();
1671        for (SyncFuture sync : syncFutures) {
1672          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
1673          markFutureDoneAndOffer(sync, maxSyncTxid, null);
1674        }
1675        highestSyncedTxid.set(maxSyncTxid);
1676        int finished = syncFutures.size();
1677        syncFutures.clear();
1678        return finished;
1679      } else {
1680        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
1681        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
1682        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
1683        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
1684        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
1685        long doneTxid = lowestUnprocessedAppendTxid - 1;
1686        highestSyncedTxid.set(doneTxid);
1687        return finishSyncLowerThanTxid(doneTxid);
1688      }
1689    } else {
1690      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
1691      // first unacked append minus 1.
1692      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
1693      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
1694      highestSyncedTxid.set(doneTxid);
1695      return finishSyncLowerThanTxid(doneTxid);
1696    }
1697  }
1698
1699  // confirm non-empty before calling
1700  private static long getLastTxid(Deque<FSWALEntry> queue) {
1701    return queue.peekLast().getTxid();
1702  }
1703
1704  private void appendAndSync() throws IOException {
1705    final W writer = this.writer;
1706    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
1707    // finish some.
1708    finishSync();
1709    long newHighestProcessedAppendTxid = -1L;
1710    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
1711    // threaded, this could save us some cycles
1712    boolean addedToUnackedAppends = false;
1713    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
1714      FSWALEntry entry = iter.next();
1715      /**
1716       * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not
1717       * throw any IOException.
1718       */
1719      boolean appended = appendEntry(writer, entry);
1720      newHighestProcessedAppendTxid = entry.getTxid();
1721      iter.remove();
1722      if (appended) {
1723        // This is possible, when we fail to sync, we will add the unackedAppends back to
1724        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
1725        if (
1726          addedToUnackedAppends || unackedAppends.isEmpty()
1727            || getLastTxid(unackedAppends) < entry.getTxid()
1728        ) {
1729          unackedAppends.addLast(entry);
1730          addedToUnackedAppends = true;
1731        }
1732        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
1733        // unackedAppends out. As the code in the consume method will assume that, the entries in
1734        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
1735        // not empty, we could just return as later there will be a syncCompleted call to clear the
1736        // unackedAppends, or a syncFailed to lead us to another state.
1737        // There could be other ways to fix, such as changing the logic in the consume method, but
1738        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
1739        // this way to fix first, can optimize later.
1740        if (
1741          writer.getLength() - fileLengthAtLastSync >= batchSize
1742            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
1743        ) {
1744          break;
1745        }
1746      }
1747    }
1748    // if we have a newer transaction id, update it.
1749    // otherwise, use the previous transaction id.
1750    if (newHighestProcessedAppendTxid > 0) {
1751      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
1752    } else {
1753      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
1754    }
1755
1756    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
1757      // sync because buffer size limit.
1758      sync(writer);
1759      return;
1760    }
1761    if (writer.getLength() == fileLengthAtLastSync) {
1762      // we haven't written anything out, just advance the highestSyncedSequence since we may only
1763      // stamp some region sequence id.
1764      if (unackedAppends.isEmpty()) {
1765        highestSyncedTxid.set(highestProcessedAppendTxid);
1766        finishSync();
1767        trySetReadyForRolling();
1768      }
1769      return;
1770    }
1771    // reach here means that we have some unsynced data but haven't reached the batch size yet,
1772    // but we will not issue a sync directly here even if there are sync requests because we may
1773    // have some new data in the ringbuffer, so let's just return here and delay the decision of
1774    // whether to issue a sync in the caller method.
1775  }
1776
1777  private void consume() {
1778    consumeLock.lock();
1779    try {
1780      int currentEpochAndState = epochAndState;
1781      if (writerBroken(currentEpochAndState)) {
1782        return;
1783      }
1784      if (waitingRoll(currentEpochAndState)) {
1785        if (writer.getLength() > fileLengthAtLastSync) {
1786          // issue a sync
1787          sync(writer);
1788        } else {
1789          if (unackedAppends.isEmpty()) {
1790            readyForRolling = true;
1791            readyForRollingCond.signalAll();
1792          }
1793        }
1794        return;
1795      }
1796    } finally {
1797      consumeLock.unlock();
1798    }
1799    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
1800    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
1801        <= cursorBound; nextCursor++) {
1802      if (!waitingConsumePayloads.isPublished(nextCursor)) {
1803        break;
1804      }
1805      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
1806      switch (truck.type()) {
1807        case APPEND:
1808          toWriteAppends.addLast(truck.unloadAppend());
1809          break;
1810        case SYNC:
1811          syncFutures.add(truck.unloadSync());
1812          break;
1813        default:
1814          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
1815          break;
1816      }
1817      waitingConsumePayloadsGatingSequence.set(nextCursor);
1818    }
1819
1820    /**
1821     * This method is used to be compatible with the original logic of {@link AsyncFSWAL}.
1822     */
1823    if (markerEditOnly) {
1824      drainNonMarkerEditsAndFailSyncs();
1825    }
1826    try {
1827      appendAndSync();
1828    } catch (IOException exception) {
1829      /**
1830       * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't
1831       * go in here.
1832       */
1833      LOG.error("appendAndSync throws IOException.", exception);
1834      onAppendEntryFailed(exception);
1835      return;
1836    }
1837    if (hasConsumerTask.get()) {
1838      return;
1839    }
1840    if (toWriteAppends.isEmpty()) {
1841      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1842        consumerScheduled.set(false);
1843        // recheck here since in append and sync we do not hold the consumeLock. Thing may
1844        // happen like
1845        // 1. we check cursor, no new entry
1846        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
1847        // give up scheduling the consumer task.
1848        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
1849        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1850          // we will give up consuming so if there are some unsynced data we need to issue a sync.
1851          if (
1852            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
1853              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
1854          ) {
1855            // no new data in the ringbuffer and we have at least one sync request
1856            sync(writer);
1857          }
1858          return;
1859        } else {
1860          // maybe someone has grabbed this before us
1861          if (!consumerScheduled.compareAndSet(false, true)) {
1862            return;
1863          }
1864        }
1865      }
1866    }
1867    // reschedule if we still have something to write.
1868    consumeExecutor.execute(consumer);
1869  }
1870
1871  private boolean shouldScheduleConsumer() {
1872    int currentEpochAndState = epochAndState;
1873    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
1874      return false;
1875    }
1876    return consumerScheduled.compareAndSet(false, true);
1877  }
1878
1879  /**
1880   * Append a set of edits to the WAL.
1881   * <p/>
1882   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1883   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1884   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1885   * <p/>
1886   * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc
1887   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1888   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1889   * 'complete' the transaction this mvcc transaction by calling
1890   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1891   * in the finally of a try/finally block within which this appends lives and any subsequent
1892   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1893   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1894   * immediately available on return from this method. It WILL be available subsequent to a sync of
1895   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1896   * @param hri        the regioninfo associated with append
1897   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1898   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1899   *                   sequence id that is after all currently appended edits.
1900   * @param inMemstore Always true except for case where we are writing a region event meta marker
1901   *                   edit, for example, a compaction completion record into the WAL or noting a
1902   *                   Region Open event. In these cases the entry is just so we can finish an
1903   *                   unfinished compaction after a crash when the new Server reads the WAL on
1904   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1905   *                   When memstore is false, we presume a Marker event edit.
1906   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1907   *         in it.
1908   */
1909  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1910    throws IOException {
1911    if (markerEditOnly && !edits.isMetaEdit()) {
1912      throw new IOException("WAL is closing, only marker edit is allowed");
1913    }
1914    long txid =
1915      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
1916    if (shouldScheduleConsumer()) {
1917      consumeExecutor.execute(consumer);
1918    }
1919    return txid;
1920  }
1921
1922  protected void doSync(boolean forceSync) throws IOException {
1923    long txid = waitingConsumePayloads.next();
1924    SyncFuture future;
1925    try {
1926      future = getSyncFuture(txid, forceSync);
1927      RingBufferTruck truck = waitingConsumePayloads.get(txid);
1928      truck.load(future);
1929    } finally {
1930      waitingConsumePayloads.publish(txid);
1931    }
1932    if (shouldScheduleConsumer()) {
1933      consumeExecutor.execute(consumer);
1934    }
1935    blockOnSync(future);
1936  }
1937
1938  protected void doSync(long txid, boolean forceSync) throws IOException {
1939    if (highestSyncedTxid.get() >= txid) {
1940      return;
1941    }
1942    // here we do not use ring buffer sequence as txid
1943    long sequence = waitingConsumePayloads.next();
1944    SyncFuture future;
1945    try {
1946      future = getSyncFuture(txid, forceSync);
1947      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
1948      truck.load(future);
1949    } finally {
1950      waitingConsumePayloads.publish(sequence);
1951    }
1952    if (shouldScheduleConsumer()) {
1953      consumeExecutor.execute(consumer);
1954    }
1955    blockOnSync(future);
1956  }
1957
1958  private void drainNonMarkerEditsAndFailSyncs() {
1959    if (toWriteAppends.isEmpty()) {
1960      return;
1961    }
1962    boolean hasNonMarkerEdits = false;
1963    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
1964    while (iter.hasNext()) {
1965      FSWALEntry entry = iter.next();
1966      if (!entry.getEdit().isMetaEdit()) {
1967        entry.release();
1968        hasNonMarkerEdits = true;
1969        break;
1970      }
1971    }
1972    if (hasNonMarkerEdits) {
1973      for (;;) {
1974        iter.remove();
1975        if (!iter.hasNext()) {
1976          break;
1977        }
1978        iter.next().release();
1979      }
1980      for (FSWALEntry entry : unackedAppends) {
1981        entry.release();
1982      }
1983      unackedAppends.clear();
1984      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
1985      // all the sync futures.
1986      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
1987      IOException error = new IOException("WAL is closing, only marker edit is allowed");
1988      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
1989        SyncFuture future = syncIter.next();
1990        if (future.getTxid() < txid) {
1991          markFutureDoneAndOffer(future, future.getTxid(), error);
1992          syncIter.remove();
1993        } else {
1994          break;
1995        }
1996      }
1997    }
1998  }
1999
2000  protected abstract W createWriterInstance(FileSystem fs, Path path)
2001    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
2002
2003  protected abstract W createCombinedWriter(W localWriter, W remoteWriter);
2004
2005  protected final void waitForSafePoint() {
2006    consumeLock.lock();
2007    try {
2008      int currentEpochAndState = epochAndState;
2009      if (writerBroken(currentEpochAndState) || this.writer == null) {
2010        return;
2011      }
2012      consumerScheduled.set(true);
2013      epochAndState = currentEpochAndState | 1;
2014      readyForRolling = false;
2015      consumeExecutor.execute(consumer);
2016      while (!readyForRolling) {
2017        readyForRollingCond.awaitUninterruptibly();
2018      }
2019    } finally {
2020      consumeLock.unlock();
2021    }
2022  }
2023
2024  protected final void closeWriter(W writer, Path path) {
2025    inflightWALClosures.put(path.getName(), writer);
2026    closeExecutor.execute(() -> {
2027      try {
2028        writer.close();
2029      } catch (IOException e) {
2030        LOG.warn("close old writer failed", e);
2031      } finally {
2032        // call this even if the above close fails, as there is no other chance we can set closed to
2033        // true, it will not cause big problems.
2034        markClosedAndClean(path);
2035        inflightWALClosures.remove(path.getName());
2036      }
2037    });
2038  }
2039
2040  /**
2041   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
2042   * will begin to work before returning from this method. If we clear the flag after returning from
2043   * this call, we may miss a roll request. The implementation class should choose a proper place to
2044   * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you
2045   * start writing to the new writer.
2046   */
2047  protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
2048    Preconditions.checkNotNull(nextWriter);
2049    waitForSafePoint();
2050    /**
2051     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2052     */
2053    doCleanUpResources();
2054    // we will call rollWriter in init method, where we want to create the first writer and
2055    // obviously the previous writer is null, so here we need this null check. And why we must call
2056    // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
2057    // closing the writer asynchronously, we need to make sure the WALProps is put into
2058    // walFile2Props before we call markClosedAndClean
2059    if (writer != null) {
2060      long oldFileLen = writer.getLength();
2061      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
2062      closeWriter(writer, oldPath);
2063    } else {
2064      logRollAndSetupWalProps(oldPath, newPath, 0);
2065    }
2066    this.writer = nextWriter;
2067    /**
2068     * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem
2069     * output after writer is replaced.
2070     */
2071    onWriterReplaced(nextWriter);
2072    this.fileLengthAtLastSync = nextWriter.getLength();
2073    this.highestProcessedAppendTxidAtLastSync = 0L;
2074    consumeLock.lock();
2075    try {
2076      consumerScheduled.set(true);
2077      int currentEpoch = epochAndState >>> 2;
2078      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
2079      // set a new epoch and also clear waitingRoll and writerBroken
2080      this.epochAndState = nextEpoch << 2;
2081      // Reset rollRequested status
2082      rollRequested.set(false);
2083      consumeExecutor.execute(consumer);
2084    } finally {
2085      consumeLock.unlock();
2086    }
2087  }
2088
2089  protected abstract void onWriterReplaced(W nextWriter);
2090
2091  protected void doShutdown() throws IOException {
2092    waitForSafePoint();
2093    /**
2094     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2095     */
2096    doCleanUpResources();
2097    if (this.writer != null) {
2098      closeWriter(this.writer, getOldPath());
2099      this.writer = null;
2100    }
2101    closeExecutor.shutdown();
2102    try {
2103      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
2104        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
2105          + " the close of async writer doesn't complete."
2106          + "Please check the status of underlying filesystem"
2107          + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey
2108          + "\"");
2109      }
2110    } catch (InterruptedException e) {
2111      LOG.error("The wait for close of async writer is interrupted");
2112      Thread.currentThread().interrupt();
2113    }
2114    IOException error = new IOException("WAL has been closed");
2115    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
2116    // drain all the pending sync requests
2117    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
2118        <= cursorBound; nextCursor++) {
2119      if (!waitingConsumePayloads.isPublished(nextCursor)) {
2120        break;
2121      }
2122      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
2123      switch (truck.type()) {
2124        case SYNC:
2125          syncFutures.add(truck.unloadSync());
2126          break;
2127        default:
2128          break;
2129      }
2130    }
2131    // and fail them
2132    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
2133    if (this.shouldShutDownConsumeExecutorWhenClose) {
2134      consumeExecutor.shutdown();
2135    }
2136  }
2137
2138  protected void doCleanUpResources() {
2139  };
2140
2141  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
2142
2143  /**
2144   * This method gets the pipeline for the current WAL.
2145   */
2146  abstract DatanodeInfo[] getPipeline();
2147
2148  /**
2149   * This method gets the datanode replication count for the current WAL.
2150   */
2151  abstract int getLogReplication();
2152
2153  protected abstract boolean doCheckLogLowReplication();
2154
2155  protected boolean isWriterBroken() {
2156    return writerBroken(epochAndState);
2157  }
2158
2159  private void onAppendEntryFailed(IOException exception) {
2160    LOG.warn("append entry failed", exception);
2161    final long currentEpoch = (long) epochAndState >>> 2L;
2162    this.onException(currentEpoch, exception);
2163  }
2164
2165  protected void checkSlowSyncCount() {
2166  }
2167
2168  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
2169  protected boolean doCheckSlowSync() {
2170    boolean result = false;
2171    long now = EnvironmentEdgeManager.currentTime();
2172    long elapsedTime = now - lastTimeCheckSlowSync;
2173    if (elapsedTime >= slowSyncCheckInterval) {
2174      if (slowSyncCount.get() >= slowSyncRollThreshold) {
2175        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
2176          // If two or more slowSyncCheckInterval have elapsed this is a corner case
2177          // where a train of slow syncs almost triggered us but then there was a long
2178          // interval from then until the one more that pushed us over. If so, we
2179          // should do nothing and let the count reset.
2180          if (LOG.isDebugEnabled()) {
2181            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
2182              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
2183              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
2184          }
2185          // Fall through to count reset below
2186        } else {
2187          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
2188            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
2189            + Arrays.toString(getPipeline()));
2190          result = true;
2191        }
2192      }
2193      lastTimeCheckSlowSync = now;
2194      slowSyncCount.set(0);
2195    }
2196    return result;
2197  }
2198
2199  public void checkLogLowReplication(long checkInterval) {
2200    long now = EnvironmentEdgeManager.currentTime();
2201    if (now - lastTimeCheckLowReplication < checkInterval) {
2202      return;
2203    }
2204    // Will return immediately if we are in the middle of a WAL log roll currently.
2205    if (!rollWriterLock.tryLock()) {
2206      return;
2207    }
2208    try {
2209      lastTimeCheckLowReplication = now;
2210      if (doCheckLogLowReplication()) {
2211        requestLogRoll(LOW_REPLICATION);
2212      }
2213    } finally {
2214      rollWriterLock.unlock();
2215    }
2216  }
2217
2218  // Allow temporarily skipping the creation of remote writer. When failing to write to the remote
2219  // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we
2220  // need to write a close marker when closing a region, and if it fails, the whole rs will abort.
2221  // So here we need to skip the creation of remote writer and make it possible to write the region
2222  // close marker.
2223  // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
2224  // any pending wal entries as they will be discarded. The remote cluster will replicate the
2225  // correct data back later. We still need to allow writing marker edits such as close region event
2226  // to allow closing a region.
2227  @Override
2228  public void skipRemoteWAL(boolean markerEditOnly) {
2229    if (markerEditOnly) {
2230      this.markerEditOnly = true;
2231    }
2232    this.skipRemoteWAL = true;
2233  }
2234
2235  private static void split(final Configuration conf, final Path p) throws IOException {
2236    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
2237    if (!fs.exists(p)) {
2238      throw new FileNotFoundException(p.toString());
2239    }
2240    if (!fs.getFileStatus(p).isDirectory()) {
2241      throw new IOException(p + " is not a directory");
2242    }
2243
2244    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
2245    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
2246    if (
2247      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
2248        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
2249    ) {
2250      archiveDir = new Path(archiveDir, p.getName());
2251    }
2252    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
2253  }
2254
2255  W getWriter() {
2256    return this.writer;
2257  }
2258
2259  private static void usage() {
2260    System.err.println("Usage: AbstractFSWAL <ARGS>");
2261    System.err.println("Arguments:");
2262    System.err.println(" --dump  Dump textual representation of passed one or more files");
2263    System.err.println("         For example: "
2264      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
2265    System.err.println(" --split Split the passed directory of WAL logs");
2266    System.err.println(
2267      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
2268  }
2269
2270  /**
2271   * Pass one or more log file names, and it will either dump out a text version on
2272   * <code>stdout</code> or split the specified log files.
2273   */
2274  public static void main(String[] args) throws IOException {
2275    if (args.length < 2) {
2276      usage();
2277      System.exit(-1);
2278    }
2279    // either dump using the WALPrettyPrinter or split, depending on args
2280    if (args[0].compareTo("--dump") == 0) {
2281      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2282    } else if (args[0].compareTo("--perf") == 0) {
2283      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
2284      LOG.error(HBaseMarkers.FATAL,
2285        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
2286      System.exit(-1);
2287    } else if (args[0].compareTo("--split") == 0) {
2288      Configuration conf = HBaseConfiguration.create();
2289      for (int i = 1; i < args.length; i++) {
2290        try {
2291          Path logPath = new Path(args[i]);
2292          CommonFSUtils.setFsDefault(conf, logPath);
2293          split(conf, logPath);
2294        } catch (IOException t) {
2295          t.printStackTrace(System.err);
2296          System.exit(-1);
2297        }
2298      }
2299    } else {
2300      usage();
2301      System.exit(-1);
2302    }
2303  }
2304}