View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.io.FileNotFoundException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicLong;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.ReentrantLock;
28  import java.util.concurrent.LinkedTransferQueue;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import java.util.concurrent.TimeUnit;
31  import java.util.Arrays;
32  import java.util.ArrayList;
33  import java.util.Collections;
34  import java.util.Comparator;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.LinkedList;
38  import java.util.Set;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileAlreadyExistsException;
45  import org.apache.hadoop.fs.FileStatus;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.fs.PathFilter;
49  import org.apache.hadoop.hbase.classification.InterfaceAudience;
50  import org.apache.hadoop.hbase.classification.InterfaceStability;
51  import org.apache.hadoop.hbase.procedure2.Procedure;
52  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
53  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
54  import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
55  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
56  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
57  import org.apache.hadoop.hbase.util.Threads;
58  import org.apache.hadoop.ipc.RemoteException;
59  
60  import com.google.common.annotations.VisibleForTesting;
61  
62  /**
63   * WAL implementation of the ProcedureStore.
64   */
65  @InterfaceAudience.Private
66  @InterfaceStability.Evolving
67  public class WALProcedureStore implements ProcedureStore {
68    private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
69  
70    public interface LeaseRecovery {
71      void recoverFileLease(FileSystem fs, Path path) throws IOException;
72    }
73  
74    private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
75      "hbase.procedure.store.wal.max.retries.before.roll";
76    private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
77  
78    private static final String WAIT_BEFORE_ROLL_CONF_KEY =
79      "hbase.procedure.store.wal.wait.before.roll";
80    private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
81  
82    private static final String ROLL_RETRIES_CONF_KEY =
83      "hbase.procedure.store.wal.max.roll.retries";
84    private static final int DEFAULT_ROLL_RETRIES = 3;
85  
86    private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
87      "hbase.procedure.store.wal.sync.failure.roll.max";
88    private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
89  
90    private static final String PERIODIC_ROLL_CONF_KEY =
91      "hbase.procedure.store.wal.periodic.roll.msec";
92    private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
93  
94    private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
95    private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
96  
97    private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
98    private static final boolean DEFAULT_USE_HSYNC = true;
99  
100   private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
101   private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
102 
103   private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
104     new CopyOnWriteArrayList<ProcedureStoreListener>();
105 
106   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
107   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
108   private final AtomicBoolean running = new AtomicBoolean(false);
109 
110   private final ReentrantLock lock = new ReentrantLock();
111   private final Condition waitCond = lock.newCondition();
112   private final Condition slotCond = lock.newCondition();
113   private final Condition syncCond = lock.newCondition();
114 
115   private final LeaseRecovery leaseRecovery;
116   private final Configuration conf;
117   private final FileSystem fs;
118   private final Path logDir;
119 
120   private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
121   private final AtomicBoolean loading = new AtomicBoolean(true);
122   private final AtomicBoolean inSync = new AtomicBoolean(false);
123   private final AtomicLong totalSynced = new AtomicLong(0);
124   private final AtomicLong lastRollTs = new AtomicLong(0);
125 
126   private LinkedTransferQueue<ByteSlot> slotsCache = null;
127   private Set<ProcedureWALFile> corruptedLogs = null;
128   private FSDataOutputStream stream = null;
129   private long flushLogId = 0;
130   private int slotIndex = 0;
131   private Thread syncThread;
132   private ByteSlot[] slots;
133 
134   private int maxRetriesBeforeRoll;
135   private int maxSyncFailureRoll;
136   private int waitBeforeRoll;
137   private int rollRetries;
138   private int periodicRollMsec;
139   private long rollThreshold;
140   private boolean useHsync;
141   private int syncWaitMsec;
142 
143   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
144       final LeaseRecovery leaseRecovery) {
145     this.fs = fs;
146     this.conf = conf;
147     this.logDir = logDir;
148     this.leaseRecovery = leaseRecovery;
149   }
150 
151   @Override
152   public void start(int numSlots) throws IOException {
153     if (running.getAndSet(true)) {
154       return;
155     }
156 
157     // Init buffer slots
158     loading.set(true);
159     slots = new ByteSlot[numSlots];
160     slotsCache = new LinkedTransferQueue();
161     while (slotsCache.size() < numSlots) {
162       slotsCache.offer(new ByteSlot());
163     }
164 
165     // Tunings
166     maxRetriesBeforeRoll =
167       conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
168     maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
169     waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
170     rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
171     rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
172     periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
173     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
174     useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
175 
176     // Init sync thread
177     syncThread = new Thread("WALProcedureStoreSyncThread") {
178       @Override
179       public void run() {
180         try {
181           syncLoop();
182         } catch (Throwable e) {
183           LOG.error("Got an exception from the sync-loop", e);
184           if (!isSyncAborted()) {
185             sendAbortProcessSignal();
186           }
187         }
188       }
189     };
190     syncThread.start();
191   }
192 
193   @Override
194   public void stop(boolean abort) {
195     if (!running.getAndSet(false)) {
196       return;
197     }
198 
199     LOG.info("Stopping the WAL Procedure Store");
200     sendStopSignal();
201 
202     if (!abort) {
203       try {
204         while (syncThread.isAlive()) {
205           sendStopSignal();
206           syncThread.join(250);
207         }
208       } catch (InterruptedException e) {
209         LOG.warn("join interrupted", e);
210         Thread.currentThread().interrupt();
211       }
212     }
213 
214     // Close the writer
215     closeStream();
216 
217     // Close the old logs
218     // they should be already closed, this is just in case the load fails
219     // and we call start() and then stop()
220     for (ProcedureWALFile log: logs) {
221       log.close();
222     }
223     logs.clear();
224   }
225 
226   private void sendStopSignal() {
227     if (lock.tryLock()) {
228       try {
229         waitCond.signalAll();
230         syncCond.signalAll();
231       } finally {
232         lock.unlock();
233       }
234     }
235   }
236 
237   @Override
238   public boolean isRunning() {
239     return running.get();
240   }
241 
242   @Override
243   public int getNumThreads() {
244     return slots == null ? 0 : slots.length;
245   }
246 
247   public ProcedureStoreTracker getStoreTracker() {
248     return storeTracker;
249   }
250 
251   public ArrayList<ProcedureWALFile> getActiveLogs() {
252     lock.lock();
253     try {
254       return new ArrayList<ProcedureWALFile>(logs);
255     } finally {
256       lock.unlock();
257     }
258   }
259 
260   public Set<ProcedureWALFile> getCorruptedLogs() {
261     return corruptedLogs;
262   }
263 
264   @Override
265   public void registerListener(ProcedureStoreListener listener) {
266     this.listeners.add(listener);
267   }
268 
269   @Override
270   public boolean unregisterListener(ProcedureStoreListener listener) {
271     return this.listeners.remove(listener);
272   }
273 
274   @Override
275   public void recoverLease() throws IOException {
276     lock.lock();
277     try {
278       LOG.info("Starting WAL Procedure Store lease recovery");
279       FileStatus[] oldLogs = getLogFiles();
280       while (isRunning()) {
281         // Get Log-MaxID and recover lease on old logs
282         try {
283           flushLogId = initOldLogs(oldLogs);
284         } catch (FileNotFoundException e) {
285           LOG.warn("someone else is active and deleted logs. retrying.", e);
286           oldLogs = getLogFiles();
287           continue;
288         }
289 
290         // Create new state-log
291         if (!rollWriter(flushLogId + 1)) {
292           // someone else has already created this log
293           LOG.debug("someone else has already created log " + flushLogId);
294           continue;
295         }
296 
297         // We have the lease on the log
298         oldLogs = getLogFiles();
299         if (getMaxLogId(oldLogs) > flushLogId) {
300           if (LOG.isDebugEnabled()) {
301             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
302           }
303           logs.getLast().removeFile();
304           continue;
305         }
306 
307         LOG.info("Lease acquired for flushLogId: " + flushLogId);
308         break;
309       }
310     } finally {
311       lock.unlock();
312     }
313   }
314 
315   @Override
316   public Iterator<Procedure> load() throws IOException {
317     if (logs.isEmpty()) {
318       throw new RuntimeException("recoverLease() must be called before loading data");
319     }
320 
321     // Nothing to do, If we have only the current log.
322     if (logs.size() == 1) {
323       if (LOG.isDebugEnabled()) {
324         LOG.debug("No state logs to replay.");
325       }
326       loading.set(false);
327       return null;
328     }
329 
330     // Load the old logs
331     Iterator<ProcedureWALFile> it = logs.descendingIterator();
332     it.next(); // Skip the current log
333     try {
334       return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
335         @Override
336         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
337           if (corruptedLogs == null) {
338             corruptedLogs = new HashSet<ProcedureWALFile>();
339           }
340           corruptedLogs.add(log);
341           // TODO: sideline corrupted log
342         }
343       });
344     } finally {
345       loading.set(false);
346     }
347   }
348 
349   @Override
350   public void insert(final Procedure proc, final Procedure[] subprocs) {
351     if (LOG.isTraceEnabled()) {
352       LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
353     }
354 
355     ByteSlot slot = acquireSlot();
356     try {
357       // Serialize the insert
358       long[] subProcIds = null;
359       if (subprocs != null) {
360         ProcedureWALFormat.writeInsert(slot, proc, subprocs);
361         subProcIds = new long[subprocs.length];
362         for (int i = 0; i < subprocs.length; ++i) {
363           subProcIds[i] = subprocs[i].getProcId();
364         }
365       } else {
366         assert !proc.hasParent();
367         ProcedureWALFormat.writeInsert(slot, proc);
368       }
369 
370       // Push the transaction data and wait until it is persisted
371       pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
372     } catch (IOException e) {
373       // We are not able to serialize the procedure.
374       // this is a code error, and we are not able to go on.
375       LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
376                 ", subprocs=" + Arrays.toString(subprocs), e);
377       throw new RuntimeException(e);
378     } finally {
379       releaseSlot(slot);
380     }
381   }
382 
383   @Override
384   public void update(final Procedure proc) {
385     if (LOG.isTraceEnabled()) {
386       LOG.trace("Update " + proc);
387     }
388 
389     ByteSlot slot = acquireSlot();
390     try {
391       // Serialize the update
392       ProcedureWALFormat.writeUpdate(slot, proc);
393 
394       // Push the transaction data and wait until it is persisted
395       pushData(PushType.UPDATE, slot, proc.getProcId(), null);
396     } catch (IOException e) {
397       // We are not able to serialize the procedure.
398       // this is a code error, and we are not able to go on.
399       LOG.fatal("Unable to serialize the procedure: " + proc, e);
400       throw new RuntimeException(e);
401     } finally {
402       releaseSlot(slot);
403     }
404   }
405 
406   @Override
407   public void delete(final long procId) {
408     if (LOG.isTraceEnabled()) {
409       LOG.trace("Delete " + procId);
410     }
411 
412     ByteSlot slot = acquireSlot();
413     try {
414       // Serialize the delete
415       ProcedureWALFormat.writeDelete(slot, procId);
416 
417       // Push the transaction data and wait until it is persisted
418       pushData(PushType.DELETE, slot, procId, null);
419     } catch (IOException e) {
420       // We are not able to serialize the procedure.
421       // this is a code error, and we are not able to go on.
422       LOG.fatal("Unable to serialize the procedure: " + procId, e);
423       throw new RuntimeException(e);
424     } finally {
425       releaseSlot(slot);
426     }
427   }
428 
429   private ByteSlot acquireSlot() {
430     ByteSlot slot = slotsCache.poll();
431     return slot != null ? slot : new ByteSlot();
432   }
433 
434   private void releaseSlot(final ByteSlot slot) {
435     slot.reset();
436     slotsCache.offer(slot);
437   }
438 
439   private enum PushType { INSERT, UPDATE, DELETE };
440 
441   private long pushData(final PushType type, final ByteSlot slot,
442       final long procId, final long[] subProcIds) {
443     if (!isRunning()) {
444       throw new RuntimeException("the store must be running before inserting data");
445     }
446     if (logs.isEmpty()) {
447       throw new RuntimeException("recoverLease() must be called before inserting data");
448     }
449 
450     long logId = -1;
451     lock.lock();
452     try {
453       // Wait for the sync to be completed
454       while (true) {
455         if (!isRunning()) {
456           throw new RuntimeException("store no longer running");
457         } else if (isSyncAborted()) {
458           throw new RuntimeException("sync aborted", syncException.get());
459         } else if (inSync.get()) {
460           syncCond.await();
461         } else if (slotIndex == slots.length) {
462           slotCond.signal();
463           syncCond.await();
464         } else {
465           break;
466         }
467       }
468 
469       updateStoreTracker(type, procId, subProcIds);
470       slots[slotIndex++] = slot;
471       logId = flushLogId;
472 
473       // Notify that there is new data
474       if (slotIndex == 1) {
475         waitCond.signal();
476       }
477 
478       // Notify that the slots are full
479       if (slotIndex == slots.length) {
480         waitCond.signal();
481         slotCond.signal();
482       }
483 
484       syncCond.await();
485     } catch (InterruptedException e) {
486       Thread.currentThread().interrupt();
487       sendAbortProcessSignal();
488       throw new RuntimeException(e);
489     } finally {
490       lock.unlock();
491       if (isSyncAborted()) {
492         throw new RuntimeException("sync aborted", syncException.get());
493       }
494     }
495     return logId;
496   }
497 
498   private void updateStoreTracker(final PushType type,
499       final long procId, final long[] subProcIds) {
500     switch (type) {
501       case INSERT:
502         if (subProcIds == null) {
503           storeTracker.insert(procId);
504         } else {
505           storeTracker.insert(procId, subProcIds);
506         }
507         break;
508       case UPDATE:
509         storeTracker.update(procId);
510         break;
511       case DELETE:
512         storeTracker.delete(procId);
513         break;
514       default:
515         throw new RuntimeException("invalid push type " + type);
516     }
517   }
518 
519   private boolean isSyncAborted() {
520     return syncException.get() != null;
521   }
522 
523   private void syncLoop() throws Throwable {
524     inSync.set(false);
525     lock.lock();
526     try {
527       while (isRunning()) {
528         try {
529           // Wait until new data is available
530           if (slotIndex == 0) {
531             if (!loading.get()) {
532               periodicRoll();
533             }
534 
535             if (LOG.isTraceEnabled()) {
536               float rollTsSec = getMillisFromLastRoll() / 1000.0f;
537               LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
538                         StringUtils.humanSize(totalSynced.get()),
539                         StringUtils.humanSize(totalSynced.get() / rollTsSec)));
540             }
541 
542             waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
543             if (slotIndex == 0) {
544               // no data.. probably a stop() or a periodic roll
545               continue;
546             }
547           }
548 
549           // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
550           long syncWaitSt = System.currentTimeMillis();
551           if (slotIndex != slots.length) {
552             slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
553           }
554           long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
555           if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
556             float rollSec = getMillisFromLastRoll() / 1000.0f;
557             LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
558                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
559                       StringUtils.humanSize(totalSynced.get()),
560                       StringUtils.humanSize(totalSynced.get() / rollSec)));
561           }
562 
563           inSync.set(true);
564           totalSynced.addAndGet(syncSlots());
565           slotIndex = 0;
566           inSync.set(false);
567         } catch (InterruptedException e) {
568           Thread.currentThread().interrupt();
569           sendAbortProcessSignal();
570           syncException.compareAndSet(null, e);
571           throw e;
572         } catch (Throwable t) {
573           syncException.compareAndSet(null, t);
574           throw t;
575         } finally {
576           syncCond.signalAll();
577         }
578       }
579     } finally {
580       lock.unlock();
581     }
582   }
583 
584   private long syncSlots() throws Throwable {
585     int retry = 0;
586     int logRolled = 0;
587     long totalSynced = 0;
588     do {
589       try {
590         totalSynced = syncSlots(stream, slots, 0, slotIndex);
591         break;
592       } catch (Throwable e) {
593         LOG.warn("unable to sync slots, retry=" + retry);
594         if (++retry >= maxRetriesBeforeRoll) {
595           if (logRolled >= maxSyncFailureRoll) {
596             LOG.error("Sync slots after log roll failed, abort.", e);
597             sendAbortProcessSignal();
598             throw e;
599           }
600 
601           if (!rollWriterOrDie()) {
602             throw e;
603           }
604 
605           logRolled++;
606           retry = 0;
607         }
608       }
609     } while (running.get());
610     return totalSynced;
611   }
612 
613   protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
614       throws IOException {
615     long totalSynced = 0;
616     for (int i = 0; i < count; ++i) {
617       ByteSlot data = slots[offset + i];
618       data.writeTo(stream);
619       totalSynced += data.size();
620     }
621 
622     if (useHsync) {
623       stream.hsync();
624     } else {
625       stream.hflush();
626     }
627     sendPostSyncSignal();
628 
629     if (LOG.isTraceEnabled()) {
630       LOG.trace("Sync slots=" + count + '/' + slots.length +
631                 ", flushed=" + StringUtils.humanSize(totalSynced));
632     }
633     return totalSynced;
634   }
635 
636   protected void sendPostSyncSignal() {
637     if (!this.listeners.isEmpty()) {
638       for (ProcedureStoreListener listener : this.listeners) {
639         listener.postSync();
640       }
641     }
642   }
643 
644   private void sendAbortProcessSignal() {
645     if (!this.listeners.isEmpty()) {
646       for (ProcedureStoreListener listener : this.listeners) {
647         listener.abortProcess();
648       }
649     }
650   }
651 
652   private boolean rollWriterOrDie() {
653     for (int i = 0; i < rollRetries; ++i) {
654       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
655 
656       try {
657         if (rollWriter()) {
658           return true;
659         }
660       } catch (IOException e) {
661         LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
662       }
663     }
664     LOG.fatal("Unable to roll the log");
665     sendAbortProcessSignal();
666     throw new RuntimeException("unable to roll the log");
667   }
668 
669   private boolean tryRollWriter() {
670     try {
671       return rollWriter();
672     } catch (IOException e) {
673       LOG.warn("Unable to roll the log", e);
674       return false;
675     }
676   }
677 
678   private long getMillisToNextPeriodicRoll() {
679     if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
680       return periodicRollMsec - getMillisFromLastRoll();
681     }
682     return Long.MAX_VALUE;
683   }
684 
685   private long getMillisFromLastRoll() {
686     return (System.currentTimeMillis() - lastRollTs.get());
687   }
688 
689   @VisibleForTesting
690   protected void periodicRollForTesting() throws IOException {
691     lock.lock();
692     try {
693       periodicRoll();
694     } finally {
695       lock.unlock();
696     }
697   }
698 
699   @VisibleForTesting
700   protected boolean rollWriterForTesting() throws IOException {
701     lock.lock();
702     try {
703       return rollWriter();
704     } finally {
705       lock.unlock();
706     }
707   }
708 
709   private void periodicRoll() throws IOException {
710     if (storeTracker.isEmpty()) {
711       if (LOG.isTraceEnabled()) {
712         LOG.trace("no active procedures");
713       }
714       tryRollWriter();
715       removeAllLogs(flushLogId - 1);
716     } else {
717       if (storeTracker.isUpdated()) {
718         if (LOG.isTraceEnabled()) {
719           LOG.trace("all the active procedures are in the latest log");
720         }
721         removeAllLogs(flushLogId - 1);
722       }
723 
724       // if the log size has exceeded the roll threshold
725       // or the periodic roll timeout is expired, try to roll the wal.
726       if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
727         tryRollWriter();
728       }
729 
730       removeInactiveLogs();
731     }
732   }
733 
734   private boolean rollWriter() throws IOException {
735     // Create new state-log
736     if (!rollWriter(flushLogId + 1)) {
737       LOG.warn("someone else has already created log " + flushLogId);
738       return false;
739     }
740 
741     // We have the lease on the log,
742     // but we should check if someone else has created new files
743     if (getMaxLogId(getLogFiles()) > flushLogId) {
744       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
745       logs.getLast().removeFile();
746       return false;
747     }
748 
749     // We have the lease on the log
750     return true;
751   }
752 
753   private boolean rollWriter(final long logId) throws IOException {
754     assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
755     assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
756 
757     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
758       .setVersion(ProcedureWALFormat.HEADER_VERSION)
759       .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
760       .setMinProcId(storeTracker.getMinProcId())
761       .setLogId(logId)
762       .build();
763 
764     FSDataOutputStream newStream = null;
765     Path newLogFile = null;
766     long startPos = -1;
767     newLogFile = getLogFilePath(logId);
768     try {
769       newStream = fs.create(newLogFile, false);
770     } catch (FileAlreadyExistsException e) {
771       LOG.error("Log file with id=" + logId + " already exists", e);
772       return false;
773     } catch (RemoteException re) {
774       LOG.warn("failed to create log file with id=" + logId, re);
775       return false;
776     }
777     try {
778       ProcedureWALFormat.writeHeader(newStream, header);
779       startPos = newStream.getPos();
780     } catch (IOException ioe) {
781       LOG.warn("Encountered exception writing header", ioe);
782       newStream.close();
783       return false;
784     }
785 
786     closeStream();
787 
788     storeTracker.resetUpdates();
789     stream = newStream;
790     flushLogId = logId;
791     totalSynced.set(0);
792     lastRollTs.set(System.currentTimeMillis());
793     logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
794 
795     if (LOG.isDebugEnabled()) {
796       LOG.debug("Roll new state log: " + logId);
797     }
798     return true;
799   }
800 
801   private void closeStream() {
802     try {
803       if (stream != null) {
804         try {
805           ProcedureWALFile log = logs.getLast();
806           log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
807           ProcedureWALFormat.writeTrailer(stream, storeTracker);
808         } catch (IOException e) {
809           LOG.warn("Unable to write the trailer: " + e.getMessage());
810         }
811         stream.close();
812       }
813     } catch (IOException e) {
814       LOG.error("Unable to close the stream", e);
815     } finally {
816       stream = null;
817     }
818   }
819 
820   // ==========================================================================
821   //  Log Files cleaner helpers
822   // ==========================================================================
823   private void removeInactiveLogs() {
824     // Verify if the ProcId of the first oldest is still active. if not remove the file.
825     while (logs.size() > 1) {
826       ProcedureWALFile log = logs.getFirst();
827       if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
828         break;
829       }
830       removeLogFile(log);
831     }
832   }
833 
834   private void removeAllLogs(long lastLogId) {
835     if (logs.size() <= 1) return;
836 
837     if (LOG.isDebugEnabled()) {
838       LOG.debug("Remove all state logs with ID less than " + lastLogId);
839     }
840     while (logs.size() > 1) {
841       ProcedureWALFile log = logs.getFirst();
842       if (lastLogId < log.getLogId()) {
843         break;
844       }
845       removeLogFile(log);
846     }
847   }
848 
849   private boolean removeLogFile(final ProcedureWALFile log) {
850     try {
851       if (LOG.isDebugEnabled()) {
852         LOG.debug("Remove log: " + log);
853       }
854       log.removeFile();
855       logs.remove(log);
856       LOG.info("Remove log: " + log);
857       LOG.info("Removed logs: " + logs);
858       if (logs.size() == 0) { LOG.error("Expected at least one log"); }
859       assert logs.size() > 0 : "expected at least one log";
860     } catch (IOException e) {
861       LOG.error("Unable to remove log: " + log, e);
862       return false;
863     }
864     return true;
865   }
866 
867   // ==========================================================================
868   //  FileSystem Log Files helpers
869   // ==========================================================================
870   public Path getLogDir() {
871     return this.logDir;
872   }
873 
874   public FileSystem getFileSystem() {
875     return this.fs;
876   }
877 
878   protected Path getLogFilePath(final long logId) throws IOException {
879     return new Path(logDir, String.format("state-%020d.log", logId));
880   }
881 
882   private static long getLogIdFromName(final String name) {
883     int end = name.lastIndexOf(".log");
884     int start = name.lastIndexOf('-') + 1;
885     while (start < end) {
886       if (name.charAt(start) != '0')
887         break;
888       start++;
889     }
890     return Long.parseLong(name.substring(start, end));
891   }
892 
893   private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
894     @Override
895     public boolean accept(Path path) {
896       String name = path.getName();
897       return name.startsWith("state-") && name.endsWith(".log");
898     }
899   };
900 
901   private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
902       new Comparator<FileStatus>() {
903     @Override
904     public int compare(FileStatus a, FileStatus b) {
905       final long aId = getLogIdFromName(a.getPath().getName());
906       final long bId = getLogIdFromName(b.getPath().getName());
907       return Long.compare(aId, bId);
908     }
909   };
910 
911   private FileStatus[] getLogFiles() throws IOException {
912     try {
913       FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
914       Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
915       return files;
916     } catch (FileNotFoundException e) {
917       LOG.warn("Log directory not found: " + e.getMessage());
918       return null;
919     }
920   }
921 
922   private static long getMaxLogId(final FileStatus[] logFiles) {
923     long maxLogId = 0;
924     if (logFiles != null && logFiles.length > 0) {
925       for (int i = 0; i < logFiles.length; ++i) {
926         maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
927       }
928     }
929     return maxLogId;
930   }
931 
932   /**
933    * @return Max-LogID of the specified log file set
934    */
935   private long initOldLogs(final FileStatus[] logFiles) throws IOException {
936     this.logs.clear();
937 
938     long maxLogId = 0;
939     if (logFiles != null && logFiles.length > 0) {
940       for (int i = 0; i < logFiles.length; ++i) {
941         final Path logPath = logFiles[i].getPath();
942         leaseRecovery.recoverFileLease(fs, logPath);
943         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
944 
945         ProcedureWALFile log = initOldLog(logFiles[i]);
946         if (log != null) {
947           this.logs.add(log);
948         }
949       }
950       Collections.sort(this.logs);
951       initTrackerFromOldLogs();
952     }
953     return maxLogId;
954   }
955 
956   private void initTrackerFromOldLogs() {
957     // TODO: Load the most recent tracker available
958     if (!logs.isEmpty()) {
959       ProcedureWALFile log = logs.getLast();
960       try {
961         log.readTracker(storeTracker);
962       } catch (IOException e) {
963         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
964         // try the next one...
965         storeTracker.reset();
966         storeTracker.setPartialFlag(true);
967       }
968     }
969   }
970 
971   private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
972     ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
973     if (logFile.getLen() == 0) {
974       LOG.warn("Remove uninitialized log: " + logFile);
975       log.removeFile();
976       return null;
977     }
978     if (LOG.isDebugEnabled()) {
979       LOG.debug("Opening state-log: " + logFile);
980     }
981     try {
982       log.open();
983     } catch (ProcedureWALFormat.InvalidWALDataException e) {
984       LOG.warn("Remove uninitialized log: " + logFile, e);
985       log.removeFile();
986       return null;
987     } catch (IOException e) {
988       String msg = "Unable to read state log: " + logFile;
989       LOG.error(msg, e);
990       throw new IOException(msg, e);
991     }
992 
993     if (log.isCompacted()) {
994       try {
995         log.readTrailer();
996       } catch (IOException e) {
997         LOG.warn("Unfinished compacted log: " + logFile, e);
998         log.removeFile();
999         return null;
1000       }
1001     }
1002     return log;
1003   }
1004 }