1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.LinkedTransferQueue;
29 import java.util.concurrent.TimeUnit;
30 import java.util.Arrays;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.LinkedList;
37 import java.util.Set;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FSDataOutputStream;
43 import org.apache.hadoop.fs.FileAlreadyExistsException;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.PathFilter;
48 import org.apache.hadoop.hbase.classification.InterfaceAudience;
49 import org.apache.hadoop.hbase.classification.InterfaceStability;
50 import org.apache.hadoop.hbase.procedure2.Procedure;
51 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
52 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
53 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
54 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
55 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.ipc.RemoteException;
58
59 import com.google.common.annotations.VisibleForTesting;
60
61
62
63
64 @InterfaceAudience.Private
65 @InterfaceStability.Evolving
66 public class WALProcedureStore extends ProcedureStoreBase {
67 private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
68
69 public interface LeaseRecovery {
70 void recoverFileLease(FileSystem fs, Path path) throws IOException;
71 }
72
73 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
74 "hbase.procedure.store.wal.max.retries.before.roll";
75 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
76
77 public static final String WAIT_BEFORE_ROLL_CONF_KEY =
78 "hbase.procedure.store.wal.wait.before.roll";
79 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
80
81 public static final String ROLL_RETRIES_CONF_KEY =
82 "hbase.procedure.store.wal.max.roll.retries";
83 private static final int DEFAULT_ROLL_RETRIES = 3;
84
85 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
86 "hbase.procedure.store.wal.sync.failure.roll.max";
87 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
88
89 public static final String PERIODIC_ROLL_CONF_KEY =
90 "hbase.procedure.store.wal.periodic.roll.msec";
91 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000;
92
93 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
94 private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
95
96 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
97 private static final boolean DEFAULT_USE_HSYNC = true;
98
99 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
100 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024;
101
102 private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
103 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
104 private final ReentrantLock lock = new ReentrantLock();
105 private final Condition waitCond = lock.newCondition();
106 private final Condition slotCond = lock.newCondition();
107 private final Condition syncCond = lock.newCondition();
108
109 private final LeaseRecovery leaseRecovery;
110 private final Configuration conf;
111 private final FileSystem fs;
112 private final Path logDir;
113
114 private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
115 private final AtomicBoolean loading = new AtomicBoolean(true);
116 private final AtomicBoolean inSync = new AtomicBoolean(false);
117 private final AtomicLong totalSynced = new AtomicLong(0);
118 private final AtomicLong lastRollTs = new AtomicLong(0);
119
120 private LinkedTransferQueue<ByteSlot> slotsCache = null;
121 private Set<ProcedureWALFile> corruptedLogs = null;
122 private FSDataOutputStream stream = null;
123 private long flushLogId = 0;
124 private int slotIndex = 0;
125 private Thread syncThread;
126 private ByteSlot[] slots;
127
128 private int maxRetriesBeforeRoll;
129 private int maxSyncFailureRoll;
130 private int waitBeforeRoll;
131 private int rollRetries;
132 private int periodicRollMsec;
133 private long rollThreshold;
134 private boolean useHsync;
135 private int syncWaitMsec;
136
137 public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
138 final LeaseRecovery leaseRecovery) {
139 this.fs = fs;
140 this.conf = conf;
141 this.logDir = logDir;
142 this.leaseRecovery = leaseRecovery;
143 }
144
145 @Override
146 public void start(int numSlots) throws IOException {
147 if (!setRunning(true)) {
148 return;
149 }
150
151
152 loading.set(true);
153 slots = new ByteSlot[numSlots];
154 slotsCache = new LinkedTransferQueue();
155 while (slotsCache.size() < numSlots) {
156 slotsCache.offer(new ByteSlot());
157 }
158
159
160 maxRetriesBeforeRoll =
161 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
162 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
163 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
164 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
165 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
166 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
167 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
168 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
169
170
171 syncThread = new Thread("WALProcedureStoreSyncThread") {
172 @Override
173 public void run() {
174 try {
175 syncLoop();
176 } catch (Throwable e) {
177 LOG.error("Got an exception from the sync-loop", e);
178 if (!isSyncAborted()) {
179 sendAbortProcessSignal();
180 }
181 }
182 }
183 };
184 syncThread.start();
185 }
186
187 @Override
188 public void stop(boolean abort) {
189 if (!setRunning(false)) {
190 return;
191 }
192
193 LOG.info("Stopping the WAL Procedure Store");
194 sendStopSignal();
195
196 if (!abort) {
197 try {
198 while (syncThread.isAlive()) {
199 sendStopSignal();
200 syncThread.join(250);
201 }
202 } catch (InterruptedException e) {
203 LOG.warn("join interrupted", e);
204 Thread.currentThread().interrupt();
205 }
206 }
207
208
209 closeStream();
210
211
212
213
214 for (ProcedureWALFile log: logs) {
215 log.close();
216 }
217 logs.clear();
218 }
219
220 private void sendStopSignal() {
221 if (lock.tryLock()) {
222 try {
223 waitCond.signalAll();
224 syncCond.signalAll();
225 } finally {
226 lock.unlock();
227 }
228 }
229 }
230
231 @Override
232 public int getNumThreads() {
233 return slots == null ? 0 : slots.length;
234 }
235
236 public ProcedureStoreTracker getStoreTracker() {
237 return storeTracker;
238 }
239
240 public ArrayList<ProcedureWALFile> getActiveLogs() {
241 lock.lock();
242 try {
243 return new ArrayList<ProcedureWALFile>(logs);
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 public Set<ProcedureWALFile> getCorruptedLogs() {
250 return corruptedLogs;
251 }
252
253 @Override
254 public void recoverLease() throws IOException {
255 lock.lock();
256 try {
257 LOG.info("Starting WAL Procedure Store lease recovery");
258 FileStatus[] oldLogs = getLogFiles();
259 while (isRunning()) {
260
261 try {
262 flushLogId = initOldLogs(oldLogs);
263 } catch (FileNotFoundException e) {
264 LOG.warn("someone else is active and deleted logs. retrying.", e);
265 oldLogs = getLogFiles();
266 continue;
267 }
268
269
270 if (!rollWriter(flushLogId + 1)) {
271
272 LOG.debug("someone else has already created log " + flushLogId);
273 continue;
274 }
275
276
277 oldLogs = getLogFiles();
278 if (getMaxLogId(oldLogs) > flushLogId) {
279 if (LOG.isDebugEnabled()) {
280 LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
281 }
282 logs.getLast().removeFile();
283 continue;
284 }
285
286 LOG.info("Lease acquired for flushLogId: " + flushLogId);
287 break;
288 }
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 @Override
295 public void load(final ProcedureLoader loader) throws IOException {
296 if (logs.isEmpty()) {
297 throw new RuntimeException("recoverLease() must be called before loading data");
298 }
299
300
301 if (logs.size() == 1) {
302 if (LOG.isDebugEnabled()) {
303 LOG.debug("No state logs to replay.");
304 }
305 loader.setMaxProcId(0);
306 loading.set(false);
307 return;
308 }
309
310
311 Iterator<ProcedureWALFile> it = logs.descendingIterator();
312 it.next();
313 try {
314 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
315 @Override
316 public void setMaxProcId(long maxProcId) {
317 loader.setMaxProcId(maxProcId);
318 }
319
320 @Override
321 public void load(ProcedureIterator procIter) throws IOException {
322 loader.load(procIter);
323 }
324
325 @Override
326 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
327 loader.handleCorrupted(procIter);
328 }
329
330 @Override
331 public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
332 if (corruptedLogs == null) {
333 corruptedLogs = new HashSet<ProcedureWALFile>();
334 }
335 corruptedLogs.add(log);
336
337 }
338 });
339 } finally {
340 loading.set(false);
341 }
342 }
343
344 @Override
345 public void insert(final Procedure proc, final Procedure[] subprocs) {
346 if (LOG.isTraceEnabled()) {
347 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
348 }
349
350 ByteSlot slot = acquireSlot();
351 try {
352
353 long[] subProcIds = null;
354 if (subprocs != null) {
355 ProcedureWALFormat.writeInsert(slot, proc, subprocs);
356 subProcIds = new long[subprocs.length];
357 for (int i = 0; i < subprocs.length; ++i) {
358 subProcIds[i] = subprocs[i].getProcId();
359 }
360 } else {
361 assert !proc.hasParent();
362 ProcedureWALFormat.writeInsert(slot, proc);
363 }
364
365
366 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
367 } catch (IOException e) {
368
369
370 LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
371 ", subprocs=" + Arrays.toString(subprocs), e);
372 throw new RuntimeException(e);
373 } finally {
374 releaseSlot(slot);
375 }
376 }
377
378 @Override
379 public void update(final Procedure proc) {
380 if (LOG.isTraceEnabled()) {
381 LOG.trace("Update " + proc);
382 }
383
384 ByteSlot slot = acquireSlot();
385 try {
386
387 ProcedureWALFormat.writeUpdate(slot, proc);
388
389
390 pushData(PushType.UPDATE, slot, proc.getProcId(), null);
391 } catch (IOException e) {
392
393
394 LOG.fatal("Unable to serialize the procedure: " + proc, e);
395 throw new RuntimeException(e);
396 } finally {
397 releaseSlot(slot);
398 }
399 }
400
401 @Override
402 public void delete(final long procId) {
403 if (LOG.isTraceEnabled()) {
404 LOG.trace("Delete " + procId);
405 }
406
407 ByteSlot slot = acquireSlot();
408 try {
409
410 ProcedureWALFormat.writeDelete(slot, procId);
411
412
413 pushData(PushType.DELETE, slot, procId, null);
414 } catch (IOException e) {
415
416
417 LOG.fatal("Unable to serialize the procedure: " + procId, e);
418 throw new RuntimeException(e);
419 } finally {
420 releaseSlot(slot);
421 }
422 }
423
424 private ByteSlot acquireSlot() {
425 ByteSlot slot = slotsCache.poll();
426 return slot != null ? slot : new ByteSlot();
427 }
428
429 private void releaseSlot(final ByteSlot slot) {
430 slot.reset();
431 slotsCache.offer(slot);
432 }
433
434 private enum PushType { INSERT, UPDATE, DELETE };
435
436 private long pushData(final PushType type, final ByteSlot slot,
437 final long procId, final long[] subProcIds) {
438 if (!isRunning()) {
439 throw new RuntimeException("the store must be running before inserting data");
440 }
441 if (logs.isEmpty()) {
442 throw new RuntimeException("recoverLease() must be called before inserting data");
443 }
444
445 long logId = -1;
446 lock.lock();
447 try {
448
449 while (true) {
450 if (!isRunning()) {
451 throw new RuntimeException("store no longer running");
452 } else if (isSyncAborted()) {
453 throw new RuntimeException("sync aborted", syncException.get());
454 } else if (inSync.get()) {
455 syncCond.await();
456 } else if (slotIndex == slots.length) {
457 slotCond.signal();
458 syncCond.await();
459 } else {
460 break;
461 }
462 }
463
464 updateStoreTracker(type, procId, subProcIds);
465 slots[slotIndex++] = slot;
466 logId = flushLogId;
467
468
469 if (slotIndex == 1) {
470 waitCond.signal();
471 }
472
473
474 if (slotIndex == slots.length) {
475 waitCond.signal();
476 slotCond.signal();
477 }
478
479 syncCond.await();
480 } catch (InterruptedException e) {
481 Thread.currentThread().interrupt();
482 sendAbortProcessSignal();
483 throw new RuntimeException(e);
484 } finally {
485 lock.unlock();
486 if (isSyncAborted()) {
487 throw new RuntimeException("sync aborted", syncException.get());
488 }
489 }
490 return logId;
491 }
492
493 private void updateStoreTracker(final PushType type,
494 final long procId, final long[] subProcIds) {
495 switch (type) {
496 case INSERT:
497 if (subProcIds == null) {
498 storeTracker.insert(procId);
499 } else {
500 storeTracker.insert(procId, subProcIds);
501 }
502 break;
503 case UPDATE:
504 storeTracker.update(procId);
505 break;
506 case DELETE:
507 storeTracker.delete(procId);
508 break;
509 default:
510 throw new RuntimeException("invalid push type " + type);
511 }
512 }
513
514 private boolean isSyncAborted() {
515 return syncException.get() != null;
516 }
517
518 private void syncLoop() throws Throwable {
519 inSync.set(false);
520 lock.lock();
521 try {
522 while (isRunning()) {
523 try {
524
525 if (slotIndex == 0) {
526 if (!loading.get()) {
527 periodicRoll();
528 }
529
530 if (LOG.isTraceEnabled()) {
531 float rollTsSec = getMillisFromLastRoll() / 1000.0f;
532 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
533 StringUtils.humanSize(totalSynced.get()),
534 StringUtils.humanSize(totalSynced.get() / rollTsSec)));
535 }
536
537 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
538 if (slotIndex == 0) {
539
540 continue;
541 }
542 }
543
544
545 long syncWaitSt = System.currentTimeMillis();
546 if (slotIndex != slots.length) {
547 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
548 }
549 long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
550 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
551 float rollSec = getMillisFromLastRoll() / 1000.0f;
552 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
553 StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
554 StringUtils.humanSize(totalSynced.get()),
555 StringUtils.humanSize(totalSynced.get() / rollSec)));
556 }
557
558 inSync.set(true);
559 totalSynced.addAndGet(syncSlots());
560 slotIndex = 0;
561 inSync.set(false);
562 } catch (InterruptedException e) {
563 Thread.currentThread().interrupt();
564 sendAbortProcessSignal();
565 syncException.compareAndSet(null, e);
566 throw e;
567 } catch (Throwable t) {
568 syncException.compareAndSet(null, t);
569 throw t;
570 } finally {
571 syncCond.signalAll();
572 }
573 }
574 } finally {
575 lock.unlock();
576 }
577 }
578
579 private long syncSlots() throws Throwable {
580 int retry = 0;
581 int logRolled = 0;
582 long totalSynced = 0;
583 do {
584 try {
585 totalSynced = syncSlots(stream, slots, 0, slotIndex);
586 break;
587 } catch (Throwable e) {
588 LOG.warn("unable to sync slots, retry=" + retry);
589 if (++retry >= maxRetriesBeforeRoll) {
590 if (logRolled >= maxSyncFailureRoll) {
591 LOG.error("Sync slots after log roll failed, abort.", e);
592 sendAbortProcessSignal();
593 throw e;
594 }
595
596 if (!rollWriterOrDie()) {
597 throw e;
598 }
599
600 logRolled++;
601 retry = 0;
602 }
603 }
604 } while (isRunning());
605 return totalSynced;
606 }
607
608 protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
609 throws IOException {
610 long totalSynced = 0;
611 for (int i = 0; i < count; ++i) {
612 ByteSlot data = slots[offset + i];
613 data.writeTo(stream);
614 totalSynced += data.size();
615 }
616
617 if (useHsync) {
618 stream.hsync();
619 } else {
620 stream.hflush();
621 }
622 sendPostSyncSignal();
623
624 if (LOG.isTraceEnabled()) {
625 LOG.trace("Sync slots=" + count + '/' + slots.length +
626 ", flushed=" + StringUtils.humanSize(totalSynced));
627 }
628 return totalSynced;
629 }
630
631 private boolean rollWriterOrDie() {
632 for (int i = 0; i < rollRetries; ++i) {
633 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
634
635 try {
636 if (rollWriter()) {
637 return true;
638 }
639 } catch (IOException e) {
640 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
641 }
642 }
643 LOG.fatal("Unable to roll the log");
644 sendAbortProcessSignal();
645 throw new RuntimeException("unable to roll the log");
646 }
647
648 private boolean tryRollWriter() {
649 try {
650 return rollWriter();
651 } catch (IOException e) {
652 LOG.warn("Unable to roll the log", e);
653 return false;
654 }
655 }
656
657 private long getMillisToNextPeriodicRoll() {
658 if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
659 return periodicRollMsec - getMillisFromLastRoll();
660 }
661 return Long.MAX_VALUE;
662 }
663
664 private long getMillisFromLastRoll() {
665 return (System.currentTimeMillis() - lastRollTs.get());
666 }
667
668 @VisibleForTesting
669 protected void periodicRollForTesting() throws IOException {
670 lock.lock();
671 try {
672 periodicRoll();
673 } finally {
674 lock.unlock();
675 }
676 }
677
678 @VisibleForTesting
679 protected boolean rollWriterForTesting() throws IOException {
680 lock.lock();
681 try {
682 return rollWriter();
683 } finally {
684 lock.unlock();
685 }
686 }
687
688 private void periodicRoll() throws IOException {
689 if (storeTracker.isEmpty()) {
690 if (LOG.isTraceEnabled()) {
691 LOG.trace("no active procedures");
692 }
693 tryRollWriter();
694 removeAllLogs(flushLogId - 1);
695 } else {
696 if (storeTracker.isUpdated()) {
697 if (LOG.isTraceEnabled()) {
698 LOG.trace("all the active procedures are in the latest log");
699 }
700 removeAllLogs(flushLogId - 1);
701 }
702
703
704
705 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
706 tryRollWriter();
707 }
708
709 removeInactiveLogs();
710 }
711 }
712
713 private boolean rollWriter() throws IOException {
714
715 if (!rollWriter(flushLogId + 1)) {
716 LOG.warn("someone else has already created log " + flushLogId);
717 return false;
718 }
719
720
721
722 if (getMaxLogId(getLogFiles()) > flushLogId) {
723 LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
724 logs.getLast().removeFile();
725 return false;
726 }
727
728
729 return true;
730 }
731
732 private boolean rollWriter(final long logId) throws IOException {
733 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
734 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
735
736 ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
737 .setVersion(ProcedureWALFormat.HEADER_VERSION)
738 .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
739 .setMinProcId(storeTracker.getMinProcId())
740 .setLogId(logId)
741 .build();
742
743 FSDataOutputStream newStream = null;
744 Path newLogFile = null;
745 long startPos = -1;
746 newLogFile = getLogFilePath(logId);
747 try {
748 newStream = fs.create(newLogFile, false);
749 } catch (FileAlreadyExistsException e) {
750 LOG.error("Log file with id=" + logId + " already exists", e);
751 return false;
752 } catch (RemoteException re) {
753 LOG.warn("failed to create log file with id=" + logId, re);
754 return false;
755 }
756 try {
757 ProcedureWALFormat.writeHeader(newStream, header);
758 startPos = newStream.getPos();
759 } catch (IOException ioe) {
760 LOG.warn("Encountered exception writing header", ioe);
761 newStream.close();
762 return false;
763 }
764
765 closeStream();
766
767 storeTracker.resetUpdates();
768 stream = newStream;
769 flushLogId = logId;
770 totalSynced.set(0);
771 lastRollTs.set(System.currentTimeMillis());
772 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
773
774 if (LOG.isDebugEnabled()) {
775 LOG.debug("Roll new state log: " + logId);
776 }
777 return true;
778 }
779
780 private void closeStream() {
781 try {
782 if (stream != null) {
783 try {
784 ProcedureWALFile log = logs.getLast();
785 log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
786 ProcedureWALFormat.writeTrailer(stream, storeTracker);
787 } catch (IOException e) {
788 LOG.warn("Unable to write the trailer: " + e.getMessage());
789 }
790 stream.close();
791 }
792 } catch (IOException e) {
793 LOG.error("Unable to close the stream", e);
794 } finally {
795 stream = null;
796 }
797 }
798
799
800
801
802 private void removeInactiveLogs() {
803
804 while (logs.size() > 1) {
805 ProcedureWALFile log = logs.getFirst();
806 if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
807 break;
808 }
809 removeLogFile(log);
810 }
811 }
812
813 private void removeAllLogs(long lastLogId) {
814 if (logs.size() <= 1) return;
815
816 if (LOG.isDebugEnabled()) {
817 LOG.debug("Remove all state logs with ID less than " + lastLogId);
818 }
819 while (logs.size() > 1) {
820 ProcedureWALFile log = logs.getFirst();
821 if (lastLogId < log.getLogId()) {
822 break;
823 }
824 removeLogFile(log);
825 }
826 }
827
828 private boolean removeLogFile(final ProcedureWALFile log) {
829 try {
830 if (LOG.isDebugEnabled()) {
831 LOG.debug("Remove log: " + log);
832 }
833 log.removeFile();
834 logs.remove(log);
835 LOG.info("Remove log: " + log);
836 LOG.info("Removed logs: " + logs);
837 if (logs.size() == 0) { LOG.error("Expected at least one log"); }
838 assert logs.size() > 0 : "expected at least one log";
839 } catch (IOException e) {
840 LOG.error("Unable to remove log: " + log, e);
841 return false;
842 }
843 return true;
844 }
845
846
847
848
849 public Path getLogDir() {
850 return this.logDir;
851 }
852
853 public FileSystem getFileSystem() {
854 return this.fs;
855 }
856
857 protected Path getLogFilePath(final long logId) throws IOException {
858 return new Path(logDir, String.format("state-%020d.log", logId));
859 }
860
861 private static long getLogIdFromName(final String name) {
862 int end = name.lastIndexOf(".log");
863 int start = name.lastIndexOf('-') + 1;
864 while (start < end) {
865 if (name.charAt(start) != '0')
866 break;
867 start++;
868 }
869 return Long.parseLong(name.substring(start, end));
870 }
871
872 private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
873 @Override
874 public boolean accept(Path path) {
875 String name = path.getName();
876 return name.startsWith("state-") && name.endsWith(".log");
877 }
878 };
879
880 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
881 new Comparator<FileStatus>() {
882 @Override
883 public int compare(FileStatus a, FileStatus b) {
884 final long aId = getLogIdFromName(a.getPath().getName());
885 final long bId = getLogIdFromName(b.getPath().getName());
886 return Long.compare(aId, bId);
887 }
888 };
889
890 private FileStatus[] getLogFiles() throws IOException {
891 try {
892 FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
893 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
894 return files;
895 } catch (FileNotFoundException e) {
896 LOG.warn("Log directory not found: " + e.getMessage());
897 return null;
898 }
899 }
900
901 private static long getMaxLogId(final FileStatus[] logFiles) {
902 long maxLogId = 0;
903 if (logFiles != null && logFiles.length > 0) {
904 for (int i = 0; i < logFiles.length; ++i) {
905 maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
906 }
907 }
908 return maxLogId;
909 }
910
911
912
913
914 private long initOldLogs(final FileStatus[] logFiles) throws IOException {
915 this.logs.clear();
916
917 long maxLogId = 0;
918 if (logFiles != null && logFiles.length > 0) {
919 for (int i = 0; i < logFiles.length; ++i) {
920 final Path logPath = logFiles[i].getPath();
921 leaseRecovery.recoverFileLease(fs, logPath);
922 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
923
924 ProcedureWALFile log = initOldLog(logFiles[i]);
925 if (log != null) {
926 this.logs.add(log);
927 }
928 }
929 Collections.sort(this.logs);
930 initTrackerFromOldLogs();
931 }
932 return maxLogId;
933 }
934
935 private void initTrackerFromOldLogs() {
936
937 if (!logs.isEmpty()) {
938 ProcedureWALFile log = logs.getLast();
939 try {
940 log.readTracker(storeTracker);
941 } catch (IOException e) {
942 LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
943
944 storeTracker.reset();
945 storeTracker.setPartialFlag(true);
946 }
947 }
948 }
949
950 private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
951 ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
952 if (logFile.getLen() == 0) {
953 LOG.warn("Remove uninitialized log: " + logFile);
954 log.removeFile();
955 return null;
956 }
957 if (LOG.isDebugEnabled()) {
958 LOG.debug("Opening state-log: " + logFile);
959 }
960 try {
961 log.open();
962 } catch (ProcedureWALFormat.InvalidWALDataException e) {
963 LOG.warn("Remove uninitialized log: " + logFile, e);
964 log.removeFile();
965 return null;
966 } catch (IOException e) {
967 String msg = "Unable to read state log: " + logFile;
968 LOG.error(msg, e);
969 throw new IOException(msg, e);
970 }
971
972 if (log.isCompacted()) {
973 try {
974 log.readTrailer();
975 } catch (IOException e) {
976 LOG.warn("Unfinished compacted log: " + logFile, e);
977 log.removeFile();
978 return null;
979 }
980 }
981 return log;
982 }
983 }