001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
023import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
024import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
027import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
028import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
029
030import com.lmax.disruptor.RingBuffer;
031import com.lmax.disruptor.Sequence;
032import com.lmax.disruptor.Sequencer;
033import io.opentelemetry.api.trace.Span;
034import java.io.FileNotFoundException;
035import java.io.IOException;
036import java.io.InterruptedIOException;
037import java.lang.management.MemoryType;
038import java.net.URLEncoder;
039import java.nio.charset.StandardCharsets;
040import java.util.ArrayDeque;
041import java.util.ArrayList;
042import java.util.Arrays;
043import java.util.Comparator;
044import java.util.Deque;
045import java.util.Iterator;
046import java.util.List;
047import java.util.Map;
048import java.util.OptionalLong;
049import java.util.Set;
050import java.util.SortedSet;
051import java.util.TreeSet;
052import java.util.concurrent.Callable;
053import java.util.concurrent.CompletableFuture;
054import java.util.concurrent.ConcurrentHashMap;
055import java.util.concurrent.ConcurrentNavigableMap;
056import java.util.concurrent.ConcurrentSkipListMap;
057import java.util.concurrent.CopyOnWriteArrayList;
058import java.util.concurrent.ExecutionException;
059import java.util.concurrent.ExecutorService;
060import java.util.concurrent.Executors;
061import java.util.concurrent.Future;
062import java.util.concurrent.LinkedBlockingQueue;
063import java.util.concurrent.ThreadPoolExecutor;
064import java.util.concurrent.TimeUnit;
065import java.util.concurrent.TimeoutException;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.atomic.AtomicInteger;
068import java.util.concurrent.atomic.AtomicLong;
069import java.util.concurrent.locks.Condition;
070import java.util.concurrent.locks.Lock;
071import java.util.concurrent.locks.ReentrantLock;
072import java.util.function.Supplier;
073import org.apache.commons.lang3.mutable.MutableLong;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileStatus;
076import org.apache.hadoop.fs.FileSystem;
077import org.apache.hadoop.fs.Path;
078import org.apache.hadoop.fs.PathFilter;
079import org.apache.hadoop.hbase.Abortable;
080import org.apache.hadoop.hbase.Cell;
081import org.apache.hadoop.hbase.HBaseConfiguration;
082import org.apache.hadoop.hbase.HConstants;
083import org.apache.hadoop.hbase.PrivateCellUtil;
084import org.apache.hadoop.hbase.client.ConnectionUtils;
085import org.apache.hadoop.hbase.client.RegionInfo;
086import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
087import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
088import org.apache.hadoop.hbase.ipc.RpcServer;
089import org.apache.hadoop.hbase.ipc.ServerCall;
090import org.apache.hadoop.hbase.log.HBaseMarkers;
091import org.apache.hadoop.hbase.regionserver.HRegion;
092import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
093import org.apache.hadoop.hbase.trace.TraceUtil;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.CommonFSUtils;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.Pair;
098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
099import org.apache.hadoop.hbase.wal.WAL;
100import org.apache.hadoop.hbase.wal.WALEdit;
101import org.apache.hadoop.hbase.wal.WALFactory;
102import org.apache.hadoop.hbase.wal.WALKeyImpl;
103import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
104import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
105import org.apache.hadoop.hbase.wal.WALSplitter;
106import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
107import org.apache.hadoop.util.StringUtils;
108import org.apache.yetus.audience.InterfaceAudience;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
113import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
114import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
115
116/**
117 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
118 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
119 * This is done internal to the implementation.
120 * <p>
121 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
122 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
123 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
124 * flushed out to hfiles, and what is yet in WAL and in memory only.
125 * <p>
126 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
127 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
128 * (smaller) than the most-recent flush.
129 * <p>
130 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
131 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
132 * replication where we may want to tail the active WAL file.
133 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
134 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
135 * we have made successful appends to the WAL and we then are unable to sync them, our current
136 * semantic is to return error to the client that the appends failed but also to abort the current
137 * context, usually the hosting server. We need to replay the WALs. <br>
138 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
139 * that the append failed. <br>
140 * TODO: replication may pick up these last edits though they have been marked as failed append
141 * (Need to keep our own file lengths, not rely on HDFS).
142 */
143@InterfaceAudience.Private
144public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
145  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
146
147  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
148    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
149
150  private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
151  private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
152  /** Don't log blocking regions more frequently than this. */
153  private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
154
155  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
156  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
157  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
158  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
159  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
160    "hbase.regionserver.wal.slowsync.roll.threshold";
161  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
162  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
163    "hbase.regionserver.wal.slowsync.roll.interval.ms";
164  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
165
166  public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
167  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
168
169  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
170
171  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
172
173  public static final String RING_BUFFER_SLOT_COUNT =
174    "hbase.regionserver.wal.disruptor.event.count";
175
176  public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
177  public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
178
179  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
180  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
181
182  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
183    "hbase.regionserver.wal.avoid-local-writes";
184  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
185
186  /**
187   * file system instance
188   */
189  protected final FileSystem fs;
190
191  /**
192   * WAL directory, where all WAL files would be placed.
193   */
194  protected final Path walDir;
195
196  private final FileSystem remoteFs;
197
198  private final Path remoteWALDir;
199
200  /**
201   * dir path where old logs are kept.
202   */
203  protected final Path walArchiveDir;
204
205  /**
206   * Matches just those wal files that belong to this wal instance.
207   */
208  protected final PathFilter ourFiles;
209
210  /**
211   * Prefix of a WAL file, usually the region server name it is hosted on.
212   */
213  protected final String walFilePrefix;
214
215  /**
216   * Suffix included on generated wal file names
217   */
218  protected final String walFileSuffix;
219
220  /**
221   * Prefix used when checking for wal membership.
222   */
223  protected final String prefixPathStr;
224
225  protected final WALCoprocessorHost coprocessorHost;
226
227  /**
228   * conf object
229   */
230  protected final Configuration conf;
231
232  protected final Abortable abortable;
233
234  /** Listeners that are called on WAL events. */
235  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
236
237  /** Tracks the logs in the process of being closed. */
238  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
239
240  /**
241   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
242   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
243   * facility for answering questions such as "Is it safe to GC a WAL?".
244   */
245  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
246
247  /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
248  protected final long slowSyncNs, rollOnSyncNs;
249  protected final int slowSyncRollThreshold;
250  protected final int slowSyncCheckInterval;
251  protected final AtomicInteger slowSyncCount = new AtomicInteger();
252
253  private final long walSyncTimeoutNs;
254
255  private final long walTooOldNs;
256
257  // If > than this size, roll the log.
258  protected final long logrollsize;
259
260  /**
261   * Block size to use writing files.
262   */
263  protected final long blocksize;
264
265  /*
266   * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If
267   * too many and we crash, then will take forever replaying. Keep the number of logs tidy.
268   */
269  protected final int maxLogs;
270
271  protected final boolean useHsync;
272
273  /**
274   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
275   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
276   * warning when it thinks synchronized controls writer thread safety. It is held when we are
277   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
278   * not.
279   */
280  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
281
282  // The timestamp (in ms) when the log file was created.
283  protected final AtomicLong filenum = new AtomicLong(-1);
284
285  // Number of transactions in the current Wal.
286  protected final AtomicInteger numEntries = new AtomicInteger(0);
287
288  /**
289   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
290   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
291   * corresponding entry in queue.
292   */
293  protected volatile long highestUnsyncedTxid = -1;
294
295  /**
296   * Updated to the transaction id of the last successful sync call. This can be less than
297   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
298   * for it.
299   */
300  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
301
302  /**
303   * The total size of wal
304   */
305  protected final AtomicLong totalLogSize = new AtomicLong(0);
306  /**
307   * Current log file.
308   */
309  volatile W writer;
310
311  // Last time to check low replication on hlog's pipeline
312  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
313
314  // Last time we asked to roll the log due to a slow sync
315  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
316
317  protected volatile boolean closed = false;
318
319  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
320
321  protected final long walShutdownTimeout;
322
323  private long nextLogTooOldNs = System.nanoTime();
324
325  /**
326   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
327   * an IllegalArgumentException if used to compare paths from different wals.
328   */
329  final Comparator<Path> LOG_NAME_COMPARATOR =
330    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
331
332  private static final class WALProps {
333
334    /**
335     * Map the encoded region name to the highest sequence id.
336     * <p/>
337     * Contains all the regions it has an entry for.
338     */
339    private final Map<byte[], Long> encodedName2HighestSequenceId;
340
341    /**
342     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
343     * subclasses.
344     */
345    private final long logSize;
346
347    /**
348     * The nanoTime of the log rolling, used to determine the time interval that has passed since.
349     */
350    private final long rollTimeNs;
351
352    /**
353     * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the
354     * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
355     * for safety.
356     */
357    private volatile boolean closed = false;
358
359    WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
360      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
361      this.logSize = logSize;
362      this.rollTimeNs = System.nanoTime();
363    }
364  }
365
366  /**
367   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
368   * (contained in the log file name).
369   */
370  protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
371    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
372
373  /**
374   * A cache of sync futures reused by threads.
375   */
376  protected final SyncFutureCache syncFutureCache;
377
378  /**
379   * The class name of the runtime implementation, used as prefix for logging/tracing.
380   * <p>
381   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
382   * refer to HBASE-17676 for more details
383   * </p>
384   */
385  protected final String implClassName;
386
387  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
388
389  protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
390    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
391
392  // Run in caller if we get reject execution exception, to avoid aborting region server when we get
393  // reject execution exception. Usually this should not happen but let's make it more robust.
394  private final ExecutorService logArchiveExecutor =
395    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
396      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
397      new ThreadPoolExecutor.CallerRunsPolicy());
398
399  private final int archiveRetries;
400
401  protected ExecutorService consumeExecutor;
402
403  private final Lock consumeLock = new ReentrantLock();
404
405  protected final Runnable consumer = this::consume;
406
407  // check if there is already a consumer task in the event loop's task queue
408  protected Supplier<Boolean> hasConsumerTask;
409
410  private static final int MAX_EPOCH = 0x3FFFFFFF;
411  // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old
412  // writer to be closed.
413  // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter
414  // is needed.
415  // all other bits are the epoch number of the current writer, this is used to detect whether the
416  // writer is still the one when you issue the sync.
417  // notice that, modification to this field is only allowed under the protection of consumeLock.
418  private volatile int epochAndState;
419
420  private boolean readyForRolling;
421
422  private final Condition readyForRollingCond = consumeLock.newCondition();
423
424  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
425
426  private final Sequence waitingConsumePayloadsGatingSequence;
427
428  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
429
430  private final long batchSize;
431
432  protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
433
434  protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
435
436  protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
437
438  // the highest txid of WAL entries being processed
439  protected long highestProcessedAppendTxid;
440
441  // file length when we issue last sync request on the writer
442  private long fileLengthAtLastSync;
443
444  private long highestProcessedAppendTxidAtLastSync;
445
446  private int waitOnShutdownInSeconds;
447
448  private String waitOnShutdownInSecondsConfigKey;
449
450  protected boolean shouldShutDownConsumeExecutorWhenClose = true;
451
452  private volatile boolean skipRemoteWAL = false;
453
454  private volatile boolean markerEditOnly = false;
455
456  public long getFilenum() {
457    return this.filenum.get();
458  }
459
460  /**
461   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
462   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
463   * the filename is created with the {@link #computeFilename(long filenum)} method.
464   * @return timestamp, as in the log file name.
465   */
466  protected long getFileNumFromFileName(Path fileName) {
467    checkNotNull(fileName, "file name can't be null");
468    if (!ourFiles.accept(fileName)) {
469      throw new IllegalArgumentException(
470        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
471    }
472    final String fileNameString = fileName.toString();
473    String chompedPath = fileNameString.substring(prefixPathStr.length(),
474      (fileNameString.length() - walFileSuffix.length()));
475    return Long.parseLong(chompedPath);
476  }
477
478  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
479    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
480    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
481  }
482
483  // must be power of 2
484  protected final int getPreallocatedEventCount() {
485    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
486    // be stuck and make no progress if the buffer is filled with appends only and there is no
487    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
488    // before they return.
489    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
490    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
491    int floor = Integer.highestOneBit(preallocatedEventCount);
492    if (floor == preallocatedEventCount) {
493      return floor;
494    }
495    // max capacity is 1 << 30
496    if (floor >= 1 << 29) {
497      return 1 << 30;
498    }
499    return floor << 1;
500  }
501
502  protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds,
503    String waitOnShutdownInSecondsConfigKey) {
504    this.waitOnShutdownInSeconds = waitOnShutdownInSeconds;
505    this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey;
506  }
507
508  protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir,
509    final String prefix) {
510    ThreadPoolExecutor threadPool =
511      new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
512        new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"
513          + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());
514    hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
515    consumeExecutor = threadPool;
516    this.shouldShutDownConsumeExecutorWhenClose = true;
517  }
518
519  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
520    final String logDir, final String archiveDir, final Configuration conf,
521    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
522    final String suffix, FileSystem remoteFs, Path remoteWALDir)
523    throws FailedLogCloseException, IOException {
524    this.fs = fs;
525    this.walDir = new Path(rootDir, logDir);
526    this.walArchiveDir = new Path(rootDir, archiveDir);
527    this.conf = conf;
528    this.abortable = abortable;
529    this.remoteFs = remoteFs;
530    this.remoteWALDir = remoteWALDir;
531
532    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
533      throw new IOException("Unable to mkdir " + walDir);
534    }
535
536    if (!fs.exists(this.walArchiveDir)) {
537      if (!fs.mkdirs(this.walArchiveDir)) {
538        throw new IOException("Unable to mkdir " + this.walArchiveDir);
539      }
540    }
541
542    // If prefix is null||empty then just name it wal
543    this.walFilePrefix = prefix == null || prefix.isEmpty()
544      ? "wal"
545      : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name());
546    // we only correctly differentiate suffices when numeric ones start with '.'
547    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
548      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
549        + "' but instead was '" + suffix + "'");
550    }
551    // Now that it exists, set the storage policy for the entire directory of wal files related to
552    // this FSHLog instance
553    String storagePolicy =
554      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
555    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
556    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
557    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
558
559    this.ourFiles = new PathFilter() {
560      @Override
561      public boolean accept(final Path fileName) {
562        // The path should start with dir/<prefix> and end with our suffix
563        final String fileNameString = fileName.toString();
564        if (!fileNameString.startsWith(prefixPathStr)) {
565          return false;
566        }
567        if (walFileSuffix.isEmpty()) {
568          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
569          return org.apache.commons.lang3.StringUtils
570            .isNumeric(fileNameString.substring(prefixPathStr.length()));
571        } else if (!fileNameString.endsWith(walFileSuffix)) {
572          return false;
573        }
574        return true;
575      }
576    };
577
578    if (failIfWALExists) {
579      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
580      if (null != walFiles && 0 != walFiles.length) {
581        throw new IOException("Target WAL already exists within directory " + walDir);
582      }
583    }
584
585    // Register listeners. TODO: Should this exist anymore? We have CPs?
586    if (listeners != null) {
587      for (WALActionsListener i : listeners) {
588        registerWALActionsListener(i);
589      }
590    }
591    this.coprocessorHost = new WALCoprocessorHost(this, conf);
592
593    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
594    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
595    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
596    // the block size but experience from the field has it that this was not enough time for the
597    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
598    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
599    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
600    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
601    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
602    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
603    this.logrollsize = (long) (this.blocksize * multiplier);
604    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
605
606    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
607      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
608      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
609      + ", maxLogs=" + this.maxLogs);
610    this.slowSyncNs =
611      TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
612    this.rollOnSyncNs = TimeUnit.MILLISECONDS
613      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
614    this.slowSyncRollThreshold =
615      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
616    this.slowSyncCheckInterval =
617      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
618    this.walSyncTimeoutNs =
619      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
620    this.syncFutureCache = new SyncFutureCache(conf);
621    this.implClassName = getClass().getSimpleName();
622    this.walTooOldNs = TimeUnit.SECONDS
623      .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
624    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
625    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
626    this.walShutdownTimeout =
627      conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
628
629    int preallocatedEventCount =
630      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
631    waitingConsumePayloads =
632      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
633    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
634    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
635
636    // inrease the ringbuffer sequence so our txid is start from 1
637    waitingConsumePayloads.publish(waitingConsumePayloads.next());
638    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
639
640    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
641  }
642
643  /**
644   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
645   */
646  @Override
647  public void init() throws IOException {
648    rollWriter();
649  }
650
651  @Override
652  public void registerWALActionsListener(WALActionsListener listener) {
653    this.listeners.add(listener);
654  }
655
656  @Override
657  public boolean unregisterWALActionsListener(WALActionsListener listener) {
658    return this.listeners.remove(listener);
659  }
660
661  @Override
662  public WALCoprocessorHost getCoprocessorHost() {
663    return coprocessorHost;
664  }
665
666  @Override
667  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
668    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
669  }
670
671  @Override
672  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
673    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
674  }
675
676  @Override
677  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
678    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
679  }
680
681  @Override
682  public void abortCacheFlush(byte[] encodedRegionName) {
683    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
684  }
685
686  @Override
687  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
688    // Used by tests. Deprecated as too subtle for general usage.
689    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
690  }
691
692  @Override
693  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
694    // This method is used by tests and for figuring if we should flush or not because our
695    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
696    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
697    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
698    // currently flushing sequence ids, and if anything found there, it is returning these. This is
699    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
700    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
701    // id is old even though we are currently flushing. This may mean we do too much flushing.
702    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
703  }
704
705  @Override
706  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
707    return rollWriter(false);
708  }
709
710  @Override
711  public final void sync() throws IOException {
712    sync(useHsync);
713  }
714
715  @Override
716  public final void sync(long txid) throws IOException {
717    sync(txid, useHsync);
718  }
719
720  @Override
721  public final void sync(boolean forceSync) throws IOException {
722    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
723  }
724
725  @Override
726  public final void sync(long txid, boolean forceSync) throws IOException {
727    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
728  }
729
730  /**
731   * This is a convenience method that computes a new filename with a given file-number.
732   * @param filenum to use
733   */
734  protected Path computeFilename(final long filenum) {
735    if (filenum < 0) {
736      throw new RuntimeException("WAL file number can't be < 0");
737    }
738    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
739    return new Path(walDir, child);
740  }
741
742  /**
743   * This is a convenience method that computes a new filename with a given using the current WAL
744   * file-number
745   */
746  public Path getCurrentFileName() {
747    return computeFilename(this.filenum.get());
748  }
749
750  /**
751   * retrieve the next path to use for writing. Increments the internal filenum.
752   */
753  private Path getNewPath() throws IOException {
754    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
755    Path newPath = getCurrentFileName();
756    return newPath;
757  }
758
759  public Path getOldPath() {
760    long currentFilenum = this.filenum.get();
761    Path oldPath = null;
762    if (currentFilenum > 0) {
763      // ComputeFilename will take care of meta wal filename
764      oldPath = computeFilename(currentFilenum);
765    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
766    return oldPath;
767  }
768
769  /**
770   * Tell listeners about pre log roll.
771   */
772  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
773    throws IOException {
774    coprocessorHost.preWALRoll(oldPath, newPath);
775
776    if (!this.listeners.isEmpty()) {
777      for (WALActionsListener i : this.listeners) {
778        i.preLogRoll(oldPath, newPath);
779      }
780    }
781  }
782
783  /**
784   * Tell listeners about post log roll.
785   */
786  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
787    throws IOException {
788    if (!this.listeners.isEmpty()) {
789      for (WALActionsListener i : this.listeners) {
790        i.postLogRoll(oldPath, newPath);
791      }
792    }
793
794    coprocessorHost.postWALRoll(oldPath, newPath);
795  }
796
797  // public only until class moves to o.a.h.h.wal
798  /** Returns the number of rolled log files */
799  public int getNumRolledLogFiles() {
800    return walFile2Props.size();
801  }
802
803  // public only until class moves to o.a.h.h.wal
804  /** Returns the number of log files in use */
805  public int getNumLogFiles() {
806    // +1 for current use log
807    return getNumRolledLogFiles() + 1;
808  }
809
810  /**
811   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
812   * first (oldest) WAL, and return those regions which should be flushed so that it can be
813   * let-go/'archived'.
814   * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file
815   */
816  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
817    Map<byte[], List<byte[]>> regions = null;
818    int logCount = getNumRolledLogFiles();
819    if (logCount > this.maxLogs && logCount > 0) {
820      Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
821      regions =
822        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
823    }
824    if (regions != null) {
825      List<String> listForPrint = new ArrayList<>();
826      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
827        StringBuilder families = new StringBuilder();
828        for (int i = 0; i < r.getValue().size(); i++) {
829          if (i > 0) {
830            families.append(",");
831          }
832          families.append(Bytes.toString(r.getValue().get(i)));
833        }
834        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
835      }
836      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
837        + "; forcing (partial) flush of " + regions.size() + " region(s): "
838        + StringUtils.join(",", listForPrint));
839    }
840    return regions;
841  }
842
843  /**
844   * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
845   */
846  private void markClosedAndClean(Path path) {
847    WALProps props = walFile2Props.get(path);
848    // typically this should not be null, but if there is no big issue if it is already null, so
849    // let's make the code more robust
850    if (props != null) {
851      props.closed = true;
852      cleanOldLogs();
853    }
854  }
855
856  /**
857   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
858   * <p/>
859   * Use synchronized because we may call this method in different threads, normally when replacing
860   * writer, and since now close writer may be asynchronous, we will also call this method in the
861   * closeExecutor, right after we actually close a WAL writer.
862   */
863  private synchronized void cleanOldLogs() {
864    List<Pair<Path, Long>> logsToArchive = null;
865    long now = System.nanoTime();
866    boolean mayLogTooOld = nextLogTooOldNs <= now;
867    ArrayList<byte[]> regionsBlockingWal = null;
868    // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids
869    // are older than what is currently in memory, the WAL can be GC'd.
870    for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
871      if (!e.getValue().closed) {
872        LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
873        continue;
874      }
875      Path log = e.getKey();
876      ArrayList<byte[]> regionsBlockingThisWal = null;
877      long ageNs = now - e.getValue().rollTimeNs;
878      if (ageNs > walTooOldNs) {
879        if (mayLogTooOld && regionsBlockingWal == null) {
880          regionsBlockingWal = new ArrayList<>();
881        }
882        regionsBlockingThisWal = regionsBlockingWal;
883      }
884      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
885      if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
886        if (logsToArchive == null) {
887          logsToArchive = new ArrayList<>();
888        }
889        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
890        if (LOG.isTraceEnabled()) {
891          LOG.trace("WAL file ready for archiving " + log);
892        }
893      } else if (regionsBlockingThisWal != null) {
894        StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
895          .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
896        boolean isFirst = true;
897        for (byte[] region : regionsBlockingThisWal) {
898          if (!isFirst) {
899            sb.append("; ");
900          }
901          isFirst = false;
902          sb.append(Bytes.toString(region));
903        }
904        LOG.warn(sb.toString());
905        nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
906        regionsBlockingThisWal.clear();
907      }
908    }
909
910    if (logsToArchive != null) {
911      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
912      // make it async
913      for (Pair<Path, Long> log : localLogsToArchive) {
914        logArchiveExecutor.execute(() -> {
915          archive(log);
916        });
917        this.walFile2Props.remove(log.getFirst());
918      }
919    }
920  }
921
922  protected void archive(final Pair<Path, Long> log) {
923    totalLogSize.addAndGet(-log.getSecond());
924    int retry = 1;
925    while (true) {
926      try {
927        archiveLogFile(log.getFirst());
928        // successful
929        break;
930      } catch (Throwable e) {
931        if (retry > archiveRetries) {
932          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
933          if (this.abortable != null) {
934            this.abortable.abort("Failed log archiving", e);
935            break;
936          }
937        } else {
938          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
939        }
940        retry++;
941      }
942    }
943  }
944
945  /*
946   * only public so WALSplitter can use.
947   * @return archived location of a WAL file with the given path p
948   */
949  public static Path getWALArchivePath(Path archiveDir, Path p) {
950    return new Path(archiveDir, p.getName());
951  }
952
953  protected void archiveLogFile(final Path p) throws IOException {
954    Path newPath = getWALArchivePath(this.walArchiveDir, p);
955    // Tell our listeners that a log is going to be archived.
956    if (!this.listeners.isEmpty()) {
957      for (WALActionsListener i : this.listeners) {
958        i.preLogArchive(p, newPath);
959      }
960    }
961    LOG.info("Archiving " + p + " to " + newPath);
962    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
963      throw new IOException("Unable to rename " + p + " to " + newPath);
964    }
965    // Tell our listeners that a log has been archived.
966    if (!this.listeners.isEmpty()) {
967      for (WALActionsListener i : this.listeners) {
968        i.postLogArchive(p, newPath);
969      }
970    }
971  }
972
973  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
974    int oldNumEntries = this.numEntries.getAndSet(0);
975    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
976    if (oldPath != null) {
977      this.walFile2Props.put(oldPath,
978        new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
979      this.totalLogSize.addAndGet(oldFileLen);
980      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
981        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
982        newPathString);
983    } else {
984      LOG.info("New WAL {}", newPathString);
985    }
986  }
987
988  private Span createSpan(String name) {
989    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
990  }
991
992  /**
993   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
994   * <p/>
995   * <ul>
996   * <li>In the case of creating a new WAL, oldPath will be null.</li>
997   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
998   * </li>
999   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
1000   * null.</li>
1001   * </ul>
1002   * @param oldPath    may be null
1003   * @param newPath    may be null
1004   * @param nextWriter may be null
1005   * @return the passed in <code>newPath</code>
1006   * @throws IOException if there is a problem flushing or closing the underlying FS
1007   */
1008  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
1009    return TraceUtil.trace(() -> {
1010      doReplaceWriter(oldPath, newPath, nextWriter);
1011      return newPath;
1012    }, () -> createSpan("WAL.replaceWriter"));
1013  }
1014
1015  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
1016    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
1017    try {
1018      if (syncFuture != null) {
1019        if (closed) {
1020          throw new IOException("WAL has been closed");
1021        } else {
1022          syncFuture.get(walSyncTimeoutNs);
1023        }
1024      }
1025    } catch (TimeoutIOException tioe) {
1026      throw new WALSyncTimeoutIOException(tioe);
1027    } catch (InterruptedException ie) {
1028      LOG.warn("Interrupted", ie);
1029      throw convertInterruptedExceptionToIOException(ie);
1030    } catch (ExecutionException e) {
1031      throw ensureIOException(e.getCause());
1032    }
1033  }
1034
1035  private static IOException ensureIOException(final Throwable t) {
1036    return (t instanceof IOException) ? (IOException) t : new IOException(t);
1037  }
1038
1039  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1040    Thread.currentThread().interrupt();
1041    IOException ioe = new InterruptedIOException();
1042    ioe.initCause(ie);
1043    return ioe;
1044  }
1045
1046  private W createCombinedWriter(W localWriter, Path localPath)
1047    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
1048    // retry forever if we can not create the remote writer to prevent aborting the RS due to log
1049    // rolling error, unless the skipRemoteWal is set to true.
1050    // TODO: since for now we only have one thread doing log rolling, this may block the rolling for
1051    // other wals
1052    Path remoteWAL = new Path(remoteWALDir, localPath.getName());
1053    for (int retry = 0;; retry++) {
1054      if (skipRemoteWAL) {
1055        return localWriter;
1056      }
1057      W remoteWriter;
1058      try {
1059        remoteWriter = createWriterInstance(remoteFs, remoteWAL);
1060      } catch (IOException e) {
1061        LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
1062        try {
1063          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
1064        } catch (InterruptedException ie) {
1065          // restore the interrupt state
1066          Thread.currentThread().interrupt();
1067          // must close local writer here otherwise no one will close it for us
1068          Closeables.close(localWriter, true);
1069          throw (IOException) new InterruptedIOException().initCause(ie);
1070        }
1071        continue;
1072      }
1073      return createCombinedWriter(localWriter, remoteWriter);
1074    }
1075  }
1076
1077  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
1078    rollWriterLock.lock();
1079    try {
1080      if (this.closed) {
1081        throw new WALClosedException("WAL has been closed");
1082      }
1083      // Return if nothing to flush.
1084      if (!force && this.writer != null && this.numEntries.get() <= 0) {
1085        return null;
1086      }
1087      Map<byte[], List<byte[]>> regionsToFlush = null;
1088      try {
1089        Path oldPath = getOldPath();
1090        Path newPath = getNewPath();
1091        // Any exception from here on is catastrophic, non-recoverable, so we currently abort.
1092        W nextWriter = this.createWriterInstance(fs, newPath);
1093        if (remoteFs != null) {
1094          // create a remote wal if necessary
1095          nextWriter = createCombinedWriter(nextWriter, newPath);
1096        }
1097        tellListenersAboutPreLogRoll(oldPath, newPath);
1098        // NewPath could be equal to oldPath if replaceWriter fails.
1099        newPath = replaceWriter(oldPath, newPath, nextWriter);
1100        tellListenersAboutPostLogRoll(oldPath, newPath);
1101        if (LOG.isDebugEnabled()) {
1102          LOG.debug("Create new " + implClassName + " writer with pipeline: "
1103            + Arrays.toString(getPipeline()));
1104        }
1105        // We got a new writer, so reset the slow sync count
1106        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
1107        slowSyncCount.set(0);
1108        // Can we delete any of the old log files?
1109        if (getNumRolledLogFiles() > 0) {
1110          cleanOldLogs();
1111          regionsToFlush = findRegionsToForceFlush();
1112        }
1113      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
1114        // If the underlying FileSystem can't do what we ask, treat as IO failure, so
1115        // we'll abort.
1116        throw new IOException(
1117          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
1118          exception);
1119      }
1120      return regionsToFlush;
1121    } finally {
1122      rollWriterLock.unlock();
1123    }
1124  }
1125
1126  @Override
1127  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
1128    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
1129  }
1130
1131  // public only until class moves to o.a.h.h.wal
1132  /** Returns the size of log files in use */
1133  public long getLogFileSize() {
1134    return this.totalLogSize.get();
1135  }
1136
1137  // public only until class moves to o.a.h.h.wal
1138  public void requestLogRoll() {
1139    requestLogRoll(ERROR);
1140  }
1141
1142  /**
1143   * Get the backing files associated with this WAL.
1144   * @return may be null if there are no files.
1145   */
1146  FileStatus[] getFiles() throws IOException {
1147    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
1148  }
1149
1150  @Override
1151  public void shutdown() throws IOException {
1152    if (!shutdown.compareAndSet(false, true)) {
1153      return;
1154    }
1155    closed = true;
1156    // Tell our listeners that the log is closing
1157    if (!this.listeners.isEmpty()) {
1158      for (WALActionsListener i : this.listeners) {
1159        i.logCloseRequested();
1160      }
1161    }
1162
1163    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
1164      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
1165
1166    Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
1167      @Override
1168      public Void call() throws Exception {
1169        if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
1170          try {
1171            doShutdown();
1172            if (syncFutureCache != null) {
1173              syncFutureCache.clear();
1174            }
1175          } finally {
1176            rollWriterLock.unlock();
1177          }
1178        } else {
1179          throw new IOException("Waiting for rollWriterLock timeout");
1180        }
1181        return null;
1182      }
1183    });
1184    shutdownExecutor.shutdown();
1185
1186    try {
1187      future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
1188    } catch (InterruptedException e) {
1189      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1190    } catch (TimeoutException e) {
1191      throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1192        + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1193        + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1194        + "\"", e);
1195    } catch (ExecutionException e) {
1196      if (e.getCause() instanceof IOException) {
1197        throw (IOException) e.getCause();
1198      } else {
1199        throw new IOException(e.getCause());
1200      }
1201    } finally {
1202      // in shutdown, we may call cleanOldLogs so shutdown this executor in the end.
1203      // In sync replication implementation, we may shut down a WAL without shutting down the whole
1204      // region server, if we shut down this executor earlier we may get reject execution exception
1205      // and abort the region server
1206      logArchiveExecutor.shutdown();
1207    }
1208    // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1209    // have some pending archiving tasks not finished yet, and in close we may archive all the
1210    // remaining WAL files, there could be race if we do not wait for the background archive task
1211    // finish
1212    try {
1213      if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1214        throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1215          + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1216          + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1217          + "\"");
1218      }
1219    } catch (InterruptedException e) {
1220      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1221    }
1222  }
1223
1224  @Override
1225  public void close() throws IOException {
1226    shutdown();
1227    final FileStatus[] files = getFiles();
1228    if (null != files && 0 != files.length) {
1229      for (FileStatus file : files) {
1230        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
1231        // Tell our listeners that a log is going to be archived.
1232        if (!this.listeners.isEmpty()) {
1233          for (WALActionsListener i : this.listeners) {
1234            i.preLogArchive(file.getPath(), p);
1235          }
1236        }
1237
1238        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1239          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1240        }
1241        // Tell our listeners that a log was archived.
1242        if (!this.listeners.isEmpty()) {
1243          for (WALActionsListener i : this.listeners) {
1244            i.postLogArchive(file.getPath(), p);
1245          }
1246        }
1247      }
1248      LOG.debug(
1249        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
1250    }
1251    LOG.info("Closed WAL: " + toString());
1252  }
1253
1254  /** Returns number of WALs currently in the process of closing. */
1255  public int getInflightWALCloseCount() {
1256    return inflightWALClosures.size();
1257  }
1258
1259  /**
1260   * updates the sequence number of a specific store. depending on the flag: replaces current seq
1261   * number if the given seq id is bigger, or even if it is lower than existing one
1262   */
1263  @Override
1264  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
1265    boolean onlyIfGreater) {
1266    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
1267  }
1268
1269  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1270    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
1271  }
1272
1273  protected boolean isLogRollRequested() {
1274    return rollRequested.get();
1275  }
1276
1277  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
1278    // If we have already requested a roll, don't do it again
1279    // And only set rollRequested to true when there is a registered listener
1280    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
1281      for (WALActionsListener i : this.listeners) {
1282        i.logRollRequested(reason);
1283      }
1284    }
1285  }
1286
1287  long getUnflushedEntriesCount() {
1288    long highestSynced = this.highestSyncedTxid.get();
1289    long highestUnsynced = this.highestUnsyncedTxid;
1290    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1291  }
1292
1293  boolean isUnflushedEntries() {
1294    return getUnflushedEntriesCount() > 0;
1295  }
1296
1297  /**
1298   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1299   */
1300  protected void atHeadOfRingBufferEventHandlerAppend() {
1301    // Noop
1302  }
1303
1304  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1305    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1306    atHeadOfRingBufferEventHandlerAppend();
1307    long start = EnvironmentEdgeManager.currentTime();
1308    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1309    long regionSequenceId = entry.getKey().getSequenceId();
1310
1311    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1312    // region sequence id only, a region edit/sequence id that is not associated with an actual
1313    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1314    if (entry.getEdit().isEmpty()) {
1315      return false;
1316    }
1317
1318    // Coprocessor hook.
1319    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1320    if (!listeners.isEmpty()) {
1321      for (WALActionsListener i : listeners) {
1322        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1323      }
1324    }
1325    doAppend(writer, entry);
1326    assert highestUnsyncedTxid < entry.getTxid();
1327    highestUnsyncedTxid = entry.getTxid();
1328    if (entry.isCloseRegion()) {
1329      // let's clean all the records of this region
1330      sequenceIdAccounting.onRegionClose(encodedRegionName);
1331    } else {
1332      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1333        entry.isInMemStore());
1334    }
1335    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1336    // Update metrics.
1337    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1338    numEntries.incrementAndGet();
1339    return true;
1340  }
1341
1342  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1343    long len = 0;
1344    if (!listeners.isEmpty()) {
1345      for (Cell cell : e.getEdit().getCells()) {
1346        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1347      }
1348      for (WALActionsListener listener : listeners) {
1349        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1350      }
1351    }
1352    return len;
1353  }
1354
1355  protected final void postSync(long timeInNanos, int handlerSyncs) {
1356    if (timeInNanos > this.slowSyncNs) {
1357      String msg = new StringBuilder().append("Slow sync cost: ")
1358        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1359        .append(Arrays.toString(getPipeline())).toString();
1360      LOG.info(msg);
1361      if (timeInNanos > this.rollOnSyncNs) {
1362        // A single sync took too long.
1363        // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1364        // effects. Here we have a single data point that indicates we should take immediate
1365        // action, so do so.
1366        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1367          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1368          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1369          + Arrays.toString(getPipeline()));
1370        requestLogRoll(SLOW_SYNC);
1371      }
1372      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1373    }
1374    if (!listeners.isEmpty()) {
1375      for (WALActionsListener listener : listeners) {
1376        listener.postSync(timeInNanos, handlerSyncs);
1377      }
1378    }
1379  }
1380
1381  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1382    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1383    if (this.closed) {
1384      throw new IOException(
1385        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1386    }
1387    MutableLong txidHolder = new MutableLong();
1388    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1389      txidHolder.setValue(ringBuffer.next());
1390    });
1391    long txid = txidHolder.longValue();
1392    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
1393    try {
1394      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1395      entry.stampRegionSequenceId(we);
1396      ringBuffer.get(txid).load(entry);
1397    } finally {
1398      ringBuffer.publish(txid);
1399    }
1400    return txid;
1401  }
1402
1403  @Override
1404  public String toString() {
1405    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1406  }
1407
1408  /**
1409   * if the given {@code path} is being written currently, then return its length.
1410   * <p>
1411   * This is used by replication to prevent replicating unacked log entries. See
1412   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1413   */
1414  @Override
1415  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1416    rollWriterLock.lock();
1417    try {
1418      Path currentPath = getOldPath();
1419      if (path.equals(currentPath)) {
1420        // Currently active path.
1421        W writer = this.writer;
1422        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1423      } else {
1424        W temp = inflightWALClosures.get(path.getName());
1425        if (temp != null) {
1426          // In the process of being closed, trailer bytes may or may not be flushed.
1427          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1428          // use cases like replication, see HBASE-25924/HBASE-25932.
1429          return OptionalLong.of(temp.getSyncedLength());
1430        }
1431        // Log rolled successfully.
1432        return OptionalLong.empty();
1433      }
1434    } finally {
1435      rollWriterLock.unlock();
1436    }
1437  }
1438
1439  @Override
1440  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1441    return TraceUtil.trace(() -> append(info, key, edits, true),
1442      () -> createSpan("WAL.appendData"));
1443  }
1444
1445  @Override
1446  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1447    return TraceUtil.trace(() -> append(info, key, edits, false),
1448      () -> createSpan("WAL.appendMarker"));
1449  }
1450
1451  /**
1452   * Helper that marks the future as DONE and offers it back to the cache.
1453   */
1454  protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
1455    future.done(txid, t);
1456    syncFutureCache.offer(future);
1457  }
1458
1459  private static boolean waitingRoll(int epochAndState) {
1460    return (epochAndState & 1) != 0;
1461  }
1462
1463  private static boolean writerBroken(int epochAndState) {
1464    return ((epochAndState >>> 1) & 1) != 0;
1465  }
1466
1467  private static int epoch(int epochAndState) {
1468    return epochAndState >>> 2;
1469  }
1470
1471  // return whether we have successfully set readyForRolling to true.
1472  private boolean trySetReadyForRolling() {
1473    // Check without holding lock first. Usually we will just return here.
1474    // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe
1475    // to check them outside the consumeLock.
1476    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
1477      return false;
1478    }
1479    consumeLock.lock();
1480    try {
1481      // 1. a roll is requested
1482      // 2. all out-going entries have been acked(we have confirmed above).
1483      if (waitingRoll(epochAndState)) {
1484        readyForRolling = true;
1485        readyForRollingCond.signalAll();
1486        return true;
1487      } else {
1488        return false;
1489      }
1490    } finally {
1491      consumeLock.unlock();
1492    }
1493  }
1494
1495  private void syncFailed(long epochWhenSync, Throwable error) {
1496    LOG.warn("sync failed", error);
1497    this.onException(epochWhenSync, error);
1498  }
1499
1500  private void onException(long epochWhenSync, Throwable error) {
1501    boolean shouldRequestLogRoll = true;
1502    consumeLock.lock();
1503    try {
1504      int currentEpochAndState = epochAndState;
1505      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
1506        // this is not the previous writer which means we have already rolled the writer.
1507        // or this is still the current writer, but we have already marked it as broken and request
1508        // a roll.
1509        return;
1510      }
1511      this.epochAndState = currentEpochAndState | 0b10;
1512      if (waitingRoll(currentEpochAndState)) {
1513        readyForRolling = true;
1514        readyForRollingCond.signalAll();
1515        // this means we have already in the middle of a rollWriter so just tell the roller thread
1516        // that you can continue without requesting an extra log roll.
1517        shouldRequestLogRoll = false;
1518      }
1519    } finally {
1520      consumeLock.unlock();
1521    }
1522    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
1523      toWriteAppends.addFirst(iter.next());
1524    }
1525    highestUnsyncedTxid = highestSyncedTxid.get();
1526    if (shouldRequestLogRoll) {
1527      // request a roll.
1528      requestLogRoll(ERROR);
1529    }
1530  }
1531
1532  private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) {
1533    // Please see the last several comments on HBASE-22761, it is possible that we get a
1534    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
1535    // writer. So here we will also check on the epoch and state, if the epoch has already been
1536    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
1537    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
1538    // cause data corruption.
1539    // The syncCompleted call is on the critical write path, so we should try our best to make it
1540    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
1541    // there are only 3 possible situations:
1542    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
1543    // Before rolling actually happen, we will only change the state to waitingRoll which is another
1544    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
1545    // no outgoing sync request. So we will always pass the check here and there is no problem.
1546    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
1547    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
1548    // situation with #1.
1549    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
1550    // only 2 possible situations:
1551    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
1552    // and give up.
1553    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
1554    // and give up.
1555    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
1556    // epochAndState as a whole.
1557    // So in general, for all the cases above, we do not need to hold the consumeLock.
1558    int epochAndState = this.epochAndState;
1559    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
1560      LOG.warn("Got a sync complete call after the writer is broken, skip");
1561      return;
1562    }
1563
1564    if (processedTxid < highestSyncedTxid.get()) {
1565      return;
1566    }
1567    highestSyncedTxid.set(processedTxid);
1568    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
1569      FSWALEntry entry = iter.next();
1570      if (entry.getTxid() <= processedTxid) {
1571        entry.release();
1572        iter.remove();
1573      } else {
1574        break;
1575      }
1576    }
1577    postSync(System.nanoTime() - startTimeNs, finishSync());
1578    /**
1579     * This method is used to be compatible with the original logic of {@link FSHLog}.
1580     */
1581    checkSlowSyncCount();
1582    if (trySetReadyForRolling()) {
1583      // we have just finished a roll, then do not need to check for log rolling, the writer will be
1584      // closed soon.
1585      return;
1586    }
1587    // If we haven't already requested a roll, check if we have exceeded logrollsize
1588    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
1589      if (LOG.isDebugEnabled()) {
1590        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
1591          + ", logrollsize=" + logrollsize);
1592      }
1593      requestLogRoll(SIZE);
1594    }
1595  }
1596
1597  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
1598  // sync futures then just use the default one.
1599  private boolean isHsync(long beginTxid, long endTxid) {
1600    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
1601      new SyncFuture().reset(endTxid + 1, false));
1602    if (futures.isEmpty()) {
1603      return useHsync;
1604    }
1605    for (SyncFuture future : futures) {
1606      if (future.isForceSync()) {
1607        return true;
1608      }
1609    }
1610    return false;
1611  }
1612
1613  private void sync(W writer) {
1614    fileLengthAtLastSync = writer.getLength();
1615    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
1616    boolean shouldUseHsync =
1617      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
1618    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
1619    final long startTimeNs = System.nanoTime();
1620    final long epoch = (long) epochAndState >>> 2L;
1621    addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid),
1622      (result, error) -> {
1623        if (error != null) {
1624          syncFailed(epoch, error);
1625        } else {
1626          long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result);
1627          syncCompleted(epoch, writer, syncedTxid, startTimeNs);
1628        }
1629      }, consumeExecutor);
1630  }
1631
1632  /**
1633   * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use
1634   * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling
1635   * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we
1636   * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as
1637   * successful syncedTxid.
1638   */
1639  protected long getSyncedTxid(long processedTxid, long completableFutureResult) {
1640    return processedTxid;
1641  }
1642
1643  protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync,
1644    long txidWhenSyn);
1645
1646  private int finishSyncLowerThanTxid(long txid) {
1647    int finished = 0;
1648    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
1649      SyncFuture sync = iter.next();
1650      if (sync.getTxid() <= txid) {
1651        markFutureDoneAndOffer(sync, txid, null);
1652        iter.remove();
1653        finished++;
1654      } else {
1655        break;
1656      }
1657    }
1658    return finished;
1659  }
1660
1661  // try advancing the highestSyncedTxid as much as possible
1662  private int finishSync() {
1663    if (unackedAppends.isEmpty()) {
1664      // All outstanding appends have been acked.
1665      if (toWriteAppends.isEmpty()) {
1666        // Also no appends that wait to be written out, then just finished all pending syncs.
1667        long maxSyncTxid = highestSyncedTxid.get();
1668        for (SyncFuture sync : syncFutures) {
1669          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
1670          markFutureDoneAndOffer(sync, maxSyncTxid, null);
1671        }
1672        highestSyncedTxid.set(maxSyncTxid);
1673        int finished = syncFutures.size();
1674        syncFutures.clear();
1675        return finished;
1676      } else {
1677        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
1678        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
1679        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
1680        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
1681        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
1682        long doneTxid = lowestUnprocessedAppendTxid - 1;
1683        highestSyncedTxid.set(doneTxid);
1684        return finishSyncLowerThanTxid(doneTxid);
1685      }
1686    } else {
1687      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
1688      // first unacked append minus 1.
1689      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
1690      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
1691      highestSyncedTxid.set(doneTxid);
1692      return finishSyncLowerThanTxid(doneTxid);
1693    }
1694  }
1695
1696  // confirm non-empty before calling
1697  private static long getLastTxid(Deque<FSWALEntry> queue) {
1698    return queue.peekLast().getTxid();
1699  }
1700
1701  private void appendAndSync() throws IOException {
1702    final W writer = this.writer;
1703    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
1704    // finish some.
1705    finishSync();
1706    long newHighestProcessedAppendTxid = -1L;
1707    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
1708    // threaded, this could save us some cycles
1709    boolean addedToUnackedAppends = false;
1710    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
1711      FSWALEntry entry = iter.next();
1712      /**
1713       * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not
1714       * throw any IOException.
1715       */
1716      boolean appended = appendEntry(writer, entry);
1717      newHighestProcessedAppendTxid = entry.getTxid();
1718      iter.remove();
1719      if (appended) {
1720        // This is possible, when we fail to sync, we will add the unackedAppends back to
1721        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
1722        if (
1723          addedToUnackedAppends || unackedAppends.isEmpty()
1724            || getLastTxid(unackedAppends) < entry.getTxid()
1725        ) {
1726          unackedAppends.addLast(entry);
1727          addedToUnackedAppends = true;
1728        }
1729        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
1730        // unackedAppends out. As the code in the consume method will assume that, the entries in
1731        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
1732        // not empty, we could just return as later there will be a syncCompleted call to clear the
1733        // unackedAppends, or a syncFailed to lead us to another state.
1734        // There could be other ways to fix, such as changing the logic in the consume method, but
1735        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
1736        // this way to fix first, can optimize later.
1737        if (
1738          writer.getLength() - fileLengthAtLastSync >= batchSize
1739            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
1740        ) {
1741          break;
1742        }
1743      }
1744    }
1745    // if we have a newer transaction id, update it.
1746    // otherwise, use the previous transaction id.
1747    if (newHighestProcessedAppendTxid > 0) {
1748      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
1749    } else {
1750      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
1751    }
1752
1753    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
1754      // sync because buffer size limit.
1755      sync(writer);
1756      return;
1757    }
1758    if (writer.getLength() == fileLengthAtLastSync) {
1759      // we haven't written anything out, just advance the highestSyncedSequence since we may only
1760      // stamp some region sequence id.
1761      if (unackedAppends.isEmpty()) {
1762        highestSyncedTxid.set(highestProcessedAppendTxid);
1763        finishSync();
1764        trySetReadyForRolling();
1765      }
1766      return;
1767    }
1768    // reach here means that we have some unsynced data but haven't reached the batch size yet,
1769    // but we will not issue a sync directly here even if there are sync requests because we may
1770    // have some new data in the ringbuffer, so let's just return here and delay the decision of
1771    // whether to issue a sync in the caller method.
1772  }
1773
1774  private void consume() {
1775    consumeLock.lock();
1776    try {
1777      int currentEpochAndState = epochAndState;
1778      if (writerBroken(currentEpochAndState)) {
1779        return;
1780      }
1781      if (waitingRoll(currentEpochAndState)) {
1782        if (writer.getLength() > fileLengthAtLastSync) {
1783          // issue a sync
1784          sync(writer);
1785        } else {
1786          if (unackedAppends.isEmpty()) {
1787            readyForRolling = true;
1788            readyForRollingCond.signalAll();
1789          }
1790        }
1791        return;
1792      }
1793    } finally {
1794      consumeLock.unlock();
1795    }
1796    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
1797    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
1798        <= cursorBound; nextCursor++) {
1799      if (!waitingConsumePayloads.isPublished(nextCursor)) {
1800        break;
1801      }
1802      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
1803      switch (truck.type()) {
1804        case APPEND:
1805          toWriteAppends.addLast(truck.unloadAppend());
1806          break;
1807        case SYNC:
1808          syncFutures.add(truck.unloadSync());
1809          break;
1810        default:
1811          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
1812          break;
1813      }
1814      waitingConsumePayloadsGatingSequence.set(nextCursor);
1815    }
1816
1817    /**
1818     * This method is used to be compatible with the original logic of {@link AsyncFSWAL}.
1819     */
1820    if (markerEditOnly) {
1821      drainNonMarkerEditsAndFailSyncs();
1822    }
1823    try {
1824      appendAndSync();
1825    } catch (IOException exception) {
1826      /**
1827       * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't
1828       * go in here.
1829       */
1830      LOG.error("appendAndSync throws IOException.", exception);
1831      onAppendEntryFailed(exception);
1832      return;
1833    }
1834    if (hasConsumerTask.get()) {
1835      return;
1836    }
1837    if (toWriteAppends.isEmpty()) {
1838      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1839        consumerScheduled.set(false);
1840        // recheck here since in append and sync we do not hold the consumeLock. Thing may
1841        // happen like
1842        // 1. we check cursor, no new entry
1843        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
1844        // give up scheduling the consumer task.
1845        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
1846        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1847          // we will give up consuming so if there are some unsynced data we need to issue a sync.
1848          if (
1849            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
1850              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
1851          ) {
1852            // no new data in the ringbuffer and we have at least one sync request
1853            sync(writer);
1854          }
1855          return;
1856        } else {
1857          // maybe someone has grabbed this before us
1858          if (!consumerScheduled.compareAndSet(false, true)) {
1859            return;
1860          }
1861        }
1862      }
1863    }
1864    // reschedule if we still have something to write.
1865    consumeExecutor.execute(consumer);
1866  }
1867
1868  private boolean shouldScheduleConsumer() {
1869    int currentEpochAndState = epochAndState;
1870    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
1871      return false;
1872    }
1873    return consumerScheduled.compareAndSet(false, true);
1874  }
1875
1876  /**
1877   * Append a set of edits to the WAL.
1878   * <p/>
1879   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1880   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1881   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1882   * <p/>
1883   * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc
1884   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1885   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1886   * 'complete' the transaction this mvcc transaction by calling
1887   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1888   * in the finally of a try/finally block within which this appends lives and any subsequent
1889   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1890   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1891   * immediately available on return from this method. It WILL be available subsequent to a sync of
1892   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1893   * @param hri        the regioninfo associated with append
1894   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1895   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1896   *                   sequence id that is after all currently appended edits.
1897   * @param inMemstore Always true except for case where we are writing a region event meta marker
1898   *                   edit, for example, a compaction completion record into the WAL or noting a
1899   *                   Region Open event. In these cases the entry is just so we can finish an
1900   *                   unfinished compaction after a crash when the new Server reads the WAL on
1901   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1902   *                   When memstore is false, we presume a Marker event edit.
1903   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1904   *         in it.
1905   */
1906  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1907    throws IOException {
1908    if (markerEditOnly && !edits.isMetaEdit()) {
1909      throw new IOException("WAL is closing, only marker edit is allowed");
1910    }
1911    long txid =
1912      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
1913    if (shouldScheduleConsumer()) {
1914      consumeExecutor.execute(consumer);
1915    }
1916    return txid;
1917  }
1918
1919  protected void doSync(boolean forceSync) throws IOException {
1920    long txid = waitingConsumePayloads.next();
1921    SyncFuture future;
1922    try {
1923      future = getSyncFuture(txid, forceSync);
1924      RingBufferTruck truck = waitingConsumePayloads.get(txid);
1925      truck.load(future);
1926    } finally {
1927      waitingConsumePayloads.publish(txid);
1928    }
1929    if (shouldScheduleConsumer()) {
1930      consumeExecutor.execute(consumer);
1931    }
1932    blockOnSync(future);
1933  }
1934
1935  protected void doSync(long txid, boolean forceSync) throws IOException {
1936    if (highestSyncedTxid.get() >= txid) {
1937      return;
1938    }
1939    // here we do not use ring buffer sequence as txid
1940    long sequence = waitingConsumePayloads.next();
1941    SyncFuture future;
1942    try {
1943      future = getSyncFuture(txid, forceSync);
1944      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
1945      truck.load(future);
1946    } finally {
1947      waitingConsumePayloads.publish(sequence);
1948    }
1949    if (shouldScheduleConsumer()) {
1950      consumeExecutor.execute(consumer);
1951    }
1952    blockOnSync(future);
1953  }
1954
1955  private void drainNonMarkerEditsAndFailSyncs() {
1956    if (toWriteAppends.isEmpty()) {
1957      return;
1958    }
1959    boolean hasNonMarkerEdits = false;
1960    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
1961    while (iter.hasNext()) {
1962      FSWALEntry entry = iter.next();
1963      if (!entry.getEdit().isMetaEdit()) {
1964        entry.release();
1965        hasNonMarkerEdits = true;
1966        break;
1967      }
1968    }
1969    if (hasNonMarkerEdits) {
1970      for (;;) {
1971        iter.remove();
1972        if (!iter.hasNext()) {
1973          break;
1974        }
1975        iter.next().release();
1976      }
1977      for (FSWALEntry entry : unackedAppends) {
1978        entry.release();
1979      }
1980      unackedAppends.clear();
1981      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
1982      // all the sync futures.
1983      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
1984      IOException error = new IOException("WAL is closing, only marker edit is allowed");
1985      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
1986        SyncFuture future = syncIter.next();
1987        if (future.getTxid() < txid) {
1988          markFutureDoneAndOffer(future, future.getTxid(), error);
1989          syncIter.remove();
1990        } else {
1991          break;
1992        }
1993      }
1994    }
1995  }
1996
1997  protected abstract W createWriterInstance(FileSystem fs, Path path)
1998    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1999
2000  protected abstract W createCombinedWriter(W localWriter, W remoteWriter);
2001
2002  protected final void waitForSafePoint() {
2003    consumeLock.lock();
2004    try {
2005      int currentEpochAndState = epochAndState;
2006      if (writerBroken(currentEpochAndState) || this.writer == null) {
2007        return;
2008      }
2009      consumerScheduled.set(true);
2010      epochAndState = currentEpochAndState | 1;
2011      readyForRolling = false;
2012      consumeExecutor.execute(consumer);
2013      while (!readyForRolling) {
2014        readyForRollingCond.awaitUninterruptibly();
2015      }
2016    } finally {
2017      consumeLock.unlock();
2018    }
2019  }
2020
2021  protected final void closeWriter(W writer, Path path) {
2022    inflightWALClosures.put(path.getName(), writer);
2023    closeExecutor.execute(() -> {
2024      try {
2025        writer.close();
2026      } catch (IOException e) {
2027        LOG.warn("close old writer failed", e);
2028      } finally {
2029        // call this even if the above close fails, as there is no other chance we can set closed to
2030        // true, it will not cause big problems.
2031        markClosedAndClean(path);
2032        inflightWALClosures.remove(path.getName());
2033      }
2034    });
2035  }
2036
2037  /**
2038   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
2039   * will begin to work before returning from this method. If we clear the flag after returning from
2040   * this call, we may miss a roll request. The implementation class should choose a proper place to
2041   * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you
2042   * start writing to the new writer.
2043   */
2044  protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
2045    Preconditions.checkNotNull(nextWriter);
2046    waitForSafePoint();
2047    /**
2048     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2049     */
2050    doCleanUpResources();
2051    // we will call rollWriter in init method, where we want to create the first writer and
2052    // obviously the previous writer is null, so here we need this null check. And why we must call
2053    // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
2054    // closing the writer asynchronously, we need to make sure the WALProps is put into
2055    // walFile2Props before we call markClosedAndClean
2056    if (writer != null) {
2057      long oldFileLen = writer.getLength();
2058      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
2059      closeWriter(writer, oldPath);
2060    } else {
2061      logRollAndSetupWalProps(oldPath, newPath, 0);
2062    }
2063    this.writer = nextWriter;
2064    /**
2065     * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem
2066     * output after writer is replaced.
2067     */
2068    onWriterReplaced(nextWriter);
2069    this.fileLengthAtLastSync = nextWriter.getLength();
2070    this.highestProcessedAppendTxidAtLastSync = 0L;
2071    consumeLock.lock();
2072    try {
2073      consumerScheduled.set(true);
2074      int currentEpoch = epochAndState >>> 2;
2075      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
2076      // set a new epoch and also clear waitingRoll and writerBroken
2077      this.epochAndState = nextEpoch << 2;
2078      // Reset rollRequested status
2079      rollRequested.set(false);
2080      consumeExecutor.execute(consumer);
2081    } finally {
2082      consumeLock.unlock();
2083    }
2084  }
2085
2086  protected abstract void onWriterReplaced(W nextWriter);
2087
2088  protected void doShutdown() throws IOException {
2089    waitForSafePoint();
2090    /**
2091     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2092     */
2093    doCleanUpResources();
2094    if (this.writer != null) {
2095      closeWriter(this.writer, getOldPath());
2096      this.writer = null;
2097    }
2098    closeExecutor.shutdown();
2099    try {
2100      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
2101        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
2102          + " the close of async writer doesn't complete."
2103          + "Please check the status of underlying filesystem"
2104          + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey
2105          + "\"");
2106      }
2107    } catch (InterruptedException e) {
2108      LOG.error("The wait for close of async writer is interrupted");
2109      Thread.currentThread().interrupt();
2110    }
2111    IOException error = new IOException("WAL has been closed");
2112    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
2113    // drain all the pending sync requests
2114    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
2115        <= cursorBound; nextCursor++) {
2116      if (!waitingConsumePayloads.isPublished(nextCursor)) {
2117        break;
2118      }
2119      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
2120      switch (truck.type()) {
2121        case SYNC:
2122          syncFutures.add(truck.unloadSync());
2123          break;
2124        default:
2125          break;
2126      }
2127    }
2128    // and fail them
2129    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
2130    if (this.shouldShutDownConsumeExecutorWhenClose) {
2131      consumeExecutor.shutdown();
2132    }
2133  }
2134
2135  protected void doCleanUpResources() {
2136  };
2137
2138  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
2139
2140  /**
2141   * This method gets the pipeline for the current WAL.
2142   */
2143  abstract DatanodeInfo[] getPipeline();
2144
2145  /**
2146   * This method gets the datanode replication count for the current WAL.
2147   */
2148  abstract int getLogReplication();
2149
2150  protected abstract boolean doCheckLogLowReplication();
2151
2152  protected boolean isWriterBroken() {
2153    return writerBroken(epochAndState);
2154  }
2155
2156  private void onAppendEntryFailed(IOException exception) {
2157    LOG.warn("append entry failed", exception);
2158    final long currentEpoch = (long) epochAndState >>> 2L;
2159    this.onException(currentEpoch, exception);
2160  }
2161
2162  protected void checkSlowSyncCount() {
2163  }
2164
2165  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
2166  protected boolean doCheckSlowSync() {
2167    boolean result = false;
2168    long now = EnvironmentEdgeManager.currentTime();
2169    long elapsedTime = now - lastTimeCheckSlowSync;
2170    if (elapsedTime >= slowSyncCheckInterval) {
2171      if (slowSyncCount.get() >= slowSyncRollThreshold) {
2172        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
2173          // If two or more slowSyncCheckInterval have elapsed this is a corner case
2174          // where a train of slow syncs almost triggered us but then there was a long
2175          // interval from then until the one more that pushed us over. If so, we
2176          // should do nothing and let the count reset.
2177          if (LOG.isDebugEnabled()) {
2178            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
2179              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
2180              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
2181          }
2182          // Fall through to count reset below
2183        } else {
2184          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
2185            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
2186            + Arrays.toString(getPipeline()));
2187          result = true;
2188        }
2189      }
2190      lastTimeCheckSlowSync = now;
2191      slowSyncCount.set(0);
2192    }
2193    return result;
2194  }
2195
2196  public void checkLogLowReplication(long checkInterval) {
2197    long now = EnvironmentEdgeManager.currentTime();
2198    if (now - lastTimeCheckLowReplication < checkInterval) {
2199      return;
2200    }
2201    // Will return immediately if we are in the middle of a WAL log roll currently.
2202    if (!rollWriterLock.tryLock()) {
2203      return;
2204    }
2205    try {
2206      lastTimeCheckLowReplication = now;
2207      if (doCheckLogLowReplication()) {
2208        requestLogRoll(LOW_REPLICATION);
2209      }
2210    } finally {
2211      rollWriterLock.unlock();
2212    }
2213  }
2214
2215  // Allow temporarily skipping the creation of remote writer. When failing to write to the remote
2216  // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we
2217  // need to write a close marker when closing a region, and if it fails, the whole rs will abort.
2218  // So here we need to skip the creation of remote writer and make it possible to write the region
2219  // close marker.
2220  // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
2221  // any pending wal entries as they will be discarded. The remote cluster will replicate the
2222  // correct data back later. We still need to allow writing marker edits such as close region event
2223  // to allow closing a region.
2224  @Override
2225  public void skipRemoteWAL(boolean markerEditOnly) {
2226    if (markerEditOnly) {
2227      this.markerEditOnly = true;
2228    }
2229    this.skipRemoteWAL = true;
2230  }
2231
2232  private static void split(final Configuration conf, final Path p) throws IOException {
2233    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
2234    if (!fs.exists(p)) {
2235      throw new FileNotFoundException(p.toString());
2236    }
2237    if (!fs.getFileStatus(p).isDirectory()) {
2238      throw new IOException(p + " is not a directory");
2239    }
2240
2241    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
2242    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
2243    if (
2244      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
2245        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
2246    ) {
2247      archiveDir = new Path(archiveDir, p.getName());
2248    }
2249    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
2250  }
2251
2252  W getWriter() {
2253    return this.writer;
2254  }
2255
2256  private static void usage() {
2257    System.err.println("Usage: AbstractFSWAL <ARGS>");
2258    System.err.println("Arguments:");
2259    System.err.println(" --dump  Dump textual representation of passed one or more files");
2260    System.err.println("         For example: "
2261      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
2262    System.err.println(" --split Split the passed directory of WAL logs");
2263    System.err.println(
2264      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
2265  }
2266
2267  /**
2268   * Pass one or more log file names, and it will either dump out a text version on
2269   * <code>stdout</code> or split the specified log files.
2270   */
2271  public static void main(String[] args) throws IOException {
2272    if (args.length < 2) {
2273      usage();
2274      System.exit(-1);
2275    }
2276    // either dump using the WALPrettyPrinter or split, depending on args
2277    if (args[0].compareTo("--dump") == 0) {
2278      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2279    } else if (args[0].compareTo("--perf") == 0) {
2280      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
2281      LOG.error(HBaseMarkers.FATAL,
2282        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
2283      System.exit(-1);
2284    } else if (args[0].compareTo("--split") == 0) {
2285      Configuration conf = HBaseConfiguration.create();
2286      for (int i = 1; i < args.length; i++) {
2287        try {
2288          Path logPath = new Path(args[i]);
2289          CommonFSUtils.setFsDefault(conf, logPath);
2290          split(conf, logPath);
2291        } catch (IOException t) {
2292          t.printStackTrace(System.err);
2293          System.exit(-1);
2294        }
2295      }
2296    } else {
2297      usage();
2298      System.exit(-1);
2299    }
2300  }
2301}