001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.LinkedTransferQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FSError;
039import org.apache.hadoop.fs.FileAlreadyExistsException;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.PathFilter;
044import org.apache.hadoop.fs.StreamCapabilities;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.log.HBaseMarkers;
048import org.apache.hadoop.hbase.procedure2.Procedure;
049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
050import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
052import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
053import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
054import org.apache.hadoop.hbase.procedure2.util.StringUtils;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.hadoop.ipc.RemoteException;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
066
067/**
068 * WAL implementation of the ProcedureStore.
069 * <p/>
070 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
071 * then {@link #load(ProcedureLoader)}.
072 * <p/>
073 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
074 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
075 * the old wal files.
076 * <p/>
077 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
078 * the races if there are two master both wants to acquire the lease...
079 * <p/>
080 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
081 * comments of this method for more details.
082 * <p/>
083 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
084 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
085 * methods we will put thing into the {@link #slots} and wait. And there is a background sync
086 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
087 * to the FileSystem, and notify the caller that we have finished.
088 * <p/>
089 * TODO: try using disruptor to increase performance and simplify the logic?
090 * <p/>
091 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
092 * also the one being written currently. And the deleted bits in it are for all the procedures, not
093 * only the ones in the newest wal file. And when rolling a log, we will first store it in the
094 * trailer of the current wal file, and then reset its modified bits, so that it can start to track
095 * the modified procedures for the new wal file.
096 * <p/>
097 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
098 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
099 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
100 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
101 * with the tracker of every newer wal files, using the
102 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
103 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
104 * files, then we can delete it. This is because that, every time we call
105 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
106 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be
107 * deleted.
108 * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
109 * @see #main(String[]) to parse a directory of MasterWALProcs.
110 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
111 *             use the new region based procedure store.
112 */
113@Deprecated
114@InterfaceAudience.Private
115public class WALProcedureStore extends ProcedureStoreBase {
116  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
117  public static final String LOG_PREFIX = "pv2-";
118  /** Used to construct the name of the log directory for master procedures */
119  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
120
121  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
122    "hbase.procedure.store.wal.warn.threshold";
123  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
124
125  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
126    "hbase.procedure.store.wal.exec.cleanup.on.load";
127  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
128
129  public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
130    "hbase.procedure.store.wal.max.retries.before.roll";
131  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
132
133  public static final String WAIT_BEFORE_ROLL_CONF_KEY =
134    "hbase.procedure.store.wal.wait.before.roll";
135  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
136
137  public static final String ROLL_RETRIES_CONF_KEY = "hbase.procedure.store.wal.max.roll.retries";
138  private static final int DEFAULT_ROLL_RETRIES = 3;
139
140  public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
141    "hbase.procedure.store.wal.sync.failure.roll.max";
142  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
143
144  public static final String PERIODIC_ROLL_CONF_KEY =
145    "hbase.procedure.store.wal.periodic.roll.msec";
146  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
147
148  public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
149  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
150
151  public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
152  private static final boolean DEFAULT_USE_HSYNC = true;
153
154  public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
155  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
156
157  public static final String STORE_WAL_SYNC_STATS_COUNT =
158    "hbase.procedure.store.wal.sync.stats.count";
159  private static final int DEFAULT_SYNC_STATS_COUNT = 10;
160
161  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
162  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
163  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
164  private final ReentrantLock lock = new ReentrantLock();
165  private final Condition waitCond = lock.newCondition();
166  private final Condition slotCond = lock.newCondition();
167  private final Condition syncCond = lock.newCondition();
168
169  private final LeaseRecovery leaseRecovery;
170  private final Configuration conf;
171  private final FileSystem fs;
172  private final Path walDir;
173  private final Path walArchiveDir;
174  private final boolean enforceStreamCapability;
175
176  private final AtomicReference<Throwable> syncException = new AtomicReference<>();
177  private final AtomicBoolean loading = new AtomicBoolean(true);
178  private final AtomicBoolean inSync = new AtomicBoolean(false);
179  private final AtomicLong totalSynced = new AtomicLong(0);
180  private final AtomicLong lastRollTs = new AtomicLong(0);
181  private final AtomicLong syncId = new AtomicLong(0);
182
183  private LinkedTransferQueue<ByteSlot> slotsCache = null;
184  private Set<ProcedureWALFile> corruptedLogs = null;
185  private FSDataOutputStream stream = null;
186  private int runningProcCount = 1;
187  private long flushLogId = 0;
188  private int syncMaxSlot = 1;
189  private int slotIndex = 0;
190  private Thread syncThread;
191  private ByteSlot[] slots;
192
193  private int walCountWarnThreshold;
194  private int maxRetriesBeforeRoll;
195  private int maxSyncFailureRoll;
196  private int waitBeforeRoll;
197  private int rollRetries;
198  private int periodicRollMsec;
199  private long rollThreshold;
200  private boolean useHsync;
201  private int syncWaitMsec;
202
203  // Variables used for UI display
204  private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
205
206  public static class SyncMetrics {
207    private long timestamp;
208    private long syncWaitMs;
209    private long totalSyncedBytes;
210    private int syncedEntries;
211    private float syncedPerSec;
212
213    public long getTimestamp() {
214      return timestamp;
215    }
216
217    public long getSyncWaitMs() {
218      return syncWaitMs;
219    }
220
221    public long getTotalSyncedBytes() {
222      return totalSyncedBytes;
223    }
224
225    public long getSyncedEntries() {
226      return syncedEntries;
227    }
228
229    public float getSyncedPerSec() {
230      return syncedPerSec;
231    }
232  }
233
234  public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
235    this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
236      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
237      leaseRecovery);
238  }
239
240  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
241    final LeaseRecovery leaseRecovery) throws IOException {
242    this.conf = conf;
243    this.leaseRecovery = leaseRecovery;
244    this.walDir = walDir;
245    this.walArchiveDir = walArchiveDir;
246    this.fs = CommonFSUtils.getWALFileSystem(conf);
247    this.enforceStreamCapability =
248      conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
249
250    // Create the log directory for the procedure store
251    if (!fs.exists(walDir)) {
252      if (!fs.mkdirs(walDir)) {
253        throw new IOException("Unable to mkdir " + walDir);
254      }
255    }
256    // Now that it exists, set the log policy
257    String storagePolicy =
258      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
259    CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy);
260
261    // Create archive dir up front. Rename won't work w/o it up on HDFS.
262    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
263      if (this.fs.mkdirs(this.walArchiveDir)) {
264        LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
265      } else {
266        LOG.warn("Failed create of {}", this.walArchiveDir);
267      }
268    }
269  }
270
271  @Override
272  public void start(int numSlots) throws IOException {
273    if (!setRunning(true)) {
274      return;
275    }
276
277    // Init buffer slots
278    loading.set(true);
279    runningProcCount = numSlots;
280    syncMaxSlot = numSlots;
281    slots = new ByteSlot[numSlots];
282    slotsCache = new LinkedTransferQueue<>();
283    while (slotsCache.size() < numSlots) {
284      slotsCache.offer(new ByteSlot());
285    }
286
287    // Tunings
288    walCountWarnThreshold =
289      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
290    maxRetriesBeforeRoll =
291      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
292    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
293    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
294    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
295    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
296    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
297    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
298    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
299
300    // WebUI
301    syncMetricsQueue =
302      new CircularFifoQueue<>(conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
303
304    // Init sync thread
305    syncThread = new Thread("WALProcedureStoreSyncThread") {
306      @Override
307      public void run() {
308        try {
309          syncLoop();
310        } catch (Throwable e) {
311          LOG.error("Got an exception from the sync-loop", e);
312          if (!isSyncAborted()) {
313            sendAbortProcessSignal();
314          }
315        }
316      }
317    };
318    syncThread.start();
319  }
320
321  @Override
322  public void stop(final boolean abort) {
323    if (!setRunning(false)) {
324      return;
325    }
326
327    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort
328      + (isSyncAborted() ? " (self aborting)" : ""));
329    sendStopSignal();
330    if (!isSyncAborted()) {
331      try {
332        while (syncThread.isAlive()) {
333          sendStopSignal();
334          syncThread.join(250);
335        }
336      } catch (InterruptedException e) {
337        LOG.warn("join interrupted", e);
338        Thread.currentThread().interrupt();
339      }
340    }
341
342    // Close the writer
343    closeCurrentLogStream(abort);
344
345    // Close the old logs
346    // they should be already closed, this is just in case the load fails
347    // and we call start() and then stop()
348    for (ProcedureWALFile log : logs) {
349      log.close();
350    }
351    logs.clear();
352    loading.set(true);
353  }
354
355  private void sendStopSignal() {
356    if (lock.tryLock()) {
357      try {
358        waitCond.signalAll();
359        syncCond.signalAll();
360      } finally {
361        lock.unlock();
362      }
363    }
364  }
365
366  @Override
367  public int getNumThreads() {
368    return slots == null ? 0 : slots.length;
369  }
370
371  @Override
372  public int setRunningProcedureCount(final int count) {
373    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
374    return this.runningProcCount;
375  }
376
377  public ProcedureStoreTracker getStoreTracker() {
378    return storeTracker;
379  }
380
381  public ArrayList<ProcedureWALFile> getActiveLogs() {
382    lock.lock();
383    try {
384      return new ArrayList<>(logs);
385    } finally {
386      lock.unlock();
387    }
388  }
389
390  public Set<ProcedureWALFile> getCorruptedLogs() {
391    return corruptedLogs;
392  }
393
394  @Override
395  public void recoverLease() throws IOException {
396    lock.lock();
397    try {
398      LOG.debug("Starting WAL Procedure Store lease recovery");
399      boolean afterFirstAttempt = false;
400      while (isRunning()) {
401        // Don't sleep before first attempt
402        if (afterFirstAttempt) {
403          LOG.trace("Sleep {} ms after first lease recovery attempt.", waitBeforeRoll);
404          Threads.sleepWithoutInterrupt(waitBeforeRoll);
405        } else {
406          afterFirstAttempt = true;
407        }
408        FileStatus[] oldLogs = getLogFiles();
409        // Get Log-MaxID and recover lease on old logs
410        try {
411          flushLogId = initOldLogs(oldLogs);
412        } catch (FileNotFoundException e) {
413          LOG.warn("Someone else is active and deleted logs. retrying.", e);
414          continue;
415        }
416
417        // Create new state-log
418        if (!rollWriter(flushLogId + 1)) {
419          // someone else has already created this log
420          LOG.debug("Someone else has already created log {}. Retrying.", flushLogId);
421          continue;
422        }
423
424        // We have the lease on the log
425        oldLogs = getLogFiles();
426        if (getMaxLogId(oldLogs) > flushLogId) {
427          LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
428          logs.getLast().removeFile(this.walArchiveDir);
429          continue;
430        }
431
432        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
433        break;
434      }
435    } finally {
436      lock.unlock();
437    }
438  }
439
440  @Override
441  public void load(ProcedureLoader loader) throws IOException {
442    lock.lock();
443    try {
444      if (logs.isEmpty()) {
445        throw new IllegalStateException("recoverLease() must be called before loading data");
446      }
447
448      // Nothing to do, If we have only the current log.
449      if (logs.size() == 1) {
450        LOG.debug("No state logs to replay.");
451        loader.setMaxProcId(0);
452        loading.set(false);
453        return;
454      }
455
456      // Load the old logs
457      Iterator<ProcedureWALFile> it = logs.descendingIterator();
458      it.next(); // Skip the current log
459
460      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
461
462        @Override
463        public void setMaxProcId(long maxProcId) {
464          loader.setMaxProcId(maxProcId);
465        }
466
467        @Override
468        public void load(ProcedureIterator procIter) throws IOException {
469          loader.load(procIter);
470        }
471
472        @Override
473        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
474          loader.handleCorrupted(procIter);
475        }
476
477        @Override
478        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
479          if (corruptedLogs == null) {
480            corruptedLogs = new HashSet<>();
481          }
482          corruptedLogs.add(log);
483          // TODO: sideline corrupted log
484        }
485      });
486      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
487      // method. As it may happen that, we have finished constructing the modified and deleted bits,
488      // but before we call resetModified, we fail, then if we persist the storeTracker then when
489      // restarting, we will consider that all procedures have been included in this file and delete
490      // all the previous files. Obviously this not correct. So here we will only set loading to
491      // false when we successfully loaded all the procedures, and when closing we will skip
492      // persisting the store tracker. And also, this will prevent the sync thread to do
493      // periodicRoll, where we may also clean old logs.
494      loading.set(false);
495      // try to cleanup inactive wals and complete the operation
496      buildHoldingCleanupTracker();
497      tryCleanupLogsOnLoad();
498    } finally {
499      lock.unlock();
500    }
501  }
502
503  private void tryCleanupLogsOnLoad() {
504    // nothing to cleanup.
505    if (logs.size() <= 1) {
506      return;
507    }
508
509    // the config says to not cleanup wals on load.
510    if (
511      !conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)
512    ) {
513      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
514      return;
515    }
516
517    try {
518      periodicRoll();
519    } catch (IOException e) {
520      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
521    }
522  }
523
524  @Override
525  public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
526    if (LOG.isTraceEnabled()) {
527      LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
528    }
529
530    ByteSlot slot = acquireSlot();
531    try {
532      // Serialize the insert
533      long[] subProcIds = null;
534      if (subprocs != null) {
535        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
536        subProcIds = new long[subprocs.length];
537        for (int i = 0; i < subprocs.length; ++i) {
538          subProcIds[i] = subprocs[i].getProcId();
539        }
540      } else {
541        assert !proc.hasParent();
542        ProcedureWALFormat.writeInsert(slot, proc);
543      }
544
545      // Push the transaction data and wait until it is persisted
546      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
547    } catch (IOException e) {
548      // We are not able to serialize the procedure.
549      // this is a code error, and we are not able to go on.
550      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + proc
551        + ", subprocs=" + Arrays.toString(subprocs), e);
552      throw new RuntimeException(e);
553    } finally {
554      releaseSlot(slot);
555    }
556  }
557
558  @Override
559  public void insert(Procedure<?>[] procs) {
560    if (LOG.isTraceEnabled()) {
561      LOG.trace("Insert " + Arrays.toString(procs));
562    }
563
564    ByteSlot slot = acquireSlot();
565    try {
566      // Serialize the insert
567      long[] procIds = new long[procs.length];
568      for (int i = 0; i < procs.length; ++i) {
569        assert !procs[i].hasParent();
570        procIds[i] = procs[i].getProcId();
571        ProcedureWALFormat.writeInsert(slot, procs[i]);
572      }
573
574      // Push the transaction data and wait until it is persisted
575      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
576    } catch (IOException e) {
577      // We are not able to serialize the procedure.
578      // this is a code error, and we are not able to go on.
579      LOG.error(HBaseMarkers.FATAL,
580        "Unable to serialize one of the procedure: " + Arrays.toString(procs), e);
581      throw new RuntimeException(e);
582    } finally {
583      releaseSlot(slot);
584    }
585  }
586
587  @Override
588  public void update(Procedure<?> proc) {
589    if (LOG.isTraceEnabled()) {
590      LOG.trace("Update " + proc);
591    }
592
593    ByteSlot slot = acquireSlot();
594    try {
595      // Serialize the update
596      ProcedureWALFormat.writeUpdate(slot, proc);
597
598      // Push the transaction data and wait until it is persisted
599      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
600    } catch (IOException e) {
601      // We are not able to serialize the procedure.
602      // this is a code error, and we are not able to go on.
603      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
604      throw new RuntimeException(e);
605    } finally {
606      releaseSlot(slot);
607    }
608  }
609
610  @Override
611  public void delete(long procId) {
612    LOG.trace("Delete {}", procId);
613    ByteSlot slot = acquireSlot();
614    try {
615      // Serialize the delete
616      ProcedureWALFormat.writeDelete(slot, procId);
617
618      // Push the transaction data and wait until it is persisted
619      pushData(PushType.DELETE, slot, procId, null);
620    } catch (IOException e) {
621      // We are not able to serialize the procedure.
622      // this is a code error, and we are not able to go on.
623      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e);
624      throw new RuntimeException(e);
625    } finally {
626      releaseSlot(slot);
627    }
628  }
629
630  @Override
631  public void delete(Procedure<?> proc, long[] subProcIds) {
632    assert proc != null : "expected a non-null procedure";
633    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
634    if (LOG.isTraceEnabled()) {
635      LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
636    }
637
638    ByteSlot slot = acquireSlot();
639    try {
640      // Serialize the delete
641      ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
642
643      // Push the transaction data and wait until it is persisted
644      pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
645    } catch (IOException e) {
646      // We are not able to serialize the procedure.
647      // this is a code error, and we are not able to go on.
648      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
649      throw new RuntimeException(e);
650    } finally {
651      releaseSlot(slot);
652    }
653  }
654
655  @Override
656  public void delete(final long[] procIds, final int offset, final int count) {
657    if (count == 0) {
658      return;
659    }
660
661    if (offset == 0 && count == procIds.length) {
662      delete(procIds);
663    } else if (count == 1) {
664      delete(procIds[offset]);
665    } else {
666      delete(Arrays.copyOfRange(procIds, offset, offset + count));
667    }
668  }
669
670  private void delete(long[] procIds) {
671    if (LOG.isTraceEnabled()) {
672      LOG.trace("Delete " + Arrays.toString(procIds));
673    }
674
675    final ByteSlot slot = acquireSlot();
676    try {
677      // Serialize the delete
678      for (int i = 0; i < procIds.length; ++i) {
679        ProcedureWALFormat.writeDelete(slot, procIds[i]);
680      }
681
682      // Push the transaction data and wait until it is persisted
683      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
684    } catch (IOException e) {
685      // We are not able to serialize the procedure.
686      // this is a code error, and we are not able to go on.
687      LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
688      throw new RuntimeException(e);
689    } finally {
690      releaseSlot(slot);
691    }
692  }
693
694  private ByteSlot acquireSlot() {
695    ByteSlot slot = slotsCache.poll();
696    return slot != null ? slot : new ByteSlot();
697  }
698
699  private void releaseSlot(final ByteSlot slot) {
700    slot.reset();
701    slotsCache.offer(slot);
702  }
703
704  private enum PushType {
705    INSERT,
706    UPDATE,
707    DELETE
708  };
709
710  private long pushData(final PushType type, final ByteSlot slot, final long procId,
711    final long[] subProcIds) {
712    if (!isRunning()) {
713      throw new RuntimeException("the store must be running before inserting data");
714    }
715    if (logs.isEmpty()) {
716      throw new RuntimeException("recoverLease() must be called before inserting data");
717    }
718
719    long logId = -1;
720    lock.lock();
721    try {
722      // Wait for the sync to be completed
723      while (true) {
724        if (!isRunning()) {
725          throw new RuntimeException("store no longer running");
726        } else if (isSyncAborted()) {
727          throw new RuntimeException("sync aborted", syncException.get());
728        } else if (inSync.get()) {
729          syncCond.await();
730        } else if (slotIndex >= syncMaxSlot) {
731          slotCond.signal();
732          syncCond.await();
733        } else {
734          break;
735        }
736      }
737
738      final long pushSyncId = syncId.get();
739      updateStoreTracker(type, procId, subProcIds);
740      slots[slotIndex++] = slot;
741      logId = flushLogId;
742
743      // Notify that there is new data
744      if (slotIndex == 1) {
745        waitCond.signal();
746      }
747
748      // Notify that the slots are full
749      if (slotIndex == syncMaxSlot) {
750        waitCond.signal();
751        slotCond.signal();
752      }
753
754      while (pushSyncId == syncId.get() && isRunning()) {
755        syncCond.await();
756      }
757    } catch (InterruptedException e) {
758      Thread.currentThread().interrupt();
759      sendAbortProcessSignal();
760      throw new RuntimeException(e);
761    } finally {
762      lock.unlock();
763      if (isSyncAborted()) {
764        throw new RuntimeException("sync aborted", syncException.get());
765      }
766    }
767    return logId;
768  }
769
770  private void updateStoreTracker(final PushType type, final long procId, final long[] subProcIds) {
771    switch (type) {
772      case INSERT:
773        if (subProcIds == null) {
774          storeTracker.insert(procId);
775        } else if (procId == Procedure.NO_PROC_ID) {
776          storeTracker.insert(subProcIds);
777        } else {
778          storeTracker.insert(procId, subProcIds);
779          holdingCleanupTracker.setDeletedIfModified(procId);
780        }
781        break;
782      case UPDATE:
783        storeTracker.update(procId);
784        holdingCleanupTracker.setDeletedIfModified(procId);
785        break;
786      case DELETE:
787        if (subProcIds != null && subProcIds.length > 0) {
788          storeTracker.delete(subProcIds);
789          holdingCleanupTracker.setDeletedIfModified(subProcIds);
790        } else {
791          storeTracker.delete(procId);
792          holdingCleanupTracker.setDeletedIfModified(procId);
793        }
794        break;
795      default:
796        throw new RuntimeException("invalid push type " + type);
797    }
798  }
799
800  private boolean isSyncAborted() {
801    return syncException.get() != null;
802  }
803
804  private void syncLoop() throws Throwable {
805    long totalSyncedToStore = 0;
806    inSync.set(false);
807    lock.lock();
808    try {
809      while (isRunning()) {
810        try {
811          // Wait until new data is available
812          if (slotIndex == 0) {
813            if (!loading.get()) {
814              periodicRoll();
815            }
816
817            if (LOG.isTraceEnabled()) {
818              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
819              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
820                StringUtils.humanSize(totalSynced.get()),
821                StringUtils.humanSize(totalSynced.get() / rollTsSec)));
822            }
823
824            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
825            if (slotIndex == 0) {
826              // no data.. probably a stop() or a periodic roll
827              continue;
828            }
829          }
830          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
831          syncMaxSlot = runningProcCount;
832          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
833          final long syncWaitSt = EnvironmentEdgeManager.currentTime();
834          if (slotIndex != syncMaxSlot) {
835            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
836          }
837
838          final long currentTs = EnvironmentEdgeManager.currentTime();
839          final long syncWaitMs = currentTs - syncWaitSt;
840          final float rollSec = getMillisFromLastRoll() / 1000.0f;
841          final float syncedPerSec = totalSyncedToStore / rollSec;
842          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
843            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
844              StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
845              StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(syncedPerSec)));
846          }
847
848          // update webui circular buffers (TODO: get rid of allocations)
849          final SyncMetrics syncMetrics = new SyncMetrics();
850          syncMetrics.timestamp = currentTs;
851          syncMetrics.syncWaitMs = syncWaitMs;
852          syncMetrics.syncedEntries = slotIndex;
853          syncMetrics.totalSyncedBytes = totalSyncedToStore;
854          syncMetrics.syncedPerSec = syncedPerSec;
855          syncMetricsQueue.add(syncMetrics);
856
857          // sync
858          inSync.set(true);
859          long slotSize = syncSlots();
860          logs.getLast().addToSize(slotSize);
861          totalSyncedToStore = totalSynced.addAndGet(slotSize);
862          slotIndex = 0;
863          inSync.set(false);
864          syncId.incrementAndGet();
865        } catch (InterruptedException e) {
866          Thread.currentThread().interrupt();
867          syncException.compareAndSet(null, e);
868          sendAbortProcessSignal();
869          throw e;
870        } catch (Throwable t) {
871          syncException.compareAndSet(null, t);
872          sendAbortProcessSignal();
873          throw t;
874        } finally {
875          syncCond.signalAll();
876        }
877      }
878    } finally {
879      lock.unlock();
880    }
881  }
882
883  public ArrayList<SyncMetrics> getSyncMetrics() {
884    lock.lock();
885    try {
886      return new ArrayList<>(syncMetricsQueue);
887    } finally {
888      lock.unlock();
889    }
890  }
891
892  private long syncSlots() throws Throwable {
893    int retry = 0;
894    int logRolled = 0;
895    long totalSynced = 0;
896    do {
897      try {
898        totalSynced = syncSlots(stream, slots, 0, slotIndex);
899        break;
900      } catch (Throwable e) {
901        LOG.warn("unable to sync slots, retry=" + retry);
902        if (++retry >= maxRetriesBeforeRoll) {
903          if (logRolled >= maxSyncFailureRoll && isRunning()) {
904            LOG.error("Sync slots after log roll failed, abort.", e);
905            throw e;
906          }
907
908          if (!rollWriterWithRetries()) {
909            throw e;
910          }
911
912          logRolled++;
913          retry = 0;
914        }
915      }
916    } while (isRunning());
917    return totalSynced;
918  }
919
920  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
921    final int offset, final int count) throws IOException {
922    long totalSynced = 0;
923    for (int i = 0; i < count; ++i) {
924      final ByteSlot data = slots[offset + i];
925      data.writeTo(stream);
926      totalSynced += data.size();
927    }
928
929    syncStream(stream);
930    sendPostSyncSignal();
931
932    if (LOG.isTraceEnabled()) {
933      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed="
934        + StringUtils.humanSize(totalSynced));
935    }
936    return totalSynced;
937  }
938
939  protected void syncStream(final FSDataOutputStream stream) throws IOException {
940    if (useHsync) {
941      stream.hsync();
942    } else {
943      stream.hflush();
944    }
945  }
946
947  private boolean rollWriterWithRetries() {
948    for (int i = 0; i < rollRetries && isRunning(); ++i) {
949      if (i > 0) {
950        Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
951      }
952
953      try {
954        if (rollWriter()) {
955          return true;
956        }
957      } catch (IOException e) {
958        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
959      }
960    }
961    LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
962    return false;
963  }
964
965  private boolean tryRollWriter() {
966    try {
967      return rollWriter();
968    } catch (IOException e) {
969      LOG.warn("Unable to roll the log", e);
970      return false;
971    }
972  }
973
974  public long getMillisToNextPeriodicRoll() {
975    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
976      return periodicRollMsec - getMillisFromLastRoll();
977    }
978    return Long.MAX_VALUE;
979  }
980
981  public long getMillisFromLastRoll() {
982    return (EnvironmentEdgeManager.currentTime() - lastRollTs.get());
983  }
984
985  void periodicRollForTesting() throws IOException {
986    lock.lock();
987    try {
988      periodicRoll();
989    } finally {
990      lock.unlock();
991    }
992  }
993
994  public boolean rollWriterForTesting() throws IOException {
995    lock.lock();
996    try {
997      return rollWriter();
998    } finally {
999      lock.unlock();
1000    }
1001  }
1002
1003  void removeInactiveLogsForTesting() throws Exception {
1004    lock.lock();
1005    try {
1006      removeInactiveLogs();
1007    } finally {
1008      lock.unlock();
1009    }
1010  }
1011
1012  private void periodicRoll() throws IOException {
1013    if (storeTracker.isEmpty()) {
1014      LOG.trace("no active procedures");
1015      tryRollWriter();
1016      removeAllLogs(flushLogId - 1, "no active procedures");
1017    } else {
1018      if (storeTracker.isAllModified()) {
1019        LOG.trace("all the active procedures are in the latest log");
1020        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
1021      }
1022
1023      // if the log size has exceeded the roll threshold
1024      // or the periodic roll timeout is expired, try to roll the wal.
1025      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
1026        tryRollWriter();
1027      }
1028
1029      removeInactiveLogs();
1030    }
1031  }
1032
1033  private boolean rollWriter() throws IOException {
1034    if (!isRunning()) {
1035      return false;
1036    }
1037
1038    // Create new state-log
1039    if (!rollWriter(flushLogId + 1)) {
1040      LOG.warn("someone else has already created log {}", flushLogId);
1041      return false;
1042    }
1043
1044    // We have the lease on the log,
1045    // but we should check if someone else has created new files
1046    if (getMaxLogId(getLogFiles()) > flushLogId) {
1047      LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
1048      logs.getLast().removeFile(this.walArchiveDir);
1049      return false;
1050    }
1051
1052    // We have the lease on the log
1053    return true;
1054  }
1055
1056  boolean rollWriter(long logId) throws IOException {
1057    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
1058    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
1059
1060    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
1061      .setVersion(ProcedureWALFormat.HEADER_VERSION).setType(ProcedureWALFormat.LOG_TYPE_STREAM)
1062      .setMinProcId(storeTracker.getActiveMinProcId()).setLogId(logId).build();
1063
1064    FSDataOutputStream newStream = null;
1065    Path newLogFile = null;
1066    long startPos = -1;
1067    newLogFile = getLogFilePath(logId);
1068    try {
1069      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
1070    } catch (FileAlreadyExistsException e) {
1071      LOG.error("Log file with id={} already exists", logId, e);
1072      return false;
1073    } catch (RemoteException re) {
1074      LOG.warn("failed to create log file with id={}", logId, re);
1075      return false;
1076    }
1077    // After we create the stream but before we attempt to use it at all
1078    // ensure that we can provide the level of data safety we're configured
1079    // to provide.
1080    final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
1081    if (enforceStreamCapability && !newStream.hasCapability(durability)) {
1082      throw new IllegalStateException("The procedure WAL relies on the ability to " + durability
1083        + " for proper operation during component failures, but the underlying filesystem does "
1084        + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY
1085        + "' to set the desired level of robustness and ensure the config value of '"
1086        + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
1087    }
1088    try {
1089      ProcedureWALFormat.writeHeader(newStream, header);
1090      startPos = newStream.getPos();
1091    } catch (IOException ioe) {
1092      LOG.warn("Encountered exception writing header", ioe);
1093      newStream.close();
1094      return false;
1095    }
1096
1097    closeCurrentLogStream(false);
1098
1099    storeTracker.resetModified();
1100    stream = newStream;
1101    flushLogId = logId;
1102    totalSynced.set(0);
1103    long rollTs = EnvironmentEdgeManager.currentTime();
1104    lastRollTs.set(rollTs);
1105    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
1106
1107    // if it's the first next WAL being added, build the holding cleanup tracker
1108    if (logs.size() == 2) {
1109      buildHoldingCleanupTracker();
1110    } else if (logs.size() > walCountWarnThreshold) {
1111      LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures"
1112        + " to see if something is stuck.", logs.size(), walCountWarnThreshold);
1113      // This is just like what we have done at RS side when there are too many wal files. For RS,
1114      // if there are too many wal files, we will find out the wal entries in the oldest file, and
1115      // tell the upper layer to flush these regions so the wal entries will be useless and then we
1116      // can delete the wal file. For WALProcedureStore, the assumption is that, if all the
1117      // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
1118      // we are safe to delete it. So here if there are too many proc wal files, we will find out
1119      // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
1120      // wal files, and tell upper layer to update the state of these procedures to the newest proc
1121      // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
1122      // file.
1123      sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
1124    }
1125
1126    LOG.info("Rolled new Procedure Store WAL, id={}", logId);
1127    return true;
1128  }
1129
1130  private void closeCurrentLogStream(boolean abort) {
1131    if (stream == null || logs.isEmpty()) {
1132      return;
1133    }
1134
1135    try {
1136      ProcedureWALFile log = logs.getLast();
1137      // If the loading flag is true, it usually means that we fail when loading procedures, so we
1138      // should not persist the store tracker, as its state may not be correct.
1139      if (!loading.get()) {
1140        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
1141        log.updateLocalTracker(storeTracker);
1142        if (!abort) {
1143          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
1144          log.addToSize(trailerSize);
1145        }
1146      }
1147    } catch (IOException | FSError e) {
1148      LOG.warn("Unable to write the trailer", e);
1149    }
1150    try {
1151      stream.close();
1152    } catch (IOException | FSError e) {
1153      LOG.error("Unable to close the stream", e);
1154    }
1155    stream = null;
1156  }
1157
1158  // ==========================================================================
1159  // Log Files cleaner helpers
1160  // ==========================================================================
1161  private void removeInactiveLogs() throws IOException {
1162    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
1163    // once there is nothing olding the oldest WAL we can remove it.
1164    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
1165      LOG.info("Remove the oldest log {}", logs.getFirst());
1166      removeLogFile(logs.getFirst(), walArchiveDir);
1167      buildHoldingCleanupTracker();
1168    }
1169
1170    // TODO: In case we are holding up a lot of logs for long time we should
1171    // rewrite old procedures (in theory parent procs) to the new WAL.
1172  }
1173
1174  private void buildHoldingCleanupTracker() {
1175    if (logs.size() <= 1) {
1176      // we only have one wal, so nothing to do
1177      holdingCleanupTracker.reset();
1178      return;
1179    }
1180
1181    // compute the holding tracker.
1182    // - the first WAL is used for the 'updates'
1183    // - the global tracker will be used to determine whether a procedure has been deleted
1184    // - other trackers will be used to determine whether a procedure has been updated, as a deleted
1185    // procedure can always be detected by checking the global tracker, we can save the deleted
1186    // checks when applying other trackers
1187    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
1188    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
1189    // the logs is a linked list, so avoid calling get(index) on it.
1190    Iterator<ProcedureWALFile> iter = logs.iterator();
1191    // skip the tracker for the first file when creating the iterator.
1192    iter.next();
1193    ProcedureStoreTracker tracker = iter.next().getTracker();
1194    // testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
1195    // which is just the storeTracker above.
1196    while (iter.hasNext()) {
1197      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
1198      if (holdingCleanupTracker.isEmpty()) {
1199        break;
1200      }
1201      tracker = iter.next().getTracker();
1202    }
1203  }
1204
1205  /**
1206   * Remove all logs with logId <= {@code lastLogId}.
1207   */
1208  private void removeAllLogs(long lastLogId, String why) {
1209    if (logs.size() <= 1) {
1210      return;
1211    }
1212
1213    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
1214
1215    boolean removed = false;
1216    while (logs.size() > 1) {
1217      ProcedureWALFile log = logs.getFirst();
1218      if (lastLogId < log.getLogId()) {
1219        break;
1220      }
1221      removeLogFile(log, walArchiveDir);
1222      removed = true;
1223    }
1224
1225    if (removed) {
1226      buildHoldingCleanupTracker();
1227    }
1228  }
1229
1230  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
1231    try {
1232      LOG.trace("Removing log={}", log);
1233      log.removeFile(walArchiveDir);
1234      logs.remove(log);
1235      LOG.debug("Removed log={}, activeLogs={}", log, logs);
1236      assert logs.size() > 0 : "expected at least one log";
1237    } catch (IOException e) {
1238      LOG.error("Unable to remove log: " + log, e);
1239      return false;
1240    }
1241    return true;
1242  }
1243
1244  // ==========================================================================
1245  // FileSystem Log Files helpers
1246  // ==========================================================================
1247  public Path getWALDir() {
1248    return this.walDir;
1249  }
1250
1251  Path getWalArchiveDir() {
1252    return this.walArchiveDir;
1253  }
1254
1255  public FileSystem getFileSystem() {
1256    return this.fs;
1257  }
1258
1259  protected Path getLogFilePath(final long logId) throws IOException {
1260    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
1261  }
1262
1263  private static long getLogIdFromName(final String name) {
1264    int end = name.lastIndexOf(".log");
1265    int start = name.lastIndexOf('-') + 1;
1266    return Long.parseLong(name.substring(start, end));
1267  }
1268
1269  private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
1270    @Override
1271    public boolean accept(Path path) {
1272      String name = path.getName();
1273      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
1274    }
1275  };
1276
1277  private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
1278    new Comparator<FileStatus>() {
1279      @Override
1280      public int compare(FileStatus a, FileStatus b) {
1281        final long aId = getLogIdFromName(a.getPath().getName());
1282        final long bId = getLogIdFromName(b.getPath().getName());
1283        return Long.compare(aId, bId);
1284      }
1285    };
1286
1287  private FileStatus[] getLogFiles() throws IOException {
1288    try {
1289      FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
1290      Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
1291      return files;
1292    } catch (FileNotFoundException e) {
1293      LOG.warn("Log directory not found: " + e.getMessage());
1294      return null;
1295    }
1296  }
1297
1298  /**
1299   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1300   * the file set by log id.
1301   * @return Max-LogID of the specified log file set
1302   */
1303  private static long getMaxLogId(FileStatus[] logFiles) {
1304    if (logFiles == null || logFiles.length == 0) {
1305      return 0L;
1306    }
1307    return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
1308  }
1309
1310  /**
1311   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1312   * the file set by log id.
1313   * @return Max-LogID of the specified log file set
1314   */
1315  private long initOldLogs(FileStatus[] logFiles) throws IOException {
1316    if (logFiles == null || logFiles.length == 0) {
1317      return 0L;
1318    }
1319    long maxLogId = 0;
1320    for (int i = 0; i < logFiles.length; ++i) {
1321      final Path logPath = logFiles[i].getPath();
1322      leaseRecovery.recoverFileLease(fs, logPath);
1323      if (!isRunning()) {
1324        throw new IOException("wal aborting");
1325      }
1326
1327      maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1328      ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
1329      if (log != null) {
1330        this.logs.add(log);
1331      }
1332    }
1333    initTrackerFromOldLogs();
1334    return maxLogId;
1335  }
1336
1337  /**
1338   * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
1339   * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
1340   */
1341  private void initTrackerFromOldLogs() {
1342    if (logs.isEmpty() || !isRunning()) {
1343      return;
1344    }
1345    ProcedureWALFile log = logs.getLast();
1346    if (!log.getTracker().isPartial()) {
1347      storeTracker.resetTo(log.getTracker());
1348    } else {
1349      storeTracker.reset();
1350      storeTracker.setPartialFlag(true);
1351    }
1352  }
1353
1354  /**
1355   * Loads given log file and it's tracker.
1356   */
1357  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
1358    throws IOException {
1359    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1360    if (logFile.getLen() == 0) {
1361      LOG.warn("Remove uninitialized log: {}", logFile);
1362      log.removeFile(walArchiveDir);
1363      return null;
1364    }
1365    LOG.debug("Opening Pv2 {}", logFile);
1366    try {
1367      log.open();
1368    } catch (ProcedureWALFormat.InvalidWALDataException e) {
1369      LOG.warn("Remove uninitialized log: {}", logFile, e);
1370      log.removeFile(walArchiveDir);
1371      return null;
1372    } catch (IOException e) {
1373      String msg = "Unable to read state log: " + logFile;
1374      LOG.error(msg, e);
1375      throw new IOException(msg, e);
1376    }
1377
1378    try {
1379      log.readTracker();
1380    } catch (IOException e) {
1381      log.getTracker().reset();
1382      log.getTracker().setPartialFlag(true);
1383      LOG.warn("Unable to read tracker for {}", log, e);
1384    }
1385
1386    log.close();
1387    return log;
1388  }
1389
1390  /**
1391   * Parses a directory of WALs building up ProcedureState. For testing parse and profiling.
1392   * @param args Include pointer to directory of WAL files for a store instance to parse & load.
1393   */
1394  public static void main(String[] args) throws IOException {
1395    Configuration conf = HBaseConfiguration.create();
1396    if (args == null || args.length != 1) {
1397      System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR.");
1398      System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR");
1399      System.exit(-1);
1400    }
1401    WALProcedureStore store =
1402      new WALProcedureStore(conf, new Path(args[0]), null, new LeaseRecovery() {
1403        @Override
1404        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
1405          // no-op
1406        }
1407      });
1408    try {
1409      store.start(16);
1410      ProcedureExecutor<?> pe =
1411        new ProcedureExecutor<>(conf, new Object()/* Pass anything */, store);
1412      pe.init(1, true);
1413    } finally {
1414      store.stop(true);
1415    }
1416  }
1417}