001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.LinkedTransferQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileAlreadyExistsException;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.PathFilter;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.log.HBaseMarkers;
046import org.apache.hadoop.hbase.procedure2.Procedure;
047import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
049import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
050import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
051import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
052import org.apache.hadoop.hbase.procedure2.util.StringUtils;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.Threads;
055import org.apache.hadoop.ipc.RemoteException;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
061import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
064
065/**
066 * WAL implementation of the ProcedureStore.
067 * <p/>
068 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
069 * then {@link #load(ProcedureLoader)}.
070 * <p/>
071 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
072 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
073 * the old wal files.
074 * <p/>
075 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
076 * the races if there are two master both wants to acquire the lease...
077 * <p/>
078 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
079 * comments of this method for more details.
080 * <p/>
081 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
082 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
083 * methods we will put thing into the {@link #slots} and wait. And there is a background sync
084 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
085 * to the FileSystem, and notify the caller that we have finished.
086 * <p/>
087 * TODO: try using disruptor to increase performance and simplify the logic?
088 * <p/>
089 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
090 * also the one being written currently. And the deleted bits in it are for all the procedures, not
091 * only the ones in the newest wal file. And when rolling a log, we will first store it in the
092 * trailer of the current wal file, and then reset its modified bits, so that it can start to track
093 * the modified procedures for the new wal file.
094 * <p/>
095 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
096 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
097 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
098 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
099 * with the tracker of every newer wal files, using the
100 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}.
101 * If we find out
102 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
103 * files, then we can delete it. This is because that, every time we call
104 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
105 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be
106 * deleted.
107 * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
108 * @see #main(String[]) to parse a directory of MasterWALProcs.
109 */
110@InterfaceAudience.Private
111public class WALProcedureStore extends ProcedureStoreBase {
112  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
113  public static final String LOG_PREFIX = "pv2-";
114  /** Used to construct the name of the log directory for master procedures */
115  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
116
117
118  public interface LeaseRecovery {
119    void recoverFileLease(FileSystem fs, Path path) throws IOException;
120  }
121
122  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
123    "hbase.procedure.store.wal.warn.threshold";
124  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
125
126  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
127    "hbase.procedure.store.wal.exec.cleanup.on.load";
128  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
129
130  public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
131    "hbase.procedure.store.wal.max.retries.before.roll";
132  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
133
134  public static final String WAIT_BEFORE_ROLL_CONF_KEY =
135    "hbase.procedure.store.wal.wait.before.roll";
136  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
137
138  public static final String ROLL_RETRIES_CONF_KEY =
139    "hbase.procedure.store.wal.max.roll.retries";
140  private static final int DEFAULT_ROLL_RETRIES = 3;
141
142  public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
143    "hbase.procedure.store.wal.sync.failure.roll.max";
144  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
145
146  public static final String PERIODIC_ROLL_CONF_KEY =
147    "hbase.procedure.store.wal.periodic.roll.msec";
148  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
149
150  public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
151  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
152
153  public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
154  private static final boolean DEFAULT_USE_HSYNC = true;
155
156  public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
157  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
158
159  public static final String STORE_WAL_SYNC_STATS_COUNT =
160      "hbase.procedure.store.wal.sync.stats.count";
161  private static final int DEFAULT_SYNC_STATS_COUNT = 10;
162
163  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
164  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
165  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
166  private final ReentrantLock lock = new ReentrantLock();
167  private final Condition waitCond = lock.newCondition();
168  private final Condition slotCond = lock.newCondition();
169  private final Condition syncCond = lock.newCondition();
170
171  private final LeaseRecovery leaseRecovery;
172  private final Configuration conf;
173  private final FileSystem fs;
174  private final Path walDir;
175  private final Path walArchiveDir;
176  private final boolean enforceStreamCapability;
177
178  private final AtomicReference<Throwable> syncException = new AtomicReference<>();
179  private final AtomicBoolean loading = new AtomicBoolean(true);
180  private final AtomicBoolean inSync = new AtomicBoolean(false);
181  private final AtomicLong totalSynced = new AtomicLong(0);
182  private final AtomicLong lastRollTs = new AtomicLong(0);
183  private final AtomicLong syncId = new AtomicLong(0);
184
185  private LinkedTransferQueue<ByteSlot> slotsCache = null;
186  private Set<ProcedureWALFile> corruptedLogs = null;
187  private FSDataOutputStream stream = null;
188  private int runningProcCount = 1;
189  private long flushLogId = 0;
190  private int syncMaxSlot = 1;
191  private int slotIndex = 0;
192  private Thread syncThread;
193  private ByteSlot[] slots;
194
195  private int walCountWarnThreshold;
196  private int maxRetriesBeforeRoll;
197  private int maxSyncFailureRoll;
198  private int waitBeforeRoll;
199  private int rollRetries;
200  private int periodicRollMsec;
201  private long rollThreshold;
202  private boolean useHsync;
203  private int syncWaitMsec;
204
205  // Variables used for UI display
206  private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
207
208  public static class SyncMetrics {
209    private long timestamp;
210    private long syncWaitMs;
211    private long totalSyncedBytes;
212    private int syncedEntries;
213    private float syncedPerSec;
214
215    public long getTimestamp() {
216      return timestamp;
217    }
218
219    public long getSyncWaitMs() {
220      return syncWaitMs;
221    }
222
223    public long getTotalSyncedBytes() {
224      return totalSyncedBytes;
225    }
226
227    public long getSyncedEntries() {
228      return syncedEntries;
229    }
230
231    public float getSyncedPerSec() {
232      return syncedPerSec;
233    }
234  }
235
236  public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
237      throws IOException {
238    this(conf,
239        new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
240        new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
241        leaseRecovery);
242  }
243
244  @VisibleForTesting
245  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
246      final LeaseRecovery leaseRecovery) throws IOException {
247    this.conf = conf;
248    this.leaseRecovery = leaseRecovery;
249    this.walDir = walDir;
250    this.walArchiveDir = walArchiveDir;
251    this.fs = CommonFSUtils.getWALFileSystem(conf);
252    this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,
253        true);
254
255    // Create the log directory for the procedure store
256    if (!fs.exists(walDir)) {
257      if (!fs.mkdirs(walDir)) {
258        throw new IOException("Unable to mkdir " + walDir);
259      }
260    }
261    // Now that it exists, set the log policy
262    String storagePolicy =
263        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
264    CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy);
265
266    // Create archive dir up front. Rename won't work w/o it up on HDFS.
267    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
268      if (this.fs.mkdirs(this.walArchiveDir)) {
269        LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
270      } else {
271        LOG.warn("Failed create of {}", this.walArchiveDir);
272      }
273    }
274  }
275
276  @Override
277  public void start(int numSlots) throws IOException {
278    if (!setRunning(true)) {
279      return;
280    }
281
282    // Init buffer slots
283    loading.set(true);
284    runningProcCount = numSlots;
285    syncMaxSlot = numSlots;
286    slots = new ByteSlot[numSlots];
287    slotsCache = new LinkedTransferQueue<>();
288    while (slotsCache.size() < numSlots) {
289      slotsCache.offer(new ByteSlot());
290    }
291
292    // Tunings
293    walCountWarnThreshold =
294      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
295    maxRetriesBeforeRoll =
296      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
297    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
298    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
299    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
300    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
301    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
302    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
303    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
304
305    // WebUI
306    syncMetricsQueue = new CircularFifoQueue<>(
307      conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
308
309    // Init sync thread
310    syncThread = new Thread("WALProcedureStoreSyncThread") {
311      @Override
312      public void run() {
313        try {
314          syncLoop();
315        } catch (Throwable e) {
316          LOG.error("Got an exception from the sync-loop", e);
317          if (!isSyncAborted()) {
318            sendAbortProcessSignal();
319          }
320        }
321      }
322    };
323    syncThread.start();
324  }
325
326  @Override
327  public void stop(final boolean abort) {
328    if (!setRunning(false)) {
329      return;
330    }
331
332    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
333      (isSyncAborted() ? " (self aborting)" : ""));
334    sendStopSignal();
335    if (!isSyncAborted()) {
336      try {
337        while (syncThread.isAlive()) {
338          sendStopSignal();
339          syncThread.join(250);
340        }
341      } catch (InterruptedException e) {
342        LOG.warn("join interrupted", e);
343        Thread.currentThread().interrupt();
344      }
345    }
346
347    // Close the writer
348    closeCurrentLogStream(abort);
349
350    // Close the old logs
351    // they should be already closed, this is just in case the load fails
352    // and we call start() and then stop()
353    for (ProcedureWALFile log: logs) {
354      log.close();
355    }
356    logs.clear();
357    loading.set(true);
358  }
359
360  private void sendStopSignal() {
361    if (lock.tryLock()) {
362      try {
363        waitCond.signalAll();
364        syncCond.signalAll();
365      } finally {
366        lock.unlock();
367      }
368    }
369  }
370
371  @Override
372  public int getNumThreads() {
373    return slots == null ? 0 : slots.length;
374  }
375
376  @Override
377  public int setRunningProcedureCount(final int count) {
378    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
379    return this.runningProcCount;
380  }
381
382  public ProcedureStoreTracker getStoreTracker() {
383    return storeTracker;
384  }
385
386  public ArrayList<ProcedureWALFile> getActiveLogs() {
387    lock.lock();
388    try {
389      return new ArrayList<>(logs);
390    } finally {
391      lock.unlock();
392    }
393  }
394
395  public Set<ProcedureWALFile> getCorruptedLogs() {
396    return corruptedLogs;
397  }
398
399  @Override
400  public void recoverLease() throws IOException {
401    lock.lock();
402    try {
403      LOG.debug("Starting WAL Procedure Store lease recovery");
404      boolean afterFirstAttempt = false;
405      while (isRunning()) {
406        // Don't sleep before first attempt
407        if (afterFirstAttempt) {
408          LOG.trace("Sleep {} ms after first lease recovery attempt.",
409              waitBeforeRoll);
410          Threads.sleepWithoutInterrupt(waitBeforeRoll);
411        } else {
412          afterFirstAttempt = true;
413        }
414        FileStatus[] oldLogs = getLogFiles();
415        // Get Log-MaxID and recover lease on old logs
416        try {
417          flushLogId = initOldLogs(oldLogs);
418        } catch (FileNotFoundException e) {
419          LOG.warn("Someone else is active and deleted logs. retrying.", e);
420          continue;
421        }
422
423        // Create new state-log
424        if (!rollWriter(flushLogId + 1)) {
425          // someone else has already created this log
426          LOG.debug("Someone else has already created log {}. Retrying.", flushLogId);
427          continue;
428        }
429
430        // We have the lease on the log
431        oldLogs = getLogFiles();
432        if (getMaxLogId(oldLogs) > flushLogId) {
433          LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
434          logs.getLast().removeFile(this.walArchiveDir);
435          continue;
436        }
437
438        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
439        break;
440      }
441    } finally {
442      lock.unlock();
443    }
444  }
445
446  @Override
447  public void load(ProcedureLoader loader) throws IOException {
448    lock.lock();
449    try {
450      if (logs.isEmpty()) {
451        throw new IllegalStateException("recoverLease() must be called before loading data");
452      }
453
454      // Nothing to do, If we have only the current log.
455      if (logs.size() == 1) {
456        LOG.debug("No state logs to replay.");
457        loader.setMaxProcId(0);
458        loading.set(false);
459        return;
460      }
461
462      // Load the old logs
463      Iterator<ProcedureWALFile> it = logs.descendingIterator();
464      it.next(); // Skip the current log
465
466      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
467
468        @Override
469        public void setMaxProcId(long maxProcId) {
470          loader.setMaxProcId(maxProcId);
471        }
472
473        @Override
474        public void load(ProcedureIterator procIter) throws IOException {
475          loader.load(procIter);
476        }
477
478        @Override
479        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
480          loader.handleCorrupted(procIter);
481        }
482
483        @Override
484        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
485          if (corruptedLogs == null) {
486            corruptedLogs = new HashSet<>();
487          }
488          corruptedLogs.add(log);
489          // TODO: sideline corrupted log
490        }
491      });
492      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
493      // method. As it may happen that, we have finished constructing the modified and deleted bits,
494      // but before we call resetModified, we fail, then if we persist the storeTracker then when
495      // restarting, we will consider that all procedures have been included in this file and delete
496      // all the previous files. Obviously this not correct. So here we will only set loading to
497      // false when we successfully loaded all the procedures, and when closing we will skip
498      // persisting the store tracker. And also, this will prevent the sync thread to do
499      // periodicRoll, where we may also clean old logs.
500      loading.set(false);
501      // try to cleanup inactive wals and complete the operation
502      buildHoldingCleanupTracker();
503      tryCleanupLogsOnLoad();
504    } finally {
505      lock.unlock();
506    }
507  }
508
509  private void tryCleanupLogsOnLoad() {
510    // nothing to cleanup.
511    if (logs.size() <= 1) {
512      return;
513    }
514
515    // the config says to not cleanup wals on load.
516    if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
517      DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) {
518      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
519      return;
520    }
521
522    try {
523      periodicRoll();
524    } catch (IOException e) {
525      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
526    }
527  }
528
529  @Override
530  public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
531    if (LOG.isTraceEnabled()) {
532      LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
533    }
534
535    ByteSlot slot = acquireSlot();
536    try {
537      // Serialize the insert
538      long[] subProcIds = null;
539      if (subprocs != null) {
540        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
541        subProcIds = new long[subprocs.length];
542        for (int i = 0; i < subprocs.length; ++i) {
543          subProcIds[i] = subprocs[i].getProcId();
544        }
545      } else {
546        assert !proc.hasParent();
547        ProcedureWALFormat.writeInsert(slot, proc);
548      }
549
550      // Push the transaction data and wait until it is persisted
551      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
552    } catch (IOException e) {
553      // We are not able to serialize the procedure.
554      // this is a code error, and we are not able to go on.
555      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" +
556          proc + ", subprocs=" + Arrays.toString(subprocs), e);
557      throw new RuntimeException(e);
558    } finally {
559      releaseSlot(slot);
560    }
561  }
562
563  @Override
564  public void insert(Procedure<?>[] procs) {
565    if (LOG.isTraceEnabled()) {
566      LOG.trace("Insert " + Arrays.toString(procs));
567    }
568
569    ByteSlot slot = acquireSlot();
570    try {
571      // Serialize the insert
572      long[] procIds = new long[procs.length];
573      for (int i = 0; i < procs.length; ++i) {
574        assert !procs[i].hasParent();
575        procIds[i] = procs[i].getProcId();
576        ProcedureWALFormat.writeInsert(slot, procs[i]);
577      }
578
579      // Push the transaction data and wait until it is persisted
580      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
581    } catch (IOException e) {
582      // We are not able to serialize the procedure.
583      // this is a code error, and we are not able to go on.
584      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " +
585          Arrays.toString(procs), e);
586      throw new RuntimeException(e);
587    } finally {
588      releaseSlot(slot);
589    }
590  }
591
592  @Override
593  public void update(Procedure<?> proc) {
594    if (LOG.isTraceEnabled()) {
595      LOG.trace("Update " + proc);
596    }
597
598    ByteSlot slot = acquireSlot();
599    try {
600      // Serialize the update
601      ProcedureWALFormat.writeUpdate(slot, proc);
602
603      // Push the transaction data and wait until it is persisted
604      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
605    } catch (IOException e) {
606      // We are not able to serialize the procedure.
607      // this is a code error, and we are not able to go on.
608      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
609      throw new RuntimeException(e);
610    } finally {
611      releaseSlot(slot);
612    }
613  }
614
615  @Override
616  public void delete(long procId) {
617    LOG.trace("Delete {}", procId);
618    ByteSlot slot = acquireSlot();
619    try {
620      // Serialize the delete
621      ProcedureWALFormat.writeDelete(slot, procId);
622
623      // Push the transaction data and wait until it is persisted
624      pushData(PushType.DELETE, slot, procId, null);
625    } catch (IOException e) {
626      // We are not able to serialize the procedure.
627      // this is a code error, and we are not able to go on.
628      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e);
629      throw new RuntimeException(e);
630    } finally {
631      releaseSlot(slot);
632    }
633  }
634
635  @Override
636  public void delete(Procedure<?> proc, long[] subProcIds) {
637    assert proc != null : "expected a non-null procedure";
638    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
639    if (LOG.isTraceEnabled()) {
640      LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
641    }
642
643    ByteSlot slot = acquireSlot();
644    try {
645      // Serialize the delete
646      ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
647
648      // Push the transaction data and wait until it is persisted
649      pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
650    } catch (IOException e) {
651      // We are not able to serialize the procedure.
652      // this is a code error, and we are not able to go on.
653      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
654      throw new RuntimeException(e);
655    } finally {
656      releaseSlot(slot);
657    }
658  }
659
660  @Override
661  public void delete(final long[] procIds, final int offset, final int count) {
662    if (count == 0) {
663      return;
664    }
665
666    if (offset == 0 && count == procIds.length) {
667      delete(procIds);
668    } else if (count == 1) {
669      delete(procIds[offset]);
670    } else {
671      delete(Arrays.copyOfRange(procIds, offset, offset + count));
672    }
673  }
674
675  private void delete(long[] procIds) {
676    if (LOG.isTraceEnabled()) {
677      LOG.trace("Delete " + Arrays.toString(procIds));
678    }
679
680    final ByteSlot slot = acquireSlot();
681    try {
682      // Serialize the delete
683      for (int i = 0; i < procIds.length; ++i) {
684        ProcedureWALFormat.writeDelete(slot, procIds[i]);
685      }
686
687      // Push the transaction data and wait until it is persisted
688      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
689    } catch (IOException e) {
690      // We are not able to serialize the procedure.
691      // this is a code error, and we are not able to go on.
692      LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
693      throw new RuntimeException(e);
694    } finally {
695      releaseSlot(slot);
696    }
697  }
698
699  private ByteSlot acquireSlot() {
700    ByteSlot slot = slotsCache.poll();
701    return slot != null ? slot : new ByteSlot();
702  }
703
704  private void releaseSlot(final ByteSlot slot) {
705    slot.reset();
706    slotsCache.offer(slot);
707  }
708
709  private enum PushType { INSERT, UPDATE, DELETE }
710
711  private long pushData(final PushType type, final ByteSlot slot,
712      final long procId, final long[] subProcIds) {
713    if (!isRunning()) {
714      throw new RuntimeException("the store must be running before inserting data");
715    }
716    if (logs.isEmpty()) {
717      throw new RuntimeException("recoverLease() must be called before inserting data");
718    }
719
720    long logId = -1;
721    lock.lock();
722    try {
723      // Wait for the sync to be completed
724      while (true) {
725        if (!isRunning()) {
726          throw new RuntimeException("store no longer running");
727        } else if (isSyncAborted()) {
728          throw new RuntimeException("sync aborted", syncException.get());
729        } else if (inSync.get()) {
730          syncCond.await();
731        } else if (slotIndex >= syncMaxSlot) {
732          slotCond.signal();
733          syncCond.await();
734        } else {
735          break;
736        }
737      }
738
739      final long pushSyncId = syncId.get();
740      updateStoreTracker(type, procId, subProcIds);
741      slots[slotIndex++] = slot;
742      logId = flushLogId;
743
744      // Notify that there is new data
745      if (slotIndex == 1) {
746        waitCond.signal();
747      }
748
749      // Notify that the slots are full
750      if (slotIndex == syncMaxSlot) {
751        waitCond.signal();
752        slotCond.signal();
753      }
754
755      while (pushSyncId == syncId.get() && isRunning()) {
756        syncCond.await();
757      }
758    } catch (InterruptedException e) {
759      Thread.currentThread().interrupt();
760      sendAbortProcessSignal();
761      throw new RuntimeException(e);
762    } finally {
763      lock.unlock();
764      if (isSyncAborted()) {
765        throw new RuntimeException("sync aborted", syncException.get());
766      }
767    }
768    return logId;
769  }
770
771  private void updateStoreTracker(final PushType type,
772      final long procId, final long[] subProcIds) {
773    switch (type) {
774      case INSERT:
775        if (subProcIds == null) {
776          storeTracker.insert(procId);
777        } else if (procId == Procedure.NO_PROC_ID) {
778          storeTracker.insert(subProcIds);
779        } else {
780          storeTracker.insert(procId, subProcIds);
781          holdingCleanupTracker.setDeletedIfModified(procId);
782        }
783        break;
784      case UPDATE:
785        storeTracker.update(procId);
786        holdingCleanupTracker.setDeletedIfModified(procId);
787        break;
788      case DELETE:
789        if (subProcIds != null && subProcIds.length > 0) {
790          storeTracker.delete(subProcIds);
791          holdingCleanupTracker.setDeletedIfModified(subProcIds);
792        } else {
793          storeTracker.delete(procId);
794          holdingCleanupTracker.setDeletedIfModified(procId);
795        }
796        break;
797      default:
798        throw new RuntimeException("invalid push type " + type);
799    }
800  }
801
802  private boolean isSyncAborted() {
803    return syncException.get() != null;
804  }
805
806  private void syncLoop() throws Throwable {
807    long totalSyncedToStore = 0;
808    inSync.set(false);
809    lock.lock();
810    try {
811      while (isRunning()) {
812        try {
813          // Wait until new data is available
814          if (slotIndex == 0) {
815            if (!loading.get()) {
816              periodicRoll();
817            }
818
819            if (LOG.isTraceEnabled()) {
820              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
821              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
822                        StringUtils.humanSize(totalSynced.get()),
823                        StringUtils.humanSize(totalSynced.get() / rollTsSec)));
824            }
825
826            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
827            if (slotIndex == 0) {
828              // no data.. probably a stop() or a periodic roll
829              continue;
830            }
831          }
832          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
833          syncMaxSlot = runningProcCount;
834          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
835          final long syncWaitSt = System.currentTimeMillis();
836          if (slotIndex != syncMaxSlot) {
837            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
838          }
839
840          final long currentTs = System.currentTimeMillis();
841          final long syncWaitMs = currentTs - syncWaitSt;
842          final float rollSec = getMillisFromLastRoll() / 1000.0f;
843          final float syncedPerSec = totalSyncedToStore / rollSec;
844          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
845            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
846                      StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
847                      StringUtils.humanSize(totalSyncedToStore),
848                      StringUtils.humanSize(syncedPerSec)));
849          }
850
851          // update webui circular buffers (TODO: get rid of allocations)
852          final SyncMetrics syncMetrics = new SyncMetrics();
853          syncMetrics.timestamp = currentTs;
854          syncMetrics.syncWaitMs = syncWaitMs;
855          syncMetrics.syncedEntries = slotIndex;
856          syncMetrics.totalSyncedBytes = totalSyncedToStore;
857          syncMetrics.syncedPerSec = syncedPerSec;
858          syncMetricsQueue.add(syncMetrics);
859
860          // sync
861          inSync.set(true);
862          long slotSize = syncSlots();
863          logs.getLast().addToSize(slotSize);
864          totalSyncedToStore = totalSynced.addAndGet(slotSize);
865          slotIndex = 0;
866          inSync.set(false);
867          syncId.incrementAndGet();
868        } catch (InterruptedException e) {
869          Thread.currentThread().interrupt();
870          syncException.compareAndSet(null, e);
871          sendAbortProcessSignal();
872          throw e;
873        } catch (Throwable t) {
874          syncException.compareAndSet(null, t);
875          sendAbortProcessSignal();
876          throw t;
877        } finally {
878          syncCond.signalAll();
879        }
880      }
881    } finally {
882      lock.unlock();
883    }
884  }
885
886  public ArrayList<SyncMetrics> getSyncMetrics() {
887    lock.lock();
888    try {
889      return new ArrayList<>(syncMetricsQueue);
890    } finally {
891      lock.unlock();
892    }
893  }
894
895  private long syncSlots() throws Throwable {
896    int retry = 0;
897    int logRolled = 0;
898    long totalSynced = 0;
899    do {
900      try {
901        totalSynced = syncSlots(stream, slots, 0, slotIndex);
902        break;
903      } catch (Throwable e) {
904        LOG.warn("unable to sync slots, retry=" + retry);
905        if (++retry >= maxRetriesBeforeRoll) {
906          if (logRolled >= maxSyncFailureRoll && isRunning()) {
907            LOG.error("Sync slots after log roll failed, abort.", e);
908            throw e;
909          }
910
911          if (!rollWriterWithRetries()) {
912            throw e;
913          }
914
915          logRolled++;
916          retry = 0;
917        }
918      }
919    } while (isRunning());
920    return totalSynced;
921  }
922
923  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
924      final int offset, final int count) throws IOException {
925    long totalSynced = 0;
926    for (int i = 0; i < count; ++i) {
927      final ByteSlot data = slots[offset + i];
928      data.writeTo(stream);
929      totalSynced += data.size();
930    }
931
932    syncStream(stream);
933    sendPostSyncSignal();
934
935    if (LOG.isTraceEnabled()) {
936      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot +
937                ", flushed=" + StringUtils.humanSize(totalSynced));
938    }
939    return totalSynced;
940  }
941
942  protected void syncStream(final FSDataOutputStream stream) throws IOException {
943    if (useHsync) {
944      stream.hsync();
945    } else {
946      stream.hflush();
947    }
948  }
949
950  private boolean rollWriterWithRetries() {
951    for (int i = 0; i < rollRetries && isRunning(); ++i) {
952      if (i > 0) {
953        Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
954      }
955
956      try {
957        if (rollWriter()) {
958          return true;
959        }
960      } catch (IOException e) {
961        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
962      }
963    }
964    LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
965    return false;
966  }
967
968  private boolean tryRollWriter() {
969    try {
970      return rollWriter();
971    } catch (IOException e) {
972      LOG.warn("Unable to roll the log", e);
973      return false;
974    }
975  }
976
977  public long getMillisToNextPeriodicRoll() {
978    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
979      return periodicRollMsec - getMillisFromLastRoll();
980    }
981    return Long.MAX_VALUE;
982  }
983
984  public long getMillisFromLastRoll() {
985    return (System.currentTimeMillis() - lastRollTs.get());
986  }
987
988  @VisibleForTesting
989  void periodicRollForTesting() throws IOException {
990    lock.lock();
991    try {
992      periodicRoll();
993    } finally {
994      lock.unlock();
995    }
996  }
997
998  @VisibleForTesting
999  public boolean rollWriterForTesting() throws IOException {
1000    lock.lock();
1001    try {
1002      return rollWriter();
1003    } finally {
1004      lock.unlock();
1005    }
1006  }
1007
1008  @VisibleForTesting
1009  void removeInactiveLogsForTesting() throws Exception {
1010    lock.lock();
1011    try {
1012      removeInactiveLogs();
1013    } finally  {
1014      lock.unlock();
1015    }
1016  }
1017
1018  private void periodicRoll() throws IOException {
1019    if (storeTracker.isEmpty()) {
1020      LOG.trace("no active procedures");
1021      tryRollWriter();
1022      removeAllLogs(flushLogId - 1, "no active procedures");
1023    } else {
1024      if (storeTracker.isAllModified()) {
1025        LOG.trace("all the active procedures are in the latest log");
1026        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
1027      }
1028
1029      // if the log size has exceeded the roll threshold
1030      // or the periodic roll timeout is expired, try to roll the wal.
1031      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
1032        tryRollWriter();
1033      }
1034
1035      removeInactiveLogs();
1036    }
1037  }
1038
1039  private boolean rollWriter() throws IOException {
1040    if (!isRunning()) {
1041      return false;
1042    }
1043
1044    // Create new state-log
1045    if (!rollWriter(flushLogId + 1)) {
1046      LOG.warn("someone else has already created log {}", flushLogId);
1047      return false;
1048    }
1049
1050    // We have the lease on the log,
1051    // but we should check if someone else has created new files
1052    if (getMaxLogId(getLogFiles()) > flushLogId) {
1053      LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
1054      logs.getLast().removeFile(this.walArchiveDir);
1055      return false;
1056    }
1057
1058    // We have the lease on the log
1059    return true;
1060  }
1061
1062  @VisibleForTesting
1063  boolean rollWriter(long logId) throws IOException {
1064    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
1065    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
1066
1067    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
1068      .setVersion(ProcedureWALFormat.HEADER_VERSION)
1069      .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
1070      .setMinProcId(storeTracker.getActiveMinProcId())
1071      .setLogId(logId)
1072      .build();
1073
1074    FSDataOutputStream newStream = null;
1075    Path newLogFile = null;
1076    long startPos = -1;
1077    newLogFile = getLogFilePath(logId);
1078    try {
1079      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
1080    } catch (FileAlreadyExistsException e) {
1081      LOG.error("Log file with id={} already exists", logId, e);
1082      return false;
1083    } catch (RemoteException re) {
1084      LOG.warn("failed to create log file with id={}", logId, re);
1085      return false;
1086    }
1087    // After we create the stream but before we attempt to use it at all
1088    // ensure that we can provide the level of data safety we're configured
1089    // to provide.
1090    final String durability = useHsync ? "hsync" : "hflush";
1091    if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
1092      throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
1093          " for proper operation during component failures, but the underlying filesystem does " +
1094          "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
1095          "' to set the desired level of robustness and ensure the config value of '" +
1096          CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
1097    }
1098    try {
1099      ProcedureWALFormat.writeHeader(newStream, header);
1100      startPos = newStream.getPos();
1101    } catch (IOException ioe) {
1102      LOG.warn("Encountered exception writing header", ioe);
1103      newStream.close();
1104      return false;
1105    }
1106
1107    closeCurrentLogStream(false);
1108
1109    storeTracker.resetModified();
1110    stream = newStream;
1111    flushLogId = logId;
1112    totalSynced.set(0);
1113    long rollTs = System.currentTimeMillis();
1114    lastRollTs.set(rollTs);
1115    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
1116
1117    // if it's the first next WAL being added, build the holding cleanup tracker
1118    if (logs.size() == 2) {
1119      buildHoldingCleanupTracker();
1120    } else if (logs.size() > walCountWarnThreshold) {
1121      LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" +
1122        " to see if something is stuck.", logs.size(), walCountWarnThreshold);
1123      // This is just like what we have done at RS side when there are too many wal files. For RS,
1124      // if there are too many wal files, we will find out the wal entries in the oldest file, and
1125      // tell the upper layer to flush these regions so the wal entries will be useless and then we
1126      // can delete the wal file. For WALProcedureStore, the assumption is that, if all the
1127      // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
1128      // we are safe to delete it. So here if there are too many proc wal files, we will find out
1129      // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
1130      // wal files, and tell upper layer to update the state of these procedures to the newest proc
1131      // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
1132      // file.
1133      sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
1134    }
1135
1136    LOG.info("Rolled new Procedure Store WAL, id={}", logId);
1137    return true;
1138  }
1139
1140  private void closeCurrentLogStream(boolean abort) {
1141    if (stream == null || logs.isEmpty()) {
1142      return;
1143    }
1144
1145    try {
1146      ProcedureWALFile log = logs.getLast();
1147      // If the loading flag is true, it usually means that we fail when loading procedures, so we
1148      // should not persist the store tracker, as its state may not be correct.
1149      if (!loading.get()) {
1150        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
1151        log.updateLocalTracker(storeTracker);
1152        if (!abort) {
1153          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
1154          log.addToSize(trailerSize);
1155        }
1156      }
1157    } catch (IOException e) {
1158      LOG.warn("Unable to write the trailer", e);
1159    }
1160    try {
1161      stream.close();
1162    } catch (IOException e) {
1163      LOG.error("Unable to close the stream", e);
1164    }
1165    stream = null;
1166  }
1167
1168  // ==========================================================================
1169  //  Log Files cleaner helpers
1170  // ==========================================================================
1171  private void removeInactiveLogs() throws IOException {
1172    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
1173    // once there is nothing olding the oldest WAL we can remove it.
1174    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
1175      LOG.info("Remove the oldest log {}", logs.getFirst());
1176      removeLogFile(logs.getFirst(), walArchiveDir);
1177      buildHoldingCleanupTracker();
1178    }
1179
1180    // TODO: In case we are holding up a lot of logs for long time we should
1181    // rewrite old procedures (in theory parent procs) to the new WAL.
1182  }
1183
1184  private void buildHoldingCleanupTracker() {
1185    if (logs.size() <= 1) {
1186      // we only have one wal, so nothing to do
1187      holdingCleanupTracker.reset();
1188      return;
1189    }
1190
1191    // compute the holding tracker.
1192    // - the first WAL is used for the 'updates'
1193    // - the global tracker will be used to determine whether a procedure has been deleted
1194    // - other trackers will be used to determine whether a procedure has been updated, as a deleted
1195    // procedure can always be detected by checking the global tracker, we can save the deleted
1196    // checks when applying other trackers
1197    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
1198    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
1199    // the logs is a linked list, so avoid calling get(index) on it.
1200    Iterator<ProcedureWALFile> iter = logs.iterator();
1201    // skip the tracker for the first file when creating the iterator.
1202    iter.next();
1203    ProcedureStoreTracker tracker = iter.next().getTracker();
1204    // testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
1205    // which is just the storeTracker above.
1206    while (iter.hasNext()) {
1207      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
1208      if (holdingCleanupTracker.isEmpty()) {
1209        break;
1210      }
1211      tracker = iter.next().getTracker();
1212    }
1213  }
1214
1215  /**
1216   * Remove all logs with logId <= {@code lastLogId}.
1217   */
1218  private void removeAllLogs(long lastLogId, String why) {
1219    if (logs.size() <= 1) {
1220      return;
1221    }
1222
1223    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
1224
1225    boolean removed = false;
1226    while (logs.size() > 1) {
1227      ProcedureWALFile log = logs.getFirst();
1228      if (lastLogId < log.getLogId()) {
1229        break;
1230      }
1231      removeLogFile(log, walArchiveDir);
1232      removed = true;
1233    }
1234
1235    if (removed) {
1236      buildHoldingCleanupTracker();
1237    }
1238  }
1239
1240  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
1241    try {
1242      LOG.trace("Removing log={}", log);
1243      log.removeFile(walArchiveDir);
1244      logs.remove(log);
1245      LOG.debug("Removed log={}, activeLogs={}", log, logs);
1246      assert logs.size() > 0 : "expected at least one log";
1247    } catch (IOException e) {
1248      LOG.error("Unable to remove log: " + log, e);
1249      return false;
1250    }
1251    return true;
1252  }
1253
1254  // ==========================================================================
1255  //  FileSystem Log Files helpers
1256  // ==========================================================================
1257  public Path getWALDir() {
1258    return this.walDir;
1259  }
1260
1261  @VisibleForTesting
1262  Path getWalArchiveDir() {
1263    return this.walArchiveDir;
1264  }
1265
1266  public FileSystem getFileSystem() {
1267    return this.fs;
1268  }
1269
1270  protected Path getLogFilePath(final long logId) throws IOException {
1271    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
1272  }
1273
1274  private static long getLogIdFromName(final String name) {
1275    int end = name.lastIndexOf(".log");
1276    int start = name.lastIndexOf('-') + 1;
1277    return Long.parseLong(name.substring(start, end));
1278  }
1279
1280  private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
1281    @Override
1282    public boolean accept(Path path) {
1283      String name = path.getName();
1284      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
1285    }
1286  };
1287
1288  private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
1289      new Comparator<FileStatus>() {
1290    @Override
1291    public int compare(FileStatus a, FileStatus b) {
1292      final long aId = getLogIdFromName(a.getPath().getName());
1293      final long bId = getLogIdFromName(b.getPath().getName());
1294      return Long.compare(aId, bId);
1295    }
1296  };
1297
1298  private FileStatus[] getLogFiles() throws IOException {
1299    try {
1300      FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
1301      Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
1302      return files;
1303    } catch (FileNotFoundException e) {
1304      LOG.warn("Log directory not found: " + e.getMessage());
1305      return null;
1306    }
1307  }
1308
1309  /**
1310   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1311   * the file set by log id.
1312   * @return Max-LogID of the specified log file set
1313   */
1314  private static long getMaxLogId(FileStatus[] logFiles) {
1315    if (logFiles == null || logFiles.length == 0) {
1316      return 0L;
1317    }
1318    return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
1319  }
1320
1321  /**
1322   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1323   * the file set by log id.
1324   * @return Max-LogID of the specified log file set
1325   */
1326  private long initOldLogs(FileStatus[] logFiles) throws IOException {
1327    if (logFiles == null || logFiles.length == 0) {
1328      return 0L;
1329    }
1330    long maxLogId = 0;
1331    for (int i = 0; i < logFiles.length; ++i) {
1332      final Path logPath = logFiles[i].getPath();
1333      leaseRecovery.recoverFileLease(fs, logPath);
1334      if (!isRunning()) {
1335        throw new IOException("wal aborting");
1336      }
1337
1338      maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1339      ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
1340      if (log != null) {
1341        this.logs.add(log);
1342      }
1343    }
1344    initTrackerFromOldLogs();
1345    return maxLogId;
1346  }
1347
1348  /**
1349   * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
1350   * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
1351   */
1352  private void initTrackerFromOldLogs() {
1353    if (logs.isEmpty() || !isRunning()) {
1354      return;
1355    }
1356    ProcedureWALFile log = logs.getLast();
1357    if (!log.getTracker().isPartial()) {
1358      storeTracker.resetTo(log.getTracker());
1359    } else {
1360      storeTracker.reset();
1361      storeTracker.setPartialFlag(true);
1362    }
1363  }
1364
1365  /**
1366   * Loads given log file and it's tracker.
1367   */
1368  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
1369      throws IOException {
1370    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1371    if (logFile.getLen() == 0) {
1372      LOG.warn("Remove uninitialized log: {}", logFile);
1373      log.removeFile(walArchiveDir);
1374      return null;
1375    }
1376    LOG.debug("Opening Pv2 {}", logFile);
1377    try {
1378      log.open();
1379    } catch (ProcedureWALFormat.InvalidWALDataException e) {
1380      LOG.warn("Remove uninitialized log: {}", logFile, e);
1381      log.removeFile(walArchiveDir);
1382      return null;
1383    } catch (IOException e) {
1384      String msg = "Unable to read state log: " + logFile;
1385      LOG.error(msg, e);
1386      throw new IOException(msg, e);
1387    }
1388
1389    try {
1390      log.readTracker();
1391    } catch (IOException e) {
1392      log.getTracker().reset();
1393      log.getTracker().setPartialFlag(true);
1394      LOG.warn("Unable to read tracker for {}", log, e);
1395    }
1396
1397    log.close();
1398    return log;
1399  }
1400
1401  /**
1402   * Parses a directory of WALs building up ProcedureState.
1403   * For testing parse and profiling.
1404   * @param args Include pointer to directory of WAL files for a store instance to parse & load.
1405   */
1406  public static void main(String [] args) throws IOException {
1407    Configuration conf = HBaseConfiguration.create();
1408    if (args == null || args.length != 1) {
1409      System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR.");
1410      System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR");
1411      System.exit(-1);
1412    }
1413    WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
1414      new WALProcedureStore.LeaseRecovery() {
1415        @Override
1416        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
1417          // no-op
1418        }
1419      });
1420    try {
1421      store.start(16);
1422      ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store);
1423      pe.init(1, true);
1424    } finally {
1425      store.stop(true);
1426    }
1427  }
1428}