1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.OutputStream;
26 import java.lang.management.MemoryUsage;
27 import java.lang.reflect.InvocationTargetException;
28 import java.net.URLEncoder;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Comparator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.NavigableMap;
35 import java.util.Set;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentSkipListMap;
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.locks.ReentrantLock;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.fs.FSDataOutputStream;
55 import org.apache.hadoop.fs.FileStatus;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.Path;
58 import org.apache.hadoop.fs.PathFilter;
59 import org.apache.hadoop.hbase.Cell;
60 import org.apache.hadoop.hbase.CellUtil;
61 import org.apache.hadoop.hbase.HBaseConfiguration;
62 import org.apache.hadoop.hbase.HConstants;
63 import org.apache.hadoop.hbase.HRegionInfo;
64 import org.apache.hadoop.hbase.HTableDescriptor;
65 import org.apache.hadoop.hbase.classification.InterfaceAudience;
66 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
67 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.ClassSize;
70 import org.apache.hadoop.hbase.util.DrainBarrier;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.FSUtils;
73 import org.apache.hadoop.hbase.util.HasThread;
74 import org.apache.hadoop.hbase.util.Threads;
75 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
76 import org.apache.hadoop.hbase.wal.WAL;
77 import org.apache.hadoop.hbase.wal.WALFactory;
78 import org.apache.hadoop.hbase.wal.WALKey;
79 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
80 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
81 import org.apache.hadoop.hbase.wal.WALSplitter;
82 import org.apache.hadoop.hdfs.DFSOutputStream;
83 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
84 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
85 import org.apache.hadoop.util.StringUtils;
86 import org.apache.htrace.NullScope;
87 import org.apache.htrace.Span;
88 import org.apache.htrace.Trace;
89 import org.apache.htrace.TraceScope;
90
91 import com.google.common.annotations.VisibleForTesting;
92 import com.lmax.disruptor.BlockingWaitStrategy;
93 import com.lmax.disruptor.EventHandler;
94 import com.lmax.disruptor.ExceptionHandler;
95 import com.lmax.disruptor.LifecycleAware;
96 import com.lmax.disruptor.TimeoutException;
97 import com.lmax.disruptor.dsl.Disruptor;
98 import com.lmax.disruptor.dsl.ProducerType;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 @InterfaceAudience.Private
128 public class FSHLog implements WAL {
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 private static final Log LOG = LogFactory.getLog(FSHLog.class);
164
165 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100;
166
167 private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000;
168
169
170
171
172
173
174
175
176
177 private final Disruptor<RingBufferTruck> disruptor;
178
179
180
181
182 private final ExecutorService appendExecutor;
183
184
185
186
187
188
189
190 private final RingBufferEventHandler ringBufferEventHandler;
191
192
193
194
195
196
197 private final ThreadLocal<SyncFuture> cachedSyncFutures;
198
199
200
201
202
203 private volatile long highestUnsyncedSequence = -1;
204
205
206
207
208
209
210 private final AtomicLong highestSyncedSequence = new AtomicLong(0);
211
212
213
214
215 protected final FileSystem fs;
216
217
218
219
220 private final Path fullPathLogDir;
221
222
223
224
225 private final Path fullPathArchiveDir;
226
227
228
229
230 private final PathFilter ourFiles;
231
232
233
234
235 private final String logFilePrefix;
236
237
238
239
240 private final String logFileSuffix;
241
242
243
244
245 private final String prefixPathStr;
246
247 private final WALCoprocessorHost coprocessorHost;
248
249
250
251
252 protected final Configuration conf;
253
254
255 private final List<WALActionsListener> listeners =
256 new CopyOnWriteArrayList<WALActionsListener>();
257
258 @Override
259 public void registerWALActionsListener(final WALActionsListener listener) {
260 this.listeners.add(listener);
261 }
262
263 @Override
264 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
265 return this.listeners.remove(listener);
266 }
267
268 @Override
269 public WALCoprocessorHost getCoprocessorHost() {
270 return coprocessorHost;
271 }
272
273
274
275
276 private FSDataOutputStream hdfs_out;
277
278
279
280
281 private final int minTolerableReplication;
282
283 private final int slowSyncNs;
284
285 private final long walSyncTimeout;
286
287
288
289
290
291 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
292
293 private final int lowReplicationRollLimit;
294
295
296
297
298 private volatile boolean lowReplicationRollEnabled = true;
299
300
301
302
303
304
305 private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
306
307
308
309
310 volatile Writer writer;
311
312
313 private final DrainBarrier closeBarrier = new DrainBarrier();
314
315
316
317
318
319
320
321
322 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
323
324 private volatile boolean closed = false;
325 private final AtomicBoolean shutdown = new AtomicBoolean(false);
326
327
328 private final AtomicLong filenum = new AtomicLong(-1);
329
330
331 private final AtomicInteger numEntries = new AtomicInteger(0);
332
333
334 private final long logrollsize;
335
336
337
338
339 private AtomicLong totalLogSize = new AtomicLong(0);
340
341
342
343
344
345
346 private final int maxLogs;
347
348
349 private final int closeErrorsTolerated;
350
351 private final AtomicInteger closeErrorCount = new AtomicInteger();
352
353
354 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
355
356
357
358
359
360
361 final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
362 @Override
363 public int compare(Path o1, Path o2) {
364 long t1 = getFileNumFromFileName(o1);
365 long t2 = getFileNumFromFileName(o2);
366 if (t1 == t2) return 0;
367 return (t1 > t2) ? 1 : -1;
368 }
369 };
370
371
372
373
374
375 private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
376 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
377
378
379
380
381
382 static class RingBufferExceptionHandler implements ExceptionHandler {
383 @Override
384 public void handleEventException(Throwable ex, long sequence, Object event) {
385 LOG.error("Sequence=" + sequence + ", event=" + event, ex);
386 throw new RuntimeException(ex);
387 }
388
389 @Override
390 public void handleOnStartException(Throwable ex) {
391 LOG.error(ex);
392 throw new RuntimeException(ex);
393 }
394
395 @Override
396 public void handleOnShutdownException(Throwable ex) {
397 LOG.error(ex);
398 throw new RuntimeException(ex);
399 }
400 }
401
402
403
404
405
406
407
408
409
410
411 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
412 throws IOException {
413 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
414 }
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
441 final String archiveDir, final Configuration conf,
442 final List<WALActionsListener> listeners,
443 final boolean failIfWALExists, final String prefix, final String suffix)
444 throws IOException {
445 this.fs = fs;
446 this.fullPathLogDir = new Path(rootDir, logDir);
447 this.fullPathArchiveDir = new Path(rootDir, archiveDir);
448 this.conf = conf;
449
450 if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
451 throw new IOException("Unable to mkdir " + fullPathLogDir);
452 }
453
454 if (!fs.exists(this.fullPathArchiveDir)) {
455 if (!fs.mkdirs(this.fullPathArchiveDir)) {
456 throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
457 }
458 }
459
460
461 this.logFilePrefix =
462 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
463
464 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
465 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
466 "' but instead was '" + suffix + "'");
467 }
468
469
470 FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
471 HConstants.DEFAULT_WAL_STORAGE_POLICY);
472 this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
473 this.prefixPathStr = new Path(fullPathLogDir,
474 logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
475
476 this.ourFiles = new PathFilter() {
477 @Override
478 public boolean accept(final Path fileName) {
479
480 final String fileNameString = fileName.toString();
481 if (!fileNameString.startsWith(prefixPathStr)) {
482 return false;
483 }
484 if (logFileSuffix.isEmpty()) {
485
486 return org.apache.commons.lang.StringUtils.isNumeric(
487 fileNameString.substring(prefixPathStr.length()));
488 } else if (!fileNameString.endsWith(logFileSuffix)) {
489 return false;
490 }
491 return true;
492 }
493 };
494
495 if (failIfWALExists) {
496 final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
497 if (null != walFiles && 0 != walFiles.length) {
498 throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
499 }
500 }
501
502
503 if (listeners != null) {
504 for (WALActionsListener i: listeners) {
505 registerWALActionsListener(i);
506 }
507 }
508 this.coprocessorHost = new WALCoprocessorHost(this, conf);
509
510
511
512 final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
513 FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
514 this.logrollsize =
515 (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
516
517 float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY,
518 conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY,
519 HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
520 boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
521 if(maxLogsDefined){
522 LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
523 }
524 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
525 Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
526 this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
527 FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
528 this.lowReplicationRollLimit =
529 conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
530 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
531 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
532
533 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
534 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
535 ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
536 this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
537
538
539 rollWriter();
540
541 this.slowSyncNs =
542 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
543 DEFAULT_SLOW_SYNC_TIME_MS);
544 this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
545 DEFAULT_WAL_SYNC_TIMEOUT_MS);
546
547
548
549 String hostingThreadName = Thread.currentThread().getName();
550 this.appendExecutor = Executors.
551 newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
552
553
554
555
556 final int preallocatedEventCount =
557 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
558
559
560 this.disruptor =
561 new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
562 this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
563
564
565 this.disruptor.getRingBuffer().next();
566 this.ringBufferEventHandler =
567 new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
568 maxHandlersCount);
569 this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
570 this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
571 this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
572 @Override
573 protected SyncFuture initialValue() {
574 return new SyncFuture();
575 }
576 };
577
578 this.disruptor.start();
579 }
580
581 private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) {
582 long max = -1L;
583 final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
584 if (usage != null) {
585 max = usage.getMax();
586 }
587 int maxLogs = Math.round(max * memstoreSizeRatio * 2 / logRollSize);
588 return maxLogs;
589 }
590
591
592
593
594
595 protected FileStatus[] getFiles() throws IOException {
596 return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
597 }
598
599
600
601
602
603
604
605
606
607 @VisibleForTesting
608 OutputStream getOutputStream() {
609 FSDataOutputStream fsdos = this.hdfs_out;
610 if (fsdos == null) return null;
611 return fsdos.getWrappedStream();
612 }
613
614 @Override
615 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
616 return rollWriter(false);
617 }
618
619
620
621
622
623 private Path getNewPath() throws IOException {
624 this.filenum.set(System.currentTimeMillis());
625 Path newPath = getCurrentFileName();
626 while (fs.exists(newPath)) {
627 this.filenum.incrementAndGet();
628 newPath = getCurrentFileName();
629 }
630 return newPath;
631 }
632
633 Path getOldPath() {
634 long currentFilenum = this.filenum.get();
635 Path oldPath = null;
636 if (currentFilenum > 0) {
637
638 oldPath = computeFilename(currentFilenum);
639 }
640 return oldPath;
641 }
642
643
644
645
646
647 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
648 throws IOException {
649 if (!this.listeners.isEmpty()) {
650 for (WALActionsListener i : this.listeners) {
651 i.preLogRoll(oldPath, newPath);
652 }
653 }
654 }
655
656
657
658
659
660 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
661 throws IOException {
662 if (!this.listeners.isEmpty()) {
663 for (WALActionsListener i : this.listeners) {
664 i.postLogRoll(oldPath, newPath);
665 }
666 }
667 }
668
669
670
671
672
673
674 private void preemptiveSync(final ProtobufLogWriter nextWriter) {
675 long startTimeNanos = System.nanoTime();
676 try {
677 nextWriter.sync();
678 postSync(System.nanoTime() - startTimeNanos, 0);
679 } catch (IOException e) {
680
681 LOG.warn("pre-sync failed but an optimization so keep going", e);
682 }
683 }
684
685 @Override
686 public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
687 rollWriterLock.lock();
688 try {
689
690 if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
691 byte [][] regionsToFlush = null;
692 if (this.closed) {
693 LOG.debug("WAL closed. Skipping rolling of writer");
694 return regionsToFlush;
695 }
696 if (!closeBarrier.beginOp()) {
697 LOG.debug("WAL closing. Skipping rolling of writer");
698 return regionsToFlush;
699 }
700 TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
701 try {
702 Path oldPath = getOldPath();
703 Path newPath = getNewPath();
704
705 Writer nextWriter = this.createWriterInstance(newPath);
706 FSDataOutputStream nextHdfsOut = null;
707 if (nextWriter instanceof ProtobufLogWriter) {
708 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
709
710
711 preemptiveSync((ProtobufLogWriter)nextWriter);
712 }
713 tellListenersAboutPreLogRoll(oldPath, newPath);
714
715 newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
716 tellListenersAboutPostLogRoll(oldPath, newPath);
717
718 if (getNumRolledLogFiles() > 0) {
719 cleanOldLogs();
720 regionsToFlush = findRegionsToForceFlush();
721 }
722 } finally {
723 closeBarrier.endOp();
724 assert scope == NullScope.INSTANCE || !scope.isDetached();
725 scope.close();
726 }
727 return regionsToFlush;
728 } finally {
729 rollWriterLock.unlock();
730 }
731 }
732
733
734
735
736
737
738
739 protected Writer createWriterInstance(final Path path) throws IOException {
740 return DefaultWALProvider.createWriter(conf, fs, path, false);
741 }
742
743
744
745
746
747 private void cleanOldLogs() throws IOException {
748 List<Path> logsToArchive = null;
749
750
751 for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
752 Path log = e.getKey();
753 Map<byte[], Long> sequenceNums = e.getValue();
754 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
755 if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
756 logsToArchive.add(log);
757 if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
758 }
759 }
760 if (logsToArchive != null) {
761 for (Path p : logsToArchive) {
762 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
763 archiveLogFile(p);
764 this.byWalRegionSequenceIds.remove(p);
765 }
766 }
767 }
768
769
770
771
772
773
774
775
776 byte[][] findRegionsToForceFlush() throws IOException {
777 byte [][] regions = null;
778 int logCount = getNumRolledLogFiles();
779 if (logCount > this.maxLogs && logCount > 0) {
780 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
781 this.byWalRegionSequenceIds.firstEntry();
782 regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
783 }
784 if (regions != null) {
785 StringBuilder sb = new StringBuilder();
786 for (int i = 0; i < regions.length; i++) {
787 if (i > 0) sb.append(", ");
788 sb.append(Bytes.toStringBinary(regions[i]));
789 }
790 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
791 "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
792 }
793 return regions;
794 }
795
796
797
798
799
800 @VisibleForTesting
801 protected void afterCreatingZigZagLatch() {}
802
803
804
805
806 @VisibleForTesting
807 protected void beforeWaitOnSafePoint() {};
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827 Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
828 final FSDataOutputStream nextHdfsOut)
829 throws IOException {
830
831
832
833
834
835 SyncFuture syncFuture = null;
836 SafePointZigZagLatch zigzagLatch = null;
837 long sequence = -1L;
838 if (this.ringBufferEventHandler != null) {
839
840
841
842
843
844
845
846
847 sequence = getSequenceOnRingBuffer();
848 zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
849 }
850 afterCreatingZigZagLatch();
851 TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
852 try {
853
854
855
856
857
858 try {
859 if (zigzagLatch != null) {
860
861
862 assert sequence > 0L : "Failed to get sequence from ring buffer";
863 Trace.addTimelineAnnotation("awaiting safepoint");
864 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence));
865 }
866 } catch (FailedSyncBeforeLogCloseException e) {
867
868 if (isUnflushedEntries()) throw e;
869 LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
870 e.getMessage());
871 }
872
873
874
875 try {
876 if (this.writer != null) {
877 Trace.addTimelineAnnotation("closing writer");
878 this.writer.close();
879 Trace.addTimelineAnnotation("writer closed");
880 }
881 this.closeErrorCount.set(0);
882 } catch (IOException ioe) {
883 int errors = closeErrorCount.incrementAndGet();
884 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
885 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
886 ioe.getMessage() + "\", errors=" + errors +
887 "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
888 } else {
889 throw ioe;
890 }
891 }
892 this.writer = nextWriter;
893 this.hdfs_out = nextHdfsOut;
894 int oldNumEntries = this.numEntries.get();
895 this.numEntries.set(0);
896 final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
897 if (oldPath != null) {
898 this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
899 long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
900 this.totalLogSize.addAndGet(oldFileLen);
901 LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
902 ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
903 newPathString);
904 } else {
905 LOG.info("New WAL " + newPathString);
906 }
907 } catch (InterruptedException ie) {
908
909 Thread.currentThread().interrupt();
910 } catch (IOException e) {
911 long count = getUnflushedEntriesCount();
912 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
913 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
914 } finally {
915 try {
916
917 if (zigzagLatch != null) {
918 zigzagLatch.releaseSafePoint();
919
920
921
922
923
924
925 if (syncFuture != null) {
926 try {
927 blockOnSync(syncFuture);
928 } catch (IOException ioe) {
929 if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
930 }
931 }
932 }
933 } finally {
934 scope.close();
935 }
936 }
937 return newPath;
938 }
939
940 long getUnflushedEntriesCount() {
941 long highestSynced = this.highestSyncedSequence.get();
942 return highestSynced > this.highestUnsyncedSequence?
943 0: this.highestUnsyncedSequence - highestSynced;
944 }
945
946 boolean isUnflushedEntries() {
947 return getUnflushedEntriesCount() > 0;
948 }
949
950
951
952
953
954 public static Path getWALArchivePath(Path archiveDir, Path p) {
955 return new Path(archiveDir, p.getName());
956 }
957
958 private void archiveLogFile(final Path p) throws IOException {
959 Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
960
961 if (!this.listeners.isEmpty()) {
962 for (WALActionsListener i : this.listeners) {
963 i.preLogArchive(p, newPath);
964 }
965 }
966 LOG.info("Archiving " + p + " to " + newPath);
967 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
968 throw new IOException("Unable to rename " + p + " to " + newPath);
969 }
970
971 if (!this.listeners.isEmpty()) {
972 for (WALActionsListener i : this.listeners) {
973 i.postLogArchive(p, newPath);
974 }
975 }
976 }
977
978
979
980
981
982
983
984 protected Path computeFilename(final long filenum) {
985 if (filenum < 0) {
986 throw new RuntimeException("WAL file number can't be < 0");
987 }
988 String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
989 return new Path(fullPathLogDir, child);
990 }
991
992
993
994
995
996
997 public Path getCurrentFileName() {
998 return computeFilename(this.filenum.get());
999 }
1000
1001 @Override
1002 public String toString() {
1003 return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
1004 }
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014 protected long getFileNumFromFileName(Path fileName) {
1015 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
1016 if (!ourFiles.accept(fileName)) {
1017 throw new IllegalArgumentException("The log file " + fileName +
1018 " doesn't belong to this WAL. (" + toString() + ")");
1019 }
1020 final String fileNameString = fileName.toString();
1021 String chompedPath = fileNameString.substring(prefixPathStr.length(),
1022 (fileNameString.length() - logFileSuffix.length()));
1023 return Long.parseLong(chompedPath);
1024 }
1025
1026 @Override
1027 public void close() throws IOException {
1028 shutdown();
1029 final FileStatus[] files = getFiles();
1030 if (null != files && 0 != files.length) {
1031 for (FileStatus file : files) {
1032 Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1033
1034 if (!this.listeners.isEmpty()) {
1035 for (WALActionsListener i : this.listeners) {
1036 i.preLogArchive(file.getPath(), p);
1037 }
1038 }
1039
1040 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1041 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1042 }
1043
1044 if (!this.listeners.isEmpty()) {
1045 for (WALActionsListener i : this.listeners) {
1046 i.postLogArchive(file.getPath(), p);
1047 }
1048 }
1049 }
1050 LOG.debug("Moved " + files.length + " WAL file(s) to " +
1051 FSUtils.getPath(this.fullPathArchiveDir));
1052 }
1053 LOG.info("Closed WAL: " + toString());
1054 }
1055
1056 @Override
1057 public void shutdown() throws IOException {
1058 if (shutdown.compareAndSet(false, true)) {
1059 try {
1060
1061 closeBarrier.stopAndDrainOps();
1062 } catch (InterruptedException e) {
1063 LOG.error("Exception while waiting for cache flushes and log rolls", e);
1064 Thread.currentThread().interrupt();
1065 }
1066
1067
1068
1069
1070 if (this.disruptor != null) {
1071 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1072 try {
1073 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1074 } catch (TimeoutException e) {
1075 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1076 "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1077 this.disruptor.halt();
1078 this.disruptor.shutdown();
1079 }
1080 }
1081
1082 if (this.appendExecutor != null) this.appendExecutor.shutdown();
1083
1084
1085 if (!this.listeners.isEmpty()) {
1086 for (WALActionsListener i : this.listeners) {
1087 i.logCloseRequested();
1088 }
1089 }
1090 this.closed = true;
1091 if (LOG.isDebugEnabled()) {
1092 LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1093 }
1094 if (this.writer != null) {
1095 this.writer.close();
1096 this.writer = null;
1097 }
1098 }
1099 }
1100
1101 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1102 justification="Will never be null")
1103 @Override
1104 public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1105 final WALEdit edits, final boolean inMemstore) throws IOException {
1106 if (this.closed) throw new IOException("Cannot append; log is closed");
1107
1108
1109 TraceScope scope = Trace.startSpan("FSHLog.append");
1110
1111
1112
1113
1114 FSWALEntry entry = null;
1115 long sequence = this.disruptor.getRingBuffer().next();
1116 try {
1117 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1118
1119 entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
1120 truck.loadPayload(entry, scope.detach());
1121 } finally {
1122 this.disruptor.getRingBuffer().publish(sequence);
1123 }
1124 return sequence;
1125 }
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142 private class SyncRunner extends HasThread {
1143 private volatile long sequence;
1144
1145 private final BlockingQueue<SyncFuture> syncFutures;
1146 private volatile SyncFuture takeSyncFuture = null;
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158 SyncRunner(final String name, final int maxHandlersCount) {
1159 super(name);
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173 this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1174 }
1175
1176 void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1177
1178 this.sequence = sequence;
1179 for (int i = 0; i < syncFutureCount; ++i) {
1180 this.syncFutures.add(syncFutures[i]);
1181 }
1182 }
1183
1184
1185
1186
1187
1188
1189
1190
1191 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1192 final Throwable t) {
1193 if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1194
1195 return 1;
1196 }
1197
1198
1199
1200
1201
1202
1203
1204 private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1205 int syncCount = 0;
1206 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1207 if (syncFuture.getRingBufferSequence() > currentSequence) break;
1208 releaseSyncFuture(syncFuture, currentSequence, t);
1209 if (!this.syncFutures.remove(syncFuture)) {
1210 throw new IllegalStateException(syncFuture.toString());
1211 }
1212 syncCount++;
1213 }
1214 return syncCount;
1215 }
1216
1217
1218
1219
1220
1221 private long updateHighestSyncedSequence(long sequence) {
1222 long currentHighestSyncedSequence;
1223
1224 do {
1225 currentHighestSyncedSequence = highestSyncedSequence.get();
1226 if (currentHighestSyncedSequence >= sequence) {
1227
1228
1229 sequence = currentHighestSyncedSequence;
1230 break;
1231 }
1232 } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1233 return sequence;
1234 }
1235
1236 boolean areSyncFuturesReleased() {
1237
1238
1239 return syncFutures.size() <= 0
1240 && takeSyncFuture == null;
1241 }
1242
1243 public void run() {
1244 long currentSequence;
1245 while (!isInterrupted()) {
1246 int syncCount = 0;
1247
1248 try {
1249 while (true) {
1250 takeSyncFuture = null;
1251
1252 takeSyncFuture = this.syncFutures.take();
1253 currentSequence = this.sequence;
1254 long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1255 if (syncFutureSequence > currentSequence) {
1256 throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1257 ", syncFutureSequence=" + syncFutureSequence);
1258 }
1259
1260 long currentHighestSyncedSequence = highestSyncedSequence.get();
1261 if (currentSequence < currentHighestSyncedSequence) {
1262 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1263
1264 continue;
1265 }
1266 break;
1267 }
1268
1269
1270 TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1271 long start = System.nanoTime();
1272 Throwable lastException = null;
1273 try {
1274 Trace.addTimelineAnnotation("syncing writer");
1275 writer.sync();
1276 Trace.addTimelineAnnotation("writer synced");
1277 currentSequence = updateHighestSyncedSequence(currentSequence);
1278 } catch (IOException e) {
1279 LOG.error("Error syncing, request close of WAL", e);
1280 lastException = e;
1281 } catch (Exception e) {
1282 LOG.warn("UNEXPECTED", e);
1283 lastException = e;
1284 } finally {
1285
1286 takeSyncFuture.setSpan(scope.detach());
1287
1288 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1289
1290 syncCount += releaseSyncFutures(currentSequence, lastException);
1291 if (lastException != null) requestLogRoll();
1292 else checkLogRoll();
1293 }
1294 postSync(System.nanoTime() - start, syncCount);
1295 } catch (InterruptedException e) {
1296
1297 Thread.currentThread().interrupt();
1298 } catch (Throwable t) {
1299 LOG.warn("UNEXPECTED, continuing", t);
1300 }
1301 }
1302 }
1303 }
1304
1305
1306
1307
1308 public void checkLogRoll() {
1309
1310 if (!rollWriterLock.tryLock()) return;
1311 boolean lowReplication;
1312 try {
1313 lowReplication = checkLowReplication();
1314 } finally {
1315 rollWriterLock.unlock();
1316 }
1317 try {
1318 if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1319 requestLogRoll(lowReplication);
1320 }
1321 } catch (IOException e) {
1322 LOG.warn("Writer.getLength() failed; continuing", e);
1323 }
1324 }
1325
1326
1327
1328
1329 private boolean checkLowReplication() {
1330 boolean logRollNeeded = false;
1331 this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
1332
1333
1334 try {
1335 int numCurrentReplicas = getLogReplication();
1336 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1337 if (this.lowReplicationRollEnabled) {
1338 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1339 LOG.warn("HDFS pipeline error detected. " + "Found "
1340 + numCurrentReplicas + " replicas but expecting no less than "
1341 + this.minTolerableReplication + " replicas. "
1342 + " Requesting close of WAL. current pipeline: "
1343 + Arrays.toString(getPipeLine()));
1344 logRollNeeded = true;
1345
1346
1347
1348 this.consecutiveLogRolls.getAndIncrement();
1349 } else {
1350 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1351 + "the total number of live datanodes is lower than the tolerable replicas.");
1352 this.consecutiveLogRolls.set(0);
1353 this.lowReplicationRollEnabled = false;
1354 }
1355 }
1356 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1357 if (!this.lowReplicationRollEnabled) {
1358
1359
1360
1361 if (this.numEntries.get() <= 1) {
1362 return logRollNeeded;
1363 }
1364
1365
1366 this.lowReplicationRollEnabled = true;
1367 LOG.info("LowReplication-Roller was enabled.");
1368 }
1369 }
1370 } catch (Exception e) {
1371 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1372 ", continuing...");
1373 }
1374 return logRollNeeded;
1375 }
1376
1377 private SyncFuture publishSyncOnRingBuffer(long sequence) {
1378 return publishSyncOnRingBuffer(sequence, null);
1379 }
1380
1381 private long getSequenceOnRingBuffer() {
1382 return this.disruptor.getRingBuffer().next();
1383 }
1384
1385 private SyncFuture publishSyncOnRingBuffer(Span span) {
1386 long sequence = this.disruptor.getRingBuffer().next();
1387 return publishSyncOnRingBuffer(sequence, span);
1388 }
1389
1390 private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
1391 SyncFuture syncFuture = getSyncFuture(sequence, span);
1392 try {
1393 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1394 truck.loadPayload(syncFuture);
1395 } finally {
1396 this.disruptor.getRingBuffer().publish(sequence);
1397 }
1398 return syncFuture;
1399 }
1400
1401
1402 private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1403 return blockOnSync(publishSyncOnRingBuffer(span));
1404 }
1405
1406 private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1407
1408 try {
1409 syncFuture.get(walSyncTimeout);
1410 return syncFuture.getSpan();
1411 } catch (TimeoutIOException tioe) {
1412
1413
1414
1415 this.cachedSyncFutures.remove();
1416 throw tioe;
1417 } catch (InterruptedException ie) {
1418 LOG.warn("Interrupted", ie);
1419 throw convertInterruptedExceptionToIOException(ie);
1420 } catch (ExecutionException e) {
1421 throw ensureIOException(e.getCause());
1422 }
1423 }
1424
1425 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1426 Thread.currentThread().interrupt();
1427 IOException ioe = new InterruptedIOException();
1428 ioe.initCause(ie);
1429 return ioe;
1430 }
1431
1432 private SyncFuture getSyncFuture(final long sequence, Span span) {
1433 return cachedSyncFutures.get().reset(sequence);
1434 }
1435
1436 private void postSync(final long timeInNanos, final int handlerSyncs) {
1437 if (timeInNanos > this.slowSyncNs) {
1438 String msg =
1439 new StringBuilder().append("Slow sync cost: ")
1440 .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1441 .append(Arrays.toString(getPipeLine())).toString();
1442 Trace.addTimelineAnnotation(msg);
1443 LOG.info(msg);
1444 }
1445 if (!listeners.isEmpty()) {
1446 for (WALActionsListener listener : listeners) {
1447 listener.postSync(timeInNanos, handlerSyncs);
1448 }
1449 }
1450 }
1451
1452 private long postAppend(final Entry e, final long elapsedTime) {
1453 long len = 0;
1454 if (!listeners.isEmpty()) {
1455 for (Cell cell : e.getEdit().getCells()) {
1456 len += CellUtil.estimatedSerializedSizeOf(cell);
1457 }
1458 for (WALActionsListener listener : listeners) {
1459 listener.postAppend(len, elapsedTime);
1460 }
1461 }
1462 return len;
1463 }
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478 @VisibleForTesting
1479 int getLogReplication() {
1480 try {
1481
1482 if (this.hdfs_out instanceof HdfsDataOutputStream) {
1483 return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication();
1484 }
1485 } catch (IOException e) {
1486 LOG.info("", e);
1487 }
1488 return 0;
1489 }
1490
1491 @Override
1492 public void sync() throws IOException {
1493 TraceScope scope = Trace.startSpan("FSHLog.sync");
1494 try {
1495 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1496 } finally {
1497 assert scope == NullScope.INSTANCE || !scope.isDetached();
1498 scope.close();
1499 }
1500 }
1501
1502 @Override
1503 public void sync(long txid) throws IOException {
1504 if (this.highestSyncedSequence.get() >= txid){
1505
1506 return;
1507 }
1508 TraceScope scope = Trace.startSpan("FSHLog.sync");
1509 try {
1510 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1511 } finally {
1512 assert scope == NullScope.INSTANCE || !scope.isDetached();
1513 scope.close();
1514 }
1515 }
1516
1517
1518 public void requestLogRoll() {
1519 requestLogRoll(false);
1520 }
1521
1522 private void requestLogRoll(boolean tooFewReplicas) {
1523 if (!this.listeners.isEmpty()) {
1524 for (WALActionsListener i: this.listeners) {
1525 i.logRollRequested(tooFewReplicas);
1526 }
1527 }
1528 }
1529
1530
1531
1532 public int getNumRolledLogFiles() {
1533 return byWalRegionSequenceIds.size();
1534 }
1535
1536
1537
1538 public int getNumLogFiles() {
1539
1540 return getNumRolledLogFiles() + 1;
1541 }
1542
1543
1544
1545 public long getLogFileSize() {
1546 return this.totalLogSize.get();
1547 }
1548
1549 @Override
1550 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
1551 if (!closeBarrier.beginOp()) {
1552 LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
1553 return null;
1554 }
1555 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
1556 }
1557
1558 @Override
1559 public void completeCacheFlush(final byte [] encodedRegionName) {
1560 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
1561 closeBarrier.endOp();
1562 }
1563
1564 @Override
1565 public void abortCacheFlush(byte[] encodedRegionName) {
1566 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
1567 closeBarrier.endOp();
1568 }
1569
1570 @VisibleForTesting
1571 boolean isLowReplicationRollEnabled() {
1572 return lowReplicationRollEnabled;
1573 }
1574
1575 public static final long FIXED_OVERHEAD = ClassSize.align(
1576 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1577 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1578
1579 private static void split(final Configuration conf, final Path p)
1580 throws IOException {
1581 FileSystem fs = FileSystem.get(conf);
1582 if (!fs.exists(p)) {
1583 throw new FileNotFoundException(p.toString());
1584 }
1585 if (!fs.getFileStatus(p).isDirectory()) {
1586 throw new IOException(p + " is not a directory");
1587 }
1588
1589 final Path baseDir = FSUtils.getRootDir(conf);
1590 final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1591 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1592 }
1593
1594
1595 @Override
1596 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1597
1598 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
1599 }
1600
1601 @Override
1602 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
1603
1604
1605
1606
1607
1608
1609
1610
1611 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
1612 }
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640 static class SafePointZigZagLatch {
1641
1642
1643
1644 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1645
1646
1647
1648 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1649
1650 private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
1651 if (syncFuture.isThrowable()) {
1652 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1653 }
1654 }
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666 SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
1667 FailedSyncBeforeLogCloseException {
1668 while (!this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) {
1669 checkIfSyncFailed(syncFuture);
1670 }
1671 checkIfSyncFailed(syncFuture);
1672 return syncFuture;
1673 }
1674
1675
1676
1677
1678
1679
1680
1681 void safePointAttained() throws InterruptedException {
1682 this.safePointAttainedLatch.countDown();
1683 this.safePointReleasedLatch.await();
1684 }
1685
1686
1687
1688
1689
1690 void releaseSafePoint() {
1691 this.safePointReleasedLatch.countDown();
1692 }
1693
1694
1695
1696
1697 boolean isCocked() {
1698 return this.safePointAttainedLatch.getCount() > 0 &&
1699 this.safePointReleasedLatch.getCount() > 0;
1700 }
1701 }
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1727 private final SyncRunner [] syncRunners;
1728 private final SyncFuture [] syncFutures;
1729
1730
1731 private volatile int syncFuturesCount = 0;
1732 private volatile SafePointZigZagLatch zigzagLatch;
1733
1734
1735
1736
1737 private Exception exception = null;
1738
1739
1740
1741 private final Object safePointWaiter = new Object();
1742 private volatile boolean shutdown = false;
1743
1744
1745
1746
1747 private int syncRunnerIndex;
1748
1749 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1750 this.syncFutures = new SyncFuture[maxHandlersCount];
1751 this.syncRunners = new SyncRunner[syncRunnerCount];
1752 for (int i = 0; i < syncRunnerCount; i++) {
1753 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
1754 }
1755 }
1756
1757 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
1758
1759 for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
1760 this.syncFuturesCount = 0;
1761 }
1762
1763
1764
1765
1766 private boolean isOutstandingSyncs() {
1767
1768 for (int i = 0; i < this.syncFuturesCount; i++) {
1769 if (!this.syncFutures[i].isDone()) return true;
1770 }
1771
1772 return false;
1773 }
1774
1775 private boolean isOutstandingSyncsFromRunners() {
1776
1777 for (SyncRunner syncRunner: syncRunners) {
1778 if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
1779 return true;
1780 }
1781 }
1782 return false;
1783 }
1784
1785 @Override
1786
1787 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
1788 throws Exception {
1789
1790
1791
1792
1793
1794
1795
1796
1797 try {
1798 if (truck.hasSyncFuturePayload()) {
1799 this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
1800
1801 if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
1802 } else if (truck.hasFSWALEntryPayload()) {
1803 TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
1804 try {
1805 FSWALEntry entry = truck.unloadFSWALEntryPayload();
1806 if (this.exception != null) {
1807
1808
1809
1810
1811
1812 entry.stampRegionSequenceId();
1813
1814 return;
1815 }
1816 append(entry);
1817 } catch (Exception e) {
1818
1819 this.exception = e;
1820
1821
1822
1823 cleanupOutstandingSyncsOnException(sequence,
1824 this.exception instanceof DamagedWALException ? this.exception
1825 : new DamagedWALException("On sync", this.exception));
1826
1827 return;
1828 } finally {
1829 assert scope == NullScope.INSTANCE || !scope.isDetached();
1830 scope.close();
1831 }
1832 } else {
1833
1834 cleanupOutstandingSyncsOnException(sequence,
1835 new IllegalStateException("Neither append nor sync"));
1836
1837 return;
1838 }
1839
1840
1841
1842 if (this.exception == null) {
1843
1844
1845 if (!endOfBatch || this.syncFuturesCount <= 0) return;
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
1856 try {
1857
1858
1859 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
1860 this.syncFuturesCount);
1861 } catch (Exception e) {
1862
1863 requestLogRoll();
1864 this.exception = new DamagedWALException("Failed offering sync", e);
1865 }
1866 }
1867
1868 if (this.exception != null) {
1869 cleanupOutstandingSyncsOnException(sequence,
1870 this.exception instanceof DamagedWALException?
1871 this.exception:
1872 new DamagedWALException("On sync", this.exception));
1873 }
1874 attainSafePoint(sequence);
1875 this.syncFuturesCount = 0;
1876 } catch (Throwable t) {
1877 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
1878 }
1879 }
1880
1881 SafePointZigZagLatch attainSafePoint() {
1882 this.zigzagLatch = new SafePointZigZagLatch();
1883 return this.zigzagLatch;
1884 }
1885
1886
1887
1888
1889
1890 private void attainSafePoint(final long currentSequence) {
1891 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
1892
1893 beforeWaitOnSafePoint();
1894 try {
1895
1896
1897
1898 while ((!this.shutdown && this.zigzagLatch.isCocked()
1899 && highestSyncedSequence.get() < currentSequence &&
1900
1901
1902 isOutstandingSyncs())
1903
1904 || isOutstandingSyncsFromRunners()) {
1905 synchronized (this.safePointWaiter) {
1906 this.safePointWaiter.wait(0, 1);
1907 }
1908 }
1909
1910
1911
1912 this.exception = null;
1913 this.zigzagLatch.safePointAttained();
1914 } catch (InterruptedException e) {
1915 LOG.warn("Interrupted ", e);
1916 Thread.currentThread().interrupt();
1917 }
1918 }
1919
1920
1921
1922
1923
1924
1925 void append(final FSWALEntry entry) throws Exception {
1926
1927 atHeadOfRingBufferEventHandlerAppend();
1928
1929 long start = EnvironmentEdgeManager.currentTime();
1930 byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
1931 long regionSequenceId = WALKey.NO_SEQUENCE_ID;
1932 try {
1933
1934
1935
1936 regionSequenceId = entry.stampRegionSequenceId();
1937
1938
1939
1940 if (entry.getEdit().isEmpty()) {
1941 return;
1942 }
1943
1944
1945 if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
1946 entry.getEdit())) {
1947 if (entry.getEdit().isReplay()) {
1948
1949 entry.getKey().setScopes(null);
1950 }
1951 }
1952 if (!listeners.isEmpty()) {
1953 for (WALActionsListener i: listeners) {
1954
1955 i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
1956 entry.getEdit());
1957 }
1958 }
1959
1960 writer.append(entry);
1961 assert highestUnsyncedSequence < entry.getSequence();
1962 highestUnsyncedSequence = entry.getSequence();
1963 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1964 entry.isInMemstore());
1965 coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
1966
1967 postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1968 } catch (Exception e) {
1969 String msg = "Append sequenceId=" + regionSequenceId + ", requesting roll of WAL";
1970 LOG.warn(msg, e);
1971 requestLogRoll();
1972 throw new DamagedWALException(msg, e);
1973 }
1974 numEntries.incrementAndGet();
1975 }
1976
1977 @Override
1978 public void onStart() {
1979 for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
1980 }
1981
1982 @Override
1983 public void onShutdown() {
1984 for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
1985 }
1986 }
1987
1988
1989
1990
1991 @VisibleForTesting
1992 void atHeadOfRingBufferEventHandlerAppend() {
1993
1994 }
1995
1996 private static IOException ensureIOException(final Throwable t) {
1997 return (t instanceof IOException)? (IOException)t: new IOException(t);
1998 }
1999
2000 private static void usage() {
2001 System.err.println("Usage: FSHLog <ARGS>");
2002 System.err.println("Arguments:");
2003 System.err.println(" --dump Dump textual representation of passed one or more files");
2004 System.err.println(" For example: " +
2005 "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
2006 System.err.println(" --split Split the passed directory of WAL logs");
2007 System.err.println(" For example: " +
2008 "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
2009 }
2010
2011
2012
2013
2014
2015
2016
2017
2018 public static void main(String[] args) throws IOException {
2019 if (args.length < 2) {
2020 usage();
2021 System.exit(-1);
2022 }
2023
2024 if (args[0].compareTo("--dump") == 0) {
2025 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2026 } else if (args[0].compareTo("--perf") == 0) {
2027 LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
2028 LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
2029 args[1]);
2030 System.exit(-1);
2031 } else if (args[0].compareTo("--split") == 0) {
2032 Configuration conf = HBaseConfiguration.create();
2033 for (int i = 1; i < args.length; i++) {
2034 try {
2035 Path logPath = new Path(args[i]);
2036 FSUtils.setFsDefault(conf, logPath);
2037 split(conf, logPath);
2038 } catch (IOException t) {
2039 t.printStackTrace(System.err);
2040 System.exit(-1);
2041 }
2042 }
2043 } else {
2044 usage();
2045 System.exit(-1);
2046 }
2047 }
2048
2049
2050
2051
2052 @VisibleForTesting
2053 DatanodeInfo[] getPipeLine() {
2054 if (this.hdfs_out != null) {
2055 if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) {
2056 return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
2057 }
2058 }
2059 return new DatanodeInfo[0];
2060 }
2061
2062
2063
2064
2065
2066 public long getLastTimeCheckLowReplication() {
2067 return this.lastTimeCheckLowReplication;
2068 }
2069 }