001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
026
027import com.lmax.disruptor.RingBuffer;
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.lang.management.MemoryType;
032import java.net.URLEncoder;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Comparator;
036import java.util.List;
037import java.util.Map;
038import java.util.OptionalLong;
039import java.util.Set;
040import java.util.concurrent.ConcurrentNavigableMap;
041import java.util.concurrent.ConcurrentSkipListMap;
042import java.util.concurrent.CopyOnWriteArrayList;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.locks.ReentrantLock;
049import org.apache.commons.lang3.mutable.MutableLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.hbase.Cell;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.PrivateCellUtil;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
061import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
062import org.apache.hadoop.hbase.ipc.RpcServer;
063import org.apache.hadoop.hbase.ipc.ServerCall;
064import org.apache.hadoop.hbase.log.HBaseMarkers;
065import org.apache.hadoop.hbase.regionserver.HRegion;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.trace.TraceUtil;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
073import org.apache.hadoop.hbase.wal.WAL;
074import org.apache.hadoop.hbase.wal.WALEdit;
075import org.apache.hadoop.hbase.wal.WALFactory;
076import org.apache.hadoop.hbase.wal.WALKeyImpl;
077import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
078import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
079import org.apache.hadoop.hbase.wal.WALSplitter;
080import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
081import org.apache.hadoop.util.StringUtils;
082import org.apache.htrace.core.TraceScope;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
088
089/**
090 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
091 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
092 * This is done internal to the implementation.
093 * <p>
094 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
095 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
096 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
097 * flushed out to hfiles, and what is yet in WAL and in memory only.
098 * <p>
099 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
100 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
101 * (smaller) than the most-recent flush.
102 * <p>
103 * To read an WAL, call
104 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
105 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
106 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
107 * we have made successful appends to the WAL and we then are unable to sync them, our current
108 * semantic is to return error to the client that the appends failed but also to abort the current
109 * context, usually the hosting server. We need to replay the WALs. <br>
110 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
111 * that the append failed. <br>
112 * TODO: replication may pick up these last edits though they have been marked as failed append
113 * (Need to keep our own file lengths, not rely on HDFS).
114 */
115@InterfaceAudience.Private
116public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
117
118  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
119
120  protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
121  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
122  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
123  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
124  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
125    "hbase.regionserver.wal.slowsync.roll.threshold";
126  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
127  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
128    "hbase.regionserver.wal.slowsync.roll.interval.ms";
129  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
130
131  protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
132  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
133
134  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
135
136  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
137
138  public static final String RING_BUFFER_SLOT_COUNT =
139    "hbase.regionserver.wal.disruptor.event.count";
140
141  /**
142   * file system instance
143   */
144  protected final FileSystem fs;
145
146  /**
147   * WAL directory, where all WAL files would be placed.
148   */
149  protected final Path walDir;
150
151  /**
152   * dir path where old logs are kept.
153   */
154  protected final Path walArchiveDir;
155
156  /**
157   * Matches just those wal files that belong to this wal instance.
158   */
159  protected final PathFilter ourFiles;
160
161  /**
162   * Prefix of a WAL file, usually the region server name it is hosted on.
163   */
164  protected final String walFilePrefix;
165
166  /**
167   * Suffix included on generated wal file names
168   */
169  protected final String walFileSuffix;
170
171  /**
172   * Prefix used when checking for wal membership.
173   */
174  protected final String prefixPathStr;
175
176  protected final WALCoprocessorHost coprocessorHost;
177
178  /**
179   * conf object
180   */
181  protected final Configuration conf;
182
183  /** Listeners that are called on WAL events. */
184  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
185
186  /**
187   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
188   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
189   * facility for answering questions such as "Is it safe to GC a WAL?".
190   */
191  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
192
193  protected final long slowSyncNs, rollOnSyncNs;
194  protected final int slowSyncRollThreshold;
195  protected final int slowSyncCheckInterval;
196  protected final AtomicInteger slowSyncCount = new AtomicInteger();
197
198  private final long walSyncTimeoutNs;
199
200  // If > than this size, roll the log.
201  protected final long logrollsize;
202
203  /**
204   * Block size to use writing files.
205   */
206  protected final long blocksize;
207
208  /*
209   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
210   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
211   */
212  protected final int maxLogs;
213
214  protected final boolean useHsync;
215
216  /**
217   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
218   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
219   * warning when it thinks synchronized controls writer thread safety. It is held when we are
220   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
221   * not.
222   */
223  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
224
225  // The timestamp (in ms) when the log file was created.
226  protected final AtomicLong filenum = new AtomicLong(-1);
227
228  // Number of transactions in the current Wal.
229  protected final AtomicInteger numEntries = new AtomicInteger(0);
230
231  /**
232   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
233   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
234   * corresponding entry in queue.
235   */
236  protected volatile long highestUnsyncedTxid = -1;
237
238  /**
239   * Updated to the transaction id of the last successful sync call. This can be less than
240   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
241   * for it.
242   */
243  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
244
245  /**
246   * The total size of wal
247   */
248  protected final AtomicLong totalLogSize = new AtomicLong(0);
249  /**
250   * Current log file.
251   */
252  volatile W writer;
253
254  // Last time to check low replication on hlog's pipeline
255  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
256
257  // Last time we asked to roll the log due to a slow sync
258  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
259
260  protected volatile boolean closed = false;
261
262  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
263  /**
264   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
265   * an IllegalArgumentException if used to compare paths from different wals.
266   */
267  final Comparator<Path> LOG_NAME_COMPARATOR =
268    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
269
270  private static final class WalProps {
271
272    /**
273     * Map the encoded region name to the highest sequence id.
274     * <p/>Contains all the regions it has an entry for.
275     */
276    public final Map<byte[], Long> encodedName2HighestSequenceId;
277
278    /**
279     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
280     * sub classes.
281     */
282    public final long logSize;
283
284    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
285      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
286      this.logSize = logSize;
287    }
288  }
289
290  /**
291   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
292   * (contained in the log file name).
293   */
294  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
295    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
296
297  /**
298   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
299   * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
300   * <p>
301   */
302  private final ThreadLocal<SyncFuture> cachedSyncFutures;
303
304  /**
305   * The class name of the runtime implementation, used as prefix for logging/tracing.
306   * <p>
307   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
308   * refer to HBASE-17676 for more details
309   * </p>
310   */
311  protected final String implClassName;
312
313  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
314
315  public long getFilenum() {
316    return this.filenum.get();
317  }
318
319  /**
320   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
321   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
322   * the filename is created with the {@link #computeFilename(long filenum)} method.
323   * @return timestamp, as in the log file name.
324   */
325  protected long getFileNumFromFileName(Path fileName) {
326    checkNotNull(fileName, "file name can't be null");
327    if (!ourFiles.accept(fileName)) {
328      throw new IllegalArgumentException(
329          "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
330    }
331    final String fileNameString = fileName.toString();
332    String chompedPath = fileNameString.substring(prefixPathStr.length(),
333      (fileNameString.length() - walFileSuffix.length()));
334    return Long.parseLong(chompedPath);
335  }
336
337  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
338    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
339    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
340  }
341
342  // must be power of 2
343  protected final int getPreallocatedEventCount() {
344    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
345    // be stuck and make no progress if the buffer is filled with appends only and there is no
346    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
347    // before they return.
348    int preallocatedEventCount =
349      this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
350    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
351    int floor = Integer.highestOneBit(preallocatedEventCount);
352    if (floor == preallocatedEventCount) {
353      return floor;
354    }
355    // max capacity is 1 << 30
356    if (floor >= 1 << 29) {
357      return 1 << 30;
358    }
359    return floor << 1;
360  }
361
362  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
363      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
364      final boolean failIfWALExists, final String prefix, final String suffix)
365      throws FailedLogCloseException, IOException {
366    this.fs = fs;
367    this.walDir = new Path(rootDir, logDir);
368    this.walArchiveDir = new Path(rootDir, archiveDir);
369    this.conf = conf;
370
371    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
372      throw new IOException("Unable to mkdir " + walDir);
373    }
374
375    if (!fs.exists(this.walArchiveDir)) {
376      if (!fs.mkdirs(this.walArchiveDir)) {
377        throw new IOException("Unable to mkdir " + this.walArchiveDir);
378      }
379    }
380
381    // If prefix is null||empty then just name it wal
382    this.walFilePrefix =
383      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
384    // we only correctly differentiate suffices when numeric ones start with '.'
385    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
386      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
387        "' but instead was '" + suffix + "'");
388    }
389    // Now that it exists, set the storage policy for the entire directory of wal files related to
390    // this FSHLog instance
391    String storagePolicy =
392        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
393    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
394    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
395    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
396
397    this.ourFiles = new PathFilter() {
398      @Override
399      public boolean accept(final Path fileName) {
400        // The path should start with dir/<prefix> and end with our suffix
401        final String fileNameString = fileName.toString();
402        if (!fileNameString.startsWith(prefixPathStr)) {
403          return false;
404        }
405        if (walFileSuffix.isEmpty()) {
406          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
407          return org.apache.commons.lang3.StringUtils
408              .isNumeric(fileNameString.substring(prefixPathStr.length()));
409        } else if (!fileNameString.endsWith(walFileSuffix)) {
410          return false;
411        }
412        return true;
413      }
414    };
415
416    if (failIfWALExists) {
417      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
418      if (null != walFiles && 0 != walFiles.length) {
419        throw new IOException("Target WAL already exists within directory " + walDir);
420      }
421    }
422
423    // Register listeners. TODO: Should this exist anymore? We have CPs?
424    if (listeners != null) {
425      for (WALActionsListener i : listeners) {
426        registerWALActionsListener(i);
427      }
428    }
429    this.coprocessorHost = new WALCoprocessorHost(this, conf);
430
431    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
432    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
433    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
434    // the block size but experience from the field has it that this was not enough time for the
435    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
436    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
437    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
438    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
439    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
440    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
441    this.logrollsize = (long) (this.blocksize * multiplier);
442    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
443
444    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
445      StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
446      walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir +
447      ", maxLogs=" + this.maxLogs);
448    this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
449      conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
450    this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
451      DEFAULT_ROLL_ON_SYNC_TIME_MS));
452    this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
453      DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
454    this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
455      DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
456    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
457      conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
458    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
459      @Override
460      protected SyncFuture initialValue() {
461        return new SyncFuture();
462      }
463    };
464    this.implClassName = getClass().getSimpleName();
465    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
466  }
467
468  /**
469   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
470   */
471  public void init() throws IOException {
472    rollWriter();
473  }
474
475  @Override
476  public void registerWALActionsListener(WALActionsListener listener) {
477    this.listeners.add(listener);
478  }
479
480  @Override
481  public boolean unregisterWALActionsListener(WALActionsListener listener) {
482    return this.listeners.remove(listener);
483  }
484
485  @Override
486  public WALCoprocessorHost getCoprocessorHost() {
487    return coprocessorHost;
488  }
489
490  @Override
491  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
492    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
493  }
494
495  @Override
496  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
497    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
498  }
499
500  @Override
501  public void completeCacheFlush(byte[] encodedRegionName) {
502    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
503  }
504
505  @Override
506  public void abortCacheFlush(byte[] encodedRegionName) {
507    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
508  }
509
510  @Override
511  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
512    // Used by tests. Deprecated as too subtle for general usage.
513    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
514  }
515
516  @Override
517  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
518    // This method is used by tests and for figuring if we should flush or not because our
519    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
520    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
521    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
522    // currently flushing sequence ids, and if anything found there, it is returning these. This is
523    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
524    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
525    // id is old even though we are currently flushing. This may mean we do too much flushing.
526    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
527  }
528
529  @Override
530  public byte[][] rollWriter() throws FailedLogCloseException, IOException {
531    return rollWriter(false);
532  }
533
534  /**
535   * This is a convenience method that computes a new filename with a given file-number.
536   * @param filenum to use
537   * @return Path
538   */
539  protected Path computeFilename(final long filenum) {
540    if (filenum < 0) {
541      throw new RuntimeException("WAL file number can't be < 0");
542    }
543    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
544    return new Path(walDir, child);
545  }
546
547  /**
548   * This is a convenience method that computes a new filename with a given using the current WAL
549   * file-number
550   * @return Path
551   */
552  public Path getCurrentFileName() {
553    return computeFilename(this.filenum.get());
554  }
555
556  /**
557   * retrieve the next path to use for writing. Increments the internal filenum.
558   */
559  private Path getNewPath() throws IOException {
560    this.filenum.set(System.currentTimeMillis());
561    Path newPath = getCurrentFileName();
562    while (fs.exists(newPath)) {
563      this.filenum.incrementAndGet();
564      newPath = getCurrentFileName();
565    }
566    return newPath;
567  }
568
569  @VisibleForTesting
570  Path getOldPath() {
571    long currentFilenum = this.filenum.get();
572    Path oldPath = null;
573    if (currentFilenum > 0) {
574      // ComputeFilename will take care of meta wal filename
575      oldPath = computeFilename(currentFilenum);
576    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
577    return oldPath;
578  }
579
580  /**
581   * Tell listeners about pre log roll.
582   */
583  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
584      throws IOException {
585    coprocessorHost.preWALRoll(oldPath, newPath);
586
587    if (!this.listeners.isEmpty()) {
588      for (WALActionsListener i : this.listeners) {
589        i.preLogRoll(oldPath, newPath);
590      }
591    }
592  }
593
594  /**
595   * Tell listeners about post log roll.
596   */
597  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
598      throws IOException {
599    if (!this.listeners.isEmpty()) {
600      for (WALActionsListener i : this.listeners) {
601        i.postLogRoll(oldPath, newPath);
602      }
603    }
604
605    coprocessorHost.postWALRoll(oldPath, newPath);
606  }
607
608  // public only until class moves to o.a.h.h.wal
609  /** @return the number of rolled log files */
610  public int getNumRolledLogFiles() {
611    return walFile2Props.size();
612  }
613
614  // public only until class moves to o.a.h.h.wal
615  /** @return the number of log files in use */
616  public int getNumLogFiles() {
617    // +1 for current use log
618    return getNumRolledLogFiles() + 1;
619  }
620
621  /**
622   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
623   * check the first (oldest) WAL, and return those regions which should be flushed so that
624   * it can be let-go/'archived'.
625   * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
626   */
627  byte[][] findRegionsToForceFlush() throws IOException {
628    byte[][] regions = null;
629    int logCount = getNumRolledLogFiles();
630    if (logCount > this.maxLogs && logCount > 0) {
631      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
632      regions =
633        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
634    }
635    if (regions != null) {
636      StringBuilder sb = new StringBuilder();
637      for (int i = 0; i < regions.length; i++) {
638        if (i > 0) {
639          sb.append(", ");
640        }
641        sb.append(Bytes.toStringBinary(regions[i]));
642      }
643      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
644        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
645    }
646    return regions;
647  }
648
649  /**
650   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
651   */
652  private void cleanOldLogs() throws IOException {
653    List<Pair<Path, Long>> logsToArchive = null;
654    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
655    // are older than what is currently in memory, the WAL can be GC'd.
656    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
657      Path log = e.getKey();
658      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
659      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
660        if (logsToArchive == null) {
661          logsToArchive = new ArrayList<>();
662        }
663        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
664        if (LOG.isTraceEnabled()) {
665          LOG.trace("WAL file ready for archiving " + log);
666        }
667      }
668    }
669    if (logsToArchive != null) {
670      for (Pair<Path, Long> logAndSize : logsToArchive) {
671        this.totalLogSize.addAndGet(-logAndSize.getSecond());
672        archiveLogFile(logAndSize.getFirst());
673        this.walFile2Props.remove(logAndSize.getFirst());
674      }
675    }
676  }
677
678  /*
679   * only public so WALSplitter can use.
680   * @return archived location of a WAL file with the given path p
681   */
682  public static Path getWALArchivePath(Path archiveDir, Path p) {
683    return new Path(archiveDir, p.getName());
684  }
685
686  private void archiveLogFile(final Path p) throws IOException {
687    Path newPath = getWALArchivePath(this.walArchiveDir, p);
688    // Tell our listeners that a log is going to be archived.
689    if (!this.listeners.isEmpty()) {
690      for (WALActionsListener i : this.listeners) {
691        i.preLogArchive(p, newPath);
692      }
693    }
694    LOG.info("Archiving " + p + " to " + newPath);
695    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
696      throw new IOException("Unable to rename " + p + " to " + newPath);
697    }
698    // Tell our listeners that a log has been archived.
699    if (!this.listeners.isEmpty()) {
700      for (WALActionsListener i : this.listeners) {
701        i.postLogArchive(p, newPath);
702      }
703    }
704  }
705
706  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
707    int oldNumEntries = this.numEntries.getAndSet(0);
708    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
709    if (oldPath != null) {
710      this.walFile2Props.put(oldPath,
711        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
712      this.totalLogSize.addAndGet(oldFileLen);
713      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
714        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
715        newPathString);
716    } else {
717      LOG.info("New WAL {}", newPathString);
718    }
719  }
720
721  /**
722   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
723   * <p/>
724   * <ul>
725   * <li>In the case of creating a new WAL, oldPath will be null.</li>
726   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
727   * </li>
728   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
729   * null.</li>
730   * </ul>
731   * @param oldPath may be null
732   * @param newPath may be null
733   * @param nextWriter may be null
734   * @return the passed in <code>newPath</code>
735   * @throws IOException if there is a problem flushing or closing the underlying FS
736   */
737  @VisibleForTesting
738  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
739    try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
740      doReplaceWriter(oldPath, newPath, nextWriter);
741      return newPath;
742    }
743  }
744
745  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
746    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
747    try {
748      if (syncFuture != null) {
749        if (closed) {
750          throw new IOException("WAL has been closed");
751        } else {
752          syncFuture.get(walSyncTimeoutNs);
753        }
754      }
755    } catch (TimeoutIOException tioe) {
756      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
757      // still refer to it, so if this thread use it next time may get a wrong
758      // result.
759      this.cachedSyncFutures.remove();
760      throw tioe;
761    } catch (InterruptedException ie) {
762      LOG.warn("Interrupted", ie);
763      throw convertInterruptedExceptionToIOException(ie);
764    } catch (ExecutionException e) {
765      throw ensureIOException(e.getCause());
766    }
767  }
768
769  private static IOException ensureIOException(final Throwable t) {
770    return (t instanceof IOException) ? (IOException) t : new IOException(t);
771  }
772
773  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
774    Thread.currentThread().interrupt();
775    IOException ioe = new InterruptedIOException();
776    ioe.initCause(ie);
777    return ioe;
778  }
779
780  @Override
781  public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
782    rollWriterLock.lock();
783    try {
784      // Return if nothing to flush.
785      if (!force && this.writer != null && this.numEntries.get() <= 0) {
786        return null;
787      }
788      byte[][] regionsToFlush = null;
789      if (this.closed) {
790        LOG.debug("WAL closed. Skipping rolling of writer");
791        return regionsToFlush;
792      }
793      try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
794        Path oldPath = getOldPath();
795        Path newPath = getNewPath();
796        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
797        W nextWriter = this.createWriterInstance(newPath);
798        tellListenersAboutPreLogRoll(oldPath, newPath);
799        // NewPath could be equal to oldPath if replaceWriter fails.
800        newPath = replaceWriter(oldPath, newPath, nextWriter);
801        tellListenersAboutPostLogRoll(oldPath, newPath);
802        if (LOG.isDebugEnabled()) {
803          LOG.debug("Create new " + implClassName + " writer with pipeline: " +
804            Arrays.toString(getPipeline()));
805        }
806        // We got a new writer, so reset the slow sync count
807        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
808        slowSyncCount.set(0);
809        // Can we delete any of the old log files?
810        if (getNumRolledLogFiles() > 0) {
811          cleanOldLogs();
812          regionsToFlush = findRegionsToForceFlush();
813        }
814      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
815        // If the underlying FileSystem can't do what we ask, treat as IO failure so
816        // we'll abort.
817        throw new IOException(
818            "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
819            exception);
820      }
821      return regionsToFlush;
822    } finally {
823      rollWriterLock.unlock();
824    }
825  }
826
827  // public only until class moves to o.a.h.h.wal
828  /** @return the size of log files in use */
829  public long getLogFileSize() {
830    return this.totalLogSize.get();
831  }
832
833  // public only until class moves to o.a.h.h.wal
834  public void requestLogRoll() {
835    requestLogRoll(ERROR);
836  }
837
838  /**
839   * Get the backing files associated with this WAL.
840   * @return may be null if there are no files.
841   */
842  @VisibleForTesting
843  FileStatus[] getFiles() throws IOException {
844    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
845  }
846
847  @Override
848  public void shutdown() throws IOException {
849    if (!shutdown.compareAndSet(false, true)) {
850      return;
851    }
852    closed = true;
853    // Tell our listeners that the log is closing
854    if (!this.listeners.isEmpty()) {
855      for (WALActionsListener i : this.listeners) {
856        i.logCloseRequested();
857      }
858    }
859    rollWriterLock.lock();
860    try {
861      doShutdown();
862    } finally {
863      rollWriterLock.unlock();
864    }
865  }
866
867  @Override
868  public void close() throws IOException {
869    shutdown();
870    final FileStatus[] files = getFiles();
871    if (null != files && 0 != files.length) {
872      for (FileStatus file : files) {
873        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
874        // Tell our listeners that a log is going to be archived.
875        if (!this.listeners.isEmpty()) {
876          for (WALActionsListener i : this.listeners) {
877            i.preLogArchive(file.getPath(), p);
878          }
879        }
880
881        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
882          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
883        }
884        // Tell our listeners that a log was archived.
885        if (!this.listeners.isEmpty()) {
886          for (WALActionsListener i : this.listeners) {
887            i.postLogArchive(file.getPath(), p);
888          }
889        }
890      }
891      LOG.debug(
892        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
893    }
894    LOG.info("Closed WAL: " + toString());
895  }
896
897  /**
898   * updates the sequence number of a specific store. depending on the flag: replaces current seq
899   * number if the given seq id is bigger, or even if it is lower than existing one
900   */
901  @Override
902  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
903      boolean onlyIfGreater) {
904    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
905  }
906
907  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
908    return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
909  }
910
911  protected boolean isLogRollRequested() {
912    return rollRequested.get();
913  }
914
915  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
916    // If we have already requested a roll, don't do it again
917    // And only set rollRequested to true when there is a registered listener
918    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
919      for (WALActionsListener i : this.listeners) {
920        i.logRollRequested(reason);
921      }
922    }
923  }
924
925  long getUnflushedEntriesCount() {
926    long highestSynced = this.highestSyncedTxid.get();
927    long highestUnsynced = this.highestUnsyncedTxid;
928    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
929  }
930
931  boolean isUnflushedEntries() {
932    return getUnflushedEntriesCount() > 0;
933  }
934
935  /**
936   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
937   */
938  @VisibleForTesting
939  protected void atHeadOfRingBufferEventHandlerAppend() {
940    // Noop
941  }
942
943  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
944    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
945    atHeadOfRingBufferEventHandlerAppend();
946    long start = EnvironmentEdgeManager.currentTime();
947    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
948    long regionSequenceId = entry.getKey().getSequenceId();
949
950    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
951    // region sequence id only, a region edit/sequence id that is not associated with an actual
952    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
953    if (entry.getEdit().isEmpty()) {
954      return false;
955    }
956
957    // Coprocessor hook.
958    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
959    if (!listeners.isEmpty()) {
960      for (WALActionsListener i : listeners) {
961        i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
962      }
963    }
964    doAppend(writer, entry);
965    assert highestUnsyncedTxid < entry.getTxid();
966    highestUnsyncedTxid = entry.getTxid();
967    if (entry.isCloseRegion()) {
968      // let's clean all the records of this region
969      sequenceIdAccounting.onRegionClose(encodedRegionName);
970    } else {
971      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
972        entry.isInMemStore());
973    }
974    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
975    // Update metrics.
976    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
977    numEntries.incrementAndGet();
978    return true;
979  }
980
981  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
982    long len = 0;
983    if (!listeners.isEmpty()) {
984      for (Cell cell : e.getEdit().getCells()) {
985        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
986      }
987      for (WALActionsListener listener : listeners) {
988        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
989      }
990    }
991    return len;
992  }
993
994  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
995    if (timeInNanos > this.slowSyncNs) {
996      String msg = new StringBuilder().append("Slow sync cost: ")
997          .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
998          .append(" ms, current pipeline: ")
999          .append(Arrays.toString(getPipeline())).toString();
1000      TraceUtil.addTimelineAnnotation(msg);
1001      LOG.info(msg);
1002      // A single sync took too long.
1003      // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1004      // effects. Here we have a single data point that indicates we should take immediate
1005      // action, so do so.
1006      if (timeInNanos > this.rollOnSyncNs) {
1007        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" +
1008          TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
1009          TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " +
1010          Arrays.toString(getPipeline()));
1011        requestLogRoll(SLOW_SYNC);
1012      }
1013      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1014    }
1015    if (!listeners.isEmpty()) {
1016      for (WALActionsListener listener : listeners) {
1017        listener.postSync(timeInNanos, handlerSyncs);
1018      }
1019    }
1020  }
1021
1022  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1023    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
1024    throws IOException {
1025    if (this.closed) {
1026      throw new IOException(
1027        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1028    }
1029    MutableLong txidHolder = new MutableLong();
1030    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1031      txidHolder.setValue(ringBuffer.next());
1032    });
1033    long txid = txidHolder.longValue();
1034    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
1035      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
1036    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1037      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1038      entry.stampRegionSequenceId(we);
1039      ringBuffer.get(txid).load(entry);
1040    } finally {
1041      ringBuffer.publish(txid);
1042    }
1043    return txid;
1044  }
1045
1046  @Override
1047  public String toString() {
1048    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1049  }
1050
1051  /**
1052   * if the given {@code path} is being written currently, then return its length.
1053   * <p>
1054   * This is used by replication to prevent replicating unacked log entries. See
1055   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1056   */
1057  @Override
1058  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1059    rollWriterLock.lock();
1060    try {
1061      Path currentPath = getOldPath();
1062      if (path.equals(currentPath)) {
1063        W writer = this.writer;
1064        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
1065      } else {
1066        return OptionalLong.empty();
1067      }
1068    } finally {
1069      rollWriterLock.unlock();
1070    }
1071  }
1072
1073  @Override
1074  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1075    return append(info, key, edits, true);
1076  }
1077
1078  @Override
1079  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
1080    throws IOException {
1081    return append(info, key, edits, false);
1082  }
1083
1084  /**
1085   * Append a set of edits to the WAL.
1086   * <p/>
1087   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1088   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1089   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1090   * <p/>
1091   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1092   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1093   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1094   * 'complete' the transaction this mvcc transaction by calling
1095   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1096   * in the finally of a try/finally block within which this append lives and any subsequent
1097   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1098   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1099   * immediately available on return from this method. It WILL be available subsequent to a sync of
1100   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1101   * @param info the regioninfo associated with append
1102   * @param key Modified by this call; we add to it this edits region edit/sequence id.
1103   * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1104   *          sequence id that is after all currently appended edits.
1105   * @param inMemstore Always true except for case where we are writing a region event meta
1106   *          marker edit, for example, a compaction completion record into the WAL or noting a
1107   *          Region Open event. In these cases the entry is just so we can finish an unfinished
1108   *          compaction after a crash when the new Server reads the WAL on recovery, etc. These
1109   *          transition event 'Markers' do not go via the memstore. When memstore is false,
1110   *          we presume a Marker event edit.
1111   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1112   *         in it.
1113   */
1114  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1115      throws IOException;
1116
1117  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1118
1119  protected abstract W createWriterInstance(Path path)
1120      throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1121
1122  /**
1123   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1124   * will begin to work before returning from this method. If we clear the flag after returning from
1125   * this call, we may miss a roll request. The implementation class should choose a proper place to
1126   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1127   * start writing to the new writer.
1128   */
1129  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1130      throws IOException;
1131
1132  protected abstract void doShutdown() throws IOException;
1133
1134  protected abstract boolean doCheckLogLowReplication();
1135
1136  /**
1137   * @return true if we exceeded the slow sync roll threshold over the last check
1138   *              interval
1139   */
1140  protected boolean doCheckSlowSync() {
1141    boolean result = false;
1142    long now = EnvironmentEdgeManager.currentTime();
1143    long elapsedTime = now - lastTimeCheckSlowSync;
1144    if (elapsedTime >= slowSyncCheckInterval) {
1145      if (slowSyncCount.get() >= slowSyncRollThreshold) {
1146        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
1147          // If two or more slowSyncCheckInterval have elapsed this is a corner case
1148          // where a train of slow syncs almost triggered us but then there was a long
1149          // interval from then until the one more that pushed us over. If so, we
1150          // should do nothing and let the count reset.
1151          if (LOG.isDebugEnabled()) {
1152            LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
1153                "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
1154                ", elapsedTime=" + elapsedTime +  " ms, slowSyncCheckInterval=" +
1155                slowSyncCheckInterval + " ms");
1156          }
1157          // Fall through to count reset below
1158        } else {
1159          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" +
1160            slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
1161            ", current pipeline: " + Arrays.toString(getPipeline()));
1162          result = true;
1163        }
1164      }
1165      lastTimeCheckSlowSync = now;
1166      slowSyncCount.set(0);
1167    }
1168    return result;
1169  }
1170
1171  public void checkLogLowReplication(long checkInterval) {
1172    long now = EnvironmentEdgeManager.currentTime();
1173    if (now - lastTimeCheckLowReplication < checkInterval) {
1174      return;
1175    }
1176    // Will return immediately if we are in the middle of a WAL log roll currently.
1177    if (!rollWriterLock.tryLock()) {
1178      return;
1179    }
1180    try {
1181      lastTimeCheckLowReplication = now;
1182      if (doCheckLogLowReplication()) {
1183        requestLogRoll(LOW_REPLICATION);
1184      }
1185    } finally {
1186      rollWriterLock.unlock();
1187    }
1188  }
1189
1190  /**
1191   * This method gets the pipeline for the current WAL.
1192   */
1193  @VisibleForTesting
1194  abstract DatanodeInfo[] getPipeline();
1195
1196  /**
1197   * This method gets the datanode replication count for the current WAL.
1198   */
1199  @VisibleForTesting
1200  abstract int getLogReplication();
1201
1202  private static void split(final Configuration conf, final Path p) throws IOException {
1203    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
1204    if (!fs.exists(p)) {
1205      throw new FileNotFoundException(p.toString());
1206    }
1207    if (!fs.getFileStatus(p).isDirectory()) {
1208      throw new IOException(p + " is not a directory");
1209    }
1210
1211    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
1212    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1213    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1214      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
1215      archiveDir = new Path(archiveDir, p.getName());
1216    }
1217    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1218  }
1219
1220  private static void usage() {
1221    System.err.println("Usage: AbstractFSWAL <ARGS>");
1222    System.err.println("Arguments:");
1223    System.err.println(" --dump  Dump textual representation of passed one or more files");
1224    System.err.println("         For example: " +
1225      "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1226    System.err.println(" --split Split the passed directory of WAL logs");
1227    System.err.println(
1228      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1229  }
1230
1231  /**
1232   * Pass one or more log file names and it will either dump out a text version on
1233   * <code>stdout</code> or split the specified log files.
1234   */
1235  public static void main(String[] args) throws IOException {
1236    if (args.length < 2) {
1237      usage();
1238      System.exit(-1);
1239    }
1240    // either dump using the WALPrettyPrinter or split, depending on args
1241    if (args[0].compareTo("--dump") == 0) {
1242      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1243    } else if (args[0].compareTo("--perf") == 0) {
1244      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1245      LOG.error(HBaseMarkers.FATAL,
1246        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1247      System.exit(-1);
1248    } else if (args[0].compareTo("--split") == 0) {
1249      Configuration conf = HBaseConfiguration.create();
1250      for (int i = 1; i < args.length; i++) {
1251        try {
1252          Path logPath = new Path(args[i]);
1253          CommonFSUtils.setFsDefault(conf, logPath);
1254          split(conf, logPath);
1255        } catch (IOException t) {
1256          t.printStackTrace(System.err);
1257          System.exit(-1);
1258        }
1259      }
1260    } else {
1261      usage();
1262      System.exit(-1);
1263    }
1264  }
1265}