001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.LinkedTransferQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FSError;
039import org.apache.hadoop.fs.FileAlreadyExistsException;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.PathFilter;
044import org.apache.hadoop.fs.StreamCapabilities;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.log.HBaseMarkers;
048import org.apache.hadoop.hbase.procedure2.Procedure;
049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
050import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
052import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
053import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
054import org.apache.hadoop.hbase.procedure2.util.StringUtils;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.hadoop.ipc.RemoteException;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
066
067/**
068 * WAL implementation of the ProcedureStore.
069 * <p/>
070 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
071 * then {@link #load(ProcedureLoader)}.
072 * <p/>
073 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
074 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
075 * the old wal files.
076 * <p/>
077 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
078 * the races if there are two master both wants to acquire the lease...
079 * <p/>
080 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
081 * comments of this method for more details.
082 * <p/>
083 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
084 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
085 * methods we will put thing into the {@link #slots} and wait. And there is a background sync
086 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
087 * to the FileSystem, and notify the caller that we have finished.
088 * <p/>
089 * TODO: try using disruptor to increase performance and simplify the logic?
090 * <p/>
091 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
092 * also the one being written currently. And the deleted bits in it are for all the procedures, not
093 * only the ones in the newest wal file. And when rolling a log, we will first store it in the
094 * trailer of the current wal file, and then reset its modified bits, so that it can start to track
095 * the modified procedures for the new wal file.
096 * <p/>
097 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
098 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
099 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
100 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
101 * with the tracker of every newer wal files, using the
102 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}.
103 * If we find out
104 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
105 * files, then we can delete it. This is because that, every time we call
106 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
107 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be
108 * deleted.
109 * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
110 * @see #main(String[]) to parse a directory of MasterWALProcs.
111 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
112 *             use the new region based procedure store.
113 */
114@Deprecated
115@InterfaceAudience.Private
116public class WALProcedureStore extends ProcedureStoreBase {
117  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
118  public static final String LOG_PREFIX = "pv2-";
119  /** Used to construct the name of the log directory for master procedures */
120  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
121
122
123  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
124    "hbase.procedure.store.wal.warn.threshold";
125  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
126
127  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
128    "hbase.procedure.store.wal.exec.cleanup.on.load";
129  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
130
131  public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
132    "hbase.procedure.store.wal.max.retries.before.roll";
133  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
134
135  public static final String WAIT_BEFORE_ROLL_CONF_KEY =
136    "hbase.procedure.store.wal.wait.before.roll";
137  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
138
139  public static final String ROLL_RETRIES_CONF_KEY =
140    "hbase.procedure.store.wal.max.roll.retries";
141  private static final int DEFAULT_ROLL_RETRIES = 3;
142
143  public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
144    "hbase.procedure.store.wal.sync.failure.roll.max";
145  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
146
147  public static final String PERIODIC_ROLL_CONF_KEY =
148    "hbase.procedure.store.wal.periodic.roll.msec";
149  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
150
151  public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
152  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
153
154  public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
155  private static final boolean DEFAULT_USE_HSYNC = true;
156
157  public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
158  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
159
160  public static final String STORE_WAL_SYNC_STATS_COUNT =
161      "hbase.procedure.store.wal.sync.stats.count";
162  private static final int DEFAULT_SYNC_STATS_COUNT = 10;
163
164  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
165  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
166  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
167  private final ReentrantLock lock = new ReentrantLock();
168  private final Condition waitCond = lock.newCondition();
169  private final Condition slotCond = lock.newCondition();
170  private final Condition syncCond = lock.newCondition();
171
172  private final LeaseRecovery leaseRecovery;
173  private final Configuration conf;
174  private final FileSystem fs;
175  private final Path walDir;
176  private final Path walArchiveDir;
177  private final boolean enforceStreamCapability;
178
179  private final AtomicReference<Throwable> syncException = new AtomicReference<>();
180  private final AtomicBoolean loading = new AtomicBoolean(true);
181  private final AtomicBoolean inSync = new AtomicBoolean(false);
182  private final AtomicLong totalSynced = new AtomicLong(0);
183  private final AtomicLong lastRollTs = new AtomicLong(0);
184  private final AtomicLong syncId = new AtomicLong(0);
185
186  private LinkedTransferQueue<ByteSlot> slotsCache = null;
187  private Set<ProcedureWALFile> corruptedLogs = null;
188  private FSDataOutputStream stream = null;
189  private int runningProcCount = 1;
190  private long flushLogId = 0;
191  private int syncMaxSlot = 1;
192  private int slotIndex = 0;
193  private Thread syncThread;
194  private ByteSlot[] slots;
195
196  private int walCountWarnThreshold;
197  private int maxRetriesBeforeRoll;
198  private int maxSyncFailureRoll;
199  private int waitBeforeRoll;
200  private int rollRetries;
201  private int periodicRollMsec;
202  private long rollThreshold;
203  private boolean useHsync;
204  private int syncWaitMsec;
205
206  // Variables used for UI display
207  private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
208
209  public static class SyncMetrics {
210    private long timestamp;
211    private long syncWaitMs;
212    private long totalSyncedBytes;
213    private int syncedEntries;
214    private float syncedPerSec;
215
216    public long getTimestamp() {
217      return timestamp;
218    }
219
220    public long getSyncWaitMs() {
221      return syncWaitMs;
222    }
223
224    public long getTotalSyncedBytes() {
225      return totalSyncedBytes;
226    }
227
228    public long getSyncedEntries() {
229      return syncedEntries;
230    }
231
232    public float getSyncedPerSec() {
233      return syncedPerSec;
234    }
235  }
236
237  public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
238    this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
239      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
240      leaseRecovery);
241  }
242
243  @VisibleForTesting
244  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
245      final LeaseRecovery leaseRecovery) throws IOException {
246    this.conf = conf;
247    this.leaseRecovery = leaseRecovery;
248    this.walDir = walDir;
249    this.walArchiveDir = walArchiveDir;
250    this.fs = CommonFSUtils.getWALFileSystem(conf);
251    this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,
252        true);
253
254    // Create the log directory for the procedure store
255    if (!fs.exists(walDir)) {
256      if (!fs.mkdirs(walDir)) {
257        throw new IOException("Unable to mkdir " + walDir);
258      }
259    }
260    // Now that it exists, set the log policy
261    String storagePolicy =
262        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
263    CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy);
264
265    // Create archive dir up front. Rename won't work w/o it up on HDFS.
266    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
267      if (this.fs.mkdirs(this.walArchiveDir)) {
268        LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
269      } else {
270        LOG.warn("Failed create of {}", this.walArchiveDir);
271      }
272    }
273  }
274
275  @Override
276  public void start(int numSlots) throws IOException {
277    if (!setRunning(true)) {
278      return;
279    }
280
281    // Init buffer slots
282    loading.set(true);
283    runningProcCount = numSlots;
284    syncMaxSlot = numSlots;
285    slots = new ByteSlot[numSlots];
286    slotsCache = new LinkedTransferQueue<>();
287    while (slotsCache.size() < numSlots) {
288      slotsCache.offer(new ByteSlot());
289    }
290
291    // Tunings
292    walCountWarnThreshold =
293      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
294    maxRetriesBeforeRoll =
295      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
296    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
297    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
298    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
299    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
300    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
301    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
302    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
303
304    // WebUI
305    syncMetricsQueue = new CircularFifoQueue<>(
306      conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
307
308    // Init sync thread
309    syncThread = new Thread("WALProcedureStoreSyncThread") {
310      @Override
311      public void run() {
312        try {
313          syncLoop();
314        } catch (Throwable e) {
315          LOG.error("Got an exception from the sync-loop", e);
316          if (!isSyncAborted()) {
317            sendAbortProcessSignal();
318          }
319        }
320      }
321    };
322    syncThread.start();
323  }
324
325  @Override
326  public void stop(final boolean abort) {
327    if (!setRunning(false)) {
328      return;
329    }
330
331    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
332      (isSyncAborted() ? " (self aborting)" : ""));
333    sendStopSignal();
334    if (!isSyncAborted()) {
335      try {
336        while (syncThread.isAlive()) {
337          sendStopSignal();
338          syncThread.join(250);
339        }
340      } catch (InterruptedException e) {
341        LOG.warn("join interrupted", e);
342        Thread.currentThread().interrupt();
343      }
344    }
345
346    // Close the writer
347    closeCurrentLogStream(abort);
348
349    // Close the old logs
350    // they should be already closed, this is just in case the load fails
351    // and we call start() and then stop()
352    for (ProcedureWALFile log: logs) {
353      log.close();
354    }
355    logs.clear();
356    loading.set(true);
357  }
358
359  private void sendStopSignal() {
360    if (lock.tryLock()) {
361      try {
362        waitCond.signalAll();
363        syncCond.signalAll();
364      } finally {
365        lock.unlock();
366      }
367    }
368  }
369
370  @Override
371  public int getNumThreads() {
372    return slots == null ? 0 : slots.length;
373  }
374
375  @Override
376  public int setRunningProcedureCount(final int count) {
377    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
378    return this.runningProcCount;
379  }
380
381  public ProcedureStoreTracker getStoreTracker() {
382    return storeTracker;
383  }
384
385  public ArrayList<ProcedureWALFile> getActiveLogs() {
386    lock.lock();
387    try {
388      return new ArrayList<>(logs);
389    } finally {
390      lock.unlock();
391    }
392  }
393
394  public Set<ProcedureWALFile> getCorruptedLogs() {
395    return corruptedLogs;
396  }
397
398  @Override
399  public void recoverLease() throws IOException {
400    lock.lock();
401    try {
402      LOG.debug("Starting WAL Procedure Store lease recovery");
403      boolean afterFirstAttempt = false;
404      while (isRunning()) {
405        // Don't sleep before first attempt
406        if (afterFirstAttempt) {
407          LOG.trace("Sleep {} ms after first lease recovery attempt.",
408              waitBeforeRoll);
409          Threads.sleepWithoutInterrupt(waitBeforeRoll);
410        } else {
411          afterFirstAttempt = true;
412        }
413        FileStatus[] oldLogs = getLogFiles();
414        // Get Log-MaxID and recover lease on old logs
415        try {
416          flushLogId = initOldLogs(oldLogs);
417        } catch (FileNotFoundException e) {
418          LOG.warn("Someone else is active and deleted logs. retrying.", e);
419          continue;
420        }
421
422        // Create new state-log
423        if (!rollWriter(flushLogId + 1)) {
424          // someone else has already created this log
425          LOG.debug("Someone else has already created log {}. Retrying.", flushLogId);
426          continue;
427        }
428
429        // We have the lease on the log
430        oldLogs = getLogFiles();
431        if (getMaxLogId(oldLogs) > flushLogId) {
432          LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
433          logs.getLast().removeFile(this.walArchiveDir);
434          continue;
435        }
436
437        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
438        break;
439      }
440    } finally {
441      lock.unlock();
442    }
443  }
444
445  @Override
446  public void load(ProcedureLoader loader) throws IOException {
447    lock.lock();
448    try {
449      if (logs.isEmpty()) {
450        throw new IllegalStateException("recoverLease() must be called before loading data");
451      }
452
453      // Nothing to do, If we have only the current log.
454      if (logs.size() == 1) {
455        LOG.debug("No state logs to replay.");
456        loader.setMaxProcId(0);
457        loading.set(false);
458        return;
459      }
460
461      // Load the old logs
462      Iterator<ProcedureWALFile> it = logs.descendingIterator();
463      it.next(); // Skip the current log
464
465      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
466
467        @Override
468        public void setMaxProcId(long maxProcId) {
469          loader.setMaxProcId(maxProcId);
470        }
471
472        @Override
473        public void load(ProcedureIterator procIter) throws IOException {
474          loader.load(procIter);
475        }
476
477        @Override
478        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
479          loader.handleCorrupted(procIter);
480        }
481
482        @Override
483        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
484          if (corruptedLogs == null) {
485            corruptedLogs = new HashSet<>();
486          }
487          corruptedLogs.add(log);
488          // TODO: sideline corrupted log
489        }
490      });
491      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
492      // method. As it may happen that, we have finished constructing the modified and deleted bits,
493      // but before we call resetModified, we fail, then if we persist the storeTracker then when
494      // restarting, we will consider that all procedures have been included in this file and delete
495      // all the previous files. Obviously this not correct. So here we will only set loading to
496      // false when we successfully loaded all the procedures, and when closing we will skip
497      // persisting the store tracker. And also, this will prevent the sync thread to do
498      // periodicRoll, where we may also clean old logs.
499      loading.set(false);
500      // try to cleanup inactive wals and complete the operation
501      buildHoldingCleanupTracker();
502      tryCleanupLogsOnLoad();
503    } finally {
504      lock.unlock();
505    }
506  }
507
508  private void tryCleanupLogsOnLoad() {
509    // nothing to cleanup.
510    if (logs.size() <= 1) {
511      return;
512    }
513
514    // the config says to not cleanup wals on load.
515    if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
516      DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) {
517      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
518      return;
519    }
520
521    try {
522      periodicRoll();
523    } catch (IOException e) {
524      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
525    }
526  }
527
528  @Override
529  public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
530    if (LOG.isTraceEnabled()) {
531      LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
532    }
533
534    ByteSlot slot = acquireSlot();
535    try {
536      // Serialize the insert
537      long[] subProcIds = null;
538      if (subprocs != null) {
539        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
540        subProcIds = new long[subprocs.length];
541        for (int i = 0; i < subprocs.length; ++i) {
542          subProcIds[i] = subprocs[i].getProcId();
543        }
544      } else {
545        assert !proc.hasParent();
546        ProcedureWALFormat.writeInsert(slot, proc);
547      }
548
549      // Push the transaction data and wait until it is persisted
550      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
551    } catch (IOException e) {
552      // We are not able to serialize the procedure.
553      // this is a code error, and we are not able to go on.
554      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" +
555          proc + ", subprocs=" + Arrays.toString(subprocs), e);
556      throw new RuntimeException(e);
557    } finally {
558      releaseSlot(slot);
559    }
560  }
561
562  @Override
563  public void insert(Procedure<?>[] procs) {
564    if (LOG.isTraceEnabled()) {
565      LOG.trace("Insert " + Arrays.toString(procs));
566    }
567
568    ByteSlot slot = acquireSlot();
569    try {
570      // Serialize the insert
571      long[] procIds = new long[procs.length];
572      for (int i = 0; i < procs.length; ++i) {
573        assert !procs[i].hasParent();
574        procIds[i] = procs[i].getProcId();
575        ProcedureWALFormat.writeInsert(slot, procs[i]);
576      }
577
578      // Push the transaction data and wait until it is persisted
579      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
580    } catch (IOException e) {
581      // We are not able to serialize the procedure.
582      // this is a code error, and we are not able to go on.
583      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " +
584          Arrays.toString(procs), e);
585      throw new RuntimeException(e);
586    } finally {
587      releaseSlot(slot);
588    }
589  }
590
591  @Override
592  public void update(Procedure<?> proc) {
593    if (LOG.isTraceEnabled()) {
594      LOG.trace("Update " + proc);
595    }
596
597    ByteSlot slot = acquireSlot();
598    try {
599      // Serialize the update
600      ProcedureWALFormat.writeUpdate(slot, proc);
601
602      // Push the transaction data and wait until it is persisted
603      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
604    } catch (IOException e) {
605      // We are not able to serialize the procedure.
606      // this is a code error, and we are not able to go on.
607      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
608      throw new RuntimeException(e);
609    } finally {
610      releaseSlot(slot);
611    }
612  }
613
614  @Override
615  public void delete(long procId) {
616    LOG.trace("Delete {}", procId);
617    ByteSlot slot = acquireSlot();
618    try {
619      // Serialize the delete
620      ProcedureWALFormat.writeDelete(slot, procId);
621
622      // Push the transaction data and wait until it is persisted
623      pushData(PushType.DELETE, slot, procId, null);
624    } catch (IOException e) {
625      // We are not able to serialize the procedure.
626      // this is a code error, and we are not able to go on.
627      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e);
628      throw new RuntimeException(e);
629    } finally {
630      releaseSlot(slot);
631    }
632  }
633
634  @Override
635  public void delete(Procedure<?> proc, long[] subProcIds) {
636    assert proc != null : "expected a non-null procedure";
637    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
638    if (LOG.isTraceEnabled()) {
639      LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
640    }
641
642    ByteSlot slot = acquireSlot();
643    try {
644      // Serialize the delete
645      ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
646
647      // Push the transaction data and wait until it is persisted
648      pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
649    } catch (IOException e) {
650      // We are not able to serialize the procedure.
651      // this is a code error, and we are not able to go on.
652      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
653      throw new RuntimeException(e);
654    } finally {
655      releaseSlot(slot);
656    }
657  }
658
659  @Override
660  public void delete(final long[] procIds, final int offset, final int count) {
661    if (count == 0) {
662      return;
663    }
664
665    if (offset == 0 && count == procIds.length) {
666      delete(procIds);
667    } else if (count == 1) {
668      delete(procIds[offset]);
669    } else {
670      delete(Arrays.copyOfRange(procIds, offset, offset + count));
671    }
672  }
673
674  private void delete(long[] procIds) {
675    if (LOG.isTraceEnabled()) {
676      LOG.trace("Delete " + Arrays.toString(procIds));
677    }
678
679    final ByteSlot slot = acquireSlot();
680    try {
681      // Serialize the delete
682      for (int i = 0; i < procIds.length; ++i) {
683        ProcedureWALFormat.writeDelete(slot, procIds[i]);
684      }
685
686      // Push the transaction data and wait until it is persisted
687      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
688    } catch (IOException e) {
689      // We are not able to serialize the procedure.
690      // this is a code error, and we are not able to go on.
691      LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
692      throw new RuntimeException(e);
693    } finally {
694      releaseSlot(slot);
695    }
696  }
697
698  private ByteSlot acquireSlot() {
699    ByteSlot slot = slotsCache.poll();
700    return slot != null ? slot : new ByteSlot();
701  }
702
703  private void releaseSlot(final ByteSlot slot) {
704    slot.reset();
705    slotsCache.offer(slot);
706  }
707
708  private enum PushType { INSERT, UPDATE, DELETE };
709
710  private long pushData(final PushType type, final ByteSlot slot,
711      final long procId, final long[] subProcIds) {
712    if (!isRunning()) {
713      throw new RuntimeException("the store must be running before inserting data");
714    }
715    if (logs.isEmpty()) {
716      throw new RuntimeException("recoverLease() must be called before inserting data");
717    }
718
719    long logId = -1;
720    lock.lock();
721    try {
722      // Wait for the sync to be completed
723      while (true) {
724        if (!isRunning()) {
725          throw new RuntimeException("store no longer running");
726        } else if (isSyncAborted()) {
727          throw new RuntimeException("sync aborted", syncException.get());
728        } else if (inSync.get()) {
729          syncCond.await();
730        } else if (slotIndex >= syncMaxSlot) {
731          slotCond.signal();
732          syncCond.await();
733        } else {
734          break;
735        }
736      }
737
738      final long pushSyncId = syncId.get();
739      updateStoreTracker(type, procId, subProcIds);
740      slots[slotIndex++] = slot;
741      logId = flushLogId;
742
743      // Notify that there is new data
744      if (slotIndex == 1) {
745        waitCond.signal();
746      }
747
748      // Notify that the slots are full
749      if (slotIndex == syncMaxSlot) {
750        waitCond.signal();
751        slotCond.signal();
752      }
753
754      while (pushSyncId == syncId.get() && isRunning()) {
755        syncCond.await();
756      }
757    } catch (InterruptedException e) {
758      Thread.currentThread().interrupt();
759      sendAbortProcessSignal();
760      throw new RuntimeException(e);
761    } finally {
762      lock.unlock();
763      if (isSyncAborted()) {
764        throw new RuntimeException("sync aborted", syncException.get());
765      }
766    }
767    return logId;
768  }
769
770  private void updateStoreTracker(final PushType type,
771      final long procId, final long[] subProcIds) {
772    switch (type) {
773      case INSERT:
774        if (subProcIds == null) {
775          storeTracker.insert(procId);
776        } else if (procId == Procedure.NO_PROC_ID) {
777          storeTracker.insert(subProcIds);
778        } else {
779          storeTracker.insert(procId, subProcIds);
780          holdingCleanupTracker.setDeletedIfModified(procId);
781        }
782        break;
783      case UPDATE:
784        storeTracker.update(procId);
785        holdingCleanupTracker.setDeletedIfModified(procId);
786        break;
787      case DELETE:
788        if (subProcIds != null && subProcIds.length > 0) {
789          storeTracker.delete(subProcIds);
790          holdingCleanupTracker.setDeletedIfModified(subProcIds);
791        } else {
792          storeTracker.delete(procId);
793          holdingCleanupTracker.setDeletedIfModified(procId);
794        }
795        break;
796      default:
797        throw new RuntimeException("invalid push type " + type);
798    }
799  }
800
801  private boolean isSyncAborted() {
802    return syncException.get() != null;
803  }
804
805  private void syncLoop() throws Throwable {
806    long totalSyncedToStore = 0;
807    inSync.set(false);
808    lock.lock();
809    try {
810      while (isRunning()) {
811        try {
812          // Wait until new data is available
813          if (slotIndex == 0) {
814            if (!loading.get()) {
815              periodicRoll();
816            }
817
818            if (LOG.isTraceEnabled()) {
819              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
820              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
821                        StringUtils.humanSize(totalSynced.get()),
822                        StringUtils.humanSize(totalSynced.get() / rollTsSec)));
823            }
824
825            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
826            if (slotIndex == 0) {
827              // no data.. probably a stop() or a periodic roll
828              continue;
829            }
830          }
831          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
832          syncMaxSlot = runningProcCount;
833          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
834          final long syncWaitSt = System.currentTimeMillis();
835          if (slotIndex != syncMaxSlot) {
836            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
837          }
838
839          final long currentTs = System.currentTimeMillis();
840          final long syncWaitMs = currentTs - syncWaitSt;
841          final float rollSec = getMillisFromLastRoll() / 1000.0f;
842          final float syncedPerSec = totalSyncedToStore / rollSec;
843          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
844            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
845                      StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
846                      StringUtils.humanSize(totalSyncedToStore),
847                      StringUtils.humanSize(syncedPerSec)));
848          }
849
850          // update webui circular buffers (TODO: get rid of allocations)
851          final SyncMetrics syncMetrics = new SyncMetrics();
852          syncMetrics.timestamp = currentTs;
853          syncMetrics.syncWaitMs = syncWaitMs;
854          syncMetrics.syncedEntries = slotIndex;
855          syncMetrics.totalSyncedBytes = totalSyncedToStore;
856          syncMetrics.syncedPerSec = syncedPerSec;
857          syncMetricsQueue.add(syncMetrics);
858
859          // sync
860          inSync.set(true);
861          long slotSize = syncSlots();
862          logs.getLast().addToSize(slotSize);
863          totalSyncedToStore = totalSynced.addAndGet(slotSize);
864          slotIndex = 0;
865          inSync.set(false);
866          syncId.incrementAndGet();
867        } catch (InterruptedException e) {
868          Thread.currentThread().interrupt();
869          syncException.compareAndSet(null, e);
870          sendAbortProcessSignal();
871          throw e;
872        } catch (Throwable t) {
873          syncException.compareAndSet(null, t);
874          sendAbortProcessSignal();
875          throw t;
876        } finally {
877          syncCond.signalAll();
878        }
879      }
880    } finally {
881      lock.unlock();
882    }
883  }
884
885  public ArrayList<SyncMetrics> getSyncMetrics() {
886    lock.lock();
887    try {
888      return new ArrayList<>(syncMetricsQueue);
889    } finally {
890      lock.unlock();
891    }
892  }
893
894  private long syncSlots() throws Throwable {
895    int retry = 0;
896    int logRolled = 0;
897    long totalSynced = 0;
898    do {
899      try {
900        totalSynced = syncSlots(stream, slots, 0, slotIndex);
901        break;
902      } catch (Throwable e) {
903        LOG.warn("unable to sync slots, retry=" + retry);
904        if (++retry >= maxRetriesBeforeRoll) {
905          if (logRolled >= maxSyncFailureRoll && isRunning()) {
906            LOG.error("Sync slots after log roll failed, abort.", e);
907            throw e;
908          }
909
910          if (!rollWriterWithRetries()) {
911            throw e;
912          }
913
914          logRolled++;
915          retry = 0;
916        }
917      }
918    } while (isRunning());
919    return totalSynced;
920  }
921
922  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
923      final int offset, final int count) throws IOException {
924    long totalSynced = 0;
925    for (int i = 0; i < count; ++i) {
926      final ByteSlot data = slots[offset + i];
927      data.writeTo(stream);
928      totalSynced += data.size();
929    }
930
931    syncStream(stream);
932    sendPostSyncSignal();
933
934    if (LOG.isTraceEnabled()) {
935      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot +
936                ", flushed=" + StringUtils.humanSize(totalSynced));
937    }
938    return totalSynced;
939  }
940
941  protected void syncStream(final FSDataOutputStream stream) throws IOException {
942    if (useHsync) {
943      stream.hsync();
944    } else {
945      stream.hflush();
946    }
947  }
948
949  private boolean rollWriterWithRetries() {
950    for (int i = 0; i < rollRetries && isRunning(); ++i) {
951      if (i > 0) {
952        Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
953      }
954
955      try {
956        if (rollWriter()) {
957          return true;
958        }
959      } catch (IOException e) {
960        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
961      }
962    }
963    LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
964    return false;
965  }
966
967  private boolean tryRollWriter() {
968    try {
969      return rollWriter();
970    } catch (IOException e) {
971      LOG.warn("Unable to roll the log", e);
972      return false;
973    }
974  }
975
976  public long getMillisToNextPeriodicRoll() {
977    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
978      return periodicRollMsec - getMillisFromLastRoll();
979    }
980    return Long.MAX_VALUE;
981  }
982
983  public long getMillisFromLastRoll() {
984    return (System.currentTimeMillis() - lastRollTs.get());
985  }
986
987  @VisibleForTesting
988  void periodicRollForTesting() throws IOException {
989    lock.lock();
990    try {
991      periodicRoll();
992    } finally {
993      lock.unlock();
994    }
995  }
996
997  @VisibleForTesting
998  public boolean rollWriterForTesting() throws IOException {
999    lock.lock();
1000    try {
1001      return rollWriter();
1002    } finally {
1003      lock.unlock();
1004    }
1005  }
1006
1007  @VisibleForTesting
1008  void removeInactiveLogsForTesting() throws Exception {
1009    lock.lock();
1010    try {
1011      removeInactiveLogs();
1012    } finally  {
1013      lock.unlock();
1014    }
1015  }
1016
1017  private void periodicRoll() throws IOException {
1018    if (storeTracker.isEmpty()) {
1019      LOG.trace("no active procedures");
1020      tryRollWriter();
1021      removeAllLogs(flushLogId - 1, "no active procedures");
1022    } else {
1023      if (storeTracker.isAllModified()) {
1024        LOG.trace("all the active procedures are in the latest log");
1025        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
1026      }
1027
1028      // if the log size has exceeded the roll threshold
1029      // or the periodic roll timeout is expired, try to roll the wal.
1030      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
1031        tryRollWriter();
1032      }
1033
1034      removeInactiveLogs();
1035    }
1036  }
1037
1038  private boolean rollWriter() throws IOException {
1039    if (!isRunning()) {
1040      return false;
1041    }
1042
1043    // Create new state-log
1044    if (!rollWriter(flushLogId + 1)) {
1045      LOG.warn("someone else has already created log {}", flushLogId);
1046      return false;
1047    }
1048
1049    // We have the lease on the log,
1050    // but we should check if someone else has created new files
1051    if (getMaxLogId(getLogFiles()) > flushLogId) {
1052      LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
1053      logs.getLast().removeFile(this.walArchiveDir);
1054      return false;
1055    }
1056
1057    // We have the lease on the log
1058    return true;
1059  }
1060
1061  @VisibleForTesting
1062  boolean rollWriter(long logId) throws IOException {
1063    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
1064    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
1065
1066    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
1067      .setVersion(ProcedureWALFormat.HEADER_VERSION)
1068      .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
1069      .setMinProcId(storeTracker.getActiveMinProcId())
1070      .setLogId(logId)
1071      .build();
1072
1073    FSDataOutputStream newStream = null;
1074    Path newLogFile = null;
1075    long startPos = -1;
1076    newLogFile = getLogFilePath(logId);
1077    try {
1078      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
1079    } catch (FileAlreadyExistsException e) {
1080      LOG.error("Log file with id={} already exists", logId, e);
1081      return false;
1082    } catch (RemoteException re) {
1083      LOG.warn("failed to create log file with id={}", logId, re);
1084      return false;
1085    }
1086    // After we create the stream but before we attempt to use it at all
1087    // ensure that we can provide the level of data safety we're configured
1088    // to provide.
1089    final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
1090    if (enforceStreamCapability && !newStream.hasCapability(durability)) {
1091      throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
1092          " for proper operation during component failures, but the underlying filesystem does " +
1093          "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
1094          "' to set the desired level of robustness and ensure the config value of '" +
1095          CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
1096    }
1097    try {
1098      ProcedureWALFormat.writeHeader(newStream, header);
1099      startPos = newStream.getPos();
1100    } catch (IOException ioe) {
1101      LOG.warn("Encountered exception writing header", ioe);
1102      newStream.close();
1103      return false;
1104    }
1105
1106    closeCurrentLogStream(false);
1107
1108    storeTracker.resetModified();
1109    stream = newStream;
1110    flushLogId = logId;
1111    totalSynced.set(0);
1112    long rollTs = System.currentTimeMillis();
1113    lastRollTs.set(rollTs);
1114    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
1115
1116    // if it's the first next WAL being added, build the holding cleanup tracker
1117    if (logs.size() == 2) {
1118      buildHoldingCleanupTracker();
1119    } else if (logs.size() > walCountWarnThreshold) {
1120      LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" +
1121        " to see if something is stuck.", logs.size(), walCountWarnThreshold);
1122      // This is just like what we have done at RS side when there are too many wal files. For RS,
1123      // if there are too many wal files, we will find out the wal entries in the oldest file, and
1124      // tell the upper layer to flush these regions so the wal entries will be useless and then we
1125      // can delete the wal file. For WALProcedureStore, the assumption is that, if all the
1126      // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
1127      // we are safe to delete it. So here if there are too many proc wal files, we will find out
1128      // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
1129      // wal files, and tell upper layer to update the state of these procedures to the newest proc
1130      // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
1131      // file.
1132      sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
1133    }
1134
1135    LOG.info("Rolled new Procedure Store WAL, id={}", logId);
1136    return true;
1137  }
1138
1139  private void closeCurrentLogStream(boolean abort) {
1140    if (stream == null || logs.isEmpty()) {
1141      return;
1142    }
1143
1144    try {
1145      ProcedureWALFile log = logs.getLast();
1146      // If the loading flag is true, it usually means that we fail when loading procedures, so we
1147      // should not persist the store tracker, as its state may not be correct.
1148      if (!loading.get()) {
1149        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
1150        log.updateLocalTracker(storeTracker);
1151        if (!abort) {
1152          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
1153          log.addToSize(trailerSize);
1154        }
1155      }
1156    } catch (IOException | FSError e) {
1157      LOG.warn("Unable to write the trailer", e);
1158    }
1159    try {
1160      stream.close();
1161    } catch (IOException | FSError e) {
1162      LOG.error("Unable to close the stream", e);
1163    }
1164    stream = null;
1165  }
1166
1167  // ==========================================================================
1168  //  Log Files cleaner helpers
1169  // ==========================================================================
1170  private void removeInactiveLogs() throws IOException {
1171    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
1172    // once there is nothing olding the oldest WAL we can remove it.
1173    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
1174      LOG.info("Remove the oldest log {}", logs.getFirst());
1175      removeLogFile(logs.getFirst(), walArchiveDir);
1176      buildHoldingCleanupTracker();
1177    }
1178
1179    // TODO: In case we are holding up a lot of logs for long time we should
1180    // rewrite old procedures (in theory parent procs) to the new WAL.
1181  }
1182
1183  private void buildHoldingCleanupTracker() {
1184    if (logs.size() <= 1) {
1185      // we only have one wal, so nothing to do
1186      holdingCleanupTracker.reset();
1187      return;
1188    }
1189
1190    // compute the holding tracker.
1191    // - the first WAL is used for the 'updates'
1192    // - the global tracker will be used to determine whether a procedure has been deleted
1193    // - other trackers will be used to determine whether a procedure has been updated, as a deleted
1194    // procedure can always be detected by checking the global tracker, we can save the deleted
1195    // checks when applying other trackers
1196    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
1197    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
1198    // the logs is a linked list, so avoid calling get(index) on it.
1199    Iterator<ProcedureWALFile> iter = logs.iterator();
1200    // skip the tracker for the first file when creating the iterator.
1201    iter.next();
1202    ProcedureStoreTracker tracker = iter.next().getTracker();
1203    // testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
1204    // which is just the storeTracker above.
1205    while (iter.hasNext()) {
1206      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
1207      if (holdingCleanupTracker.isEmpty()) {
1208        break;
1209      }
1210      tracker = iter.next().getTracker();
1211    }
1212  }
1213
1214  /**
1215   * Remove all logs with logId <= {@code lastLogId}.
1216   */
1217  private void removeAllLogs(long lastLogId, String why) {
1218    if (logs.size() <= 1) {
1219      return;
1220    }
1221
1222    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
1223
1224    boolean removed = false;
1225    while (logs.size() > 1) {
1226      ProcedureWALFile log = logs.getFirst();
1227      if (lastLogId < log.getLogId()) {
1228        break;
1229      }
1230      removeLogFile(log, walArchiveDir);
1231      removed = true;
1232    }
1233
1234    if (removed) {
1235      buildHoldingCleanupTracker();
1236    }
1237  }
1238
1239  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
1240    try {
1241      LOG.trace("Removing log={}", log);
1242      log.removeFile(walArchiveDir);
1243      logs.remove(log);
1244      LOG.debug("Removed log={}, activeLogs={}", log, logs);
1245      assert logs.size() > 0 : "expected at least one log";
1246    } catch (IOException e) {
1247      LOG.error("Unable to remove log: " + log, e);
1248      return false;
1249    }
1250    return true;
1251  }
1252
1253  // ==========================================================================
1254  //  FileSystem Log Files helpers
1255  // ==========================================================================
1256  public Path getWALDir() {
1257    return this.walDir;
1258  }
1259
1260  @VisibleForTesting
1261  Path getWalArchiveDir() {
1262    return this.walArchiveDir;
1263  }
1264
1265  public FileSystem getFileSystem() {
1266    return this.fs;
1267  }
1268
1269  protected Path getLogFilePath(final long logId) throws IOException {
1270    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
1271  }
1272
1273  private static long getLogIdFromName(final String name) {
1274    int end = name.lastIndexOf(".log");
1275    int start = name.lastIndexOf('-') + 1;
1276    return Long.parseLong(name.substring(start, end));
1277  }
1278
1279  private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
1280    @Override
1281    public boolean accept(Path path) {
1282      String name = path.getName();
1283      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
1284    }
1285  };
1286
1287  private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
1288      new Comparator<FileStatus>() {
1289    @Override
1290    public int compare(FileStatus a, FileStatus b) {
1291      final long aId = getLogIdFromName(a.getPath().getName());
1292      final long bId = getLogIdFromName(b.getPath().getName());
1293      return Long.compare(aId, bId);
1294    }
1295  };
1296
1297  private FileStatus[] getLogFiles() throws IOException {
1298    try {
1299      FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
1300      Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
1301      return files;
1302    } catch (FileNotFoundException e) {
1303      LOG.warn("Log directory not found: " + e.getMessage());
1304      return null;
1305    }
1306  }
1307
1308  /**
1309   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1310   * the file set by log id.
1311   * @return Max-LogID of the specified log file set
1312   */
1313  private static long getMaxLogId(FileStatus[] logFiles) {
1314    if (logFiles == null || logFiles.length == 0) {
1315      return 0L;
1316    }
1317    return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
1318  }
1319
1320  /**
1321   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1322   * the file set by log id.
1323   * @return Max-LogID of the specified log file set
1324   */
1325  private long initOldLogs(FileStatus[] logFiles) throws IOException {
1326    if (logFiles == null || logFiles.length == 0) {
1327      return 0L;
1328    }
1329    long maxLogId = 0;
1330    for (int i = 0; i < logFiles.length; ++i) {
1331      final Path logPath = logFiles[i].getPath();
1332      leaseRecovery.recoverFileLease(fs, logPath);
1333      if (!isRunning()) {
1334        throw new IOException("wal aborting");
1335      }
1336
1337      maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1338      ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
1339      if (log != null) {
1340        this.logs.add(log);
1341      }
1342    }
1343    initTrackerFromOldLogs();
1344    return maxLogId;
1345  }
1346
1347  /**
1348   * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
1349   * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
1350   */
1351  private void initTrackerFromOldLogs() {
1352    if (logs.isEmpty() || !isRunning()) {
1353      return;
1354    }
1355    ProcedureWALFile log = logs.getLast();
1356    if (!log.getTracker().isPartial()) {
1357      storeTracker.resetTo(log.getTracker());
1358    } else {
1359      storeTracker.reset();
1360      storeTracker.setPartialFlag(true);
1361    }
1362  }
1363
1364  /**
1365   * Loads given log file and it's tracker.
1366   */
1367  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
1368      throws IOException {
1369    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1370    if (logFile.getLen() == 0) {
1371      LOG.warn("Remove uninitialized log: {}", logFile);
1372      log.removeFile(walArchiveDir);
1373      return null;
1374    }
1375    LOG.debug("Opening Pv2 {}", logFile);
1376    try {
1377      log.open();
1378    } catch (ProcedureWALFormat.InvalidWALDataException e) {
1379      LOG.warn("Remove uninitialized log: {}", logFile, e);
1380      log.removeFile(walArchiveDir);
1381      return null;
1382    } catch (IOException e) {
1383      String msg = "Unable to read state log: " + logFile;
1384      LOG.error(msg, e);
1385      throw new IOException(msg, e);
1386    }
1387
1388    try {
1389      log.readTracker();
1390    } catch (IOException e) {
1391      log.getTracker().reset();
1392      log.getTracker().setPartialFlag(true);
1393      LOG.warn("Unable to read tracker for {}", log, e);
1394    }
1395
1396    log.close();
1397    return log;
1398  }
1399
1400  /**
1401   * Parses a directory of WALs building up ProcedureState.
1402   * For testing parse and profiling.
1403   * @param args Include pointer to directory of WAL files for a store instance to parse & load.
1404   */
1405  public static void main(String [] args) throws IOException {
1406    Configuration conf = HBaseConfiguration.create();
1407    if (args == null || args.length != 1) {
1408      System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR.");
1409      System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR");
1410      System.exit(-1);
1411    }
1412    WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
1413      new LeaseRecovery() {
1414        @Override
1415        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
1416          // no-op
1417        }
1418      });
1419    try {
1420      store.start(16);
1421      ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store);
1422      pe.init(1, true);
1423    } finally {
1424      store.stop(true);
1425    }
1426  }
1427}