001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
026
027import com.lmax.disruptor.RingBuffer;
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.lang.management.MemoryType;
032import java.net.URLEncoder;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Comparator;
036import java.util.List;
037import java.util.Map;
038import java.util.OptionalLong;
039import java.util.Set;
040import java.util.concurrent.ConcurrentNavigableMap;
041import java.util.concurrent.ConcurrentSkipListMap;
042import java.util.concurrent.CopyOnWriteArrayList;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050import java.util.concurrent.locks.ReentrantLock;
051import org.apache.commons.lang3.mutable.MutableLong;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FileStatus;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.fs.PathFilter;
057import org.apache.hadoop.hbase.Abortable;
058import org.apache.hadoop.hbase.Cell;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.client.RegionInfo;
063import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
064import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
065import org.apache.hadoop.hbase.ipc.RpcServer;
066import org.apache.hadoop.hbase.ipc.ServerCall;
067import org.apache.hadoop.hbase.log.HBaseMarkers;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
070import org.apache.hadoop.hbase.trace.TraceUtil;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.CommonFSUtils;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
076import org.apache.hadoop.hbase.wal.WAL;
077import org.apache.hadoop.hbase.wal.WALEdit;
078import org.apache.hadoop.hbase.wal.WALFactory;
079import org.apache.hadoop.hbase.wal.WALKeyImpl;
080import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
081import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
082import org.apache.hadoop.hbase.wal.WALSplitter;
083import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
084import org.apache.hadoop.util.StringUtils;
085import org.apache.htrace.core.TraceScope;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
091
092/**
093 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
094 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
095 * This is done internal to the implementation.
096 * <p>
097 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
098 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
099 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
100 * flushed out to hfiles, and what is yet in WAL and in memory only.
101 * <p>
102 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
103 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
104 * (smaller) than the most-recent flush.
105 * <p>
106 * To read an WAL, call
107 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
108 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
109 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
110 * we have made successful appends to the WAL and we then are unable to sync them, our current
111 * semantic is to return error to the client that the appends failed but also to abort the current
112 * context, usually the hosting server. We need to replay the WALs. <br>
113 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
114 * that the append failed. <br>
115 * TODO: replication may pick up these last edits though they have been marked as failed append
116 * (Need to keep our own file lengths, not rely on HDFS).
117 */
118@InterfaceAudience.Private
119public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
120
121  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
122
123  protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
124  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
125  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
126  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
127  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
128    "hbase.regionserver.wal.slowsync.roll.threshold";
129  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
130  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
131    "hbase.regionserver.wal.slowsync.roll.interval.ms";
132  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
133
134  protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
135  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
136
137  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
138
139  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
140
141  public static final String RING_BUFFER_SLOT_COUNT =
142    "hbase.regionserver.wal.disruptor.event.count";
143
144  /**
145   * file system instance
146   */
147  protected final FileSystem fs;
148
149  /**
150   * WAL directory, where all WAL files would be placed.
151   */
152  protected final Path walDir;
153
154  /**
155   * dir path where old logs are kept.
156   */
157  protected final Path walArchiveDir;
158
159  /**
160   * Matches just those wal files that belong to this wal instance.
161   */
162  protected final PathFilter ourFiles;
163
164  /**
165   * Prefix of a WAL file, usually the region server name it is hosted on.
166   */
167  protected final String walFilePrefix;
168
169  /**
170   * Suffix included on generated wal file names
171   */
172  protected final String walFileSuffix;
173
174  /**
175   * Prefix used when checking for wal membership.
176   */
177  protected final String prefixPathStr;
178
179  protected final WALCoprocessorHost coprocessorHost;
180
181  /**
182   * conf object
183   */
184  protected final Configuration conf;
185
186  protected final Abortable abortable;
187
188  /** Listeners that are called on WAL events. */
189  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
190
191  /**
192   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
193   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
194   * facility for answering questions such as "Is it safe to GC a WAL?".
195   */
196  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
197
198  protected final long slowSyncNs, rollOnSyncNs;
199  protected final int slowSyncRollThreshold;
200  protected final int slowSyncCheckInterval;
201  protected final AtomicInteger slowSyncCount = new AtomicInteger();
202
203  private final long walSyncTimeoutNs;
204
205  // If > than this size, roll the log.
206  protected final long logrollsize;
207
208  /**
209   * Block size to use writing files.
210   */
211  protected final long blocksize;
212
213  /*
214   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
215   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
216   */
217  protected final int maxLogs;
218
219  protected final boolean useHsync;
220
221  /**
222   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
223   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
224   * warning when it thinks synchronized controls writer thread safety. It is held when we are
225   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
226   * not.
227   */
228  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
229
230  // The timestamp (in ms) when the log file was created.
231  protected final AtomicLong filenum = new AtomicLong(-1);
232
233  // Number of transactions in the current Wal.
234  protected final AtomicInteger numEntries = new AtomicInteger(0);
235
236  /**
237   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
238   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
239   * corresponding entry in queue.
240   */
241  protected volatile long highestUnsyncedTxid = -1;
242
243  /**
244   * Updated to the transaction id of the last successful sync call. This can be less than
245   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
246   * for it.
247   */
248  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
249
250  /**
251   * The total size of wal
252   */
253  protected final AtomicLong totalLogSize = new AtomicLong(0);
254  /**
255   * Current log file.
256   */
257  volatile W writer;
258
259  // Last time to check low replication on hlog's pipeline
260  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
261
262  // Last time we asked to roll the log due to a slow sync
263  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
264
265  protected volatile boolean closed = false;
266
267  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
268  /**
269   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
270   * an IllegalArgumentException if used to compare paths from different wals.
271   */
272  final Comparator<Path> LOG_NAME_COMPARATOR =
273    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
274
275  private static final class WalProps {
276
277    /**
278     * Map the encoded region name to the highest sequence id.
279     * <p/>Contains all the regions it has an entry for.
280     */
281    public final Map<byte[], Long> encodedName2HighestSequenceId;
282
283    /**
284     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
285     * sub classes.
286     */
287    public final long logSize;
288
289    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
290      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
291      this.logSize = logSize;
292    }
293  }
294
295  /**
296   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
297   * (contained in the log file name).
298   */
299  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
300    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
301
302  /**
303   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
304   * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
305   * <p>
306   */
307  private final ThreadLocal<SyncFuture> cachedSyncFutures;
308
309  /**
310   * The class name of the runtime implementation, used as prefix for logging/tracing.
311   * <p>
312   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
313   * refer to HBASE-17676 for more details
314   * </p>
315   */
316  protected final String implClassName;
317
318  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
319
320  private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
321    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build());
322
323  private final int archiveRetries;
324
325  public long getFilenum() {
326    return this.filenum.get();
327  }
328
329  /**
330   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
331   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
332   * the filename is created with the {@link #computeFilename(long filenum)} method.
333   * @return timestamp, as in the log file name.
334   */
335  protected long getFileNumFromFileName(Path fileName) {
336    checkNotNull(fileName, "file name can't be null");
337    if (!ourFiles.accept(fileName)) {
338      throw new IllegalArgumentException(
339          "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
340    }
341    final String fileNameString = fileName.toString();
342    String chompedPath = fileNameString.substring(prefixPathStr.length(),
343      (fileNameString.length() - walFileSuffix.length()));
344    return Long.parseLong(chompedPath);
345  }
346
347  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
348    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
349    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
350  }
351
352  // must be power of 2
353  protected final int getPreallocatedEventCount() {
354    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
355    // be stuck and make no progress if the buffer is filled with appends only and there is no
356    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
357    // before they return.
358    int preallocatedEventCount =
359      this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
360    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
361    int floor = Integer.highestOneBit(preallocatedEventCount);
362    if (floor == preallocatedEventCount) {
363      return floor;
364    }
365    // max capacity is 1 << 30
366    if (floor >= 1 << 29) {
367      return 1 << 30;
368    }
369    return floor << 1;
370  }
371
372  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
373      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
374      final boolean failIfWALExists, final String prefix, final String suffix)
375      throws FailedLogCloseException, IOException {
376    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
377  }
378
379  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
380      final String logDir, final String archiveDir, final Configuration conf,
381      final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
382      final String suffix)
383      throws FailedLogCloseException, IOException {
384    this.fs = fs;
385    this.walDir = new Path(rootDir, logDir);
386    this.walArchiveDir = new Path(rootDir, archiveDir);
387    this.conf = conf;
388    this.abortable = abortable;
389
390    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
391      throw new IOException("Unable to mkdir " + walDir);
392    }
393
394    if (!fs.exists(this.walArchiveDir)) {
395      if (!fs.mkdirs(this.walArchiveDir)) {
396        throw new IOException("Unable to mkdir " + this.walArchiveDir);
397      }
398    }
399
400    // If prefix is null||empty then just name it wal
401    this.walFilePrefix =
402      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
403    // we only correctly differentiate suffices when numeric ones start with '.'
404    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
405      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
406        "' but instead was '" + suffix + "'");
407    }
408    // Now that it exists, set the storage policy for the entire directory of wal files related to
409    // this FSHLog instance
410    String storagePolicy =
411        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
412    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
413    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
414    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
415
416    this.ourFiles = new PathFilter() {
417      @Override
418      public boolean accept(final Path fileName) {
419        // The path should start with dir/<prefix> and end with our suffix
420        final String fileNameString = fileName.toString();
421        if (!fileNameString.startsWith(prefixPathStr)) {
422          return false;
423        }
424        if (walFileSuffix.isEmpty()) {
425          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
426          return org.apache.commons.lang3.StringUtils
427              .isNumeric(fileNameString.substring(prefixPathStr.length()));
428        } else if (!fileNameString.endsWith(walFileSuffix)) {
429          return false;
430        }
431        return true;
432      }
433    };
434
435    if (failIfWALExists) {
436      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
437      if (null != walFiles && 0 != walFiles.length) {
438        throw new IOException("Target WAL already exists within directory " + walDir);
439      }
440    }
441
442    // Register listeners. TODO: Should this exist anymore? We have CPs?
443    if (listeners != null) {
444      for (WALActionsListener i : listeners) {
445        registerWALActionsListener(i);
446      }
447    }
448    this.coprocessorHost = new WALCoprocessorHost(this, conf);
449
450    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
451    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
452    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
453    // the block size but experience from the field has it that this was not enough time for the
454    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
455    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
456    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
457    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
458    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
459    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
460    this.logrollsize = (long) (this.blocksize * multiplier);
461    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
462
463    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
464      StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
465      walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir +
466      ", maxLogs=" + this.maxLogs);
467    this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
468      conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
469    this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
470      DEFAULT_ROLL_ON_SYNC_TIME_MS));
471    this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
472      DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
473    this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
474      DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
475    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
476      conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
477    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
478      @Override
479      protected SyncFuture initialValue() {
480        return new SyncFuture();
481      }
482    };
483    this.implClassName = getClass().getSimpleName();
484    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
485    archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
486  }
487
488  /**
489   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
490   */
491  public void init() throws IOException {
492    rollWriter();
493  }
494
495  @Override
496  public void registerWALActionsListener(WALActionsListener listener) {
497    this.listeners.add(listener);
498  }
499
500  @Override
501  public boolean unregisterWALActionsListener(WALActionsListener listener) {
502    return this.listeners.remove(listener);
503  }
504
505  @Override
506  public WALCoprocessorHost getCoprocessorHost() {
507    return coprocessorHost;
508  }
509
510  @Override
511  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
512    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
513  }
514
515  @Override
516  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
517    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
518  }
519
520  @Override
521  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
522    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
523  }
524
525  @Override
526  public void abortCacheFlush(byte[] encodedRegionName) {
527    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
528  }
529
530  @Override
531  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
532    // Used by tests. Deprecated as too subtle for general usage.
533    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
534  }
535
536  @Override
537  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
538    // This method is used by tests and for figuring if we should flush or not because our
539    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
540    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
541    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
542    // currently flushing sequence ids, and if anything found there, it is returning these. This is
543    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
544    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
545    // id is old even though we are currently flushing. This may mean we do too much flushing.
546    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
547  }
548
549  @Override
550  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
551    return rollWriter(false);
552  }
553
554  /**
555   * This is a convenience method that computes a new filename with a given file-number.
556   * @param filenum to use
557   * @return Path
558   */
559  protected Path computeFilename(final long filenum) {
560    if (filenum < 0) {
561      throw new RuntimeException("WAL file number can't be < 0");
562    }
563    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
564    return new Path(walDir, child);
565  }
566
567  /**
568   * This is a convenience method that computes a new filename with a given using the current WAL
569   * file-number
570   * @return Path
571   */
572  public Path getCurrentFileName() {
573    return computeFilename(this.filenum.get());
574  }
575
576  /**
577   * retrieve the next path to use for writing. Increments the internal filenum.
578   */
579  private Path getNewPath() throws IOException {
580    this.filenum.set(System.currentTimeMillis());
581    Path newPath = getCurrentFileName();
582    while (fs.exists(newPath)) {
583      this.filenum.incrementAndGet();
584      newPath = getCurrentFileName();
585    }
586    return newPath;
587  }
588
589  Path getOldPath() {
590    long currentFilenum = this.filenum.get();
591    Path oldPath = null;
592    if (currentFilenum > 0) {
593      // ComputeFilename will take care of meta wal filename
594      oldPath = computeFilename(currentFilenum);
595    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
596    return oldPath;
597  }
598
599  /**
600   * Tell listeners about pre log roll.
601   */
602  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
603      throws IOException {
604    coprocessorHost.preWALRoll(oldPath, newPath);
605
606    if (!this.listeners.isEmpty()) {
607      for (WALActionsListener i : this.listeners) {
608        i.preLogRoll(oldPath, newPath);
609      }
610    }
611  }
612
613  /**
614   * Tell listeners about post log roll.
615   */
616  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
617      throws IOException {
618    if (!this.listeners.isEmpty()) {
619      for (WALActionsListener i : this.listeners) {
620        i.postLogRoll(oldPath, newPath);
621      }
622    }
623
624    coprocessorHost.postWALRoll(oldPath, newPath);
625  }
626
627  // public only until class moves to o.a.h.h.wal
628  /** @return the number of rolled log files */
629  public int getNumRolledLogFiles() {
630    return walFile2Props.size();
631  }
632
633  // public only until class moves to o.a.h.h.wal
634  /** @return the number of log files in use */
635  public int getNumLogFiles() {
636    // +1 for current use log
637    return getNumRolledLogFiles() + 1;
638  }
639
640  /**
641   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
642   * check the first (oldest) WAL, and return those regions which should be flushed so that
643   * it can be let-go/'archived'.
644   * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
645   */
646  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
647    Map<byte[], List<byte[]>> regions = null;
648    int logCount = getNumRolledLogFiles();
649    if (logCount > this.maxLogs && logCount > 0) {
650      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
651      regions =
652        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
653    }
654    if (regions != null) {
655      List<String> listForPrint = new ArrayList();
656      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
657        StringBuilder families = new StringBuilder();
658        for (int i = 0; i < r.getValue().size(); i++) {
659          if (i > 0) {
660            families.append(",");
661          }
662          families.append(Bytes.toString(r.getValue().get(i)));
663        }
664        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
665      }
666      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
667        "; forcing (partial) flush of " + regions.size() + " region(s): " +
668        StringUtils.join(",", listForPrint));
669    }
670    return regions;
671  }
672
673  /**
674   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
675   */
676  private void cleanOldLogs() throws IOException {
677    List<Pair<Path, Long>> logsToArchive = null;
678    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
679    // are older than what is currently in memory, the WAL can be GC'd.
680    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
681      Path log = e.getKey();
682      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
683      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
684        if (logsToArchive == null) {
685          logsToArchive = new ArrayList<>();
686        }
687        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
688        if (LOG.isTraceEnabled()) {
689          LOG.trace("WAL file ready for archiving " + log);
690        }
691      }
692    }
693
694    if (logsToArchive != null) {
695      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
696      // make it async
697      for (Pair<Path, Long> log : localLogsToArchive) {
698        logArchiveExecutor.execute(() -> {
699          archive(log);
700        });
701        this.walFile2Props.remove(log.getFirst());
702      }
703    }
704  }
705
706  protected void archive(final Pair<Path, Long> log) {
707    int retry = 1;
708    while (true) {
709      try {
710        archiveLogFile(log.getFirst());
711        totalLogSize.addAndGet(-log.getSecond());
712        // successful
713        break;
714      } catch (Throwable e) {
715        if (retry > archiveRetries) {
716          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
717          if (this.abortable != null) {
718            this.abortable.abort("Failed log archiving", e);
719            break;
720          }
721        } else {
722          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
723            e);
724        }
725        retry++;
726      }
727    }
728  }
729
730  /*
731   * only public so WALSplitter can use.
732   * @return archived location of a WAL file with the given path p
733   */
734  public static Path getWALArchivePath(Path archiveDir, Path p) {
735    return new Path(archiveDir, p.getName());
736  }
737
738  protected void archiveLogFile(final Path p) throws IOException {
739    Path newPath = getWALArchivePath(this.walArchiveDir, p);
740    // Tell our listeners that a log is going to be archived.
741    if (!this.listeners.isEmpty()) {
742      for (WALActionsListener i : this.listeners) {
743        i.preLogArchive(p, newPath);
744      }
745    }
746    LOG.info("Archiving " + p + " to " + newPath);
747    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
748      throw new IOException("Unable to rename " + p + " to " + newPath);
749    }
750    // Tell our listeners that a log has been archived.
751    if (!this.listeners.isEmpty()) {
752      for (WALActionsListener i : this.listeners) {
753        i.postLogArchive(p, newPath);
754      }
755    }
756  }
757
758  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
759    int oldNumEntries = this.numEntries.getAndSet(0);
760    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
761    if (oldPath != null) {
762      this.walFile2Props.put(oldPath,
763        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
764      this.totalLogSize.addAndGet(oldFileLen);
765      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
766        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
767        newPathString);
768    } else {
769      LOG.info("New WAL {}", newPathString);
770    }
771  }
772
773  /**
774   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
775   * <p/>
776   * <ul>
777   * <li>In the case of creating a new WAL, oldPath will be null.</li>
778   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
779   * </li>
780   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
781   * null.</li>
782   * </ul>
783   * @param oldPath may be null
784   * @param newPath may be null
785   * @param nextWriter may be null
786   * @return the passed in <code>newPath</code>
787   * @throws IOException if there is a problem flushing or closing the underlying FS
788   */
789  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
790    try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
791      doReplaceWriter(oldPath, newPath, nextWriter);
792      return newPath;
793    }
794  }
795
796  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
797    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
798    try {
799      if (syncFuture != null) {
800        if (closed) {
801          throw new IOException("WAL has been closed");
802        } else {
803          syncFuture.get(walSyncTimeoutNs);
804        }
805      }
806    } catch (TimeoutIOException tioe) {
807      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
808      // still refer to it, so if this thread use it next time may get a wrong
809      // result.
810      this.cachedSyncFutures.remove();
811      throw tioe;
812    } catch (InterruptedException ie) {
813      LOG.warn("Interrupted", ie);
814      throw convertInterruptedExceptionToIOException(ie);
815    } catch (ExecutionException e) {
816      throw ensureIOException(e.getCause());
817    }
818  }
819
820  private static IOException ensureIOException(final Throwable t) {
821    return (t instanceof IOException) ? (IOException) t : new IOException(t);
822  }
823
824  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
825    Thread.currentThread().interrupt();
826    IOException ioe = new InterruptedIOException();
827    ioe.initCause(ie);
828    return ioe;
829  }
830
831  @Override
832  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
833    rollWriterLock.lock();
834    try {
835      // Return if nothing to flush.
836      if (!force && this.writer != null && this.numEntries.get() <= 0) {
837        return null;
838      }
839      Map<byte[], List<byte[]>> regionsToFlush = null;
840      if (this.closed) {
841        LOG.debug("WAL closed. Skipping rolling of writer");
842        return regionsToFlush;
843      }
844      try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
845        Path oldPath = getOldPath();
846        Path newPath = getNewPath();
847        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
848        W nextWriter = this.createWriterInstance(newPath);
849        tellListenersAboutPreLogRoll(oldPath, newPath);
850        // NewPath could be equal to oldPath if replaceWriter fails.
851        newPath = replaceWriter(oldPath, newPath, nextWriter);
852        tellListenersAboutPostLogRoll(oldPath, newPath);
853        if (LOG.isDebugEnabled()) {
854          LOG.debug("Create new " + implClassName + " writer with pipeline: " +
855            Arrays.toString(getPipeline()));
856        }
857        // We got a new writer, so reset the slow sync count
858        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
859        slowSyncCount.set(0);
860        // Can we delete any of the old log files?
861        if (getNumRolledLogFiles() > 0) {
862          cleanOldLogs();
863          regionsToFlush = findRegionsToForceFlush();
864        }
865      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
866        // If the underlying FileSystem can't do what we ask, treat as IO failure so
867        // we'll abort.
868        throw new IOException(
869            "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
870            exception);
871      }
872      return regionsToFlush;
873    } finally {
874      rollWriterLock.unlock();
875    }
876  }
877
878  // public only until class moves to o.a.h.h.wal
879  /** @return the size of log files in use */
880  public long getLogFileSize() {
881    return this.totalLogSize.get();
882  }
883
884  // public only until class moves to o.a.h.h.wal
885  public void requestLogRoll() {
886    requestLogRoll(ERROR);
887  }
888
889  /**
890   * Get the backing files associated with this WAL.
891   * @return may be null if there are no files.
892   */
893  FileStatus[] getFiles() throws IOException {
894    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
895  }
896
897  @Override
898  public void shutdown() throws IOException {
899    if (!shutdown.compareAndSet(false, true)) {
900      return;
901    }
902    closed = true;
903    // Tell our listeners that the log is closing
904    if (!this.listeners.isEmpty()) {
905      for (WALActionsListener i : this.listeners) {
906        i.logCloseRequested();
907      }
908    }
909    rollWriterLock.lock();
910    try {
911      doShutdown();
912      if (logArchiveExecutor != null) {
913        logArchiveExecutor.shutdownNow();
914      }
915    } finally {
916      rollWriterLock.unlock();
917    }
918  }
919
920  @Override
921  public void close() throws IOException {
922    shutdown();
923    final FileStatus[] files = getFiles();
924    if (null != files && 0 != files.length) {
925      for (FileStatus file : files) {
926        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
927        // Tell our listeners that a log is going to be archived.
928        if (!this.listeners.isEmpty()) {
929          for (WALActionsListener i : this.listeners) {
930            i.preLogArchive(file.getPath(), p);
931          }
932        }
933
934        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
935          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
936        }
937        // Tell our listeners that a log was archived.
938        if (!this.listeners.isEmpty()) {
939          for (WALActionsListener i : this.listeners) {
940            i.postLogArchive(file.getPath(), p);
941          }
942        }
943      }
944      LOG.debug(
945        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
946    }
947    LOG.info("Closed WAL: " + toString());
948  }
949
950  /**
951   * updates the sequence number of a specific store. depending on the flag: replaces current seq
952   * number if the given seq id is bigger, or even if it is lower than existing one
953   */
954  @Override
955  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
956      boolean onlyIfGreater) {
957    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
958  }
959
960  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
961    return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
962  }
963
964  protected boolean isLogRollRequested() {
965    return rollRequested.get();
966  }
967
968  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
969    // If we have already requested a roll, don't do it again
970    // And only set rollRequested to true when there is a registered listener
971    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
972      for (WALActionsListener i : this.listeners) {
973        i.logRollRequested(reason);
974      }
975    }
976  }
977
978  long getUnflushedEntriesCount() {
979    long highestSynced = this.highestSyncedTxid.get();
980    long highestUnsynced = this.highestUnsyncedTxid;
981    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
982  }
983
984  boolean isUnflushedEntries() {
985    return getUnflushedEntriesCount() > 0;
986  }
987
988  /**
989   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
990   */
991  protected void atHeadOfRingBufferEventHandlerAppend() {
992    // Noop
993  }
994
995  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
996    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
997    atHeadOfRingBufferEventHandlerAppend();
998    long start = EnvironmentEdgeManager.currentTime();
999    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1000    long regionSequenceId = entry.getKey().getSequenceId();
1001
1002    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1003    // region sequence id only, a region edit/sequence id that is not associated with an actual
1004    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1005    if (entry.getEdit().isEmpty()) {
1006      return false;
1007    }
1008
1009    // Coprocessor hook.
1010    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1011    if (!listeners.isEmpty()) {
1012      for (WALActionsListener i : listeners) {
1013        i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
1014      }
1015    }
1016    doAppend(writer, entry);
1017    assert highestUnsyncedTxid < entry.getTxid();
1018    highestUnsyncedTxid = entry.getTxid();
1019    if (entry.isCloseRegion()) {
1020      // let's clean all the records of this region
1021      sequenceIdAccounting.onRegionClose(encodedRegionName);
1022    } else {
1023      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1024        entry.isInMemStore());
1025    }
1026    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1027    // Update metrics.
1028    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1029    numEntries.incrementAndGet();
1030    return true;
1031  }
1032
1033  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1034    long len = 0;
1035    if (!listeners.isEmpty()) {
1036      for (Cell cell : e.getEdit().getCells()) {
1037        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1038      }
1039      for (WALActionsListener listener : listeners) {
1040        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1041      }
1042    }
1043    return len;
1044  }
1045
1046  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
1047    if (timeInNanos > this.slowSyncNs) {
1048      String msg = new StringBuilder().append("Slow sync cost: ")
1049          .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
1050          .append(" ms, current pipeline: ")
1051          .append(Arrays.toString(getPipeline())).toString();
1052      TraceUtil.addTimelineAnnotation(msg);
1053      LOG.info(msg);
1054      // A single sync took too long.
1055      // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1056      // effects. Here we have a single data point that indicates we should take immediate
1057      // action, so do so.
1058      if (timeInNanos > this.rollOnSyncNs) {
1059        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" +
1060          TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
1061          TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " +
1062          Arrays.toString(getPipeline()));
1063        requestLogRoll(SLOW_SYNC);
1064      }
1065      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1066    }
1067    if (!listeners.isEmpty()) {
1068      for (WALActionsListener listener : listeners) {
1069        listener.postSync(timeInNanos, handlerSyncs);
1070      }
1071    }
1072  }
1073
1074  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1075    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
1076    throws IOException {
1077    if (this.closed) {
1078      throw new IOException(
1079        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1080    }
1081    MutableLong txidHolder = new MutableLong();
1082    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1083      txidHolder.setValue(ringBuffer.next());
1084    });
1085    long txid = txidHolder.longValue();
1086    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
1087      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
1088    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1089      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1090      entry.stampRegionSequenceId(we);
1091      ringBuffer.get(txid).load(entry);
1092    } finally {
1093      ringBuffer.publish(txid);
1094    }
1095    return txid;
1096  }
1097
1098  @Override
1099  public String toString() {
1100    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1101  }
1102
1103  /**
1104   * if the given {@code path} is being written currently, then return its length.
1105   * <p>
1106   * This is used by replication to prevent replicating unacked log entries. See
1107   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1108   */
1109  @Override
1110  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1111    rollWriterLock.lock();
1112    try {
1113      Path currentPath = getOldPath();
1114      if (path.equals(currentPath)) {
1115        W writer = this.writer;
1116        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1117      } else {
1118        return OptionalLong.empty();
1119      }
1120    } finally {
1121      rollWriterLock.unlock();
1122    }
1123  }
1124
1125  @Override
1126  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1127    return append(info, key, edits, true);
1128  }
1129
1130  @Override
1131  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
1132    throws IOException {
1133    return append(info, key, edits, false);
1134  }
1135
1136  /**
1137   * Append a set of edits to the WAL.
1138   * <p/>
1139   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1140   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1141   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1142   * <p/>
1143   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1144   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1145   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1146   * 'complete' the transaction this mvcc transaction by calling
1147   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1148   * in the finally of a try/finally block within which this append lives and any subsequent
1149   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1150   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1151   * immediately available on return from this method. It WILL be available subsequent to a sync of
1152   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1153   * @param info the regioninfo associated with append
1154   * @param key Modified by this call; we add to it this edits region edit/sequence id.
1155   * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1156   *          sequence id that is after all currently appended edits.
1157   * @param inMemstore Always true except for case where we are writing a region event meta
1158   *          marker edit, for example, a compaction completion record into the WAL or noting a
1159   *          Region Open event. In these cases the entry is just so we can finish an unfinished
1160   *          compaction after a crash when the new Server reads the WAL on recovery, etc. These
1161   *          transition event 'Markers' do not go via the memstore. When memstore is false,
1162   *          we presume a Marker event edit.
1163   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1164   *         in it.
1165   */
1166  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1167      throws IOException;
1168
1169  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1170
1171  protected abstract W createWriterInstance(Path path)
1172      throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1173
1174  /**
1175   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1176   * will begin to work before returning from this method. If we clear the flag after returning from
1177   * this call, we may miss a roll request. The implementation class should choose a proper place to
1178   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1179   * start writing to the new writer.
1180   */
1181  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1182      throws IOException;
1183
1184  protected abstract void doShutdown() throws IOException;
1185
1186  protected abstract boolean doCheckLogLowReplication();
1187
1188  /**
1189   * @return true if we exceeded the slow sync roll threshold over the last check
1190   *              interval
1191   */
1192  protected boolean doCheckSlowSync() {
1193    boolean result = false;
1194    long now = EnvironmentEdgeManager.currentTime();
1195    long elapsedTime = now - lastTimeCheckSlowSync;
1196    if (elapsedTime >= slowSyncCheckInterval) {
1197      if (slowSyncCount.get() >= slowSyncRollThreshold) {
1198        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
1199          // If two or more slowSyncCheckInterval have elapsed this is a corner case
1200          // where a train of slow syncs almost triggered us but then there was a long
1201          // interval from then until the one more that pushed us over. If so, we
1202          // should do nothing and let the count reset.
1203          if (LOG.isDebugEnabled()) {
1204            LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
1205                "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
1206                ", elapsedTime=" + elapsedTime +  " ms, slowSyncCheckInterval=" +
1207                slowSyncCheckInterval + " ms");
1208          }
1209          // Fall through to count reset below
1210        } else {
1211          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" +
1212            slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
1213            ", current pipeline: " + Arrays.toString(getPipeline()));
1214          result = true;
1215        }
1216      }
1217      lastTimeCheckSlowSync = now;
1218      slowSyncCount.set(0);
1219    }
1220    return result;
1221  }
1222
1223  public void checkLogLowReplication(long checkInterval) {
1224    long now = EnvironmentEdgeManager.currentTime();
1225    if (now - lastTimeCheckLowReplication < checkInterval) {
1226      return;
1227    }
1228    // Will return immediately if we are in the middle of a WAL log roll currently.
1229    if (!rollWriterLock.tryLock()) {
1230      return;
1231    }
1232    try {
1233      lastTimeCheckLowReplication = now;
1234      if (doCheckLogLowReplication()) {
1235        requestLogRoll(LOW_REPLICATION);
1236      }
1237    } finally {
1238      rollWriterLock.unlock();
1239    }
1240  }
1241
1242  /**
1243   * This method gets the pipeline for the current WAL.
1244   */
1245  abstract DatanodeInfo[] getPipeline();
1246
1247  /**
1248   * This method gets the datanode replication count for the current WAL.
1249   */
1250  abstract int getLogReplication();
1251
1252  private static void split(final Configuration conf, final Path p) throws IOException {
1253    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
1254    if (!fs.exists(p)) {
1255      throw new FileNotFoundException(p.toString());
1256    }
1257    if (!fs.getFileStatus(p).isDirectory()) {
1258      throw new IOException(p + " is not a directory");
1259    }
1260
1261    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
1262    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1263    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1264      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
1265      archiveDir = new Path(archiveDir, p.getName());
1266    }
1267    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1268  }
1269
1270  private static void usage() {
1271    System.err.println("Usage: AbstractFSWAL <ARGS>");
1272    System.err.println("Arguments:");
1273    System.err.println(" --dump  Dump textual representation of passed one or more files");
1274    System.err.println("         For example: " +
1275      "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1276    System.err.println(" --split Split the passed directory of WAL logs");
1277    System.err.println(
1278      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1279  }
1280
1281  /**
1282   * Pass one or more log file names and it will either dump out a text version on
1283   * <code>stdout</code> or split the specified log files.
1284   */
1285  public static void main(String[] args) throws IOException {
1286    if (args.length < 2) {
1287      usage();
1288      System.exit(-1);
1289    }
1290    // either dump using the WALPrettyPrinter or split, depending on args
1291    if (args[0].compareTo("--dump") == 0) {
1292      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1293    } else if (args[0].compareTo("--perf") == 0) {
1294      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1295      LOG.error(HBaseMarkers.FATAL,
1296        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1297      System.exit(-1);
1298    } else if (args[0].compareTo("--split") == 0) {
1299      Configuration conf = HBaseConfiguration.create();
1300      for (int i = 1; i < args.length; i++) {
1301        try {
1302          Path logPath = new Path(args[i]);
1303          CommonFSUtils.setFsDefault(conf, logPath);
1304          split(conf, logPath);
1305        } catch (IOException t) {
1306          t.printStackTrace(System.err);
1307          System.exit(-1);
1308        }
1309      }
1310    } else {
1311      usage();
1312      System.exit(-1);
1313    }
1314  }
1315}