001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.LinkedTransferQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
039import org.apache.hadoop.fs.FSError;
040import org.apache.hadoop.fs.FileAlreadyExistsException;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.PathFilter;
045import org.apache.hadoop.fs.StreamCapabilities;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.procedure2.Procedure;
050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
051import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
052import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
053import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
054import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
055import org.apache.hadoop.hbase.procedure2.util.StringUtils;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.hdfs.DistributedFileSystem;
060import org.apache.hadoop.ipc.RemoteException;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
068
069/**
070 * WAL implementation of the ProcedureStore.
071 * <p/>
072 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
073 * then {@link #load(ProcedureLoader)}.
074 * <p/>
075 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
076 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
077 * the old wal files.
078 * <p/>
079 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
080 * the races if there are two master both wants to acquire the lease...
081 * <p/>
082 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
083 * comments of this method for more details.
084 * <p/>
085 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
086 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
087 * methods we will put thing into the {@link #slots} and wait. And there is a background sync
088 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
089 * to the FileSystem, and notify the caller that we have finished.
090 * <p/>
091 * TODO: try using disruptor to increase performance and simplify the logic?
092 * <p/>
093 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
094 * also the one being written currently. And the deleted bits in it are for all the procedures, not
095 * only the ones in the newest wal file. And when rolling a log, we will first store it in the
096 * trailer of the current wal file, and then reset its modified bits, so that it can start to track
097 * the modified procedures for the new wal file.
098 * <p/>
099 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
100 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
101 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
102 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
103 * with the tracker of every newer wal files, using the
104 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
105 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
106 * files, then we can delete it. This is because that, every time we call
107 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
108 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be
109 * deleted.
110 * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
111 * @see #main(String[]) to parse a directory of MasterWALProcs.
112 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
113 *             use the new region based procedure store.
114 */
115@Deprecated
116@InterfaceAudience.Private
117public class WALProcedureStore extends ProcedureStoreBase {
118  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
119  public static final String LOG_PREFIX = "pv2-";
120  /** Used to construct the name of the log directory for master procedures */
121  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
122
123  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
124    "hbase.procedure.store.wal.warn.threshold";
125  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
126
127  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
128    "hbase.procedure.store.wal.exec.cleanup.on.load";
129  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
130
131  public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
132    "hbase.procedure.store.wal.max.retries.before.roll";
133  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
134
135  public static final String WAIT_BEFORE_ROLL_CONF_KEY =
136    "hbase.procedure.store.wal.wait.before.roll";
137  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
138
139  public static final String ROLL_RETRIES_CONF_KEY = "hbase.procedure.store.wal.max.roll.retries";
140  private static final int DEFAULT_ROLL_RETRIES = 3;
141
142  public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
143    "hbase.procedure.store.wal.sync.failure.roll.max";
144  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
145
146  public static final String PERIODIC_ROLL_CONF_KEY =
147    "hbase.procedure.store.wal.periodic.roll.msec";
148  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
149
150  public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
151  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
152
153  public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
154  private static final boolean DEFAULT_USE_HSYNC = true;
155
156  public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
157  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
158
159  public static final String STORE_WAL_SYNC_STATS_COUNT =
160    "hbase.procedure.store.wal.sync.stats.count";
161  private static final int DEFAULT_SYNC_STATS_COUNT = 10;
162
163  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
164  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
165  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
166  private final ReentrantLock lock = new ReentrantLock();
167  private final Condition waitCond = lock.newCondition();
168  private final Condition slotCond = lock.newCondition();
169  private final Condition syncCond = lock.newCondition();
170
171  private final LeaseRecovery leaseRecovery;
172  private final Configuration conf;
173  private final FileSystem fs;
174  private final Path walDir;
175  private final Path walArchiveDir;
176  private final boolean enforceStreamCapability;
177
178  private final AtomicReference<Throwable> syncException = new AtomicReference<>();
179  private final AtomicBoolean loading = new AtomicBoolean(true);
180  private final AtomicBoolean inSync = new AtomicBoolean(false);
181  private final AtomicLong totalSynced = new AtomicLong(0);
182  private final AtomicLong lastRollTs = new AtomicLong(0);
183  private final AtomicLong syncId = new AtomicLong(0);
184
185  private LinkedTransferQueue<ByteSlot> slotsCache = null;
186  private Set<ProcedureWALFile> corruptedLogs = null;
187  private FSDataOutputStream stream = null;
188  private int runningProcCount = 1;
189  private long flushLogId = 0;
190  private int syncMaxSlot = 1;
191  private int slotIndex = 0;
192  private Thread syncThread;
193  private ByteSlot[] slots;
194
195  private int walCountWarnThreshold;
196  private int maxRetriesBeforeRoll;
197  private int maxSyncFailureRoll;
198  private int waitBeforeRoll;
199  private int rollRetries;
200  private int periodicRollMsec;
201  private long rollThreshold;
202  private boolean useHsync;
203  private int syncWaitMsec;
204
205  // Variables used for UI display
206  private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
207
208  public static class SyncMetrics {
209    private long timestamp;
210    private long syncWaitMs;
211    private long totalSyncedBytes;
212    private int syncedEntries;
213    private float syncedPerSec;
214
215    public long getTimestamp() {
216      return timestamp;
217    }
218
219    public long getSyncWaitMs() {
220      return syncWaitMs;
221    }
222
223    public long getTotalSyncedBytes() {
224      return totalSyncedBytes;
225    }
226
227    public long getSyncedEntries() {
228      return syncedEntries;
229    }
230
231    public float getSyncedPerSec() {
232      return syncedPerSec;
233    }
234  }
235
236  public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
237    this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
238      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
239      leaseRecovery);
240  }
241
242  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
243    final LeaseRecovery leaseRecovery) throws IOException {
244    this.conf = conf;
245    this.leaseRecovery = leaseRecovery;
246    this.walDir = walDir;
247    this.walArchiveDir = walArchiveDir;
248    this.fs = CommonFSUtils.getWALFileSystem(conf);
249    this.enforceStreamCapability =
250      conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
251
252    // Create the log directory for the procedure store
253    if (!fs.exists(walDir)) {
254      if (!fs.mkdirs(walDir)) {
255        throw new IOException("Unable to mkdir " + walDir);
256      }
257    }
258    // Now that it exists, set the log policy
259    String storagePolicy =
260      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
261    CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy);
262
263    // Create archive dir up front. Rename won't work w/o it up on HDFS.
264    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
265      if (this.fs.mkdirs(this.walArchiveDir)) {
266        LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
267      } else {
268        LOG.warn("Failed create of {}", this.walArchiveDir);
269      }
270    }
271  }
272
273  @Override
274  public void start(int numSlots) throws IOException {
275    if (!setRunning(true)) {
276      return;
277    }
278
279    // Init buffer slots
280    loading.set(true);
281    runningProcCount = numSlots;
282    syncMaxSlot = numSlots;
283    slots = new ByteSlot[numSlots];
284    slotsCache = new LinkedTransferQueue<>();
285    while (slotsCache.size() < numSlots) {
286      slotsCache.offer(new ByteSlot());
287    }
288
289    // Tunings
290    walCountWarnThreshold =
291      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
292    maxRetriesBeforeRoll =
293      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
294    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
295    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
296    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
297    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
298    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
299    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
300    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
301
302    // WebUI
303    syncMetricsQueue =
304      new CircularFifoQueue<>(conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
305
306    // Init sync thread
307    syncThread = new Thread("WALProcedureStoreSyncThread") {
308      @Override
309      public void run() {
310        try {
311          syncLoop();
312        } catch (Throwable e) {
313          LOG.error("Got an exception from the sync-loop", e);
314          if (!isSyncAborted()) {
315            sendAbortProcessSignal();
316          }
317        }
318      }
319    };
320    syncThread.start();
321  }
322
323  @Override
324  public void stop(final boolean abort) {
325    if (!setRunning(false)) {
326      return;
327    }
328
329    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort
330      + (isSyncAborted() ? " (self aborting)" : ""));
331    sendStopSignal();
332    if (!isSyncAborted()) {
333      try {
334        while (syncThread.isAlive()) {
335          sendStopSignal();
336          syncThread.join(250);
337        }
338      } catch (InterruptedException e) {
339        LOG.warn("join interrupted", e);
340        Thread.currentThread().interrupt();
341      }
342    }
343
344    // Close the writer
345    closeCurrentLogStream(abort);
346
347    // Close the old logs
348    // they should be already closed, this is just in case the load fails
349    // and we call start() and then stop()
350    for (ProcedureWALFile log : logs) {
351      log.close();
352    }
353    logs.clear();
354    loading.set(true);
355  }
356
357  private void sendStopSignal() {
358    if (lock.tryLock()) {
359      try {
360        waitCond.signalAll();
361        syncCond.signalAll();
362      } finally {
363        lock.unlock();
364      }
365    }
366  }
367
368  @Override
369  public int getNumThreads() {
370    return slots == null ? 0 : slots.length;
371  }
372
373  @Override
374  public int setRunningProcedureCount(final int count) {
375    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
376    return this.runningProcCount;
377  }
378
379  public ProcedureStoreTracker getStoreTracker() {
380    return storeTracker;
381  }
382
383  public ArrayList<ProcedureWALFile> getActiveLogs() {
384    lock.lock();
385    try {
386      return new ArrayList<>(logs);
387    } finally {
388      lock.unlock();
389    }
390  }
391
392  public Set<ProcedureWALFile> getCorruptedLogs() {
393    return corruptedLogs;
394  }
395
396  @Override
397  public void recoverLease() throws IOException {
398    lock.lock();
399    try {
400      LOG.debug("Starting WAL Procedure Store lease recovery");
401      boolean afterFirstAttempt = false;
402      while (isRunning()) {
403        // Don't sleep before first attempt
404        if (afterFirstAttempt) {
405          LOG.trace("Sleep {} ms after first lease recovery attempt.", waitBeforeRoll);
406          Threads.sleepWithoutInterrupt(waitBeforeRoll);
407        } else {
408          afterFirstAttempt = true;
409        }
410        FileStatus[] oldLogs = getLogFiles();
411        // Get Log-MaxID and recover lease on old logs
412        try {
413          flushLogId = initOldLogs(oldLogs);
414        } catch (FileNotFoundException e) {
415          LOG.warn("Someone else is active and deleted logs. retrying.", e);
416          continue;
417        }
418
419        // Create new state-log
420        if (!rollWriter(flushLogId + 1)) {
421          // someone else has already created this log
422          LOG.debug("Someone else has already created log {}. Retrying.", flushLogId);
423          continue;
424        }
425
426        // We have the lease on the log
427        oldLogs = getLogFiles();
428        if (getMaxLogId(oldLogs) > flushLogId) {
429          LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
430          logs.getLast().removeFile(this.walArchiveDir);
431          continue;
432        }
433
434        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
435        break;
436      }
437    } finally {
438      lock.unlock();
439    }
440  }
441
442  @Override
443  public void load(ProcedureLoader loader) throws IOException {
444    lock.lock();
445    try {
446      if (logs.isEmpty()) {
447        throw new IllegalStateException("recoverLease() must be called before loading data");
448      }
449
450      // Nothing to do, If we have only the current log.
451      if (logs.size() == 1) {
452        LOG.debug("No state logs to replay.");
453        loader.setMaxProcId(0);
454        loading.set(false);
455        return;
456      }
457
458      // Load the old logs
459      Iterator<ProcedureWALFile> it = logs.descendingIterator();
460      it.next(); // Skip the current log
461
462      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
463
464        @Override
465        public void setMaxProcId(long maxProcId) {
466          loader.setMaxProcId(maxProcId);
467        }
468
469        @Override
470        public void load(ProcedureIterator procIter) throws IOException {
471          loader.load(procIter);
472        }
473
474        @Override
475        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
476          loader.handleCorrupted(procIter);
477        }
478
479        @Override
480        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
481          if (corruptedLogs == null) {
482            corruptedLogs = new HashSet<>();
483          }
484          corruptedLogs.add(log);
485          // TODO: sideline corrupted log
486        }
487      });
488      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
489      // method. As it may happen that, we have finished constructing the modified and deleted bits,
490      // but before we call resetModified, we fail, then if we persist the storeTracker then when
491      // restarting, we will consider that all procedures have been included in this file and delete
492      // all the previous files. Obviously this not correct. So here we will only set loading to
493      // false when we successfully loaded all the procedures, and when closing we will skip
494      // persisting the store tracker. And also, this will prevent the sync thread to do
495      // periodicRoll, where we may also clean old logs.
496      loading.set(false);
497      // try to cleanup inactive wals and complete the operation
498      buildHoldingCleanupTracker();
499      tryCleanupLogsOnLoad();
500    } finally {
501      lock.unlock();
502    }
503  }
504
505  private void tryCleanupLogsOnLoad() {
506    // nothing to cleanup.
507    if (logs.size() <= 1) {
508      return;
509    }
510
511    // the config says to not cleanup wals on load.
512    if (
513      !conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)
514    ) {
515      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
516      return;
517    }
518
519    try {
520      periodicRoll();
521    } catch (IOException e) {
522      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
523    }
524  }
525
526  @Override
527  public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
528    if (LOG.isTraceEnabled()) {
529      LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
530    }
531
532    ByteSlot slot = acquireSlot();
533    try {
534      // Serialize the insert
535      long[] subProcIds = null;
536      if (subprocs != null) {
537        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
538        subProcIds = new long[subprocs.length];
539        for (int i = 0; i < subprocs.length; ++i) {
540          subProcIds[i] = subprocs[i].getProcId();
541        }
542      } else {
543        assert !proc.hasParent();
544        ProcedureWALFormat.writeInsert(slot, proc);
545      }
546
547      // Push the transaction data and wait until it is persisted
548      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
549    } catch (IOException e) {
550      // We are not able to serialize the procedure.
551      // this is a code error, and we are not able to go on.
552      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + proc
553        + ", subprocs=" + Arrays.toString(subprocs), e);
554      throw new RuntimeException(e);
555    } finally {
556      releaseSlot(slot);
557    }
558  }
559
560  @Override
561  public void insert(Procedure<?>[] procs) {
562    if (LOG.isTraceEnabled()) {
563      LOG.trace("Insert " + Arrays.toString(procs));
564    }
565
566    ByteSlot slot = acquireSlot();
567    try {
568      // Serialize the insert
569      long[] procIds = new long[procs.length];
570      for (int i = 0; i < procs.length; ++i) {
571        assert !procs[i].hasParent();
572        procIds[i] = procs[i].getProcId();
573        ProcedureWALFormat.writeInsert(slot, procs[i]);
574      }
575
576      // Push the transaction data and wait until it is persisted
577      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
578    } catch (IOException e) {
579      // We are not able to serialize the procedure.
580      // this is a code error, and we are not able to go on.
581      LOG.error(HBaseMarkers.FATAL,
582        "Unable to serialize one of the procedure: " + Arrays.toString(procs), e);
583      throw new RuntimeException(e);
584    } finally {
585      releaseSlot(slot);
586    }
587  }
588
589  @Override
590  public void update(Procedure<?> proc) {
591    if (LOG.isTraceEnabled()) {
592      LOG.trace("Update " + proc);
593    }
594
595    ByteSlot slot = acquireSlot();
596    try {
597      // Serialize the update
598      ProcedureWALFormat.writeUpdate(slot, proc);
599
600      // Push the transaction data and wait until it is persisted
601      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
602    } catch (IOException e) {
603      // We are not able to serialize the procedure.
604      // this is a code error, and we are not able to go on.
605      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
606      throw new RuntimeException(e);
607    } finally {
608      releaseSlot(slot);
609    }
610  }
611
612  @Override
613  public void delete(long procId) {
614    LOG.trace("Delete {}", procId);
615    ByteSlot slot = acquireSlot();
616    try {
617      // Serialize the delete
618      ProcedureWALFormat.writeDelete(slot, procId);
619
620      // Push the transaction data and wait until it is persisted
621      pushData(PushType.DELETE, slot, procId, null);
622    } catch (IOException e) {
623      // We are not able to serialize the procedure.
624      // this is a code error, and we are not able to go on.
625      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e);
626      throw new RuntimeException(e);
627    } finally {
628      releaseSlot(slot);
629    }
630  }
631
632  @Override
633  public void delete(Procedure<?> proc, long[] subProcIds) {
634    assert proc != null : "expected a non-null procedure";
635    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
636    if (LOG.isTraceEnabled()) {
637      LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
638    }
639
640    ByteSlot slot = acquireSlot();
641    try {
642      // Serialize the delete
643      ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
644
645      // Push the transaction data and wait until it is persisted
646      pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
647    } catch (IOException e) {
648      // We are not able to serialize the procedure.
649      // this is a code error, and we are not able to go on.
650      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
651      throw new RuntimeException(e);
652    } finally {
653      releaseSlot(slot);
654    }
655  }
656
657  @Override
658  public void delete(final long[] procIds, final int offset, final int count) {
659    if (count == 0) {
660      return;
661    }
662
663    if (offset == 0 && count == procIds.length) {
664      delete(procIds);
665    } else if (count == 1) {
666      delete(procIds[offset]);
667    } else {
668      delete(Arrays.copyOfRange(procIds, offset, offset + count));
669    }
670  }
671
672  private void delete(long[] procIds) {
673    if (LOG.isTraceEnabled()) {
674      LOG.trace("Delete " + Arrays.toString(procIds));
675    }
676
677    final ByteSlot slot = acquireSlot();
678    try {
679      // Serialize the delete
680      for (int i = 0; i < procIds.length; ++i) {
681        ProcedureWALFormat.writeDelete(slot, procIds[i]);
682      }
683
684      // Push the transaction data and wait until it is persisted
685      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
686    } catch (IOException e) {
687      // We are not able to serialize the procedure.
688      // this is a code error, and we are not able to go on.
689      LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
690      throw new RuntimeException(e);
691    } finally {
692      releaseSlot(slot);
693    }
694  }
695
696  private ByteSlot acquireSlot() {
697    ByteSlot slot = slotsCache.poll();
698    return slot != null ? slot : new ByteSlot();
699  }
700
701  private void releaseSlot(final ByteSlot slot) {
702    slot.reset();
703    slotsCache.offer(slot);
704  }
705
706  private enum PushType {
707    INSERT,
708    UPDATE,
709    DELETE
710  }
711
712  private long pushData(final PushType type, final ByteSlot slot, final long procId,
713    final long[] subProcIds) {
714    if (!isRunning()) {
715      throw new RuntimeException("the store must be running before inserting data");
716    }
717    if (logs.isEmpty()) {
718      throw new RuntimeException("recoverLease() must be called before inserting data");
719    }
720
721    long logId = -1;
722    lock.lock();
723    try {
724      // Wait for the sync to be completed
725      while (true) {
726        if (!isRunning()) {
727          throw new RuntimeException("store no longer running");
728        } else if (isSyncAborted()) {
729          throw new RuntimeException("sync aborted", syncException.get());
730        } else if (inSync.get()) {
731          syncCond.await();
732        } else if (slotIndex >= syncMaxSlot) {
733          slotCond.signal();
734          syncCond.await();
735        } else {
736          break;
737        }
738      }
739
740      final long pushSyncId = syncId.get();
741      updateStoreTracker(type, procId, subProcIds);
742      slots[slotIndex++] = slot;
743      logId = flushLogId;
744
745      // Notify that there is new data
746      if (slotIndex == 1) {
747        waitCond.signal();
748      }
749
750      // Notify that the slots are full
751      if (slotIndex == syncMaxSlot) {
752        waitCond.signal();
753        slotCond.signal();
754      }
755
756      while (pushSyncId == syncId.get() && isRunning()) {
757        syncCond.await();
758      }
759    } catch (InterruptedException e) {
760      Thread.currentThread().interrupt();
761      sendAbortProcessSignal();
762      throw new RuntimeException(e);
763    } finally {
764      lock.unlock();
765      if (isSyncAborted()) {
766        throw new RuntimeException("sync aborted", syncException.get());
767      }
768    }
769    return logId;
770  }
771
772  private void updateStoreTracker(final PushType type, final long procId, final long[] subProcIds) {
773    switch (type) {
774      case INSERT:
775        if (subProcIds == null) {
776          storeTracker.insert(procId);
777        } else if (procId == Procedure.NO_PROC_ID) {
778          storeTracker.insert(subProcIds);
779        } else {
780          storeTracker.insert(procId, subProcIds);
781          holdingCleanupTracker.setDeletedIfModified(procId);
782        }
783        break;
784      case UPDATE:
785        storeTracker.update(procId);
786        holdingCleanupTracker.setDeletedIfModified(procId);
787        break;
788      case DELETE:
789        if (subProcIds != null && subProcIds.length > 0) {
790          storeTracker.delete(subProcIds);
791          holdingCleanupTracker.setDeletedIfModified(subProcIds);
792        } else {
793          storeTracker.delete(procId);
794          holdingCleanupTracker.setDeletedIfModified(procId);
795        }
796        break;
797      default:
798        throw new RuntimeException("invalid push type " + type);
799    }
800  }
801
802  private boolean isSyncAborted() {
803    return syncException.get() != null;
804  }
805
806  private void syncLoop() throws Throwable {
807    long totalSyncedToStore = 0;
808    inSync.set(false);
809    lock.lock();
810    try {
811      while (isRunning()) {
812        try {
813          // Wait until new data is available
814          if (slotIndex == 0) {
815            if (!loading.get()) {
816              periodicRoll();
817            }
818
819            if (LOG.isTraceEnabled()) {
820              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
821              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
822                StringUtils.humanSize(totalSynced.get()),
823                StringUtils.humanSize(totalSynced.get() / rollTsSec)));
824            }
825
826            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
827            if (slotIndex == 0) {
828              // no data.. probably a stop() or a periodic roll
829              continue;
830            }
831          }
832          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
833          syncMaxSlot = runningProcCount;
834          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
835          final long syncWaitSt = EnvironmentEdgeManager.currentTime();
836          if (slotIndex != syncMaxSlot) {
837            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
838          }
839
840          final long currentTs = EnvironmentEdgeManager.currentTime();
841          final long syncWaitMs = currentTs - syncWaitSt;
842          final float rollSec = getMillisFromLastRoll() / 1000.0f;
843          final float syncedPerSec = totalSyncedToStore / rollSec;
844          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
845            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
846              StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
847              StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(syncedPerSec)));
848          }
849
850          // update webui circular buffers (TODO: get rid of allocations)
851          final SyncMetrics syncMetrics = new SyncMetrics();
852          syncMetrics.timestamp = currentTs;
853          syncMetrics.syncWaitMs = syncWaitMs;
854          syncMetrics.syncedEntries = slotIndex;
855          syncMetrics.totalSyncedBytes = totalSyncedToStore;
856          syncMetrics.syncedPerSec = syncedPerSec;
857          syncMetricsQueue.add(syncMetrics);
858
859          // sync
860          inSync.set(true);
861          long slotSize = syncSlots();
862          logs.getLast().addToSize(slotSize);
863          totalSyncedToStore = totalSynced.addAndGet(slotSize);
864          slotIndex = 0;
865          inSync.set(false);
866          syncId.incrementAndGet();
867        } catch (InterruptedException e) {
868          Thread.currentThread().interrupt();
869          syncException.compareAndSet(null, e);
870          sendAbortProcessSignal();
871          throw e;
872        } catch (Throwable t) {
873          syncException.compareAndSet(null, t);
874          sendAbortProcessSignal();
875          throw t;
876        } finally {
877          syncCond.signalAll();
878        }
879      }
880    } finally {
881      lock.unlock();
882    }
883  }
884
885  public ArrayList<SyncMetrics> getSyncMetrics() {
886    lock.lock();
887    try {
888      return new ArrayList<>(syncMetricsQueue);
889    } finally {
890      lock.unlock();
891    }
892  }
893
894  private long syncSlots() throws Throwable {
895    int retry = 0;
896    int logRolled = 0;
897    long totalSynced = 0;
898    do {
899      try {
900        totalSynced = syncSlots(stream, slots, 0, slotIndex);
901        break;
902      } catch (Throwable e) {
903        LOG.warn("unable to sync slots, retry=" + retry);
904        if (++retry >= maxRetriesBeforeRoll) {
905          if (logRolled >= maxSyncFailureRoll && isRunning()) {
906            LOG.error("Sync slots after log roll failed, abort.", e);
907            throw e;
908          }
909
910          if (!rollWriterWithRetries()) {
911            throw e;
912          }
913
914          logRolled++;
915          retry = 0;
916        }
917      }
918    } while (isRunning());
919    return totalSynced;
920  }
921
922  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
923    final int offset, final int count) throws IOException {
924    long totalSynced = 0;
925    for (int i = 0; i < count; ++i) {
926      final ByteSlot data = slots[offset + i];
927      data.writeTo(stream);
928      totalSynced += data.size();
929    }
930
931    syncStream(stream);
932    sendPostSyncSignal();
933
934    if (LOG.isTraceEnabled()) {
935      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed="
936        + StringUtils.humanSize(totalSynced));
937    }
938    return totalSynced;
939  }
940
941  protected void syncStream(final FSDataOutputStream stream) throws IOException {
942    if (useHsync) {
943      stream.hsync();
944    } else {
945      stream.hflush();
946    }
947  }
948
949  private boolean rollWriterWithRetries() {
950    for (int i = 0; i < rollRetries && isRunning(); ++i) {
951      if (i > 0) {
952        Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
953      }
954
955      try {
956        if (rollWriter()) {
957          return true;
958        }
959      } catch (IOException e) {
960        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
961      }
962    }
963    LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
964    return false;
965  }
966
967  private boolean tryRollWriter() {
968    try {
969      return rollWriter();
970    } catch (IOException e) {
971      LOG.warn("Unable to roll the log", e);
972      return false;
973    }
974  }
975
976  public long getMillisToNextPeriodicRoll() {
977    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
978      return periodicRollMsec - getMillisFromLastRoll();
979    }
980    return Long.MAX_VALUE;
981  }
982
983  public long getMillisFromLastRoll() {
984    return (EnvironmentEdgeManager.currentTime() - lastRollTs.get());
985  }
986
987  void periodicRollForTesting() throws IOException {
988    lock.lock();
989    try {
990      periodicRoll();
991    } finally {
992      lock.unlock();
993    }
994  }
995
996  public boolean rollWriterForTesting() throws IOException {
997    lock.lock();
998    try {
999      return rollWriter();
1000    } finally {
1001      lock.unlock();
1002    }
1003  }
1004
1005  void removeInactiveLogsForTesting() throws Exception {
1006    lock.lock();
1007    try {
1008      removeInactiveLogs();
1009    } finally {
1010      lock.unlock();
1011    }
1012  }
1013
1014  private void periodicRoll() throws IOException {
1015    if (storeTracker.isEmpty()) {
1016      LOG.trace("no active procedures");
1017      tryRollWriter();
1018      removeAllLogs(flushLogId - 1, "no active procedures");
1019    } else {
1020      if (storeTracker.isAllModified()) {
1021        LOG.trace("all the active procedures are in the latest log");
1022        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
1023      }
1024
1025      // if the log size has exceeded the roll threshold
1026      // or the periodic roll timeout is expired, try to roll the wal.
1027      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
1028        tryRollWriter();
1029      }
1030
1031      removeInactiveLogs();
1032    }
1033  }
1034
1035  private boolean rollWriter() throws IOException {
1036    if (!isRunning()) {
1037      return false;
1038    }
1039
1040    // Create new state-log
1041    if (!rollWriter(flushLogId + 1)) {
1042      LOG.warn("someone else has already created log {}", flushLogId);
1043      return false;
1044    }
1045
1046    // We have the lease on the log,
1047    // but we should check if someone else has created new files
1048    if (getMaxLogId(getLogFiles()) > flushLogId) {
1049      LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
1050      logs.getLast().removeFile(this.walArchiveDir);
1051      return false;
1052    }
1053
1054    // We have the lease on the log
1055    return true;
1056  }
1057
1058  boolean rollWriter(long logId) throws IOException {
1059    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
1060    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
1061
1062    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
1063      .setVersion(ProcedureWALFormat.HEADER_VERSION).setType(ProcedureWALFormat.LOG_TYPE_STREAM)
1064      .setMinProcId(storeTracker.getActiveMinProcId()).setLogId(logId).build();
1065
1066    FSDataOutputStream newStream = null;
1067    Path newLogFile = null;
1068    long startPos = -1;
1069    newLogFile = getLogFilePath(logId);
1070    try {
1071      FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(newLogFile).overwrite(false);
1072      if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
1073        newStream =
1074          ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build();
1075      } else {
1076        newStream = builder.build();
1077      }
1078    } catch (FileAlreadyExistsException e) {
1079      LOG.error("Log file with id={} already exists", logId, e);
1080      return false;
1081    } catch (RemoteException re) {
1082      LOG.warn("failed to create log file with id={}", logId, re);
1083      return false;
1084    }
1085    // After we create the stream but before we attempt to use it at all
1086    // ensure that we can provide the level of data safety we're configured
1087    // to provide.
1088    final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
1089    if (enforceStreamCapability && !newStream.hasCapability(durability)) {
1090      throw new IllegalStateException("The procedure WAL relies on the ability to " + durability
1091        + " for proper operation during component failures, but the underlying filesystem does "
1092        + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY
1093        + "' to set the desired level of robustness and ensure the config value of '"
1094        + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
1095    }
1096    try {
1097      ProcedureWALFormat.writeHeader(newStream, header);
1098      startPos = newStream.getPos();
1099    } catch (IOException ioe) {
1100      LOG.warn("Encountered exception writing header", ioe);
1101      newStream.close();
1102      return false;
1103    }
1104
1105    closeCurrentLogStream(false);
1106
1107    storeTracker.resetModified();
1108    stream = newStream;
1109    flushLogId = logId;
1110    totalSynced.set(0);
1111    long rollTs = EnvironmentEdgeManager.currentTime();
1112    lastRollTs.set(rollTs);
1113    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
1114
1115    // if it's the first next WAL being added, build the holding cleanup tracker
1116    if (logs.size() == 2) {
1117      buildHoldingCleanupTracker();
1118    } else if (logs.size() > walCountWarnThreshold) {
1119      LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures"
1120        + " to see if something is stuck.", logs.size(), walCountWarnThreshold);
1121      // This is just like what we have done at RS side when there are too many wal files. For RS,
1122      // if there are too many wal files, we will find out the wal entries in the oldest file, and
1123      // tell the upper layer to flush these regions so the wal entries will be useless and then we
1124      // can delete the wal file. For WALProcedureStore, the assumption is that, if all the
1125      // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
1126      // we are safe to delete it. So here if there are too many proc wal files, we will find out
1127      // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
1128      // wal files, and tell upper layer to update the state of these procedures to the newest proc
1129      // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
1130      // file.
1131      sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
1132    }
1133
1134    LOG.info("Rolled new Procedure Store WAL, id={}", logId);
1135    return true;
1136  }
1137
1138  private void closeCurrentLogStream(boolean abort) {
1139    if (stream == null || logs.isEmpty()) {
1140      return;
1141    }
1142
1143    try {
1144      ProcedureWALFile log = logs.getLast();
1145      // If the loading flag is true, it usually means that we fail when loading procedures, so we
1146      // should not persist the store tracker, as its state may not be correct.
1147      if (!loading.get()) {
1148        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
1149        log.updateLocalTracker(storeTracker);
1150        if (!abort) {
1151          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
1152          log.addToSize(trailerSize);
1153        }
1154      }
1155    } catch (IOException | FSError e) {
1156      LOG.warn("Unable to write the trailer", e);
1157    }
1158    try {
1159      stream.close();
1160    } catch (IOException | FSError e) {
1161      LOG.error("Unable to close the stream", e);
1162    }
1163    stream = null;
1164  }
1165
1166  // ==========================================================================
1167  // Log Files cleaner helpers
1168  // ==========================================================================
1169  private void removeInactiveLogs() throws IOException {
1170    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
1171    // once there is nothing olding the oldest WAL we can remove it.
1172    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
1173      LOG.info("Remove the oldest log {}", logs.getFirst());
1174      removeLogFile(logs.getFirst(), walArchiveDir);
1175      buildHoldingCleanupTracker();
1176    }
1177
1178    // TODO: In case we are holding up a lot of logs for long time we should
1179    // rewrite old procedures (in theory parent procs) to the new WAL.
1180  }
1181
1182  private void buildHoldingCleanupTracker() {
1183    if (logs.size() <= 1) {
1184      // we only have one wal, so nothing to do
1185      holdingCleanupTracker.reset();
1186      return;
1187    }
1188
1189    // compute the holding tracker.
1190    // - the first WAL is used for the 'updates'
1191    // - the global tracker will be used to determine whether a procedure has been deleted
1192    // - other trackers will be used to determine whether a procedure has been updated, as a deleted
1193    // procedure can always be detected by checking the global tracker, we can save the deleted
1194    // checks when applying other trackers
1195    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
1196    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
1197    // the logs is a linked list, so avoid calling get(index) on it.
1198    Iterator<ProcedureWALFile> iter = logs.iterator();
1199    // skip the tracker for the first file when creating the iterator.
1200    iter.next();
1201    ProcedureStoreTracker tracker = iter.next().getTracker();
1202    // testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
1203    // which is just the storeTracker above.
1204    while (iter.hasNext()) {
1205      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
1206      if (holdingCleanupTracker.isEmpty()) {
1207        break;
1208      }
1209      tracker = iter.next().getTracker();
1210    }
1211  }
1212
1213  /**
1214   * Remove all logs with logId <= {@code lastLogId}.
1215   */
1216  private void removeAllLogs(long lastLogId, String why) {
1217    if (logs.size() <= 1) {
1218      return;
1219    }
1220
1221    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
1222
1223    boolean removed = false;
1224    while (logs.size() > 1) {
1225      ProcedureWALFile log = logs.getFirst();
1226      if (lastLogId < log.getLogId()) {
1227        break;
1228      }
1229      removeLogFile(log, walArchiveDir);
1230      removed = true;
1231    }
1232
1233    if (removed) {
1234      buildHoldingCleanupTracker();
1235    }
1236  }
1237
1238  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
1239    try {
1240      LOG.trace("Removing log={}", log);
1241      log.removeFile(walArchiveDir);
1242      logs.remove(log);
1243      LOG.debug("Removed log={}, activeLogs={}", log, logs);
1244      assert logs.size() > 0 : "expected at least one log";
1245    } catch (IOException e) {
1246      LOG.error("Unable to remove log: " + log, e);
1247      return false;
1248    }
1249    return true;
1250  }
1251
1252  // ==========================================================================
1253  // FileSystem Log Files helpers
1254  // ==========================================================================
1255  public Path getWALDir() {
1256    return this.walDir;
1257  }
1258
1259  Path getWalArchiveDir() {
1260    return this.walArchiveDir;
1261  }
1262
1263  public FileSystem getFileSystem() {
1264    return this.fs;
1265  }
1266
1267  protected Path getLogFilePath(final long logId) throws IOException {
1268    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
1269  }
1270
1271  private static long getLogIdFromName(final String name) {
1272    int end = name.lastIndexOf(".log");
1273    int start = name.lastIndexOf('-') + 1;
1274    return Long.parseLong(name.substring(start, end));
1275  }
1276
1277  private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
1278    @Override
1279    public boolean accept(Path path) {
1280      String name = path.getName();
1281      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
1282    }
1283  };
1284
1285  private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
1286    new Comparator<FileStatus>() {
1287      @Override
1288      public int compare(FileStatus a, FileStatus b) {
1289        final long aId = getLogIdFromName(a.getPath().getName());
1290        final long bId = getLogIdFromName(b.getPath().getName());
1291        return Long.compare(aId, bId);
1292      }
1293    };
1294
1295  private FileStatus[] getLogFiles() throws IOException {
1296    try {
1297      FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
1298      Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
1299      return files;
1300    } catch (FileNotFoundException e) {
1301      LOG.warn("Log directory not found: " + e.getMessage());
1302      return null;
1303    }
1304  }
1305
1306  /**
1307   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1308   * the file set by log id.
1309   * @return Max-LogID of the specified log file set
1310   */
1311  private static long getMaxLogId(FileStatus[] logFiles) {
1312    if (logFiles == null || logFiles.length == 0) {
1313      return 0L;
1314    }
1315    return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
1316  }
1317
1318  /**
1319   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1320   * the file set by log id.
1321   * @return Max-LogID of the specified log file set
1322   */
1323  private long initOldLogs(FileStatus[] logFiles) throws IOException {
1324    if (logFiles == null || logFiles.length == 0) {
1325      return 0L;
1326    }
1327    long maxLogId = 0;
1328    for (int i = 0; i < logFiles.length; ++i) {
1329      final Path logPath = logFiles[i].getPath();
1330      leaseRecovery.recoverFileLease(fs, logPath);
1331      if (!isRunning()) {
1332        throw new IOException("wal aborting");
1333      }
1334
1335      maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1336      ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
1337      if (log != null) {
1338        this.logs.add(log);
1339      }
1340    }
1341    initTrackerFromOldLogs();
1342    return maxLogId;
1343  }
1344
1345  /**
1346   * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
1347   * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
1348   */
1349  private void initTrackerFromOldLogs() {
1350    if (logs.isEmpty() || !isRunning()) {
1351      return;
1352    }
1353    ProcedureWALFile log = logs.getLast();
1354    if (!log.getTracker().isPartial()) {
1355      storeTracker.resetTo(log.getTracker());
1356    } else {
1357      storeTracker.reset();
1358      storeTracker.setPartialFlag(true);
1359    }
1360  }
1361
1362  /**
1363   * Loads given log file and it's tracker.
1364   */
1365  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
1366    throws IOException {
1367    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1368    if (logFile.getLen() == 0) {
1369      LOG.warn("Remove uninitialized log: {}", logFile);
1370      log.removeFile(walArchiveDir);
1371      return null;
1372    }
1373    LOG.debug("Opening Pv2 {}", logFile);
1374    try {
1375      log.open();
1376    } catch (ProcedureWALFormat.InvalidWALDataException e) {
1377      LOG.warn("Remove uninitialized log: {}", logFile, e);
1378      log.removeFile(walArchiveDir);
1379      return null;
1380    } catch (IOException e) {
1381      String msg = "Unable to read state log: " + logFile;
1382      LOG.error(msg, e);
1383      throw new IOException(msg, e);
1384    }
1385
1386    try {
1387      log.readTracker();
1388    } catch (IOException e) {
1389      log.getTracker().reset();
1390      log.getTracker().setPartialFlag(true);
1391      LOG.warn("Unable to read tracker for {}", log, e);
1392    }
1393
1394    log.close();
1395    return log;
1396  }
1397
1398  /**
1399   * Parses a directory of WALs building up ProcedureState. For testing parse and profiling.
1400   * @param args Include pointer to directory of WAL files for a store instance to parse & load.
1401   */
1402  public static void main(String[] args) throws IOException {
1403    Configuration conf = HBaseConfiguration.create();
1404    if (args == null || args.length != 1) {
1405      System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR.");
1406      System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR");
1407      System.exit(-1);
1408    }
1409    WALProcedureStore store =
1410      new WALProcedureStore(conf, new Path(args[0]), null, new LeaseRecovery() {
1411        @Override
1412        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
1413          // no-op
1414        }
1415      });
1416    try {
1417      store.start(16);
1418      ProcedureExecutor<?> pe =
1419        new ProcedureExecutor<>(conf, new Object()/* Pass anything */, store);
1420      pe.init(1, true);
1421    } finally {
1422      store.stop(true);
1423    }
1424  }
1425}