View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.io.FileNotFoundException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicLong;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.ReentrantLock;
28  import java.util.concurrent.LinkedTransferQueue;
29  import java.util.concurrent.TimeUnit;
30  import java.util.Arrays;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.LinkedList;
37  import java.util.Set;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FSDataOutputStream;
43  import org.apache.hadoop.fs.FileAlreadyExistsException;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.classification.InterfaceAudience;
49  import org.apache.hadoop.hbase.classification.InterfaceStability;
50  import org.apache.hadoop.hbase.procedure2.Procedure;
51  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
52  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
53  import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
54  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
55  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
56  import org.apache.hadoop.hbase.util.Threads;
57  import org.apache.hadoop.ipc.RemoteException;
58  
59  import com.google.common.annotations.VisibleForTesting;
60  
61  /**
62   * WAL implementation of the ProcedureStore.
63   */
64  @InterfaceAudience.Private
65  @InterfaceStability.Evolving
66  public class WALProcedureStore extends ProcedureStoreBase {
67    private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
68  
69    public interface LeaseRecovery {
70      void recoverFileLease(FileSystem fs, Path path) throws IOException;
71    }
72  
73    public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
74      "hbase.procedure.store.wal.max.retries.before.roll";
75    private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
76  
77    public static final String WAIT_BEFORE_ROLL_CONF_KEY =
78      "hbase.procedure.store.wal.wait.before.roll";
79    private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
80  
81    public static final String ROLL_RETRIES_CONF_KEY =
82      "hbase.procedure.store.wal.max.roll.retries";
83    private static final int DEFAULT_ROLL_RETRIES = 3;
84  
85    public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
86      "hbase.procedure.store.wal.sync.failure.roll.max";
87    private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
88  
89    public static final String PERIODIC_ROLL_CONF_KEY =
90      "hbase.procedure.store.wal.periodic.roll.msec";
91    private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
92  
93    public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
94    private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
95  
96    public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
97    private static final boolean DEFAULT_USE_HSYNC = true;
98  
99    public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
100   private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
101 
102   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
103   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
104   private final ReentrantLock lock = new ReentrantLock();
105   private final Condition waitCond = lock.newCondition();
106   private final Condition slotCond = lock.newCondition();
107   private final Condition syncCond = lock.newCondition();
108 
109   private final LeaseRecovery leaseRecovery;
110   private final Configuration conf;
111   private final FileSystem fs;
112   private final Path logDir;
113 
114   private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
115   private final AtomicBoolean loading = new AtomicBoolean(true);
116   private final AtomicBoolean inSync = new AtomicBoolean(false);
117   private final AtomicLong totalSynced = new AtomicLong(0);
118   private final AtomicLong lastRollTs = new AtomicLong(0);
119 
120   private LinkedTransferQueue<ByteSlot> slotsCache = null;
121   private Set<ProcedureWALFile> corruptedLogs = null;
122   private FSDataOutputStream stream = null;
123   private long flushLogId = 0;
124   private int slotIndex = 0;
125   private Thread syncThread;
126   private ByteSlot[] slots;
127 
128   private int maxRetriesBeforeRoll;
129   private int maxSyncFailureRoll;
130   private int waitBeforeRoll;
131   private int rollRetries;
132   private int periodicRollMsec;
133   private long rollThreshold;
134   private boolean useHsync;
135   private int syncWaitMsec;
136 
137   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
138       final LeaseRecovery leaseRecovery) {
139     this.fs = fs;
140     this.conf = conf;
141     this.logDir = logDir;
142     this.leaseRecovery = leaseRecovery;
143   }
144 
145   @Override
146   public void start(int numSlots) throws IOException {
147     if (!setRunning(true)) {
148       return;
149     }
150 
151     // Init buffer slots
152     loading.set(true);
153     slots = new ByteSlot[numSlots];
154     slotsCache = new LinkedTransferQueue();
155     while (slotsCache.size() < numSlots) {
156       slotsCache.offer(new ByteSlot());
157     }
158 
159     // Tunings
160     maxRetriesBeforeRoll =
161       conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
162     maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
163     waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
164     rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
165     rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
166     periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
167     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
168     useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
169 
170     // Init sync thread
171     syncThread = new Thread("WALProcedureStoreSyncThread") {
172       @Override
173       public void run() {
174         try {
175           syncLoop();
176         } catch (Throwable e) {
177           LOG.error("Got an exception from the sync-loop", e);
178           if (!isSyncAborted()) {
179             sendAbortProcessSignal();
180           }
181         }
182       }
183     };
184     syncThread.start();
185   }
186 
187   @Override
188   public void stop(boolean abort) {
189     if (!setRunning(false)) {
190       return;
191     }
192 
193     LOG.info("Stopping the WAL Procedure Store");
194     sendStopSignal();
195 
196     if (!abort) {
197       try {
198         while (syncThread.isAlive()) {
199           sendStopSignal();
200           syncThread.join(250);
201         }
202       } catch (InterruptedException e) {
203         LOG.warn("join interrupted", e);
204         Thread.currentThread().interrupt();
205       }
206     }
207 
208     // Close the writer
209     closeStream();
210 
211     // Close the old logs
212     // they should be already closed, this is just in case the load fails
213     // and we call start() and then stop()
214     for (ProcedureWALFile log: logs) {
215       log.close();
216     }
217     logs.clear();
218   }
219 
220   private void sendStopSignal() {
221     if (lock.tryLock()) {
222       try {
223         waitCond.signalAll();
224         syncCond.signalAll();
225       } finally {
226         lock.unlock();
227       }
228     }
229   }
230 
231   @Override
232   public int getNumThreads() {
233     return slots == null ? 0 : slots.length;
234   }
235 
236   public ProcedureStoreTracker getStoreTracker() {
237     return storeTracker;
238   }
239 
240   public ArrayList<ProcedureWALFile> getActiveLogs() {
241     lock.lock();
242     try {
243       return new ArrayList<ProcedureWALFile>(logs);
244     } finally {
245       lock.unlock();
246     }
247   }
248 
249   public Set<ProcedureWALFile> getCorruptedLogs() {
250     return corruptedLogs;
251   }
252 
253   @Override
254   public void recoverLease() throws IOException {
255     lock.lock();
256     try {
257       LOG.info("Starting WAL Procedure Store lease recovery");
258       FileStatus[] oldLogs = getLogFiles();
259       while (isRunning()) {
260         // Get Log-MaxID and recover lease on old logs
261         try {
262           flushLogId = initOldLogs(oldLogs);
263         } catch (FileNotFoundException e) {
264           LOG.warn("someone else is active and deleted logs. retrying.", e);
265           oldLogs = getLogFiles();
266           continue;
267         }
268 
269         // Create new state-log
270         if (!rollWriter(flushLogId + 1)) {
271           // someone else has already created this log
272           LOG.debug("someone else has already created log " + flushLogId);
273           continue;
274         }
275 
276         // We have the lease on the log
277         oldLogs = getLogFiles();
278         if (getMaxLogId(oldLogs) > flushLogId) {
279           if (LOG.isDebugEnabled()) {
280             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
281           }
282           logs.getLast().removeFile();
283           continue;
284         }
285 
286         LOG.info("Lease acquired for flushLogId: " + flushLogId);
287         break;
288       }
289     } finally {
290       lock.unlock();
291     }
292   }
293 
294   @Override
295   public void load(final ProcedureLoader loader) throws IOException {
296     if (logs.isEmpty()) {
297       throw new RuntimeException("recoverLease() must be called before loading data");
298     }
299 
300     // Nothing to do, If we have only the current log.
301     if (logs.size() == 1) {
302       if (LOG.isDebugEnabled()) {
303         LOG.debug("No state logs to replay.");
304       }
305       loader.setMaxProcId(0);
306       loading.set(false);
307       return;
308     }
309 
310     // Load the old logs
311     Iterator<ProcedureWALFile> it = logs.descendingIterator();
312     it.next(); // Skip the current log
313     try {
314       ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
315         @Override
316         public void setMaxProcId(long maxProcId) {
317           loader.setMaxProcId(maxProcId);
318         }
319 
320         @Override
321         public void load(ProcedureIterator procIter) throws IOException {
322           loader.load(procIter);
323         }
324 
325         @Override
326         public void handleCorrupted(ProcedureIterator procIter) throws IOException {
327           loader.handleCorrupted(procIter);
328         }
329 
330         @Override
331         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
332           if (corruptedLogs == null) {
333             corruptedLogs = new HashSet<ProcedureWALFile>();
334           }
335           corruptedLogs.add(log);
336           // TODO: sideline corrupted log
337         }
338       });
339     } finally {
340       loading.set(false);
341     }
342   }
343 
344   @Override
345   public void insert(final Procedure proc, final Procedure[] subprocs) {
346     if (LOG.isTraceEnabled()) {
347       LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
348     }
349 
350     ByteSlot slot = acquireSlot();
351     try {
352       // Serialize the insert
353       long[] subProcIds = null;
354       if (subprocs != null) {
355         ProcedureWALFormat.writeInsert(slot, proc, subprocs);
356         subProcIds = new long[subprocs.length];
357         for (int i = 0; i < subprocs.length; ++i) {
358           subProcIds[i] = subprocs[i].getProcId();
359         }
360       } else {
361         assert !proc.hasParent();
362         ProcedureWALFormat.writeInsert(slot, proc);
363       }
364 
365       // Push the transaction data and wait until it is persisted
366       pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
367     } catch (IOException e) {
368       // We are not able to serialize the procedure.
369       // this is a code error, and we are not able to go on.
370       LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
371                 ", subprocs=" + Arrays.toString(subprocs), e);
372       throw new RuntimeException(e);
373     } finally {
374       releaseSlot(slot);
375     }
376   }
377 
378   @Override
379   public void update(final Procedure proc) {
380     if (LOG.isTraceEnabled()) {
381       LOG.trace("Update " + proc);
382     }
383 
384     ByteSlot slot = acquireSlot();
385     try {
386       // Serialize the update
387       ProcedureWALFormat.writeUpdate(slot, proc);
388 
389       // Push the transaction data and wait until it is persisted
390       pushData(PushType.UPDATE, slot, proc.getProcId(), null);
391     } catch (IOException e) {
392       // We are not able to serialize the procedure.
393       // this is a code error, and we are not able to go on.
394       LOG.fatal("Unable to serialize the procedure: " + proc, e);
395       throw new RuntimeException(e);
396     } finally {
397       releaseSlot(slot);
398     }
399   }
400 
401   @Override
402   public void delete(final long procId) {
403     if (LOG.isTraceEnabled()) {
404       LOG.trace("Delete " + procId);
405     }
406 
407     ByteSlot slot = acquireSlot();
408     try {
409       // Serialize the delete
410       ProcedureWALFormat.writeDelete(slot, procId);
411 
412       // Push the transaction data and wait until it is persisted
413       pushData(PushType.DELETE, slot, procId, null);
414     } catch (IOException e) {
415       // We are not able to serialize the procedure.
416       // this is a code error, and we are not able to go on.
417       LOG.fatal("Unable to serialize the procedure: " + procId, e);
418       throw new RuntimeException(e);
419     } finally {
420       releaseSlot(slot);
421     }
422   }
423 
424   private ByteSlot acquireSlot() {
425     ByteSlot slot = slotsCache.poll();
426     return slot != null ? slot : new ByteSlot();
427   }
428 
429   private void releaseSlot(final ByteSlot slot) {
430     slot.reset();
431     slotsCache.offer(slot);
432   }
433 
434   private enum PushType { INSERT, UPDATE, DELETE };
435 
436   private long pushData(final PushType type, final ByteSlot slot,
437       final long procId, final long[] subProcIds) {
438     if (!isRunning()) {
439       throw new RuntimeException("the store must be running before inserting data");
440     }
441     if (logs.isEmpty()) {
442       throw new RuntimeException("recoverLease() must be called before inserting data");
443     }
444 
445     long logId = -1;
446     lock.lock();
447     try {
448       // Wait for the sync to be completed
449       while (true) {
450         if (!isRunning()) {
451           throw new RuntimeException("store no longer running");
452         } else if (isSyncAborted()) {
453           throw new RuntimeException("sync aborted", syncException.get());
454         } else if (inSync.get()) {
455           syncCond.await();
456         } else if (slotIndex == slots.length) {
457           slotCond.signal();
458           syncCond.await();
459         } else {
460           break;
461         }
462       }
463 
464       updateStoreTracker(type, procId, subProcIds);
465       slots[slotIndex++] = slot;
466       logId = flushLogId;
467 
468       // Notify that there is new data
469       if (slotIndex == 1) {
470         waitCond.signal();
471       }
472 
473       // Notify that the slots are full
474       if (slotIndex == slots.length) {
475         waitCond.signal();
476         slotCond.signal();
477       }
478 
479       syncCond.await();
480     } catch (InterruptedException e) {
481       Thread.currentThread().interrupt();
482       sendAbortProcessSignal();
483       throw new RuntimeException(e);
484     } finally {
485       lock.unlock();
486       if (isSyncAborted()) {
487         throw new RuntimeException("sync aborted", syncException.get());
488       }
489     }
490     return logId;
491   }
492 
493   private void updateStoreTracker(final PushType type,
494       final long procId, final long[] subProcIds) {
495     switch (type) {
496       case INSERT:
497         if (subProcIds == null) {
498           storeTracker.insert(procId);
499         } else {
500           storeTracker.insert(procId, subProcIds);
501         }
502         break;
503       case UPDATE:
504         storeTracker.update(procId);
505         break;
506       case DELETE:
507         storeTracker.delete(procId);
508         break;
509       default:
510         throw new RuntimeException("invalid push type " + type);
511     }
512   }
513 
514   private boolean isSyncAborted() {
515     return syncException.get() != null;
516   }
517 
518   private void syncLoop() throws Throwable {
519     inSync.set(false);
520     lock.lock();
521     try {
522       while (isRunning()) {
523         try {
524           // Wait until new data is available
525           if (slotIndex == 0) {
526             if (!loading.get()) {
527               periodicRoll();
528             }
529 
530             if (LOG.isTraceEnabled()) {
531               float rollTsSec = getMillisFromLastRoll() / 1000.0f;
532               LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
533                         StringUtils.humanSize(totalSynced.get()),
534                         StringUtils.humanSize(totalSynced.get() / rollTsSec)));
535             }
536 
537             waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
538             if (slotIndex == 0) {
539               // no data.. probably a stop() or a periodic roll
540               continue;
541             }
542           }
543 
544           // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
545           long syncWaitSt = System.currentTimeMillis();
546           if (slotIndex != slots.length) {
547             slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
548           }
549           long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
550           if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
551             float rollSec = getMillisFromLastRoll() / 1000.0f;
552             LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
553                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
554                       StringUtils.humanSize(totalSynced.get()),
555                       StringUtils.humanSize(totalSynced.get() / rollSec)));
556           }
557 
558           inSync.set(true);
559           totalSynced.addAndGet(syncSlots());
560           slotIndex = 0;
561           inSync.set(false);
562         } catch (InterruptedException e) {
563           Thread.currentThread().interrupt();
564           sendAbortProcessSignal();
565           syncException.compareAndSet(null, e);
566           throw e;
567         } catch (Throwable t) {
568           syncException.compareAndSet(null, t);
569           throw t;
570         } finally {
571           syncCond.signalAll();
572         }
573       }
574     } finally {
575       lock.unlock();
576     }
577   }
578 
579   private long syncSlots() throws Throwable {
580     int retry = 0;
581     int logRolled = 0;
582     long totalSynced = 0;
583     do {
584       try {
585         totalSynced = syncSlots(stream, slots, 0, slotIndex);
586         break;
587       } catch (Throwable e) {
588         LOG.warn("unable to sync slots, retry=" + retry);
589         if (++retry >= maxRetriesBeforeRoll) {
590           if (logRolled >= maxSyncFailureRoll) {
591             LOG.error("Sync slots after log roll failed, abort.", e);
592             sendAbortProcessSignal();
593             throw e;
594           }
595 
596           if (!rollWriterOrDie()) {
597             throw e;
598           }
599 
600           logRolled++;
601           retry = 0;
602         }
603       }
604     } while (isRunning());
605     return totalSynced;
606   }
607 
608   protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
609       throws IOException {
610     long totalSynced = 0;
611     for (int i = 0; i < count; ++i) {
612       ByteSlot data = slots[offset + i];
613       data.writeTo(stream);
614       totalSynced += data.size();
615     }
616 
617     if (useHsync) {
618       stream.hsync();
619     } else {
620       stream.hflush();
621     }
622     sendPostSyncSignal();
623 
624     if (LOG.isTraceEnabled()) {
625       LOG.trace("Sync slots=" + count + '/' + slots.length +
626                 ", flushed=" + StringUtils.humanSize(totalSynced));
627     }
628     return totalSynced;
629   }
630 
631   private boolean rollWriterOrDie() {
632     for (int i = 0; i < rollRetries; ++i) {
633       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
634 
635       try {
636         if (rollWriter()) {
637           return true;
638         }
639       } catch (IOException e) {
640         LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
641       }
642     }
643     LOG.fatal("Unable to roll the log");
644     sendAbortProcessSignal();
645     throw new RuntimeException("unable to roll the log");
646   }
647 
648   private boolean tryRollWriter() {
649     try {
650       return rollWriter();
651     } catch (IOException e) {
652       LOG.warn("Unable to roll the log", e);
653       return false;
654     }
655   }
656 
657   private long getMillisToNextPeriodicRoll() {
658     if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
659       return periodicRollMsec - getMillisFromLastRoll();
660     }
661     return Long.MAX_VALUE;
662   }
663 
664   private long getMillisFromLastRoll() {
665     return (System.currentTimeMillis() - lastRollTs.get());
666   }
667 
668   @VisibleForTesting
669   protected void periodicRollForTesting() throws IOException {
670     lock.lock();
671     try {
672       periodicRoll();
673     } finally {
674       lock.unlock();
675     }
676   }
677 
678   @VisibleForTesting
679   protected boolean rollWriterForTesting() throws IOException {
680     lock.lock();
681     try {
682       return rollWriter();
683     } finally {
684       lock.unlock();
685     }
686   }
687 
688   private void periodicRoll() throws IOException {
689     if (storeTracker.isEmpty()) {
690       if (LOG.isTraceEnabled()) {
691         LOG.trace("no active procedures");
692       }
693       tryRollWriter();
694       removeAllLogs(flushLogId - 1);
695     } else {
696       if (storeTracker.isUpdated()) {
697         if (LOG.isTraceEnabled()) {
698           LOG.trace("all the active procedures are in the latest log");
699         }
700         removeAllLogs(flushLogId - 1);
701       }
702 
703       // if the log size has exceeded the roll threshold
704       // or the periodic roll timeout is expired, try to roll the wal.
705       if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
706         tryRollWriter();
707       }
708 
709       removeInactiveLogs();
710     }
711   }
712 
713   private boolean rollWriter() throws IOException {
714     // Create new state-log
715     if (!rollWriter(flushLogId + 1)) {
716       LOG.warn("someone else has already created log " + flushLogId);
717       return false;
718     }
719 
720     // We have the lease on the log,
721     // but we should check if someone else has created new files
722     if (getMaxLogId(getLogFiles()) > flushLogId) {
723       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
724       logs.getLast().removeFile();
725       return false;
726     }
727 
728     // We have the lease on the log
729     return true;
730   }
731 
732   private boolean rollWriter(final long logId) throws IOException {
733     assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
734     assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
735 
736     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
737       .setVersion(ProcedureWALFormat.HEADER_VERSION)
738       .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
739       .setMinProcId(storeTracker.getMinProcId())
740       .setLogId(logId)
741       .build();
742 
743     FSDataOutputStream newStream = null;
744     Path newLogFile = null;
745     long startPos = -1;
746     newLogFile = getLogFilePath(logId);
747     try {
748       newStream = fs.create(newLogFile, false);
749     } catch (FileAlreadyExistsException e) {
750       LOG.error("Log file with id=" + logId + " already exists", e);
751       return false;
752     } catch (RemoteException re) {
753       LOG.warn("failed to create log file with id=" + logId, re);
754       return false;
755     }
756     try {
757       ProcedureWALFormat.writeHeader(newStream, header);
758       startPos = newStream.getPos();
759     } catch (IOException ioe) {
760       LOG.warn("Encountered exception writing header", ioe);
761       newStream.close();
762       return false;
763     }
764 
765     closeStream();
766 
767     storeTracker.resetUpdates();
768     stream = newStream;
769     flushLogId = logId;
770     totalSynced.set(0);
771     lastRollTs.set(System.currentTimeMillis());
772     logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
773 
774     if (LOG.isDebugEnabled()) {
775       LOG.debug("Roll new state log: " + logId);
776     }
777     return true;
778   }
779 
780   private void closeStream() {
781     try {
782       if (stream != null) {
783         try {
784           ProcedureWALFile log = logs.getLast();
785           log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
786           ProcedureWALFormat.writeTrailer(stream, storeTracker);
787         } catch (IOException e) {
788           LOG.warn("Unable to write the trailer: " + e.getMessage());
789         }
790         stream.close();
791       }
792     } catch (IOException e) {
793       LOG.error("Unable to close the stream", e);
794     } finally {
795       stream = null;
796     }
797   }
798 
799   // ==========================================================================
800   //  Log Files cleaner helpers
801   // ==========================================================================
802   private void removeInactiveLogs() {
803     // Verify if the ProcId of the first oldest is still active. if not remove the file.
804     while (logs.size() > 1) {
805       ProcedureWALFile log = logs.getFirst();
806       if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
807         break;
808       }
809       removeLogFile(log);
810     }
811   }
812 
813   private void removeAllLogs(long lastLogId) {
814     if (logs.size() <= 1) return;
815 
816     if (LOG.isDebugEnabled()) {
817       LOG.debug("Remove all state logs with ID less than " + lastLogId);
818     }
819     while (logs.size() > 1) {
820       ProcedureWALFile log = logs.getFirst();
821       if (lastLogId < log.getLogId()) {
822         break;
823       }
824       removeLogFile(log);
825     }
826   }
827 
828   private boolean removeLogFile(final ProcedureWALFile log) {
829     try {
830       if (LOG.isDebugEnabled()) {
831         LOG.debug("Remove log: " + log);
832       }
833       log.removeFile();
834       logs.remove(log);
835       LOG.info("Remove log: " + log);
836       LOG.info("Removed logs: " + logs);
837       if (logs.size() == 0) { LOG.error("Expected at least one log"); }
838       assert logs.size() > 0 : "expected at least one log";
839     } catch (IOException e) {
840       LOG.error("Unable to remove log: " + log, e);
841       return false;
842     }
843     return true;
844   }
845 
846   // ==========================================================================
847   //  FileSystem Log Files helpers
848   // ==========================================================================
849   public Path getLogDir() {
850     return this.logDir;
851   }
852 
853   public FileSystem getFileSystem() {
854     return this.fs;
855   }
856 
857   protected Path getLogFilePath(final long logId) throws IOException {
858     return new Path(logDir, String.format("state-%020d.log", logId));
859   }
860 
861   private static long getLogIdFromName(final String name) {
862     int end = name.lastIndexOf(".log");
863     int start = name.lastIndexOf('-') + 1;
864     while (start < end) {
865       if (name.charAt(start) != '0')
866         break;
867       start++;
868     }
869     return Long.parseLong(name.substring(start, end));
870   }
871 
872   private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
873     @Override
874     public boolean accept(Path path) {
875       String name = path.getName();
876       return name.startsWith("state-") && name.endsWith(".log");
877     }
878   };
879 
880   private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
881       new Comparator<FileStatus>() {
882     @Override
883     public int compare(FileStatus a, FileStatus b) {
884       final long aId = getLogIdFromName(a.getPath().getName());
885       final long bId = getLogIdFromName(b.getPath().getName());
886       return Long.compare(aId, bId);
887     }
888   };
889 
890   private FileStatus[] getLogFiles() throws IOException {
891     try {
892       FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
893       Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
894       return files;
895     } catch (FileNotFoundException e) {
896       LOG.warn("Log directory not found: " + e.getMessage());
897       return null;
898     }
899   }
900 
901   private static long getMaxLogId(final FileStatus[] logFiles) {
902     long maxLogId = 0;
903     if (logFiles != null && logFiles.length > 0) {
904       for (int i = 0; i < logFiles.length; ++i) {
905         maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
906       }
907     }
908     return maxLogId;
909   }
910 
911   /**
912    * @return Max-LogID of the specified log file set
913    */
914   private long initOldLogs(final FileStatus[] logFiles) throws IOException {
915     this.logs.clear();
916 
917     long maxLogId = 0;
918     if (logFiles != null && logFiles.length > 0) {
919       for (int i = 0; i < logFiles.length; ++i) {
920         final Path logPath = logFiles[i].getPath();
921         leaseRecovery.recoverFileLease(fs, logPath);
922         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
923 
924         ProcedureWALFile log = initOldLog(logFiles[i]);
925         if (log != null) {
926           this.logs.add(log);
927         }
928       }
929       Collections.sort(this.logs);
930       initTrackerFromOldLogs();
931     }
932     return maxLogId;
933   }
934 
935   private void initTrackerFromOldLogs() {
936     // TODO: Load the most recent tracker available
937     if (!logs.isEmpty()) {
938       ProcedureWALFile log = logs.getLast();
939       try {
940         log.readTracker(storeTracker);
941       } catch (IOException e) {
942         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
943         // try the next one...
944         storeTracker.reset();
945         storeTracker.setPartialFlag(true);
946       }
947     }
948   }
949 
950   private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
951     ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
952     if (logFile.getLen() == 0) {
953       LOG.warn("Remove uninitialized log: " + logFile);
954       log.removeFile();
955       return null;
956     }
957     if (LOG.isDebugEnabled()) {
958       LOG.debug("Opening state-log: " + logFile);
959     }
960     try {
961       log.open();
962     } catch (ProcedureWALFormat.InvalidWALDataException e) {
963       LOG.warn("Remove uninitialized log: " + logFile, e);
964       log.removeFile();
965       return null;
966     } catch (IOException e) {
967       String msg = "Unable to read state log: " + logFile;
968       LOG.error(msg, e);
969       throw new IOException(msg, e);
970     }
971 
972     if (log.isCompacted()) {
973       try {
974         log.readTrailer();
975       } catch (IOException e) {
976         LOG.warn("Unfinished compacted log: " + logFile, e);
977         log.removeFile();
978         return null;
979       }
980     }
981     return log;
982   }
983 }