1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.LinkedTransferQueue;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.TimeUnit;
31 import java.util.Arrays;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.LinkedList;
38 import java.util.Set;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileAlreadyExistsException;
45 import org.apache.hadoop.fs.FileStatus;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.fs.PathFilter;
49 import org.apache.hadoop.hbase.classification.InterfaceAudience;
50 import org.apache.hadoop.hbase.classification.InterfaceStability;
51 import org.apache.hadoop.hbase.procedure2.Procedure;
52 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
53 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
54 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
55 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
56 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
57 import org.apache.hadoop.hbase.util.Threads;
58 import org.apache.hadoop.ipc.RemoteException;
59
60 import com.google.common.annotations.VisibleForTesting;
61
62
63
64
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 public class WALProcedureStore implements ProcedureStore {
68 private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
69
70 public interface LeaseRecovery {
71 void recoverFileLease(FileSystem fs, Path path) throws IOException;
72 }
73
74 private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
75 "hbase.procedure.store.wal.max.retries.before.roll";
76 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
77
78 private static final String WAIT_BEFORE_ROLL_CONF_KEY =
79 "hbase.procedure.store.wal.wait.before.roll";
80 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
81
82 private static final String ROLL_RETRIES_CONF_KEY =
83 "hbase.procedure.store.wal.max.roll.retries";
84 private static final int DEFAULT_ROLL_RETRIES = 3;
85
86 private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
87 "hbase.procedure.store.wal.sync.failure.roll.max";
88 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
89
90 private static final String PERIODIC_ROLL_CONF_KEY =
91 "hbase.procedure.store.wal.periodic.roll.msec";
92 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000;
93
94 private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
95 private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
96
97 private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
98 private static final boolean DEFAULT_USE_HSYNC = true;
99
100 private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
101 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024;
102
103 private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
104 new CopyOnWriteArrayList<ProcedureStoreListener>();
105
106 private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
107 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
108 private final AtomicBoolean running = new AtomicBoolean(false);
109
110 private final ReentrantLock lock = new ReentrantLock();
111 private final Condition waitCond = lock.newCondition();
112 private final Condition slotCond = lock.newCondition();
113 private final Condition syncCond = lock.newCondition();
114
115 private final LeaseRecovery leaseRecovery;
116 private final Configuration conf;
117 private final FileSystem fs;
118 private final Path logDir;
119
120 private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
121 private final AtomicBoolean loading = new AtomicBoolean(true);
122 private final AtomicBoolean inSync = new AtomicBoolean(false);
123 private final AtomicLong totalSynced = new AtomicLong(0);
124 private final AtomicLong lastRollTs = new AtomicLong(0);
125
126 private LinkedTransferQueue<ByteSlot> slotsCache = null;
127 private Set<ProcedureWALFile> corruptedLogs = null;
128 private FSDataOutputStream stream = null;
129 private long flushLogId = 0;
130 private int slotIndex = 0;
131 private Thread syncThread;
132 private ByteSlot[] slots;
133
134 private int maxRetriesBeforeRoll;
135 private int maxSyncFailureRoll;
136 private int waitBeforeRoll;
137 private int rollRetries;
138 private int periodicRollMsec;
139 private long rollThreshold;
140 private boolean useHsync;
141 private int syncWaitMsec;
142
143 public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
144 final LeaseRecovery leaseRecovery) {
145 this.fs = fs;
146 this.conf = conf;
147 this.logDir = logDir;
148 this.leaseRecovery = leaseRecovery;
149 }
150
151 @Override
152 public void start(int numSlots) throws IOException {
153 if (running.getAndSet(true)) {
154 return;
155 }
156
157
158 loading.set(true);
159 slots = new ByteSlot[numSlots];
160 slotsCache = new LinkedTransferQueue();
161 while (slotsCache.size() < numSlots) {
162 slotsCache.offer(new ByteSlot());
163 }
164
165
166 maxRetriesBeforeRoll =
167 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
168 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
169 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
170 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
171 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
172 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
173 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
174 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
175
176
177 syncThread = new Thread("WALProcedureStoreSyncThread") {
178 @Override
179 public void run() {
180 try {
181 syncLoop();
182 } catch (Throwable e) {
183 LOG.error("Got an exception from the sync-loop", e);
184 if (!isSyncAborted()) {
185 sendAbortProcessSignal();
186 }
187 }
188 }
189 };
190 syncThread.start();
191 }
192
193 @Override
194 public void stop(boolean abort) {
195 if (!running.getAndSet(false)) {
196 return;
197 }
198
199 LOG.info("Stopping the WAL Procedure Store");
200 sendStopSignal();
201
202 if (!abort) {
203 try {
204 while (syncThread.isAlive()) {
205 sendStopSignal();
206 syncThread.join(250);
207 }
208 } catch (InterruptedException e) {
209 LOG.warn("join interrupted", e);
210 Thread.currentThread().interrupt();
211 }
212 }
213
214
215 closeStream();
216
217
218
219
220 for (ProcedureWALFile log: logs) {
221 log.close();
222 }
223 logs.clear();
224 }
225
226 private void sendStopSignal() {
227 if (lock.tryLock()) {
228 try {
229 waitCond.signalAll();
230 syncCond.signalAll();
231 } finally {
232 lock.unlock();
233 }
234 }
235 }
236
237 @Override
238 public boolean isRunning() {
239 return running.get();
240 }
241
242 @Override
243 public int getNumThreads() {
244 return slots == null ? 0 : slots.length;
245 }
246
247 public ProcedureStoreTracker getStoreTracker() {
248 return storeTracker;
249 }
250
251 public ArrayList<ProcedureWALFile> getActiveLogs() {
252 lock.lock();
253 try {
254 return new ArrayList<ProcedureWALFile>(logs);
255 } finally {
256 lock.unlock();
257 }
258 }
259
260 public Set<ProcedureWALFile> getCorruptedLogs() {
261 return corruptedLogs;
262 }
263
264 @Override
265 public void registerListener(ProcedureStoreListener listener) {
266 this.listeners.add(listener);
267 }
268
269 @Override
270 public boolean unregisterListener(ProcedureStoreListener listener) {
271 return this.listeners.remove(listener);
272 }
273
274 @Override
275 public void recoverLease() throws IOException {
276 lock.lock();
277 try {
278 LOG.info("Starting WAL Procedure Store lease recovery");
279 FileStatus[] oldLogs = getLogFiles();
280 while (isRunning()) {
281
282 try {
283 flushLogId = initOldLogs(oldLogs);
284 } catch (FileNotFoundException e) {
285 LOG.warn("someone else is active and deleted logs. retrying.", e);
286 oldLogs = getLogFiles();
287 continue;
288 }
289
290
291 if (!rollWriter(flushLogId + 1)) {
292
293 LOG.debug("someone else has already created log " + flushLogId);
294 continue;
295 }
296
297
298 oldLogs = getLogFiles();
299 if (getMaxLogId(oldLogs) > flushLogId) {
300 if (LOG.isDebugEnabled()) {
301 LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
302 }
303 logs.getLast().removeFile();
304 continue;
305 }
306
307 LOG.info("Lease acquired for flushLogId: " + flushLogId);
308 break;
309 }
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 @Override
316 public Iterator<Procedure> load() throws IOException {
317 if (logs.isEmpty()) {
318 throw new RuntimeException("recoverLease() must be called before loading data");
319 }
320
321
322 if (logs.size() == 1) {
323 if (LOG.isDebugEnabled()) {
324 LOG.debug("No state logs to replay.");
325 }
326 loading.set(false);
327 return null;
328 }
329
330
331 Iterator<ProcedureWALFile> it = logs.descendingIterator();
332 it.next();
333 try {
334 return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
335 @Override
336 public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
337 if (corruptedLogs == null) {
338 corruptedLogs = new HashSet<ProcedureWALFile>();
339 }
340 corruptedLogs.add(log);
341
342 }
343 });
344 } finally {
345 loading.set(false);
346 }
347 }
348
349 @Override
350 public void insert(final Procedure proc, final Procedure[] subprocs) {
351 if (LOG.isTraceEnabled()) {
352 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
353 }
354
355 ByteSlot slot = acquireSlot();
356 try {
357
358 long[] subProcIds = null;
359 if (subprocs != null) {
360 ProcedureWALFormat.writeInsert(slot, proc, subprocs);
361 subProcIds = new long[subprocs.length];
362 for (int i = 0; i < subprocs.length; ++i) {
363 subProcIds[i] = subprocs[i].getProcId();
364 }
365 } else {
366 assert !proc.hasParent();
367 ProcedureWALFormat.writeInsert(slot, proc);
368 }
369
370
371 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
372 } catch (IOException e) {
373
374
375 LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
376 ", subprocs=" + Arrays.toString(subprocs), e);
377 throw new RuntimeException(e);
378 } finally {
379 releaseSlot(slot);
380 }
381 }
382
383 @Override
384 public void update(final Procedure proc) {
385 if (LOG.isTraceEnabled()) {
386 LOG.trace("Update " + proc);
387 }
388
389 ByteSlot slot = acquireSlot();
390 try {
391
392 ProcedureWALFormat.writeUpdate(slot, proc);
393
394
395 pushData(PushType.UPDATE, slot, proc.getProcId(), null);
396 } catch (IOException e) {
397
398
399 LOG.fatal("Unable to serialize the procedure: " + proc, e);
400 throw new RuntimeException(e);
401 } finally {
402 releaseSlot(slot);
403 }
404 }
405
406 @Override
407 public void delete(final long procId) {
408 if (LOG.isTraceEnabled()) {
409 LOG.trace("Delete " + procId);
410 }
411
412 ByteSlot slot = acquireSlot();
413 try {
414
415 ProcedureWALFormat.writeDelete(slot, procId);
416
417
418 pushData(PushType.DELETE, slot, procId, null);
419 } catch (IOException e) {
420
421
422 LOG.fatal("Unable to serialize the procedure: " + procId, e);
423 throw new RuntimeException(e);
424 } finally {
425 releaseSlot(slot);
426 }
427 }
428
429 private ByteSlot acquireSlot() {
430 ByteSlot slot = slotsCache.poll();
431 return slot != null ? slot : new ByteSlot();
432 }
433
434 private void releaseSlot(final ByteSlot slot) {
435 slot.reset();
436 slotsCache.offer(slot);
437 }
438
439 private enum PushType { INSERT, UPDATE, DELETE };
440
441 private long pushData(final PushType type, final ByteSlot slot,
442 final long procId, final long[] subProcIds) {
443 if (!isRunning()) {
444 throw new RuntimeException("the store must be running before inserting data");
445 }
446 if (logs.isEmpty()) {
447 throw new RuntimeException("recoverLease() must be called before inserting data");
448 }
449
450 long logId = -1;
451 lock.lock();
452 try {
453
454 while (true) {
455 if (!isRunning()) {
456 throw new RuntimeException("store no longer running");
457 } else if (isSyncAborted()) {
458 throw new RuntimeException("sync aborted", syncException.get());
459 } else if (inSync.get()) {
460 syncCond.await();
461 } else if (slotIndex == slots.length) {
462 slotCond.signal();
463 syncCond.await();
464 } else {
465 break;
466 }
467 }
468
469 updateStoreTracker(type, procId, subProcIds);
470 slots[slotIndex++] = slot;
471 logId = flushLogId;
472
473
474 if (slotIndex == 1) {
475 waitCond.signal();
476 }
477
478
479 if (slotIndex == slots.length) {
480 waitCond.signal();
481 slotCond.signal();
482 }
483
484 syncCond.await();
485 } catch (InterruptedException e) {
486 Thread.currentThread().interrupt();
487 sendAbortProcessSignal();
488 throw new RuntimeException(e);
489 } finally {
490 lock.unlock();
491 if (isSyncAborted()) {
492 throw new RuntimeException("sync aborted", syncException.get());
493 }
494 }
495 return logId;
496 }
497
498 private void updateStoreTracker(final PushType type,
499 final long procId, final long[] subProcIds) {
500 switch (type) {
501 case INSERT:
502 if (subProcIds == null) {
503 storeTracker.insert(procId);
504 } else {
505 storeTracker.insert(procId, subProcIds);
506 }
507 break;
508 case UPDATE:
509 storeTracker.update(procId);
510 break;
511 case DELETE:
512 storeTracker.delete(procId);
513 break;
514 default:
515 throw new RuntimeException("invalid push type " + type);
516 }
517 }
518
519 private boolean isSyncAborted() {
520 return syncException.get() != null;
521 }
522
523 private void syncLoop() throws Throwable {
524 inSync.set(false);
525 lock.lock();
526 try {
527 while (isRunning()) {
528 try {
529
530 if (slotIndex == 0) {
531 if (!loading.get()) {
532 periodicRoll();
533 }
534
535 if (LOG.isTraceEnabled()) {
536 float rollTsSec = getMillisFromLastRoll() / 1000.0f;
537 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
538 StringUtils.humanSize(totalSynced.get()),
539 StringUtils.humanSize(totalSynced.get() / rollTsSec)));
540 }
541
542 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
543 if (slotIndex == 0) {
544
545 continue;
546 }
547 }
548
549
550 long syncWaitSt = System.currentTimeMillis();
551 if (slotIndex != slots.length) {
552 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
553 }
554 long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
555 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
556 float rollSec = getMillisFromLastRoll() / 1000.0f;
557 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
558 StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
559 StringUtils.humanSize(totalSynced.get()),
560 StringUtils.humanSize(totalSynced.get() / rollSec)));
561 }
562
563 inSync.set(true);
564 totalSynced.addAndGet(syncSlots());
565 slotIndex = 0;
566 inSync.set(false);
567 } catch (InterruptedException e) {
568 Thread.currentThread().interrupt();
569 sendAbortProcessSignal();
570 syncException.compareAndSet(null, e);
571 throw e;
572 } catch (Throwable t) {
573 syncException.compareAndSet(null, t);
574 throw t;
575 } finally {
576 syncCond.signalAll();
577 }
578 }
579 } finally {
580 lock.unlock();
581 }
582 }
583
584 private long syncSlots() throws Throwable {
585 int retry = 0;
586 int logRolled = 0;
587 long totalSynced = 0;
588 do {
589 try {
590 totalSynced = syncSlots(stream, slots, 0, slotIndex);
591 break;
592 } catch (Throwable e) {
593 LOG.warn("unable to sync slots, retry=" + retry);
594 if (++retry >= maxRetriesBeforeRoll) {
595 if (logRolled >= maxSyncFailureRoll) {
596 LOG.error("Sync slots after log roll failed, abort.", e);
597 sendAbortProcessSignal();
598 throw e;
599 }
600
601 if (!rollWriterOrDie()) {
602 throw e;
603 }
604
605 logRolled++;
606 retry = 0;
607 }
608 }
609 } while (running.get());
610 return totalSynced;
611 }
612
613 protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
614 throws IOException {
615 long totalSynced = 0;
616 for (int i = 0; i < count; ++i) {
617 ByteSlot data = slots[offset + i];
618 data.writeTo(stream);
619 totalSynced += data.size();
620 }
621
622 if (useHsync) {
623 stream.hsync();
624 } else {
625 stream.hflush();
626 }
627 sendPostSyncSignal();
628
629 if (LOG.isTraceEnabled()) {
630 LOG.trace("Sync slots=" + count + '/' + slots.length +
631 ", flushed=" + StringUtils.humanSize(totalSynced));
632 }
633 return totalSynced;
634 }
635
636 protected void sendPostSyncSignal() {
637 if (!this.listeners.isEmpty()) {
638 for (ProcedureStoreListener listener : this.listeners) {
639 listener.postSync();
640 }
641 }
642 }
643
644 private void sendAbortProcessSignal() {
645 if (!this.listeners.isEmpty()) {
646 for (ProcedureStoreListener listener : this.listeners) {
647 listener.abortProcess();
648 }
649 }
650 }
651
652 private boolean rollWriterOrDie() {
653 for (int i = 0; i < rollRetries; ++i) {
654 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
655
656 try {
657 if (rollWriter()) {
658 return true;
659 }
660 } catch (IOException e) {
661 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
662 }
663 }
664 LOG.fatal("Unable to roll the log");
665 sendAbortProcessSignal();
666 throw new RuntimeException("unable to roll the log");
667 }
668
669 private boolean tryRollWriter() {
670 try {
671 return rollWriter();
672 } catch (IOException e) {
673 LOG.warn("Unable to roll the log", e);
674 return false;
675 }
676 }
677
678 private long getMillisToNextPeriodicRoll() {
679 if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
680 return periodicRollMsec - getMillisFromLastRoll();
681 }
682 return Long.MAX_VALUE;
683 }
684
685 private long getMillisFromLastRoll() {
686 return (System.currentTimeMillis() - lastRollTs.get());
687 }
688
689 @VisibleForTesting
690 protected void periodicRollForTesting() throws IOException {
691 lock.lock();
692 try {
693 periodicRoll();
694 } finally {
695 lock.unlock();
696 }
697 }
698
699 @VisibleForTesting
700 protected boolean rollWriterForTesting() throws IOException {
701 lock.lock();
702 try {
703 return rollWriter();
704 } finally {
705 lock.unlock();
706 }
707 }
708
709 private void periodicRoll() throws IOException {
710 if (storeTracker.isEmpty()) {
711 if (LOG.isTraceEnabled()) {
712 LOG.trace("no active procedures");
713 }
714 tryRollWriter();
715 removeAllLogs(flushLogId - 1);
716 } else {
717 if (storeTracker.isUpdated()) {
718 if (LOG.isTraceEnabled()) {
719 LOG.trace("all the active procedures are in the latest log");
720 }
721 removeAllLogs(flushLogId - 1);
722 }
723
724
725
726 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
727 tryRollWriter();
728 }
729
730 removeInactiveLogs();
731 }
732 }
733
734 private boolean rollWriter() throws IOException {
735
736 if (!rollWriter(flushLogId + 1)) {
737 LOG.warn("someone else has already created log " + flushLogId);
738 return false;
739 }
740
741
742
743 if (getMaxLogId(getLogFiles()) > flushLogId) {
744 LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
745 logs.getLast().removeFile();
746 return false;
747 }
748
749
750 return true;
751 }
752
753 private boolean rollWriter(final long logId) throws IOException {
754 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
755 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
756
757 ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
758 .setVersion(ProcedureWALFormat.HEADER_VERSION)
759 .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
760 .setMinProcId(storeTracker.getMinProcId())
761 .setLogId(logId)
762 .build();
763
764 FSDataOutputStream newStream = null;
765 Path newLogFile = null;
766 long startPos = -1;
767 newLogFile = getLogFilePath(logId);
768 try {
769 newStream = fs.create(newLogFile, false);
770 } catch (FileAlreadyExistsException e) {
771 LOG.error("Log file with id=" + logId + " already exists", e);
772 return false;
773 } catch (RemoteException re) {
774 LOG.warn("failed to create log file with id=" + logId, re);
775 return false;
776 }
777 try {
778 ProcedureWALFormat.writeHeader(newStream, header);
779 startPos = newStream.getPos();
780 } catch (IOException ioe) {
781 LOG.warn("Encountered exception writing header", ioe);
782 newStream.close();
783 return false;
784 }
785
786 closeStream();
787
788 storeTracker.resetUpdates();
789 stream = newStream;
790 flushLogId = logId;
791 totalSynced.set(0);
792 lastRollTs.set(System.currentTimeMillis());
793 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
794
795 if (LOG.isDebugEnabled()) {
796 LOG.debug("Roll new state log: " + logId);
797 }
798 return true;
799 }
800
801 private void closeStream() {
802 try {
803 if (stream != null) {
804 try {
805 ProcedureWALFile log = logs.getLast();
806 log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
807 ProcedureWALFormat.writeTrailer(stream, storeTracker);
808 } catch (IOException e) {
809 LOG.warn("Unable to write the trailer: " + e.getMessage());
810 }
811 stream.close();
812 }
813 } catch (IOException e) {
814 LOG.error("Unable to close the stream", e);
815 } finally {
816 stream = null;
817 }
818 }
819
820
821
822
823 private void removeInactiveLogs() {
824
825 while (logs.size() > 1) {
826 ProcedureWALFile log = logs.getFirst();
827 if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
828 break;
829 }
830 removeLogFile(log);
831 }
832 }
833
834 private void removeAllLogs(long lastLogId) {
835 if (logs.size() <= 1) return;
836
837 if (LOG.isDebugEnabled()) {
838 LOG.debug("Remove all state logs with ID less than " + lastLogId);
839 }
840 while (logs.size() > 1) {
841 ProcedureWALFile log = logs.getFirst();
842 if (lastLogId < log.getLogId()) {
843 break;
844 }
845 removeLogFile(log);
846 }
847 }
848
849 private boolean removeLogFile(final ProcedureWALFile log) {
850 try {
851 if (LOG.isDebugEnabled()) {
852 LOG.debug("Remove log: " + log);
853 }
854 log.removeFile();
855 logs.remove(log);
856 LOG.info("Remove log: " + log);
857 LOG.info("Removed logs: " + logs);
858 if (logs.size() == 0) { LOG.error("Expected at least one log"); }
859 assert logs.size() > 0 : "expected at least one log";
860 } catch (IOException e) {
861 LOG.error("Unable to remove log: " + log, e);
862 return false;
863 }
864 return true;
865 }
866
867
868
869
870 public Path getLogDir() {
871 return this.logDir;
872 }
873
874 public FileSystem getFileSystem() {
875 return this.fs;
876 }
877
878 protected Path getLogFilePath(final long logId) throws IOException {
879 return new Path(logDir, String.format("state-%020d.log", logId));
880 }
881
882 private static long getLogIdFromName(final String name) {
883 int end = name.lastIndexOf(".log");
884 int start = name.lastIndexOf('-') + 1;
885 while (start < end) {
886 if (name.charAt(start) != '0')
887 break;
888 start++;
889 }
890 return Long.parseLong(name.substring(start, end));
891 }
892
893 private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
894 @Override
895 public boolean accept(Path path) {
896 String name = path.getName();
897 return name.startsWith("state-") && name.endsWith(".log");
898 }
899 };
900
901 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
902 new Comparator<FileStatus>() {
903 @Override
904 public int compare(FileStatus a, FileStatus b) {
905 final long aId = getLogIdFromName(a.getPath().getName());
906 final long bId = getLogIdFromName(b.getPath().getName());
907 return Long.compare(aId, bId);
908 }
909 };
910
911 private FileStatus[] getLogFiles() throws IOException {
912 try {
913 FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
914 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
915 return files;
916 } catch (FileNotFoundException e) {
917 LOG.warn("Log directory not found: " + e.getMessage());
918 return null;
919 }
920 }
921
922 private static long getMaxLogId(final FileStatus[] logFiles) {
923 long maxLogId = 0;
924 if (logFiles != null && logFiles.length > 0) {
925 for (int i = 0; i < logFiles.length; ++i) {
926 maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
927 }
928 }
929 return maxLogId;
930 }
931
932
933
934
935 private long initOldLogs(final FileStatus[] logFiles) throws IOException {
936 this.logs.clear();
937
938 long maxLogId = 0;
939 if (logFiles != null && logFiles.length > 0) {
940 for (int i = 0; i < logFiles.length; ++i) {
941 final Path logPath = logFiles[i].getPath();
942 leaseRecovery.recoverFileLease(fs, logPath);
943 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
944
945 ProcedureWALFile log = initOldLog(logFiles[i]);
946 if (log != null) {
947 this.logs.add(log);
948 }
949 }
950 Collections.sort(this.logs);
951 initTrackerFromOldLogs();
952 }
953 return maxLogId;
954 }
955
956 private void initTrackerFromOldLogs() {
957
958 if (!logs.isEmpty()) {
959 ProcedureWALFile log = logs.getLast();
960 try {
961 log.readTracker(storeTracker);
962 } catch (IOException e) {
963 LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
964
965 storeTracker.reset();
966 storeTracker.setPartialFlag(true);
967 }
968 }
969 }
970
971 private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
972 ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
973 if (logFile.getLen() == 0) {
974 LOG.warn("Remove uninitialized log: " + logFile);
975 log.removeFile();
976 return null;
977 }
978 if (LOG.isDebugEnabled()) {
979 LOG.debug("Opening state-log: " + logFile);
980 }
981 try {
982 log.open();
983 } catch (ProcedureWALFormat.InvalidWALDataException e) {
984 LOG.warn("Remove uninitialized log: " + logFile, e);
985 log.removeFile();
986 return null;
987 } catch (IOException e) {
988 String msg = "Unable to read state log: " + logFile;
989 LOG.error(msg, e);
990 throw new IOException(msg, e);
991 }
992
993 if (log.isCompacted()) {
994 try {
995 log.readTrailer();
996 } catch (IOException e) {
997 LOG.warn("Unfinished compacted log: " + logFile, e);
998 log.removeFile();
999 return null;
1000 }
1001 }
1002 return log;
1003 }
1004 }