001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
027
028import com.lmax.disruptor.RingBuffer;
029import io.opentelemetry.api.trace.Span;
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.lang.management.MemoryType;
034import java.net.URLEncoder;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Comparator;
038import java.util.List;
039import java.util.Map;
040import java.util.OptionalLong;
041import java.util.Set;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentNavigableMap;
044import java.util.concurrent.ConcurrentSkipListMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicBoolean;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.concurrent.atomic.AtomicLong;
053import java.util.concurrent.locks.ReentrantLock;
054import org.apache.commons.lang3.mutable.MutableLong;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.PathFilter;
060import org.apache.hadoop.hbase.Abortable;
061import org.apache.hadoop.hbase.Cell;
062import org.apache.hadoop.hbase.HBaseConfiguration;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.PrivateCellUtil;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
067import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
068import org.apache.hadoop.hbase.ipc.RpcServer;
069import org.apache.hadoop.hbase.ipc.ServerCall;
070import org.apache.hadoop.hbase.log.HBaseMarkers;
071import org.apache.hadoop.hbase.regionserver.HRegion;
072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
073import org.apache.hadoop.hbase.trace.TraceUtil;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.CommonFSUtils;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
079import org.apache.hadoop.hbase.wal.WAL;
080import org.apache.hadoop.hbase.wal.WALEdit;
081import org.apache.hadoop.hbase.wal.WALFactory;
082import org.apache.hadoop.hbase.wal.WALKeyImpl;
083import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
084import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
085import org.apache.hadoop.hbase.wal.WALSplitter;
086import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
087import org.apache.hadoop.util.StringUtils;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
093
094/**
095 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
096 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
097 * This is done internal to the implementation.
098 * <p>
099 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
100 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
101 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
102 * flushed out to hfiles, and what is yet in WAL and in memory only.
103 * <p>
104 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
105 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
106 * (smaller) than the most-recent flush.
107 * <p>
108 * To read an WAL, call
109 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
110 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
111 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
112 * we have made successful appends to the WAL and we then are unable to sync them, our current
113 * semantic is to return error to the client that the appends failed but also to abort the current
114 * context, usually the hosting server. We need to replay the WALs. <br>
115 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
116 * that the append failed. <br>
117 * TODO: replication may pick up these last edits though they have been marked as failed append
118 * (Need to keep our own file lengths, not rely on HDFS).
119 */
120@InterfaceAudience.Private
121public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
122
123  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
124
125  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
126  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
127  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
128  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
129  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
130    "hbase.regionserver.wal.slowsync.roll.threshold";
131  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
132  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
133    "hbase.regionserver.wal.slowsync.roll.interval.ms";
134  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
135
136  protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
137  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
138
139  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
140
141  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
142
143  public static final String RING_BUFFER_SLOT_COUNT =
144    "hbase.regionserver.wal.disruptor.event.count";
145
146  /**
147   * file system instance
148   */
149  protected final FileSystem fs;
150
151  /**
152   * WAL directory, where all WAL files would be placed.
153   */
154  protected final Path walDir;
155
156  /**
157   * dir path where old logs are kept.
158   */
159  protected final Path walArchiveDir;
160
161  /**
162   * Matches just those wal files that belong to this wal instance.
163   */
164  protected final PathFilter ourFiles;
165
166  /**
167   * Prefix of a WAL file, usually the region server name it is hosted on.
168   */
169  protected final String walFilePrefix;
170
171  /**
172   * Suffix included on generated wal file names
173   */
174  protected final String walFileSuffix;
175
176  /**
177   * Prefix used when checking for wal membership.
178   */
179  protected final String prefixPathStr;
180
181  protected final WALCoprocessorHost coprocessorHost;
182
183  /**
184   * conf object
185   */
186  protected final Configuration conf;
187
188  protected final Abortable abortable;
189
190  /** Listeners that are called on WAL events. */
191  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
192
193  /** Tracks the logs in the process of being closed. */
194  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
195
196  /**
197   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
198   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
199   * facility for answering questions such as "Is it safe to GC a WAL?".
200   */
201  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
202
203  protected final long slowSyncNs, rollOnSyncNs;
204  protected final int slowSyncRollThreshold;
205  protected final int slowSyncCheckInterval;
206  protected final AtomicInteger slowSyncCount = new AtomicInteger();
207
208  private final long walSyncTimeoutNs;
209
210  // If > than this size, roll the log.
211  protected final long logrollsize;
212
213  /**
214   * Block size to use writing files.
215   */
216  protected final long blocksize;
217
218  /*
219   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
220   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
221   */
222  protected final int maxLogs;
223
224  protected final boolean useHsync;
225
226  /**
227   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
228   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
229   * warning when it thinks synchronized controls writer thread safety. It is held when we are
230   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
231   * not.
232   */
233  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
234
235  // The timestamp (in ms) when the log file was created.
236  protected final AtomicLong filenum = new AtomicLong(-1);
237
238  // Number of transactions in the current Wal.
239  protected final AtomicInteger numEntries = new AtomicInteger(0);
240
241  /**
242   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
243   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
244   * corresponding entry in queue.
245   */
246  protected volatile long highestUnsyncedTxid = -1;
247
248  /**
249   * Updated to the transaction id of the last successful sync call. This can be less than
250   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
251   * for it.
252   */
253  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
254
255  /**
256   * The total size of wal
257   */
258  protected final AtomicLong totalLogSize = new AtomicLong(0);
259  /**
260   * Current log file.
261   */
262  volatile W writer;
263
264  // Last time to check low replication on hlog's pipeline
265  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
266
267  // Last time we asked to roll the log due to a slow sync
268  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
269
270  protected volatile boolean closed = false;
271
272  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
273  /**
274   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
275   * an IllegalArgumentException if used to compare paths from different wals.
276   */
277  final Comparator<Path> LOG_NAME_COMPARATOR =
278    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
279
280  private static final class WalProps {
281
282    /**
283     * Map the encoded region name to the highest sequence id.
284     * <p/>
285     * Contains all the regions it has an entry for.
286     */
287    public final Map<byte[], Long> encodedName2HighestSequenceId;
288
289    /**
290     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
291     * sub classes.
292     */
293    public final long logSize;
294
295    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
296      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
297      this.logSize = logSize;
298    }
299  }
300
301  /**
302   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
303   * (contained in the log file name).
304   */
305  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
306    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
307
308  /**
309   * A cache of sync futures reused by threads.
310   */
311  protected final SyncFutureCache syncFutureCache;
312
313  /**
314   * The class name of the runtime implementation, used as prefix for logging/tracing.
315   * <p>
316   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
317   * refer to HBASE-17676 for more details
318   * </p>
319   */
320  protected final String implClassName;
321
322  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
323
324  private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
325    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build());
326
327  private final int archiveRetries;
328
329  public long getFilenum() {
330    return this.filenum.get();
331  }
332
333  /**
334   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
335   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
336   * the filename is created with the {@link #computeFilename(long filenum)} method.
337   * @return timestamp, as in the log file name.
338   */
339  protected long getFileNumFromFileName(Path fileName) {
340    checkNotNull(fileName, "file name can't be null");
341    if (!ourFiles.accept(fileName)) {
342      throw new IllegalArgumentException(
343        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
344    }
345    final String fileNameString = fileName.toString();
346    String chompedPath = fileNameString.substring(prefixPathStr.length(),
347      (fileNameString.length() - walFileSuffix.length()));
348    return Long.parseLong(chompedPath);
349  }
350
351  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
352    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
353    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
354  }
355
356  // must be power of 2
357  protected final int getPreallocatedEventCount() {
358    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
359    // be stuck and make no progress if the buffer is filled with appends only and there is no
360    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
361    // before they return.
362    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
363    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
364    int floor = Integer.highestOneBit(preallocatedEventCount);
365    if (floor == preallocatedEventCount) {
366      return floor;
367    }
368    // max capacity is 1 << 30
369    if (floor >= 1 << 29) {
370      return 1 << 30;
371    }
372    return floor << 1;
373  }
374
375  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
376    final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
377    final boolean failIfWALExists, final String prefix, final String suffix)
378    throws FailedLogCloseException, IOException {
379    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
380  }
381
382  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
383    final String logDir, final String archiveDir, final Configuration conf,
384    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
385    final String suffix) throws FailedLogCloseException, IOException {
386    this.fs = fs;
387    this.walDir = new Path(rootDir, logDir);
388    this.walArchiveDir = new Path(rootDir, archiveDir);
389    this.conf = conf;
390    this.abortable = abortable;
391
392    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
393      throw new IOException("Unable to mkdir " + walDir);
394    }
395
396    if (!fs.exists(this.walArchiveDir)) {
397      if (!fs.mkdirs(this.walArchiveDir)) {
398        throw new IOException("Unable to mkdir " + this.walArchiveDir);
399      }
400    }
401
402    // If prefix is null||empty then just name it wal
403    this.walFilePrefix =
404      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
405    // we only correctly differentiate suffices when numeric ones start with '.'
406    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
407      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
408        + "' but instead was '" + suffix + "'");
409    }
410    // Now that it exists, set the storage policy for the entire directory of wal files related to
411    // this FSHLog instance
412    String storagePolicy =
413      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
414    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
415    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
416    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
417
418    this.ourFiles = new PathFilter() {
419      @Override
420      public boolean accept(final Path fileName) {
421        // The path should start with dir/<prefix> and end with our suffix
422        final String fileNameString = fileName.toString();
423        if (!fileNameString.startsWith(prefixPathStr)) {
424          return false;
425        }
426        if (walFileSuffix.isEmpty()) {
427          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
428          return org.apache.commons.lang3.StringUtils
429            .isNumeric(fileNameString.substring(prefixPathStr.length()));
430        } else if (!fileNameString.endsWith(walFileSuffix)) {
431          return false;
432        }
433        return true;
434      }
435    };
436
437    if (failIfWALExists) {
438      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
439      if (null != walFiles && 0 != walFiles.length) {
440        throw new IOException("Target WAL already exists within directory " + walDir);
441      }
442    }
443
444    // Register listeners. TODO: Should this exist anymore? We have CPs?
445    if (listeners != null) {
446      for (WALActionsListener i : listeners) {
447        registerWALActionsListener(i);
448      }
449    }
450    this.coprocessorHost = new WALCoprocessorHost(this, conf);
451
452    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
453    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
454    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
455    // the block size but experience from the field has it that this was not enough time for the
456    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
457    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
458    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
459    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
460    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
461    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
462    this.logrollsize = (long) (this.blocksize * multiplier);
463    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
464
465    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
466      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
467      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
468      + ", maxLogs=" + this.maxLogs);
469    this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
470      conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
471    this.rollOnSyncNs = TimeUnit.MILLISECONDS
472      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
473    this.slowSyncRollThreshold =
474      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
475    this.slowSyncCheckInterval =
476      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
477    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
478      conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
479    this.syncFutureCache = new SyncFutureCache(conf);
480    this.implClassName = getClass().getSimpleName();
481    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
482    archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
483  }
484
485  /**
486   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
487   */
488  public void init() throws IOException {
489    rollWriter();
490  }
491
492  @Override
493  public void registerWALActionsListener(WALActionsListener listener) {
494    this.listeners.add(listener);
495  }
496
497  @Override
498  public boolean unregisterWALActionsListener(WALActionsListener listener) {
499    return this.listeners.remove(listener);
500  }
501
502  @Override
503  public WALCoprocessorHost getCoprocessorHost() {
504    return coprocessorHost;
505  }
506
507  @Override
508  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
509    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
510  }
511
512  @Override
513  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
514    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
515  }
516
517  @Override
518  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
519    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
520  }
521
522  @Override
523  public void abortCacheFlush(byte[] encodedRegionName) {
524    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
525  }
526
527  @Override
528  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
529    // Used by tests. Deprecated as too subtle for general usage.
530    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
531  }
532
533  @Override
534  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
535    // This method is used by tests and for figuring if we should flush or not because our
536    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
537    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
538    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
539    // currently flushing sequence ids, and if anything found there, it is returning these. This is
540    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
541    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
542    // id is old even though we are currently flushing. This may mean we do too much flushing.
543    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
544  }
545
546  @Override
547  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
548    return rollWriter(false);
549  }
550
551  @Override
552  public final void sync() throws IOException {
553    sync(useHsync);
554  }
555
556  @Override
557  public final void sync(long txid) throws IOException {
558    sync(txid, useHsync);
559  }
560
561  @Override
562  public final void sync(boolean forceSync) throws IOException {
563    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
564  }
565
566  @Override
567  public final void sync(long txid, boolean forceSync) throws IOException {
568    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
569  }
570
571  protected abstract void doSync(boolean forceSync) throws IOException;
572
573  protected abstract void doSync(long txid, boolean forceSync) throws IOException;
574
575  /**
576   * This is a convenience method that computes a new filename with a given file-number.
577   * @param filenum to use n
578   */
579  protected Path computeFilename(final long filenum) {
580    if (filenum < 0) {
581      throw new RuntimeException("WAL file number can't be < 0");
582    }
583    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
584    return new Path(walDir, child);
585  }
586
587  /**
588   * This is a convenience method that computes a new filename with a given using the current WAL
589   * file-number n
590   */
591  public Path getCurrentFileName() {
592    return computeFilename(this.filenum.get());
593  }
594
595  /**
596   * retrieve the next path to use for writing. Increments the internal filenum.
597   */
598  private Path getNewPath() throws IOException {
599    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
600    Path newPath = getCurrentFileName();
601    return newPath;
602  }
603
604  Path getOldPath() {
605    long currentFilenum = this.filenum.get();
606    Path oldPath = null;
607    if (currentFilenum > 0) {
608      // ComputeFilename will take care of meta wal filename
609      oldPath = computeFilename(currentFilenum);
610    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
611    return oldPath;
612  }
613
614  /**
615   * Tell listeners about pre log roll.
616   */
617  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
618    throws IOException {
619    coprocessorHost.preWALRoll(oldPath, newPath);
620
621    if (!this.listeners.isEmpty()) {
622      for (WALActionsListener i : this.listeners) {
623        i.preLogRoll(oldPath, newPath);
624      }
625    }
626  }
627
628  /**
629   * Tell listeners about post log roll.
630   */
631  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
632    throws IOException {
633    if (!this.listeners.isEmpty()) {
634      for (WALActionsListener i : this.listeners) {
635        i.postLogRoll(oldPath, newPath);
636      }
637    }
638
639    coprocessorHost.postWALRoll(oldPath, newPath);
640  }
641
642  // public only until class moves to o.a.h.h.wal
643  /** Returns the number of rolled log files */
644  public int getNumRolledLogFiles() {
645    return walFile2Props.size();
646  }
647
648  // public only until class moves to o.a.h.h.wal
649  /** Returns the number of log files in use */
650  public int getNumLogFiles() {
651    // +1 for current use log
652    return getNumRolledLogFiles() + 1;
653  }
654
655  /**
656   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
657   * first (oldest) WAL, and return those regions which should be flushed so that it can be
658   * let-go/'archived'.
659   * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
660   */
661  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
662    Map<byte[], List<byte[]>> regions = null;
663    int logCount = getNumRolledLogFiles();
664    if (logCount > this.maxLogs && logCount > 0) {
665      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
666      regions =
667        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
668    }
669    if (regions != null) {
670      List<String> listForPrint = new ArrayList<>();
671      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
672        StringBuilder families = new StringBuilder();
673        for (int i = 0; i < r.getValue().size(); i++) {
674          if (i > 0) {
675            families.append(",");
676          }
677          families.append(Bytes.toString(r.getValue().get(i)));
678        }
679        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
680      }
681      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
682        + "; forcing (partial) flush of " + regions.size() + " region(s): "
683        + StringUtils.join(",", listForPrint));
684    }
685    return regions;
686  }
687
688  /**
689   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
690   */
691  private void cleanOldLogs() throws IOException {
692    List<Pair<Path, Long>> logsToArchive = null;
693    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
694    // are older than what is currently in memory, the WAL can be GC'd.
695    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
696      Path log = e.getKey();
697      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
698      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
699        if (logsToArchive == null) {
700          logsToArchive = new ArrayList<>();
701        }
702        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
703        if (LOG.isTraceEnabled()) {
704          LOG.trace("WAL file ready for archiving " + log);
705        }
706      }
707    }
708
709    if (logsToArchive != null) {
710      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
711      // make it async
712      for (Pair<Path, Long> log : localLogsToArchive) {
713        logArchiveExecutor.execute(() -> {
714          archive(log);
715        });
716        this.walFile2Props.remove(log.getFirst());
717      }
718    }
719  }
720
721  protected void archive(final Pair<Path, Long> log) {
722    totalLogSize.addAndGet(-log.getSecond());
723    int retry = 1;
724    while (true) {
725      try {
726        archiveLogFile(log.getFirst());
727        // successful
728        break;
729      } catch (Throwable e) {
730        if (retry > archiveRetries) {
731          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
732          if (this.abortable != null) {
733            this.abortable.abort("Failed log archiving", e);
734            break;
735          }
736        } else {
737          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
738        }
739        retry++;
740      }
741    }
742  }
743
744  /*
745   * only public so WALSplitter can use.
746   * @return archived location of a WAL file with the given path p
747   */
748  public static Path getWALArchivePath(Path archiveDir, Path p) {
749    return new Path(archiveDir, p.getName());
750  }
751
752  protected void archiveLogFile(final Path p) throws IOException {
753    Path newPath = getWALArchivePath(this.walArchiveDir, p);
754    // Tell our listeners that a log is going to be archived.
755    if (!this.listeners.isEmpty()) {
756      for (WALActionsListener i : this.listeners) {
757        i.preLogArchive(p, newPath);
758      }
759    }
760    LOG.info("Archiving " + p + " to " + newPath);
761    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
762      throw new IOException("Unable to rename " + p + " to " + newPath);
763    }
764    // Tell our listeners that a log has been archived.
765    if (!this.listeners.isEmpty()) {
766      for (WALActionsListener i : this.listeners) {
767        i.postLogArchive(p, newPath);
768      }
769    }
770  }
771
772  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
773    int oldNumEntries = this.numEntries.getAndSet(0);
774    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
775    if (oldPath != null) {
776      this.walFile2Props.put(oldPath,
777        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
778      this.totalLogSize.addAndGet(oldFileLen);
779      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
780        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
781        newPathString);
782    } else {
783      LOG.info("New WAL {}", newPathString);
784    }
785  }
786
787  private Span createSpan(String name) {
788    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
789  }
790
791  /**
792   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
793   * <p/>
794   * <ul>
795   * <li>In the case of creating a new WAL, oldPath will be null.</li>
796   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
797   * </li>
798   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
799   * null.</li>
800   * </ul>
801   * @param oldPath    may be null
802   * @param newPath    may be null
803   * @param nextWriter may be null
804   * @return the passed in <code>newPath</code>
805   * @throws IOException if there is a problem flushing or closing the underlying FS
806   */
807  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
808    return TraceUtil.trace(() -> {
809      doReplaceWriter(oldPath, newPath, nextWriter);
810      return newPath;
811    }, () -> createSpan("WAL.replaceWriter"));
812  }
813
814  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
815    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
816    try {
817      if (syncFuture != null) {
818        if (closed) {
819          throw new IOException("WAL has been closed");
820        } else {
821          syncFuture.get(walSyncTimeoutNs);
822        }
823      }
824    } catch (TimeoutIOException tioe) {
825      throw tioe;
826    } catch (InterruptedException ie) {
827      LOG.warn("Interrupted", ie);
828      throw convertInterruptedExceptionToIOException(ie);
829    } catch (ExecutionException e) {
830      throw ensureIOException(e.getCause());
831    }
832  }
833
834  private static IOException ensureIOException(final Throwable t) {
835    return (t instanceof IOException) ? (IOException) t : new IOException(t);
836  }
837
838  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
839    Thread.currentThread().interrupt();
840    IOException ioe = new InterruptedIOException();
841    ioe.initCause(ie);
842    return ioe;
843  }
844
845  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
846    rollWriterLock.lock();
847    try {
848      // Return if nothing to flush.
849      if (!force && this.writer != null && this.numEntries.get() <= 0) {
850        return null;
851      }
852      Map<byte[], List<byte[]>> regionsToFlush = null;
853      if (this.closed) {
854        LOG.debug("WAL closed. Skipping rolling of writer");
855        return regionsToFlush;
856      }
857      try {
858        Path oldPath = getOldPath();
859        Path newPath = getNewPath();
860        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
861        W nextWriter = this.createWriterInstance(newPath);
862        tellListenersAboutPreLogRoll(oldPath, newPath);
863        // NewPath could be equal to oldPath if replaceWriter fails.
864        newPath = replaceWriter(oldPath, newPath, nextWriter);
865        tellListenersAboutPostLogRoll(oldPath, newPath);
866        if (LOG.isDebugEnabled()) {
867          LOG.debug("Create new " + implClassName + " writer with pipeline: "
868            + Arrays.toString(getPipeline()));
869        }
870        // We got a new writer, so reset the slow sync count
871        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
872        slowSyncCount.set(0);
873        // Can we delete any of the old log files?
874        if (getNumRolledLogFiles() > 0) {
875          cleanOldLogs();
876          regionsToFlush = findRegionsToForceFlush();
877        }
878      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
879        // If the underlying FileSystem can't do what we ask, treat as IO failure so
880        // we'll abort.
881        throw new IOException(
882          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
883          exception);
884      }
885      return regionsToFlush;
886    } finally {
887      rollWriterLock.unlock();
888    }
889  }
890
891  @Override
892  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
893    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
894  }
895
896  // public only until class moves to o.a.h.h.wal
897  /** Returns the size of log files in use */
898  public long getLogFileSize() {
899    return this.totalLogSize.get();
900  }
901
902  // public only until class moves to o.a.h.h.wal
903  public void requestLogRoll() {
904    requestLogRoll(ERROR);
905  }
906
907  /**
908   * Get the backing files associated with this WAL.
909   * @return may be null if there are no files.
910   */
911  FileStatus[] getFiles() throws IOException {
912    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
913  }
914
915  @Override
916  public void shutdown() throws IOException {
917    if (!shutdown.compareAndSet(false, true)) {
918      return;
919    }
920    closed = true;
921    // Tell our listeners that the log is closing
922    if (!this.listeners.isEmpty()) {
923      for (WALActionsListener i : this.listeners) {
924        i.logCloseRequested();
925      }
926    }
927    rollWriterLock.lock();
928    try {
929      doShutdown();
930      if (syncFutureCache != null) {
931        syncFutureCache.clear();
932      }
933      if (logArchiveExecutor != null) {
934        logArchiveExecutor.shutdownNow();
935      }
936    } finally {
937      rollWriterLock.unlock();
938    }
939  }
940
941  @Override
942  public void close() throws IOException {
943    shutdown();
944    final FileStatus[] files = getFiles();
945    if (null != files && 0 != files.length) {
946      for (FileStatus file : files) {
947        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
948        // Tell our listeners that a log is going to be archived.
949        if (!this.listeners.isEmpty()) {
950          for (WALActionsListener i : this.listeners) {
951            i.preLogArchive(file.getPath(), p);
952          }
953        }
954
955        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
956          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
957        }
958        // Tell our listeners that a log was archived.
959        if (!this.listeners.isEmpty()) {
960          for (WALActionsListener i : this.listeners) {
961            i.postLogArchive(file.getPath(), p);
962          }
963        }
964      }
965      LOG.debug(
966        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
967    }
968    LOG.info("Closed WAL: " + toString());
969  }
970
971  /** Returns number of WALs currently in the process of closing. */
972  public int getInflightWALCloseCount() {
973    return inflightWALClosures.size();
974  }
975
976  /**
977   * updates the sequence number of a specific store. depending on the flag: replaces current seq
978   * number if the given seq id is bigger, or even if it is lower than existing one
979   */
980  @Override
981  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
982    boolean onlyIfGreater) {
983    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
984  }
985
986  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
987    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
988  }
989
990  protected boolean isLogRollRequested() {
991    return rollRequested.get();
992  }
993
994  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
995    // If we have already requested a roll, don't do it again
996    // And only set rollRequested to true when there is a registered listener
997    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
998      for (WALActionsListener i : this.listeners) {
999        i.logRollRequested(reason);
1000      }
1001    }
1002  }
1003
1004  long getUnflushedEntriesCount() {
1005    long highestSynced = this.highestSyncedTxid.get();
1006    long highestUnsynced = this.highestUnsyncedTxid;
1007    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1008  }
1009
1010  boolean isUnflushedEntries() {
1011    return getUnflushedEntriesCount() > 0;
1012  }
1013
1014  /**
1015   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1016   */
1017  protected void atHeadOfRingBufferEventHandlerAppend() {
1018    // Noop
1019  }
1020
1021  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1022    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1023    atHeadOfRingBufferEventHandlerAppend();
1024    long start = EnvironmentEdgeManager.currentTime();
1025    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1026    long regionSequenceId = entry.getKey().getSequenceId();
1027
1028    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1029    // region sequence id only, a region edit/sequence id that is not associated with an actual
1030    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1031    if (entry.getEdit().isEmpty()) {
1032      return false;
1033    }
1034
1035    // Coprocessor hook.
1036    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1037    if (!listeners.isEmpty()) {
1038      for (WALActionsListener i : listeners) {
1039        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1040      }
1041    }
1042    doAppend(writer, entry);
1043    assert highestUnsyncedTxid < entry.getTxid();
1044    highestUnsyncedTxid = entry.getTxid();
1045    if (entry.isCloseRegion()) {
1046      // let's clean all the records of this region
1047      sequenceIdAccounting.onRegionClose(encodedRegionName);
1048    } else {
1049      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1050        entry.isInMemStore());
1051    }
1052    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1053    // Update metrics.
1054    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1055    numEntries.incrementAndGet();
1056    return true;
1057  }
1058
1059  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1060    long len = 0;
1061    if (!listeners.isEmpty()) {
1062      for (Cell cell : e.getEdit().getCells()) {
1063        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1064      }
1065      for (WALActionsListener listener : listeners) {
1066        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1067      }
1068    }
1069    return len;
1070  }
1071
1072  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
1073    if (timeInNanos > this.slowSyncNs) {
1074      String msg = new StringBuilder().append("Slow sync cost: ")
1075        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1076        .append(Arrays.toString(getPipeline())).toString();
1077      LOG.info(msg);
1078      // A single sync took too long.
1079      // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1080      // effects. Here we have a single data point that indicates we should take immediate
1081      // action, so do so.
1082      if (timeInNanos > this.rollOnSyncNs) {
1083        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1084          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1085          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1086          + Arrays.toString(getPipeline()));
1087        requestLogRoll(SLOW_SYNC);
1088      }
1089      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1090    }
1091    if (!listeners.isEmpty()) {
1092      for (WALActionsListener listener : listeners) {
1093        listener.postSync(timeInNanos, handlerSyncs);
1094      }
1095    }
1096  }
1097
1098  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1099    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1100    if (this.closed) {
1101      throw new IOException(
1102        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1103    }
1104    MutableLong txidHolder = new MutableLong();
1105    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1106      txidHolder.setValue(ringBuffer.next());
1107    });
1108    long txid = txidHolder.longValue();
1109    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
1110      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
1111    try {
1112      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1113      entry.stampRegionSequenceId(we);
1114      ringBuffer.get(txid).load(entry);
1115    } finally {
1116      ringBuffer.publish(txid);
1117    }
1118    return txid;
1119  }
1120
1121  @Override
1122  public String toString() {
1123    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1124  }
1125
1126  /**
1127   * if the given {@code path} is being written currently, then return its length.
1128   * <p>
1129   * This is used by replication to prevent replicating unacked log entries. See
1130   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1131   */
1132  @Override
1133  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1134    rollWriterLock.lock();
1135    try {
1136      Path currentPath = getOldPath();
1137      if (path.equals(currentPath)) {
1138        // Currently active path.
1139        W writer = this.writer;
1140        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1141      } else {
1142        W temp = inflightWALClosures.get(path.getName());
1143        if (temp != null) {
1144          // In the process of being closed, trailer bytes may or may not be flushed.
1145          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1146          // use cases like replication, see HBASE-25924/HBASE-25932.
1147          return OptionalLong.of(temp.getSyncedLength());
1148        }
1149        // Log rolled successfully.
1150        return OptionalLong.empty();
1151      }
1152    } finally {
1153      rollWriterLock.unlock();
1154    }
1155  }
1156
1157  @Override
1158  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1159    return TraceUtil.trace(() -> append(info, key, edits, true),
1160      () -> createSpan("WAL.appendData"));
1161  }
1162
1163  @Override
1164  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1165    return TraceUtil.trace(() -> append(info, key, edits, false),
1166      () -> createSpan("WAL.appendMarker"));
1167  }
1168
1169  /**
1170   * Append a set of edits to the WAL.
1171   * <p/>
1172   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1173   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1174   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1175   * <p/>
1176   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1177   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1178   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1179   * 'complete' the transaction this mvcc transaction by calling
1180   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1181   * in the finally of a try/finally block within which this append lives and any subsequent
1182   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1183   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1184   * immediately available on return from this method. It WILL be available subsequent to a sync of
1185   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1186   * @param info       the regioninfo associated with append
1187   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1188   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1189   *                   sequence id that is after all currently appended edits.
1190   * @param inMemstore Always true except for case where we are writing a region event meta marker
1191   *                   edit, for example, a compaction completion record into the WAL or noting a
1192   *                   Region Open event. In these cases the entry is just so we can finish an
1193   *                   unfinished compaction after a crash when the new Server reads the WAL on
1194   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1195   *                   When memstore is false, we presume a Marker event edit.
1196   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1197   *         in it.
1198   */
1199  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1200    throws IOException;
1201
1202  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1203
1204  protected abstract W createWriterInstance(Path path)
1205    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1206
1207  /**
1208   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1209   * will begin to work before returning from this method. If we clear the flag after returning from
1210   * this call, we may miss a roll request. The implementation class should choose a proper place to
1211   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1212   * start writing to the new writer.
1213   */
1214  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1215    throws IOException;
1216
1217  protected abstract void doShutdown() throws IOException;
1218
1219  protected abstract boolean doCheckLogLowReplication();
1220
1221  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
1222  protected boolean doCheckSlowSync() {
1223    boolean result = false;
1224    long now = EnvironmentEdgeManager.currentTime();
1225    long elapsedTime = now - lastTimeCheckSlowSync;
1226    if (elapsedTime >= slowSyncCheckInterval) {
1227      if (slowSyncCount.get() >= slowSyncRollThreshold) {
1228        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
1229          // If two or more slowSyncCheckInterval have elapsed this is a corner case
1230          // where a train of slow syncs almost triggered us but then there was a long
1231          // interval from then until the one more that pushed us over. If so, we
1232          // should do nothing and let the count reset.
1233          if (LOG.isDebugEnabled()) {
1234            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
1235              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
1236              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
1237          }
1238          // Fall through to count reset below
1239        } else {
1240          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
1241            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
1242            + Arrays.toString(getPipeline()));
1243          result = true;
1244        }
1245      }
1246      lastTimeCheckSlowSync = now;
1247      slowSyncCount.set(0);
1248    }
1249    return result;
1250  }
1251
1252  public void checkLogLowReplication(long checkInterval) {
1253    long now = EnvironmentEdgeManager.currentTime();
1254    if (now - lastTimeCheckLowReplication < checkInterval) {
1255      return;
1256    }
1257    // Will return immediately if we are in the middle of a WAL log roll currently.
1258    if (!rollWriterLock.tryLock()) {
1259      return;
1260    }
1261    try {
1262      lastTimeCheckLowReplication = now;
1263      if (doCheckLogLowReplication()) {
1264        requestLogRoll(LOW_REPLICATION);
1265      }
1266    } finally {
1267      rollWriterLock.unlock();
1268    }
1269  }
1270
1271  /**
1272   * This method gets the pipeline for the current WAL.
1273   */
1274  abstract DatanodeInfo[] getPipeline();
1275
1276  /**
1277   * This method gets the datanode replication count for the current WAL.
1278   */
1279  abstract int getLogReplication();
1280
1281  private static void split(final Configuration conf, final Path p) throws IOException {
1282    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
1283    if (!fs.exists(p)) {
1284      throw new FileNotFoundException(p.toString());
1285    }
1286    if (!fs.getFileStatus(p).isDirectory()) {
1287      throw new IOException(p + " is not a directory");
1288    }
1289
1290    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
1291    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1292    if (
1293      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1294        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
1295    ) {
1296      archiveDir = new Path(archiveDir, p.getName());
1297    }
1298    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1299  }
1300
1301  private static void usage() {
1302    System.err.println("Usage: AbstractFSWAL <ARGS>");
1303    System.err.println("Arguments:");
1304    System.err.println(" --dump  Dump textual representation of passed one or more files");
1305    System.err.println("         For example: "
1306      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1307    System.err.println(" --split Split the passed directory of WAL logs");
1308    System.err.println(
1309      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1310  }
1311
1312  /**
1313   * Pass one or more log file names and it will either dump out a text version on
1314   * <code>stdout</code> or split the specified log files.
1315   */
1316  public static void main(String[] args) throws IOException {
1317    if (args.length < 2) {
1318      usage();
1319      System.exit(-1);
1320    }
1321    // either dump using the WALPrettyPrinter or split, depending on args
1322    if (args[0].compareTo("--dump") == 0) {
1323      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1324    } else if (args[0].compareTo("--perf") == 0) {
1325      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1326      LOG.error(HBaseMarkers.FATAL,
1327        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1328      System.exit(-1);
1329    } else if (args[0].compareTo("--split") == 0) {
1330      Configuration conf = HBaseConfiguration.create();
1331      for (int i = 1; i < args.length; i++) {
1332        try {
1333          Path logPath = new Path(args[i]);
1334          CommonFSUtils.setFsDefault(conf, logPath);
1335          split(conf, logPath);
1336        } catch (IOException t) {
1337          t.printStackTrace(System.err);
1338          System.exit(-1);
1339        }
1340      }
1341    } else {
1342      usage();
1343      System.exit(-1);
1344    }
1345  }
1346}