001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.LinkedTransferQueue;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileAlreadyExistsException;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.PathFilter;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.log.HBaseMarkers;
045import org.apache.hadoop.hbase.procedure2.Procedure;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
049import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
050import org.apache.hadoop.hbase.procedure2.util.StringUtils;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.Threads;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
062
063/**
064 * WAL implementation of the ProcedureStore.
065 * <p/>
066 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
067 * then {@link #load(ProcedureLoader)}.
068 * <p/>
069 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
070 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
071 * the old wal files.
072 * <p/>
073 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
074 * the races if there are two master both wants to acquire the lease...
075 * <p/>
076 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
077 * comments of this method for more details.
078 * <p/>
079 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
080 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
081 * methods we will put thing into the {@link #slots} and wait. And there is a background sync
082 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
083 * to the FileSystem, and notify the caller that we have finished.
084 * <p/>
085 * TODO: try using disruptor to increase performance and simplify the logic?
086 * <p/>
087 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
088 * also the one being written currently. And the deleted bits in it are for all the procedures, not
089 * only the ones in the newest wal file. And when rolling a log, we will first store it in the
090 * trailer of the current wal file, and then reset its modified bits, so that it can start to track
091 * the modified procedures for the new wal file.
092 * <p/>
093 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
094 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
095 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
096 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
097 * with the tracker of every newer wal files, using the
098 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}.
099 * If we find out
100 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
101 * files, then we can delete it. This is because that, every time we call
102 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
103 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be
104 * deleted.
105 * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
106 * @see #main(String[]) to parse a directory of MasterWALProcs.
107 */
108@InterfaceAudience.Private
109public class WALProcedureStore extends ProcedureStoreBase {
110  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
111  public static final String LOG_PREFIX = "pv2-";
112  /** Used to construct the name of the log directory for master procedures */
113  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
114
115
116  public interface LeaseRecovery {
117    void recoverFileLease(FileSystem fs, Path path) throws IOException;
118  }
119
120  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
121    "hbase.procedure.store.wal.warn.threshold";
122  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
123
124  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
125    "hbase.procedure.store.wal.exec.cleanup.on.load";
126  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
127
128  public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
129    "hbase.procedure.store.wal.max.retries.before.roll";
130  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
131
132  public static final String WAIT_BEFORE_ROLL_CONF_KEY =
133    "hbase.procedure.store.wal.wait.before.roll";
134  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
135
136  public static final String ROLL_RETRIES_CONF_KEY =
137    "hbase.procedure.store.wal.max.roll.retries";
138  private static final int DEFAULT_ROLL_RETRIES = 3;
139
140  public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
141    "hbase.procedure.store.wal.sync.failure.roll.max";
142  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
143
144  public static final String PERIODIC_ROLL_CONF_KEY =
145    "hbase.procedure.store.wal.periodic.roll.msec";
146  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
147
148  public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
149  private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
150
151  public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
152  private static final boolean DEFAULT_USE_HSYNC = true;
153
154  public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
155  private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
156
157  public static final String STORE_WAL_SYNC_STATS_COUNT =
158      "hbase.procedure.store.wal.sync.stats.count";
159  private static final int DEFAULT_SYNC_STATS_COUNT = 10;
160
161  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
162  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
163  private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
164  private final ReentrantLock lock = new ReentrantLock();
165  private final Condition waitCond = lock.newCondition();
166  private final Condition slotCond = lock.newCondition();
167  private final Condition syncCond = lock.newCondition();
168
169  private final LeaseRecovery leaseRecovery;
170  private final Configuration conf;
171  private final FileSystem fs;
172  private final Path walDir;
173  private final Path walArchiveDir;
174  private final boolean enforceStreamCapability;
175
176  private final AtomicReference<Throwable> syncException = new AtomicReference<>();
177  private final AtomicBoolean loading = new AtomicBoolean(true);
178  private final AtomicBoolean inSync = new AtomicBoolean(false);
179  private final AtomicLong totalSynced = new AtomicLong(0);
180  private final AtomicLong lastRollTs = new AtomicLong(0);
181  private final AtomicLong syncId = new AtomicLong(0);
182
183  private LinkedTransferQueue<ByteSlot> slotsCache = null;
184  private Set<ProcedureWALFile> corruptedLogs = null;
185  private FSDataOutputStream stream = null;
186  private int runningProcCount = 1;
187  private long flushLogId = 0;
188  private int syncMaxSlot = 1;
189  private int slotIndex = 0;
190  private Thread syncThread;
191  private ByteSlot[] slots;
192
193  private int walCountWarnThreshold;
194  private int maxRetriesBeforeRoll;
195  private int maxSyncFailureRoll;
196  private int waitBeforeRoll;
197  private int rollRetries;
198  private int periodicRollMsec;
199  private long rollThreshold;
200  private boolean useHsync;
201  private int syncWaitMsec;
202
203  // Variables used for UI display
204  private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
205
206  public static class SyncMetrics {
207    private long timestamp;
208    private long syncWaitMs;
209    private long totalSyncedBytes;
210    private int syncedEntries;
211    private float syncedPerSec;
212
213    public long getTimestamp() {
214      return timestamp;
215    }
216
217    public long getSyncWaitMs() {
218      return syncWaitMs;
219    }
220
221    public long getTotalSyncedBytes() {
222      return totalSyncedBytes;
223    }
224
225    public long getSyncedEntries() {
226      return syncedEntries;
227    }
228
229    public float getSyncedPerSec() {
230      return syncedPerSec;
231    }
232  }
233
234  public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
235      throws IOException {
236    this(conf,
237        new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
238        new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
239        leaseRecovery);
240  }
241
242  @VisibleForTesting
243  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
244      final LeaseRecovery leaseRecovery) throws IOException {
245    this.conf = conf;
246    this.leaseRecovery = leaseRecovery;
247    this.walDir = walDir;
248    this.walArchiveDir = walArchiveDir;
249    this.fs = walDir.getFileSystem(conf);
250    this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
251
252    // Create the log directory for the procedure store
253    if (!fs.exists(walDir)) {
254      if (!fs.mkdirs(walDir)) {
255        throw new IOException("Unable to mkdir " + walDir);
256      }
257    }
258    // Now that it exists, set the log policy
259    String storagePolicy =
260        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
261    CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy);
262
263    // Create archive dir up front. Rename won't work w/o it up on HDFS.
264    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
265      if (this.fs.mkdirs(this.walArchiveDir)) {
266        LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
267      } else {
268        LOG.warn("Failed create of {}", this.walArchiveDir);
269      }
270    }
271  }
272
273  @Override
274  public void start(int numSlots) throws IOException {
275    if (!setRunning(true)) {
276      return;
277    }
278
279    // Init buffer slots
280    loading.set(true);
281    runningProcCount = numSlots;
282    syncMaxSlot = numSlots;
283    slots = new ByteSlot[numSlots];
284    slotsCache = new LinkedTransferQueue<>();
285    while (slotsCache.size() < numSlots) {
286      slotsCache.offer(new ByteSlot());
287    }
288
289    // Tunings
290    walCountWarnThreshold =
291      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
292    maxRetriesBeforeRoll =
293      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
294    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
295    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
296    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
297    rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
298    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
299    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
300    useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
301
302    // WebUI
303    syncMetricsQueue = new CircularFifoQueue<>(
304      conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
305
306    // Init sync thread
307    syncThread = new Thread("WALProcedureStoreSyncThread") {
308      @Override
309      public void run() {
310        try {
311          syncLoop();
312        } catch (Throwable e) {
313          LOG.error("Got an exception from the sync-loop", e);
314          if (!isSyncAborted()) {
315            sendAbortProcessSignal();
316          }
317        }
318      }
319    };
320    syncThread.start();
321  }
322
323  @Override
324  public void stop(final boolean abort) {
325    if (!setRunning(false)) {
326      return;
327    }
328
329    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
330      (isSyncAborted() ? " (self aborting)" : ""));
331    sendStopSignal();
332    if (!isSyncAborted()) {
333      try {
334        while (syncThread.isAlive()) {
335          sendStopSignal();
336          syncThread.join(250);
337        }
338      } catch (InterruptedException e) {
339        LOG.warn("join interrupted", e);
340        Thread.currentThread().interrupt();
341      }
342    }
343
344    // Close the writer
345    closeCurrentLogStream(abort);
346
347    // Close the old logs
348    // they should be already closed, this is just in case the load fails
349    // and we call start() and then stop()
350    for (ProcedureWALFile log: logs) {
351      log.close();
352    }
353    logs.clear();
354    loading.set(true);
355  }
356
357  private void sendStopSignal() {
358    if (lock.tryLock()) {
359      try {
360        waitCond.signalAll();
361        syncCond.signalAll();
362      } finally {
363        lock.unlock();
364      }
365    }
366  }
367
368  @Override
369  public int getNumThreads() {
370    return slots == null ? 0 : slots.length;
371  }
372
373  @Override
374  public int setRunningProcedureCount(final int count) {
375    this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
376    return this.runningProcCount;
377  }
378
379  public ProcedureStoreTracker getStoreTracker() {
380    return storeTracker;
381  }
382
383  public ArrayList<ProcedureWALFile> getActiveLogs() {
384    lock.lock();
385    try {
386      return new ArrayList<>(logs);
387    } finally {
388      lock.unlock();
389    }
390  }
391
392  public Set<ProcedureWALFile> getCorruptedLogs() {
393    return corruptedLogs;
394  }
395
396  @Override
397  public void recoverLease() throws IOException {
398    lock.lock();
399    try {
400      LOG.debug("Starting WAL Procedure Store lease recovery");
401      boolean afterFirstAttempt = false;
402      while (isRunning()) {
403        // Don't sleep before first attempt
404        if (afterFirstAttempt) {
405          LOG.trace("Sleep {} ms after first lease recovery attempt.",
406              waitBeforeRoll);
407          Threads.sleepWithoutInterrupt(waitBeforeRoll);
408        } else {
409          afterFirstAttempt = true;
410        }
411        FileStatus[] oldLogs = getLogFiles();
412        // Get Log-MaxID and recover lease on old logs
413        try {
414          flushLogId = initOldLogs(oldLogs);
415        } catch (FileNotFoundException e) {
416          LOG.warn("Someone else is active and deleted logs. retrying.", e);
417          continue;
418        }
419
420        // Create new state-log
421        if (!rollWriter(flushLogId + 1)) {
422          // someone else has already created this log
423          LOG.debug("Someone else has already created log {}. Retrying.", flushLogId);
424          continue;
425        }
426
427        // We have the lease on the log
428        oldLogs = getLogFiles();
429        if (getMaxLogId(oldLogs) > flushLogId) {
430          LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
431          logs.getLast().removeFile(this.walArchiveDir);
432          continue;
433        }
434
435        LOG.debug("Lease acquired for flushLogId={}", flushLogId);
436        break;
437      }
438    } finally {
439      lock.unlock();
440    }
441  }
442
443  @Override
444  public void load(ProcedureLoader loader) throws IOException {
445    lock.lock();
446    try {
447      if (logs.isEmpty()) {
448        throw new IllegalStateException("recoverLease() must be called before loading data");
449      }
450
451      // Nothing to do, If we have only the current log.
452      if (logs.size() == 1) {
453        LOG.debug("No state logs to replay.");
454        loader.setMaxProcId(0);
455        loading.set(false);
456        return;
457      }
458
459      // Load the old logs
460      Iterator<ProcedureWALFile> it = logs.descendingIterator();
461      it.next(); // Skip the current log
462
463      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
464        @Override
465        public void setMaxProcId(long maxProcId) {
466          loader.setMaxProcId(maxProcId);
467        }
468
469        @Override
470        public void load(ProcedureIterator procIter) throws IOException {
471          loader.load(procIter);
472        }
473
474        @Override
475        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
476          loader.handleCorrupted(procIter);
477        }
478
479        @Override
480        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
481          if (corruptedLogs == null) {
482            corruptedLogs = new HashSet<>();
483          }
484          corruptedLogs.add(log);
485          // TODO: sideline corrupted log
486        }
487      });
488      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
489      // method. As it may happen that, we have finished constructing the modified and deleted bits,
490      // but before we call resetModified, we fail, then if we persist the storeTracker then when
491      // restarting, we will consider that all procedures have been included in this file and delete
492      // all the previous files. Obviously this not correct. So here we will only set loading to
493      // false when we successfully loaded all the procedures, and when closing we will skip
494      // persisting the store tracker. And also, this will prevent the sync thread to do
495      // periodicRoll, where we may also clean old logs.
496      loading.set(false);
497      // try to cleanup inactive wals and complete the operation
498      buildHoldingCleanupTracker();
499      tryCleanupLogsOnLoad();
500    } finally {
501      lock.unlock();
502    }
503  }
504
505  private void tryCleanupLogsOnLoad() {
506    // nothing to cleanup.
507    if (logs.size() <= 1) {
508      return;
509    }
510
511    // the config says to not cleanup wals on load.
512    if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
513      DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) {
514      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
515      return;
516    }
517
518    try {
519      periodicRoll();
520    } catch (IOException e) {
521      LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
522    }
523  }
524
525  @Override
526  public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
527    if (LOG.isTraceEnabled()) {
528      LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
529    }
530
531    ByteSlot slot = acquireSlot();
532    try {
533      // Serialize the insert
534      long[] subProcIds = null;
535      if (subprocs != null) {
536        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
537        subProcIds = new long[subprocs.length];
538        for (int i = 0; i < subprocs.length; ++i) {
539          subProcIds[i] = subprocs[i].getProcId();
540        }
541      } else {
542        assert !proc.hasParent();
543        ProcedureWALFormat.writeInsert(slot, proc);
544      }
545
546      // Push the transaction data and wait until it is persisted
547      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
548    } catch (IOException e) {
549      // We are not able to serialize the procedure.
550      // this is a code error, and we are not able to go on.
551      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" +
552          proc + ", subprocs=" + Arrays.toString(subprocs), e);
553      throw new RuntimeException(e);
554    } finally {
555      releaseSlot(slot);
556    }
557  }
558
559  @Override
560  public void insert(Procedure<?>[] procs) {
561    if (LOG.isTraceEnabled()) {
562      LOG.trace("Insert " + Arrays.toString(procs));
563    }
564
565    ByteSlot slot = acquireSlot();
566    try {
567      // Serialize the insert
568      long[] procIds = new long[procs.length];
569      for (int i = 0; i < procs.length; ++i) {
570        assert !procs[i].hasParent();
571        procIds[i] = procs[i].getProcId();
572        ProcedureWALFormat.writeInsert(slot, procs[i]);
573      }
574
575      // Push the transaction data and wait until it is persisted
576      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
577    } catch (IOException e) {
578      // We are not able to serialize the procedure.
579      // this is a code error, and we are not able to go on.
580      LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " +
581          Arrays.toString(procs), e);
582      throw new RuntimeException(e);
583    } finally {
584      releaseSlot(slot);
585    }
586  }
587
588  @Override
589  public void update(Procedure<?> proc) {
590    if (LOG.isTraceEnabled()) {
591      LOG.trace("Update " + proc);
592    }
593
594    ByteSlot slot = acquireSlot();
595    try {
596      // Serialize the update
597      ProcedureWALFormat.writeUpdate(slot, proc);
598
599      // Push the transaction data and wait until it is persisted
600      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
601    } catch (IOException e) {
602      // We are not able to serialize the procedure.
603      // this is a code error, and we are not able to go on.
604      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
605      throw new RuntimeException(e);
606    } finally {
607      releaseSlot(slot);
608    }
609  }
610
611  @Override
612  public void delete(long procId) {
613    LOG.trace("Delete {}", procId);
614    ByteSlot slot = acquireSlot();
615    try {
616      // Serialize the delete
617      ProcedureWALFormat.writeDelete(slot, procId);
618
619      // Push the transaction data and wait until it is persisted
620      pushData(PushType.DELETE, slot, procId, null);
621    } catch (IOException e) {
622      // We are not able to serialize the procedure.
623      // this is a code error, and we are not able to go on.
624      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e);
625      throw new RuntimeException(e);
626    } finally {
627      releaseSlot(slot);
628    }
629  }
630
631  @Override
632  public void delete(Procedure<?> proc, long[] subProcIds) {
633    assert proc != null : "expected a non-null procedure";
634    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
635    if (LOG.isTraceEnabled()) {
636      LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
637    }
638
639    ByteSlot slot = acquireSlot();
640    try {
641      // Serialize the delete
642      ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
643
644      // Push the transaction data and wait until it is persisted
645      pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
646    } catch (IOException e) {
647      // We are not able to serialize the procedure.
648      // this is a code error, and we are not able to go on.
649      LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e);
650      throw new RuntimeException(e);
651    } finally {
652      releaseSlot(slot);
653    }
654  }
655
656  @Override
657  public void delete(final long[] procIds, final int offset, final int count) {
658    if (count == 0) return;
659    if (offset == 0 && count == procIds.length) {
660      delete(procIds);
661    } else if (count == 1) {
662      delete(procIds[offset]);
663    } else {
664      delete(Arrays.copyOfRange(procIds, offset, offset + count));
665    }
666  }
667
668  private void delete(long[] procIds) {
669    if (LOG.isTraceEnabled()) {
670      LOG.trace("Delete " + Arrays.toString(procIds));
671    }
672
673    final ByteSlot slot = acquireSlot();
674    try {
675      // Serialize the delete
676      for (int i = 0; i < procIds.length; ++i) {
677        ProcedureWALFormat.writeDelete(slot, procIds[i]);
678      }
679
680      // Push the transaction data and wait until it is persisted
681      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
682    } catch (IOException e) {
683      // We are not able to serialize the procedure.
684      // this is a code error, and we are not able to go on.
685      LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
686      throw new RuntimeException(e);
687    } finally {
688      releaseSlot(slot);
689    }
690  }
691
692  private ByteSlot acquireSlot() {
693    ByteSlot slot = slotsCache.poll();
694    return slot != null ? slot : new ByteSlot();
695  }
696
697  private void releaseSlot(final ByteSlot slot) {
698    slot.reset();
699    slotsCache.offer(slot);
700  }
701
702  private enum PushType { INSERT, UPDATE, DELETE };
703
704  private long pushData(final PushType type, final ByteSlot slot,
705      final long procId, final long[] subProcIds) {
706    if (!isRunning()) {
707      throw new RuntimeException("the store must be running before inserting data");
708    }
709    if (logs.isEmpty()) {
710      throw new RuntimeException("recoverLease() must be called before inserting data");
711    }
712
713    long logId = -1;
714    lock.lock();
715    try {
716      // Wait for the sync to be completed
717      while (true) {
718        if (!isRunning()) {
719          throw new RuntimeException("store no longer running");
720        } else if (isSyncAborted()) {
721          throw new RuntimeException("sync aborted", syncException.get());
722        } else if (inSync.get()) {
723          syncCond.await();
724        } else if (slotIndex >= syncMaxSlot) {
725          slotCond.signal();
726          syncCond.await();
727        } else {
728          break;
729        }
730      }
731
732      final long pushSyncId = syncId.get();
733      updateStoreTracker(type, procId, subProcIds);
734      slots[slotIndex++] = slot;
735      logId = flushLogId;
736
737      // Notify that there is new data
738      if (slotIndex == 1) {
739        waitCond.signal();
740      }
741
742      // Notify that the slots are full
743      if (slotIndex == syncMaxSlot) {
744        waitCond.signal();
745        slotCond.signal();
746      }
747
748      while (pushSyncId == syncId.get() && isRunning()) {
749        syncCond.await();
750      }
751    } catch (InterruptedException e) {
752      Thread.currentThread().interrupt();
753      sendAbortProcessSignal();
754      throw new RuntimeException(e);
755    } finally {
756      lock.unlock();
757      if (isSyncAborted()) {
758        throw new RuntimeException("sync aborted", syncException.get());
759      }
760    }
761    return logId;
762  }
763
764  private void updateStoreTracker(final PushType type,
765      final long procId, final long[] subProcIds) {
766    switch (type) {
767      case INSERT:
768        if (subProcIds == null) {
769          storeTracker.insert(procId);
770        } else if (procId == Procedure.NO_PROC_ID) {
771          storeTracker.insert(subProcIds);
772        } else {
773          storeTracker.insert(procId, subProcIds);
774          holdingCleanupTracker.setDeletedIfModified(procId);
775        }
776        break;
777      case UPDATE:
778        storeTracker.update(procId);
779        holdingCleanupTracker.setDeletedIfModified(procId);
780        break;
781      case DELETE:
782        if (subProcIds != null && subProcIds.length > 0) {
783          storeTracker.delete(subProcIds);
784          holdingCleanupTracker.setDeletedIfModified(subProcIds);
785        } else {
786          storeTracker.delete(procId);
787          holdingCleanupTracker.setDeletedIfModified(procId);
788        }
789        break;
790      default:
791        throw new RuntimeException("invalid push type " + type);
792    }
793  }
794
795  private boolean isSyncAborted() {
796    return syncException.get() != null;
797  }
798
799  private void syncLoop() throws Throwable {
800    long totalSyncedToStore = 0;
801    inSync.set(false);
802    lock.lock();
803    try {
804      while (isRunning()) {
805        try {
806          // Wait until new data is available
807          if (slotIndex == 0) {
808            if (!loading.get()) {
809              periodicRoll();
810            }
811
812            if (LOG.isTraceEnabled()) {
813              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
814              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
815                        StringUtils.humanSize(totalSynced.get()),
816                        StringUtils.humanSize(totalSynced.get() / rollTsSec)));
817            }
818
819            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
820            if (slotIndex == 0) {
821              // no data.. probably a stop() or a periodic roll
822              continue;
823            }
824          }
825          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
826          syncMaxSlot = runningProcCount;
827          assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
828          final long syncWaitSt = System.currentTimeMillis();
829          if (slotIndex != syncMaxSlot) {
830            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
831          }
832
833          final long currentTs = System.currentTimeMillis();
834          final long syncWaitMs = currentTs - syncWaitSt;
835          final float rollSec = getMillisFromLastRoll() / 1000.0f;
836          final float syncedPerSec = totalSyncedToStore / rollSec;
837          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
838            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
839                      StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
840                      StringUtils.humanSize(totalSyncedToStore),
841                      StringUtils.humanSize(syncedPerSec)));
842          }
843
844          // update webui circular buffers (TODO: get rid of allocations)
845          final SyncMetrics syncMetrics = new SyncMetrics();
846          syncMetrics.timestamp = currentTs;
847          syncMetrics.syncWaitMs = syncWaitMs;
848          syncMetrics.syncedEntries = slotIndex;
849          syncMetrics.totalSyncedBytes = totalSyncedToStore;
850          syncMetrics.syncedPerSec = syncedPerSec;
851          syncMetricsQueue.add(syncMetrics);
852
853          // sync
854          inSync.set(true);
855          long slotSize = syncSlots();
856          logs.getLast().addToSize(slotSize);
857          totalSyncedToStore = totalSynced.addAndGet(slotSize);
858          slotIndex = 0;
859          inSync.set(false);
860          syncId.incrementAndGet();
861        } catch (InterruptedException e) {
862          Thread.currentThread().interrupt();
863          syncException.compareAndSet(null, e);
864          sendAbortProcessSignal();
865          throw e;
866        } catch (Throwable t) {
867          syncException.compareAndSet(null, t);
868          sendAbortProcessSignal();
869          throw t;
870        } finally {
871          syncCond.signalAll();
872        }
873      }
874    } finally {
875      lock.unlock();
876    }
877  }
878
879  public ArrayList<SyncMetrics> getSyncMetrics() {
880    lock.lock();
881    try {
882      return new ArrayList<>(syncMetricsQueue);
883    } finally {
884      lock.unlock();
885    }
886  }
887
888  private long syncSlots() throws Throwable {
889    int retry = 0;
890    int logRolled = 0;
891    long totalSynced = 0;
892    do {
893      try {
894        totalSynced = syncSlots(stream, slots, 0, slotIndex);
895        break;
896      } catch (Throwable e) {
897        LOG.warn("unable to sync slots, retry=" + retry);
898        if (++retry >= maxRetriesBeforeRoll) {
899          if (logRolled >= maxSyncFailureRoll && isRunning()) {
900            LOG.error("Sync slots after log roll failed, abort.", e);
901            throw e;
902          }
903
904          if (!rollWriterWithRetries()) {
905            throw e;
906          }
907
908          logRolled++;
909          retry = 0;
910        }
911      }
912    } while (isRunning());
913    return totalSynced;
914  }
915
916  protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
917      final int offset, final int count) throws IOException {
918    long totalSynced = 0;
919    for (int i = 0; i < count; ++i) {
920      final ByteSlot data = slots[offset + i];
921      data.writeTo(stream);
922      totalSynced += data.size();
923    }
924
925    syncStream(stream);
926    sendPostSyncSignal();
927
928    if (LOG.isTraceEnabled()) {
929      LOG.trace("Sync slots=" + count + '/' + syncMaxSlot +
930                ", flushed=" + StringUtils.humanSize(totalSynced));
931    }
932    return totalSynced;
933  }
934
935  protected void syncStream(final FSDataOutputStream stream) throws IOException {
936    if (useHsync) {
937      stream.hsync();
938    } else {
939      stream.hflush();
940    }
941  }
942
943  private boolean rollWriterWithRetries() {
944    for (int i = 0; i < rollRetries && isRunning(); ++i) {
945      if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
946
947      try {
948        if (rollWriter()) {
949          return true;
950        }
951      } catch (IOException e) {
952        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
953      }
954    }
955    LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
956    return false;
957  }
958
959  private boolean tryRollWriter() {
960    try {
961      return rollWriter();
962    } catch (IOException e) {
963      LOG.warn("Unable to roll the log", e);
964      return false;
965    }
966  }
967
968  public long getMillisToNextPeriodicRoll() {
969    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
970      return periodicRollMsec - getMillisFromLastRoll();
971    }
972    return Long.MAX_VALUE;
973  }
974
975  public long getMillisFromLastRoll() {
976    return (System.currentTimeMillis() - lastRollTs.get());
977  }
978
979  @VisibleForTesting
980  void periodicRollForTesting() throws IOException {
981    lock.lock();
982    try {
983      periodicRoll();
984    } finally {
985      lock.unlock();
986    }
987  }
988
989  @VisibleForTesting
990  public boolean rollWriterForTesting() throws IOException {
991    lock.lock();
992    try {
993      return rollWriter();
994    } finally {
995      lock.unlock();
996    }
997  }
998
999  @VisibleForTesting
1000  void removeInactiveLogsForTesting() throws Exception {
1001    lock.lock();
1002    try {
1003      removeInactiveLogs();
1004    } finally  {
1005      lock.unlock();
1006    }
1007  }
1008
1009  private void periodicRoll() throws IOException {
1010    if (storeTracker.isEmpty()) {
1011      LOG.trace("no active procedures");
1012      tryRollWriter();
1013      removeAllLogs(flushLogId - 1, "no active procedures");
1014    } else {
1015      if (storeTracker.isAllModified()) {
1016        LOG.trace("all the active procedures are in the latest log");
1017        removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
1018      }
1019
1020      // if the log size has exceeded the roll threshold
1021      // or the periodic roll timeout is expired, try to roll the wal.
1022      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
1023        tryRollWriter();
1024      }
1025
1026      removeInactiveLogs();
1027    }
1028  }
1029
1030  private boolean rollWriter() throws IOException {
1031    if (!isRunning()) {
1032      return false;
1033    }
1034
1035    // Create new state-log
1036    if (!rollWriter(flushLogId + 1)) {
1037      LOG.warn("someone else has already created log {}", flushLogId);
1038      return false;
1039    }
1040
1041    // We have the lease on the log,
1042    // but we should check if someone else has created new files
1043    if (getMaxLogId(getLogFiles()) > flushLogId) {
1044      LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
1045      logs.getLast().removeFile(this.walArchiveDir);
1046      return false;
1047    }
1048
1049    // We have the lease on the log
1050    return true;
1051  }
1052
1053  @VisibleForTesting
1054  boolean rollWriter(long logId) throws IOException {
1055    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
1056    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
1057
1058    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
1059      .setVersion(ProcedureWALFormat.HEADER_VERSION)
1060      .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
1061      .setMinProcId(storeTracker.getActiveMinProcId())
1062      .setLogId(logId)
1063      .build();
1064
1065    FSDataOutputStream newStream = null;
1066    Path newLogFile = null;
1067    long startPos = -1;
1068    newLogFile = getLogFilePath(logId);
1069    try {
1070      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
1071    } catch (FileAlreadyExistsException e) {
1072      LOG.error("Log file with id={} already exists", logId, e);
1073      return false;
1074    } catch (RemoteException re) {
1075      LOG.warn("failed to create log file with id={}", logId, re);
1076      return false;
1077    }
1078    // After we create the stream but before we attempt to use it at all
1079    // ensure that we can provide the level of data safety we're configured
1080    // to provide.
1081    final String durability = useHsync ? "hsync" : "hflush";
1082    if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
1083        throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
1084          " for proper operation during component failures, but the underlying filesystem does " +
1085          "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
1086          "' to set the desired level of robustness and ensure the config value of '" +
1087          CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
1088    }
1089    try {
1090      ProcedureWALFormat.writeHeader(newStream, header);
1091      startPos = newStream.getPos();
1092    } catch (IOException ioe) {
1093      LOG.warn("Encountered exception writing header", ioe);
1094      newStream.close();
1095      return false;
1096    }
1097
1098    closeCurrentLogStream(false);
1099
1100    storeTracker.resetModified();
1101    stream = newStream;
1102    flushLogId = logId;
1103    totalSynced.set(0);
1104    long rollTs = System.currentTimeMillis();
1105    lastRollTs.set(rollTs);
1106    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
1107
1108    // if it's the first next WAL being added, build the holding cleanup tracker
1109    if (logs.size() == 2) {
1110      buildHoldingCleanupTracker();
1111    } else if (logs.size() > walCountWarnThreshold) {
1112      LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" +
1113        " to see if something is stuck.", logs.size(), walCountWarnThreshold);
1114      // This is just like what we have done at RS side when there are too many wal files. For RS,
1115      // if there are too many wal files, we will find out the wal entries in the oldest file, and
1116      // tell the upper layer to flush these regions so the wal entries will be useless and then we
1117      // can delete the wal file. For WALProcedureStore, the assumption is that, if all the
1118      // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
1119      // we are safe to delete it. So here if there are too many proc wal files, we will find out
1120      // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
1121      // wal files, and tell upper layer to update the state of these procedures to the newest proc
1122      // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
1123      // file.
1124      sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
1125    }
1126
1127    LOG.info("Rolled new Procedure Store WAL, id={}", logId);
1128    return true;
1129  }
1130
1131  private void closeCurrentLogStream(boolean abort) {
1132    if (stream == null || logs.isEmpty()) {
1133      return;
1134    }
1135
1136    try {
1137      ProcedureWALFile log = logs.getLast();
1138      // If the loading flag is true, it usually means that we fail when loading procedures, so we
1139      // should not persist the store tracker, as its state may not be correct.
1140      if (!loading.get()) {
1141        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
1142        log.updateLocalTracker(storeTracker);
1143        if (!abort) {
1144          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
1145          log.addToSize(trailerSize);
1146        }
1147      }
1148    } catch (IOException e) {
1149      LOG.warn("Unable to write the trailer", e);
1150    }
1151    try {
1152      stream.close();
1153    } catch (IOException e) {
1154      LOG.error("Unable to close the stream", e);
1155    }
1156    stream = null;
1157  }
1158
1159  // ==========================================================================
1160  //  Log Files cleaner helpers
1161  // ==========================================================================
1162  private void removeInactiveLogs() throws IOException {
1163    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
1164    // once there is nothing olding the oldest WAL we can remove it.
1165    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
1166      LOG.info("Remove the oldest log {}", logs.getFirst());
1167      removeLogFile(logs.getFirst(), walArchiveDir);
1168      buildHoldingCleanupTracker();
1169    }
1170
1171    // TODO: In case we are holding up a lot of logs for long time we should
1172    // rewrite old procedures (in theory parent procs) to the new WAL.
1173  }
1174
1175  private void buildHoldingCleanupTracker() {
1176    if (logs.size() <= 1) {
1177      // we only have one wal, so nothing to do
1178      holdingCleanupTracker.reset();
1179      return;
1180    }
1181
1182    // compute the holding tracker.
1183    // - the first WAL is used for the 'updates'
1184    // - the global tracker will be used to determine whether a procedure has been deleted
1185    // - other trackers will be used to determine whether a procedure has been updated, as a deleted
1186    // procedure can always be detected by checking the global tracker, we can save the deleted
1187    // checks when applying other trackers
1188    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
1189    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
1190    // the logs is a linked list, so avoid calling get(index) on it.
1191    Iterator<ProcedureWALFile> iter = logs.iterator();
1192    // skip the tracker for the first file when creating the iterator.
1193    iter.next();
1194    ProcedureStoreTracker tracker = iter.next().getTracker();
1195    // testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
1196    // which is just the storeTracker above.
1197    while (iter.hasNext()) {
1198      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
1199      if (holdingCleanupTracker.isEmpty()) {
1200        break;
1201      }
1202      tracker = iter.next().getTracker();
1203    }
1204  }
1205
1206  /**
1207   * Remove all logs with logId <= {@code lastLogId}.
1208   */
1209  private void removeAllLogs(long lastLogId, String why) {
1210    if (logs.size() <= 1) {
1211      return;
1212    }
1213
1214    LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
1215
1216    boolean removed = false;
1217    while (logs.size() > 1) {
1218      ProcedureWALFile log = logs.getFirst();
1219      if (lastLogId < log.getLogId()) {
1220        break;
1221      }
1222      removeLogFile(log, walArchiveDir);
1223      removed = true;
1224    }
1225
1226    if (removed) {
1227      buildHoldingCleanupTracker();
1228    }
1229  }
1230
1231  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
1232    try {
1233      LOG.trace("Removing log={}", log);
1234      log.removeFile(walArchiveDir);
1235      logs.remove(log);
1236      LOG.debug("Removed log={}, activeLogs={}", log, logs);
1237      assert logs.size() > 0 : "expected at least one log";
1238    } catch (IOException e) {
1239      LOG.error("Unable to remove log: " + log, e);
1240      return false;
1241    }
1242    return true;
1243  }
1244
1245  // ==========================================================================
1246  //  FileSystem Log Files helpers
1247  // ==========================================================================
1248  public Path getWALDir() {
1249    return this.walDir;
1250  }
1251
1252  @VisibleForTesting
1253  Path getWalArchiveDir() {
1254    return this.walArchiveDir;
1255  }
1256
1257  public FileSystem getFileSystem() {
1258    return this.fs;
1259  }
1260
1261  protected Path getLogFilePath(final long logId) throws IOException {
1262    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
1263  }
1264
1265  private static long getLogIdFromName(final String name) {
1266    int end = name.lastIndexOf(".log");
1267    int start = name.lastIndexOf('-') + 1;
1268    return Long.parseLong(name.substring(start, end));
1269  }
1270
1271  private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
1272    @Override
1273    public boolean accept(Path path) {
1274      String name = path.getName();
1275      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
1276    }
1277  };
1278
1279  private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
1280      new Comparator<FileStatus>() {
1281    @Override
1282    public int compare(FileStatus a, FileStatus b) {
1283      final long aId = getLogIdFromName(a.getPath().getName());
1284      final long bId = getLogIdFromName(b.getPath().getName());
1285      return Long.compare(aId, bId);
1286    }
1287  };
1288
1289  private FileStatus[] getLogFiles() throws IOException {
1290    try {
1291      FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
1292      Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
1293      return files;
1294    } catch (FileNotFoundException e) {
1295      LOG.warn("Log directory not found: " + e.getMessage());
1296      return null;
1297    }
1298  }
1299
1300  /**
1301   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1302   * the file set by log id.
1303   * @return Max-LogID of the specified log file set
1304   */
1305  private static long getMaxLogId(FileStatus[] logFiles) {
1306    if (logFiles == null || logFiles.length == 0) {
1307      return 0L;
1308    }
1309    return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
1310  }
1311
1312  /**
1313   * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
1314   * the file set by log id.
1315   * @return Max-LogID of the specified log file set
1316   */
1317  private long initOldLogs(FileStatus[] logFiles) throws IOException {
1318    if (logFiles == null || logFiles.length == 0) {
1319      return 0L;
1320    }
1321    long maxLogId = 0;
1322    for (int i = 0; i < logFiles.length; ++i) {
1323      final Path logPath = logFiles[i].getPath();
1324      leaseRecovery.recoverFileLease(fs, logPath);
1325      if (!isRunning()) {
1326        throw new IOException("wal aborting");
1327      }
1328
1329      maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1330      ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
1331      if (log != null) {
1332        this.logs.add(log);
1333      }
1334    }
1335    initTrackerFromOldLogs();
1336    return maxLogId;
1337  }
1338
1339  /**
1340   * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
1341   * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
1342   */
1343  private void initTrackerFromOldLogs() {
1344    if (logs.isEmpty() || !isRunning()) {
1345      return;
1346    }
1347    ProcedureWALFile log = logs.getLast();
1348    if (!log.getTracker().isPartial()) {
1349      storeTracker.resetTo(log.getTracker());
1350    } else {
1351      storeTracker.reset();
1352      storeTracker.setPartialFlag(true);
1353    }
1354  }
1355
1356  /**
1357   * Loads given log file and it's tracker.
1358   */
1359  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
1360      throws IOException {
1361    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1362    if (logFile.getLen() == 0) {
1363      LOG.warn("Remove uninitialized log: {}", logFile);
1364      log.removeFile(walArchiveDir);
1365      return null;
1366    }
1367    LOG.debug("Opening Pv2 {}", logFile);
1368    try {
1369      log.open();
1370    } catch (ProcedureWALFormat.InvalidWALDataException e) {
1371      LOG.warn("Remove uninitialized log: {}", logFile, e);
1372      log.removeFile(walArchiveDir);
1373      return null;
1374    } catch (IOException e) {
1375      String msg = "Unable to read state log: " + logFile;
1376      LOG.error(msg, e);
1377      throw new IOException(msg, e);
1378    }
1379
1380    try {
1381      log.readTracker();
1382    } catch (IOException e) {
1383      log.getTracker().reset();
1384      log.getTracker().setPartialFlag(true);
1385      LOG.warn("Unable to read tracker for {}", log, e);
1386    }
1387
1388    log.close();
1389    return log;
1390  }
1391}