1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.OutputStream;
26 import java.lang.reflect.InvocationTargetException;
27 import java.lang.reflect.Method;
28 import java.net.URLEncoder;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableMap;
37 import java.util.Set;
38 import java.util.TreeMap;
39 import java.util.UUID;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ConcurrentSkipListMap;
44 import java.util.concurrent.CopyOnWriteArrayList;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.LinkedBlockingQueue;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52 import java.util.concurrent.atomic.AtomicInteger;
53 import java.util.concurrent.atomic.AtomicLong;
54 import java.util.concurrent.locks.ReentrantLock;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.apache.hadoop.conf.Configuration;
59 import org.apache.hadoop.fs.FSDataOutputStream;
60 import org.apache.hadoop.fs.FileStatus;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.fs.PathFilter;
64 import org.apache.hadoop.hbase.Cell;
65 import org.apache.hadoop.hbase.CellUtil;
66 import org.apache.hadoop.hbase.HBaseConfiguration;
67 import org.apache.hadoop.hbase.HConstants;
68 import org.apache.hadoop.hbase.HRegionInfo;
69 import org.apache.hadoop.hbase.HTableDescriptor;
70 import org.apache.hadoop.hbase.TableName;
71 import org.apache.hadoop.hbase.classification.InterfaceAudience;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.ClassSize;
74 import org.apache.hadoop.hbase.util.DrainBarrier;
75 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
76 import org.apache.hadoop.hbase.util.FSUtils;
77 import org.apache.hadoop.hbase.util.HasThread;
78 import org.apache.hadoop.hbase.util.Threads;
79 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
80 import org.apache.hadoop.hbase.wal.WAL;
81 import org.apache.hadoop.hbase.wal.WALFactory;
82 import org.apache.hadoop.hbase.wal.WALKey;
83 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
84 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
85 import org.apache.hadoop.hbase.wal.WALSplitter;
86 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
87 import org.apache.hadoop.util.StringUtils;
88 import org.apache.htrace.NullScope;
89 import org.apache.htrace.Span;
90 import org.apache.htrace.Trace;
91 import org.apache.htrace.TraceScope;
92
93 import com.google.common.annotations.VisibleForTesting;
94 import com.google.common.collect.Maps;
95 import com.lmax.disruptor.BlockingWaitStrategy;
96 import com.lmax.disruptor.EventHandler;
97 import com.lmax.disruptor.ExceptionHandler;
98 import com.lmax.disruptor.LifecycleAware;
99 import com.lmax.disruptor.TimeoutException;
100 import com.lmax.disruptor.dsl.Disruptor;
101 import com.lmax.disruptor.dsl.ProducerType;
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 @InterfaceAudience.Private
131 public class FSHLog implements WAL {
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 static final Log LOG = LogFactory.getLog(FSHLog.class);
167
168 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100;
169
170
171
172
173
174
175
176
177
178 private final Disruptor<RingBufferTruck> disruptor;
179
180
181
182
183 private final ExecutorService appendExecutor;
184
185
186
187
188
189
190
191 private final RingBufferEventHandler ringBufferEventHandler;
192
193
194
195
196
197
198
199 private final Map<Thread, SyncFuture> syncFuturesByHandler;
200
201
202
203
204
205 private volatile long highestUnsyncedSequence = -1;
206
207
208
209
210
211
212 private final AtomicLong highestSyncedSequence = new AtomicLong(0);
213
214
215
216
217 protected final FileSystem fs;
218
219
220
221
222 private final Path fullPathLogDir;
223
224
225
226 private final Path fullPathArchiveDir;
227
228
229
230
231 private final PathFilter ourFiles;
232
233
234
235
236 private final String logFilePrefix;
237
238
239
240
241 private final String logFileSuffix;
242
243
244
245
246 private final String prefixPathStr;
247
248 private final WALCoprocessorHost coprocessorHost;
249
250
251
252
253 protected final Configuration conf;
254
255 private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
256
257 @Override
258 public void registerWALActionsListener(final WALActionsListener listener) {
259 this.listeners.add(listener);
260 }
261
262 @Override
263 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
264 return this.listeners.remove(listener);
265 }
266
267 @Override
268 public WALCoprocessorHost getCoprocessorHost() {
269 return coprocessorHost;
270 }
271
272
273
274 private FSDataOutputStream hdfs_out;
275
276
277
278
279 private final int minTolerableReplication;
280
281
282 private final Method getNumCurrentReplicas;
283 private final Method getPipeLine;
284 private final int slowSyncNs;
285
286 private final static Object [] NO_ARGS = new Object []{};
287
288
289
290
291
292 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
293
294 private final int lowReplicationRollLimit;
295
296
297
298
299 private volatile boolean lowReplicationRollEnabled = true;
300
301
302
303
304 volatile Writer writer;
305
306
307 private final DrainBarrier closeBarrier = new DrainBarrier();
308
309
310
311
312
313
314
315
316 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
317
318 private volatile boolean closed = false;
319 private final AtomicBoolean shutdown = new AtomicBoolean(false);
320
321
322 private final AtomicLong filenum = new AtomicLong(-1);
323
324
325 private final AtomicInteger numEntries = new AtomicInteger(0);
326
327
328 private final long logrollsize;
329
330
331
332
333 private AtomicLong totalLogSize = new AtomicLong(0);
334
335
336
337
338
339
340 private final int maxLogs;
341
342
343 private final int closeErrorsTolerated;
344
345 private final AtomicInteger closeErrorCount = new AtomicInteger();
346
347
348
349
350
351
352
353
354
355
356
357 private final Object regionSequenceIdLock = new Object();
358
359
360
361
362
363
364
365
366 private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
367 = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
368 Bytes.BYTES_COMPARATOR);
369
370
371
372
373
374
375
376
377
378 private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
379 new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
380
381
382
383
384
385
386
387
388
389
390
391
392 private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
393
394
395
396
397
398 final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
399 @Override
400 public int compare(Path o1, Path o2) {
401 long t1 = getFileNumFromFileName(o1);
402 long t2 = getFileNumFromFileName(o2);
403 if (t1 == t2) return 0;
404 return (t1 > t2) ? 1 : -1;
405 }
406 };
407
408
409
410
411
412 private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
413 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
414
415
416
417
418
419 static class RingBufferExceptionHandler implements ExceptionHandler {
420 @Override
421 public void handleEventException(Throwable ex, long sequence, Object event) {
422 LOG.error("Sequence=" + sequence + ", event=" + event, ex);
423 throw new RuntimeException(ex);
424 }
425
426 @Override
427 public void handleOnStartException(Throwable ex) {
428 LOG.error(ex);
429 throw new RuntimeException(ex);
430 }
431
432 @Override
433 public void handleOnShutdownException(Throwable ex) {
434 LOG.error(ex);
435 throw new RuntimeException(ex);
436 }
437 }
438
439
440
441
442
443
444
445
446
447
448 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
449 throws IOException {
450 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
451 }
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
478 final String archiveDir, final Configuration conf,
479 final List<WALActionsListener> listeners,
480 final boolean failIfWALExists, final String prefix, final String suffix)
481 throws IOException {
482 this.fs = fs;
483 this.fullPathLogDir = new Path(rootDir, logDir);
484 this.fullPathArchiveDir = new Path(rootDir, archiveDir);
485 this.conf = conf;
486
487 if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
488 throw new IOException("Unable to mkdir " + fullPathLogDir);
489 }
490
491 if (!fs.exists(this.fullPathArchiveDir)) {
492 if (!fs.mkdirs(this.fullPathArchiveDir)) {
493 throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
494 }
495 }
496
497
498 this.logFilePrefix =
499 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
500
501 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
502 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
503 "' but instead was '" + suffix + "'");
504 }
505
506
507 FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
508 HConstants.DEFAULT_WAL_STORAGE_POLICY);
509 this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
510 this.prefixPathStr = new Path(fullPathLogDir,
511 logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
512
513 this.ourFiles = new PathFilter() {
514 @Override
515 public boolean accept(final Path fileName) {
516
517 final String fileNameString = fileName.toString();
518 if (!fileNameString.startsWith(prefixPathStr)) {
519 return false;
520 }
521 if (logFileSuffix.isEmpty()) {
522
523 return org.apache.commons.lang.StringUtils.isNumeric(
524 fileNameString.substring(prefixPathStr.length()));
525 } else if (!fileNameString.endsWith(logFileSuffix)) {
526 return false;
527 }
528 return true;
529 }
530 };
531
532 if (failIfWALExists) {
533 final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
534 if (null != walFiles && 0 != walFiles.length) {
535 throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
536 }
537 }
538
539
540 if (listeners != null) {
541 for (WALActionsListener i: listeners) {
542 registerWALActionsListener(i);
543 }
544 }
545 this.coprocessorHost = new WALCoprocessorHost(this, conf);
546
547
548
549 final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
550 FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
551 this.logrollsize =
552 (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
553
554 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
555 this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
556 FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
557 this.lowReplicationRollLimit =
558 conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
559 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
560 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
561
562 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
563 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
564 ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
565 this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
566
567
568 rollWriter();
569
570 this.slowSyncNs =
571 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
572 DEFAULT_SLOW_SYNC_TIME_MS);
573
574
575 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
576 this.getPipeLine = getGetPipeline(this.hdfs_out);
577
578
579
580 String hostingThreadName = Thread.currentThread().getName();
581 this.appendExecutor = Executors.
582 newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
583
584
585
586
587 final int preallocatedEventCount =
588 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
589
590
591 this.disruptor =
592 new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
593 this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
594
595
596 this.disruptor.getRingBuffer().next();
597 this.ringBufferEventHandler =
598 new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
599 maxHandlersCount);
600 this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
601 this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
602
603 this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
604
605 this.disruptor.start();
606 }
607
608
609
610
611
612 protected FileStatus[] getFiles() throws IOException {
613 return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
614 }
615
616
617
618
619
620
621
622
623
624 @VisibleForTesting
625 OutputStream getOutputStream() {
626 FSDataOutputStream fsdos = this.hdfs_out;
627 if (fsdos == null) return null;
628 return fsdos.getWrappedStream();
629 }
630
631 @Override
632 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
633 return rollWriter(false);
634 }
635
636
637
638
639
640 private Path getNewPath() throws IOException {
641 this.filenum.set(System.currentTimeMillis());
642 Path newPath = getCurrentFileName();
643 while (fs.exists(newPath)) {
644 this.filenum.incrementAndGet();
645 newPath = getCurrentFileName();
646 }
647 return newPath;
648 }
649
650 Path getOldPath() {
651 long currentFilenum = this.filenum.get();
652 Path oldPath = null;
653 if (currentFilenum > 0) {
654
655 oldPath = computeFilename(currentFilenum);
656 }
657 return oldPath;
658 }
659
660
661
662
663
664 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
665 throws IOException {
666 if (!this.listeners.isEmpty()) {
667 for (WALActionsListener i : this.listeners) {
668 i.preLogRoll(oldPath, newPath);
669 }
670 }
671 }
672
673
674
675
676
677 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
678 throws IOException {
679 if (!this.listeners.isEmpty()) {
680 for (WALActionsListener i : this.listeners) {
681 i.postLogRoll(oldPath, newPath);
682 }
683 }
684 }
685
686
687
688
689
690
691 private void preemptiveSync(final ProtobufLogWriter nextWriter) {
692 long startTimeNanos = System.nanoTime();
693 try {
694 nextWriter.sync();
695 postSync(System.nanoTime() - startTimeNanos, 0);
696 } catch (IOException e) {
697
698 LOG.warn("pre-sync failed but an optimization so keep going", e);
699 }
700 }
701
702 @Override
703 public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
704 rollWriterLock.lock();
705 try {
706
707 if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
708 byte [][] regionsToFlush = null;
709 if (this.closed) {
710 LOG.debug("WAL closed. Skipping rolling of writer");
711 return regionsToFlush;
712 }
713 if (!closeBarrier.beginOp()) {
714 LOG.debug("WAL closing. Skipping rolling of writer");
715 return regionsToFlush;
716 }
717 TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
718 try {
719 Path oldPath = getOldPath();
720 Path newPath = getNewPath();
721
722 Writer nextWriter = this.createWriterInstance(newPath);
723 FSDataOutputStream nextHdfsOut = null;
724 if (nextWriter instanceof ProtobufLogWriter) {
725 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
726
727
728 preemptiveSync((ProtobufLogWriter)nextWriter);
729 }
730 tellListenersAboutPreLogRoll(oldPath, newPath);
731
732 newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
733 tellListenersAboutPostLogRoll(oldPath, newPath);
734
735 if (getNumRolledLogFiles() > 0) {
736 cleanOldLogs();
737 regionsToFlush = findRegionsToForceFlush();
738 }
739 } finally {
740 closeBarrier.endOp();
741 assert scope == NullScope.INSTANCE || !scope.isDetached();
742 scope.close();
743 }
744 return regionsToFlush;
745 } finally {
746 rollWriterLock.unlock();
747 }
748 }
749
750
751
752
753
754
755
756 protected Writer createWriterInstance(final Path path) throws IOException {
757 return DefaultWALProvider.createWriter(conf, fs, path, false);
758 }
759
760 private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
761 long result = HConstants.NO_SEQNUM;
762 for (Long seqNum: seqIdMap.values()) {
763 if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
764 result = seqNum.longValue();
765 }
766 }
767 return result;
768 }
769
770 private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
771 Map<byte[], T> mapToCopy) {
772 Map<byte[], Long> copied = Maps.newHashMap();
773 for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
774 long lowestSeqId = getLowestSeqId(entry.getValue());
775 if (lowestSeqId != HConstants.NO_SEQNUM) {
776 copied.put(entry.getKey(), lowestSeqId);
777 }
778 }
779 return copied;
780 }
781
782
783
784
785
786
787
788
789
790
791
792
793 private void cleanOldLogs() throws IOException {
794 Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
795 Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
796 List<Path> logsToArchive = new ArrayList<Path>();
797
798 synchronized (regionSequenceIdLock) {
799 lowestFlushingRegionSequenceIdsLocal =
800 copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
801 oldestUnflushedRegionSequenceIdsLocal =
802 copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
803 }
804 for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
805
806 Path log = e.getKey();
807 Map<byte[], Long> sequenceNums = e.getValue();
808
809 if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
810 oldestUnflushedRegionSequenceIdsLocal)) {
811 logsToArchive.add(log);
812 LOG.debug("WAL file ready for archiving " + log);
813 }
814 }
815 for (Path p : logsToArchive) {
816 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
817 archiveLogFile(p);
818 this.byWalRegionSequenceIds.remove(p);
819 }
820 }
821
822
823
824
825
826
827
828
829
830
831
832
833 static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
834 Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
835 for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
836
837
838 long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
839 oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
840 long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
841 oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
842
843 long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
844 if (minSeqNum <= regionSeqIdEntry.getValue()) return false;
845 }
846 return true;
847 }
848
849
850
851
852
853
854
855
856
857
858
859
860 private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
861 List<byte[]> regionsToFlush = null;
862
863 synchronized (regionSequenceIdLock) {
864 for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
865 ConcurrentMap<byte[], Long> m =
866 this.oldestUnflushedStoreSequenceIds.get(e.getKey());
867 if (m == null) {
868 continue;
869 }
870 long unFlushedVal = Collections.min(m.values());
871 if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
872 if (regionsToFlush == null)
873 regionsToFlush = new ArrayList<byte[]>();
874 regionsToFlush.add(e.getKey());
875 }
876 }
877 }
878 return regionsToFlush == null ? null : regionsToFlush
879 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
880 }
881
882
883
884
885
886
887
888
889 byte[][] findRegionsToForceFlush() throws IOException {
890 byte [][] regions = null;
891 int logCount = getNumRolledLogFiles();
892 if (logCount > this.maxLogs && logCount > 0) {
893 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
894 this.byWalRegionSequenceIds.firstEntry();
895 regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
896 }
897 if (regions != null) {
898 StringBuilder sb = new StringBuilder();
899 for (int i = 0; i < regions.length; i++) {
900 if (i > 0) sb.append(", ");
901 sb.append(Bytes.toStringBinary(regions[i]));
902 }
903 LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
904 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
905 sb.toString());
906 }
907 return regions;
908 }
909
910
911
912
913
914 @VisibleForTesting
915 protected void afterCreatingZigZagLatch() {}
916
917
918
919
920 @VisibleForTesting
921 protected void beforeWaitOnSafePoint() {};
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941 Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
942 final FSDataOutputStream nextHdfsOut)
943 throws IOException {
944
945
946
947
948
949 SyncFuture syncFuture = null;
950 SafePointZigZagLatch zigzagLatch = null;
951 long sequence = -1L;
952 if (this.ringBufferEventHandler != null) {
953
954
955
956
957
958
959
960
961 sequence = getSequenceOnRingBuffer();
962 zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
963 }
964 afterCreatingZigZagLatch();
965 TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
966 try {
967
968
969
970
971
972 try {
973 if (zigzagLatch != null) {
974
975
976 assert sequence > 0L : "Failed to get sequence from ring buffer";
977 Trace.addTimelineAnnotation("awaiting safepoint");
978 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence));
979 }
980 } catch (FailedSyncBeforeLogCloseException e) {
981
982 if (isUnflushedEntries()) throw e;
983 LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
984 e.getMessage());
985 }
986
987
988
989 try {
990 if (this.writer != null) {
991 Trace.addTimelineAnnotation("closing writer");
992 this.writer.close();
993 Trace.addTimelineAnnotation("writer closed");
994 }
995 this.closeErrorCount.set(0);
996 } catch (IOException ioe) {
997 int errors = closeErrorCount.incrementAndGet();
998 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
999 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
1000 ioe.getMessage() + "\", errors=" + errors +
1001 "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
1002 } else {
1003 throw ioe;
1004 }
1005 }
1006 this.writer = nextWriter;
1007 this.hdfs_out = nextHdfsOut;
1008 int oldNumEntries = this.numEntries.get();
1009 this.numEntries.set(0);
1010 final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
1011 if (oldPath != null) {
1012 this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
1013 this.highestRegionSequenceIds = new HashMap<byte[], Long>();
1014 long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
1015 this.totalLogSize.addAndGet(oldFileLen);
1016 LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
1017 ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
1018 newPathString);
1019 } else {
1020 LOG.info("New WAL " + newPathString);
1021 }
1022 } catch (InterruptedException ie) {
1023
1024 Thread.currentThread().interrupt();
1025 } catch (IOException e) {
1026 long count = getUnflushedEntriesCount();
1027 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
1028 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
1029 } finally {
1030 try {
1031
1032 if (zigzagLatch != null) {
1033 zigzagLatch.releaseSafePoint();
1034
1035
1036
1037
1038
1039
1040 if (syncFuture != null) {
1041 try {
1042 blockOnSync(syncFuture);
1043 } catch (IOException ioe) {
1044 if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
1045 }
1046 }
1047 }
1048 } finally {
1049 scope.close();
1050 }
1051 }
1052 return newPath;
1053 }
1054
1055 long getUnflushedEntriesCount() {
1056 long highestSynced = this.highestSyncedSequence.get();
1057 return highestSynced > this.highestUnsyncedSequence?
1058 0: this.highestUnsyncedSequence - highestSynced;
1059 }
1060
1061 boolean isUnflushedEntries() {
1062 return getUnflushedEntriesCount() > 0;
1063 }
1064
1065
1066
1067
1068
1069 public static Path getWALArchivePath(Path archiveDir, Path p) {
1070 return new Path(archiveDir, p.getName());
1071 }
1072
1073 private void archiveLogFile(final Path p) throws IOException {
1074 Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
1075
1076 if (!this.listeners.isEmpty()) {
1077 for (WALActionsListener i : this.listeners) {
1078 i.preLogArchive(p, newPath);
1079 }
1080 }
1081 LOG.info("Archiving " + p + " to " + newPath);
1082 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
1083 throw new IOException("Unable to rename " + p + " to " + newPath);
1084 }
1085
1086 if (!this.listeners.isEmpty()) {
1087 for (WALActionsListener i : this.listeners) {
1088 i.postLogArchive(p, newPath);
1089 }
1090 }
1091 }
1092
1093
1094
1095
1096
1097
1098
1099 protected Path computeFilename(final long filenum) {
1100 if (filenum < 0) {
1101 throw new RuntimeException("WAL file number can't be < 0");
1102 }
1103 String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
1104 return new Path(fullPathLogDir, child);
1105 }
1106
1107
1108
1109
1110
1111
1112 public Path getCurrentFileName() {
1113 return computeFilename(this.filenum.get());
1114 }
1115
1116 @Override
1117 public String toString() {
1118 return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
1119 }
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129 protected long getFileNumFromFileName(Path fileName) {
1130 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
1131 if (!ourFiles.accept(fileName)) {
1132 throw new IllegalArgumentException("The log file " + fileName +
1133 " doesn't belong to this WAL. (" + toString() + ")");
1134 }
1135 final String fileNameString = fileName.toString();
1136 String chompedPath = fileNameString.substring(prefixPathStr.length(),
1137 (fileNameString.length() - logFileSuffix.length()));
1138 return Long.parseLong(chompedPath);
1139 }
1140
1141 @Override
1142 public void close() throws IOException {
1143 shutdown();
1144 final FileStatus[] files = getFiles();
1145 if (null != files && 0 != files.length) {
1146 for (FileStatus file : files) {
1147 Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1148
1149 if (!this.listeners.isEmpty()) {
1150 for (WALActionsListener i : this.listeners) {
1151 i.preLogArchive(file.getPath(), p);
1152 }
1153 }
1154
1155 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1156 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1157 }
1158
1159 if (!this.listeners.isEmpty()) {
1160 for (WALActionsListener i : this.listeners) {
1161 i.postLogArchive(file.getPath(), p);
1162 }
1163 }
1164 }
1165 LOG.debug("Moved " + files.length + " WAL file(s) to " +
1166 FSUtils.getPath(this.fullPathArchiveDir));
1167 }
1168 LOG.info("Closed WAL: " + toString() );
1169 }
1170
1171 @Override
1172 public void shutdown() throws IOException {
1173 if (shutdown.compareAndSet(false, true)) {
1174 try {
1175
1176 closeBarrier.stopAndDrainOps();
1177 } catch (InterruptedException e) {
1178 LOG.error("Exception while waiting for cache flushes and log rolls", e);
1179 Thread.currentThread().interrupt();
1180 }
1181
1182
1183
1184
1185 if (this.disruptor != null) {
1186 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1187 try {
1188 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1189 } catch (TimeoutException e) {
1190 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1191 "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1192 this.disruptor.halt();
1193 this.disruptor.shutdown();
1194 }
1195 }
1196
1197 if (this.appendExecutor != null) this.appendExecutor.shutdown();
1198
1199
1200 if (!this.listeners.isEmpty()) {
1201 for (WALActionsListener i : this.listeners) {
1202 i.logCloseRequested();
1203 }
1204 }
1205 this.closed = true;
1206 if (LOG.isDebugEnabled()) {
1207 LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1208 }
1209 if (this.writer != null) {
1210 this.writer.close();
1211 this.writer = null;
1212 }
1213 }
1214 }
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224 @SuppressWarnings("deprecation")
1225 protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
1226 long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
1227
1228 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
1229 }
1230
1231 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1232 justification="Will never be null")
1233 @Override
1234 public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1235 final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
1236 final List<Cell> memstoreCells) throws IOException {
1237 if (this.closed) throw new IOException("Cannot append; log is closed");
1238
1239
1240 TraceScope scope = Trace.startSpan("FSHLog.append");
1241
1242
1243
1244
1245 FSWALEntry entry = null;
1246 long sequence = this.disruptor.getRingBuffer().next();
1247 try {
1248 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1249
1250
1251
1252 entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri,
1253
1254 (memstoreCells != null)? memstoreCells: edits == null? null: edits.getCells());
1255 truck.loadPayload(entry, scope.detach());
1256 } finally {
1257 this.disruptor.getRingBuffer().publish(sequence);
1258 }
1259 return sequence;
1260 }
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277 private class SyncRunner extends HasThread {
1278 private volatile long sequence;
1279
1280 private final BlockingQueue<SyncFuture> syncFutures;
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292 SyncRunner(final String name, final int maxHandlersCount) {
1293 super(name);
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307 this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1308 }
1309
1310 void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1311
1312 this.sequence = sequence;
1313 for (int i = 0; i < syncFutureCount; ++i) {
1314 this.syncFutures.add(syncFutures[i]);
1315 }
1316 }
1317
1318
1319
1320
1321
1322
1323
1324
1325 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1326 final Throwable t) {
1327 if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1328
1329 return 1;
1330 }
1331
1332
1333
1334
1335
1336
1337
1338 private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1339 int syncCount = 0;
1340 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1341 if (syncFuture.getRingBufferSequence() > currentSequence) break;
1342 releaseSyncFuture(syncFuture, currentSequence, t);
1343 if (!this.syncFutures.remove(syncFuture)) {
1344 throw new IllegalStateException(syncFuture.toString());
1345 }
1346 syncCount++;
1347 }
1348 return syncCount;
1349 }
1350
1351
1352
1353
1354
1355 private long updateHighestSyncedSequence(long sequence) {
1356 long currentHighestSyncedSequence;
1357
1358 do {
1359 currentHighestSyncedSequence = highestSyncedSequence.get();
1360 if (currentHighestSyncedSequence >= sequence) {
1361
1362
1363 sequence = currentHighestSyncedSequence;
1364 break;
1365 }
1366 } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1367 return sequence;
1368 }
1369
1370 public void run() {
1371 long currentSequence;
1372 while (!isInterrupted()) {
1373 int syncCount = 0;
1374 SyncFuture takeSyncFuture;
1375 try {
1376 while (true) {
1377
1378 takeSyncFuture = this.syncFutures.take();
1379 currentSequence = this.sequence;
1380 long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1381 if (syncFutureSequence > currentSequence) {
1382 throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1383 ", syncFutureSequence=" + syncFutureSequence);
1384 }
1385
1386 long currentHighestSyncedSequence = highestSyncedSequence.get();
1387 if (currentSequence < currentHighestSyncedSequence) {
1388 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1389
1390 continue;
1391 }
1392 break;
1393 }
1394
1395
1396 TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1397 long start = System.nanoTime();
1398 Throwable lastException = null;
1399 try {
1400 Trace.addTimelineAnnotation("syncing writer");
1401 writer.sync();
1402 Trace.addTimelineAnnotation("writer synced");
1403 currentSequence = updateHighestSyncedSequence(currentSequence);
1404 } catch (IOException e) {
1405 LOG.error("Error syncing, request close of WAL", e);
1406 lastException = e;
1407 } catch (Exception e) {
1408 LOG.warn("UNEXPECTED", e);
1409 lastException = e;
1410 } finally {
1411
1412 takeSyncFuture.setSpan(scope.detach());
1413
1414 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1415
1416 syncCount += releaseSyncFutures(currentSequence, lastException);
1417 if (lastException != null) requestLogRoll();
1418 else checkLogRoll();
1419 }
1420 postSync(System.nanoTime() - start, syncCount);
1421 } catch (InterruptedException e) {
1422
1423 Thread.currentThread().interrupt();
1424 } catch (Throwable t) {
1425 LOG.warn("UNEXPECTED, continuing", t);
1426 }
1427 }
1428 }
1429 }
1430
1431
1432
1433
1434 void checkLogRoll() {
1435
1436 if (!rollWriterLock.tryLock()) return;
1437 boolean lowReplication;
1438 try {
1439 lowReplication = checkLowReplication();
1440 } finally {
1441 rollWriterLock.unlock();
1442 }
1443 try {
1444 if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1445 requestLogRoll(lowReplication);
1446 }
1447 } catch (IOException e) {
1448 LOG.warn("Writer.getLength() failed; continuing", e);
1449 }
1450 }
1451
1452
1453
1454
1455 private boolean checkLowReplication() {
1456 boolean logRollNeeded = false;
1457
1458
1459 try {
1460 int numCurrentReplicas = getLogReplication();
1461 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1462 if (this.lowReplicationRollEnabled) {
1463 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1464 LOG.warn("HDFS pipeline error detected. " + "Found "
1465 + numCurrentReplicas + " replicas but expecting no less than "
1466 + this.minTolerableReplication + " replicas. "
1467 + " Requesting close of WAL. current pipeline: "
1468 + Arrays.toString(getPipeLine()));
1469 logRollNeeded = true;
1470
1471
1472
1473 this.consecutiveLogRolls.getAndIncrement();
1474 } else {
1475 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1476 + "the total number of live datanodes is lower than the tolerable replicas.");
1477 this.consecutiveLogRolls.set(0);
1478 this.lowReplicationRollEnabled = false;
1479 }
1480 }
1481 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1482 if (!this.lowReplicationRollEnabled) {
1483
1484
1485
1486 if (this.numEntries.get() <= 1) {
1487 return logRollNeeded;
1488 }
1489
1490
1491 this.lowReplicationRollEnabled = true;
1492 LOG.info("LowReplication-Roller was enabled.");
1493 }
1494 }
1495 } catch (Exception e) {
1496 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1497 ", continuing...");
1498 }
1499 return logRollNeeded;
1500 }
1501
1502 private SyncFuture publishSyncOnRingBuffer(long sequence) {
1503 return publishSyncOnRingBuffer(sequence, null);
1504 }
1505
1506 private long getSequenceOnRingBuffer() {
1507 return this.disruptor.getRingBuffer().next();
1508 }
1509
1510 private SyncFuture publishSyncOnRingBuffer(Span span) {
1511 long sequence = this.disruptor.getRingBuffer().next();
1512 return publishSyncOnRingBuffer(sequence, span);
1513 }
1514
1515 private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
1516 SyncFuture syncFuture = getSyncFuture(sequence, span);
1517 try {
1518 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1519 truck.loadPayload(syncFuture);
1520 } finally {
1521 this.disruptor.getRingBuffer().publish(sequence);
1522 }
1523 return syncFuture;
1524 }
1525
1526
1527 private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1528 return blockOnSync(publishSyncOnRingBuffer(span));
1529 }
1530
1531 private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1532
1533 try {
1534 syncFuture.get();
1535 return syncFuture.getSpan();
1536 } catch (InterruptedException ie) {
1537 LOG.warn("Interrupted", ie);
1538 throw convertInterruptedExceptionToIOException(ie);
1539 } catch (ExecutionException e) {
1540 throw ensureIOException(e.getCause());
1541 }
1542 }
1543
1544 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1545 Thread.currentThread().interrupt();
1546 IOException ioe = new InterruptedIOException();
1547 ioe.initCause(ie);
1548 return ioe;
1549 }
1550
1551 private SyncFuture getSyncFuture(final long sequence, Span span) {
1552 SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1553 if (syncFuture == null) {
1554 syncFuture = new SyncFuture();
1555 this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1556 }
1557 return syncFuture.reset(sequence, span);
1558 }
1559
1560 private void postSync(final long timeInNanos, final int handlerSyncs) {
1561 if (timeInNanos > this.slowSyncNs) {
1562 String msg =
1563 new StringBuilder().append("Slow sync cost: ")
1564 .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1565 .append(Arrays.toString(getPipeLine())).toString();
1566 Trace.addTimelineAnnotation(msg);
1567 LOG.info(msg);
1568 }
1569 if (!listeners.isEmpty()) {
1570 for (WALActionsListener listener : listeners) {
1571 listener.postSync(timeInNanos, handlerSyncs);
1572 }
1573 }
1574 }
1575
1576 private long postAppend(final Entry e, final long elapsedTime) {
1577 long len = 0;
1578 if (!listeners.isEmpty()) {
1579 for (Cell cell : e.getEdit().getCells()) {
1580 len += CellUtil.estimatedSerializedSizeOf(cell);
1581 }
1582 for (WALActionsListener listener : listeners) {
1583 listener.postAppend(len, elapsedTime);
1584 }
1585 }
1586 return len;
1587 }
1588
1589
1590
1591
1592
1593
1594 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
1595
1596
1597 Method m = null;
1598 if (os != null) {
1599 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
1600 try {
1601 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
1602 m.setAccessible(true);
1603 } catch (NoSuchMethodException e) {
1604 LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
1605 "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
1606 } catch (SecurityException e) {
1607 LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
1608 "not available; fsOut=" + wrappedStreamClass.getName(), e);
1609 m = null;
1610 }
1611 }
1612 if (m != null) {
1613 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
1614 }
1615 return m;
1616 }
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630 @VisibleForTesting
1631 int getLogReplication()
1632 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1633 final OutputStream stream = getOutputStream();
1634 if (this.getNumCurrentReplicas != null && stream != null) {
1635 Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
1636 if (repl instanceof Integer) {
1637 return ((Integer)repl).intValue();
1638 }
1639 }
1640 return 0;
1641 }
1642
1643 @Override
1644 public void sync() throws IOException {
1645 TraceScope scope = Trace.startSpan("FSHLog.sync");
1646 try {
1647 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1648 } finally {
1649 assert scope == NullScope.INSTANCE || !scope.isDetached();
1650 scope.close();
1651 }
1652 }
1653
1654 @Override
1655 public void sync(long txid) throws IOException {
1656 if (this.highestSyncedSequence.get() >= txid){
1657
1658 return;
1659 }
1660 TraceScope scope = Trace.startSpan("FSHLog.sync");
1661 try {
1662 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1663 } finally {
1664 assert scope == NullScope.INSTANCE || !scope.isDetached();
1665 scope.close();
1666 }
1667 }
1668
1669
1670 public void requestLogRoll() {
1671 requestLogRoll(false);
1672 }
1673
1674 private void requestLogRoll(boolean tooFewReplicas) {
1675 if (!this.listeners.isEmpty()) {
1676 for (WALActionsListener i: this.listeners) {
1677 i.logRollRequested(tooFewReplicas);
1678 }
1679 }
1680 }
1681
1682
1683
1684 public int getNumRolledLogFiles() {
1685 return byWalRegionSequenceIds.size();
1686 }
1687
1688
1689
1690 public int getNumLogFiles() {
1691
1692 return getNumRolledLogFiles() + 1;
1693 }
1694
1695
1696
1697 public long getLogFileSize() {
1698 return this.totalLogSize.get();
1699 }
1700
1701 @Override
1702 public Long startCacheFlush(final byte[] encodedRegionName,
1703 Set<byte[]> flushedFamilyNames) {
1704 Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1705 if (!closeBarrier.beginOp()) {
1706 LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1707 " - because the server is closing.");
1708 return null;
1709 }
1710 long oldestUnflushedSequenceId = HConstants.NO_SEQNUM;
1711 synchronized (regionSequenceIdLock) {
1712 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1713 oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1714 if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1715 for (byte[] familyName: flushedFamilyNames) {
1716 Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
1717 if (seqId != null) {
1718 oldStoreSeqNum.put(familyName, seqId);
1719 }
1720 }
1721 if (!oldStoreSeqNum.isEmpty()) {
1722 Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
1723 encodedRegionName, oldStoreSeqNum);
1724 assert oldValue == null: "Flushing map not cleaned up for "
1725 + Bytes.toString(encodedRegionName);
1726 }
1727 if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
1728
1729
1730
1731
1732 oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
1733 } else {
1734 oldestUnflushedSequenceId =
1735 Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values());
1736 }
1737 }
1738 }
1739 if (oldStoreSeqNum.isEmpty()) {
1740
1741
1742
1743
1744
1745 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1746 + Bytes.toString(encodedRegionName) + "]");
1747 }
1748 return oldestUnflushedSequenceId;
1749 }
1750
1751 @Override
1752 public void completeCacheFlush(final byte [] encodedRegionName) {
1753 synchronized (regionSequenceIdLock) {
1754 this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
1755 }
1756 closeBarrier.endOp();
1757 }
1758
1759 private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
1760 byte[] encodedRegionName) {
1761 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1762 oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1763 if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1764 return oldestUnflushedStoreSequenceIdsOfRegion;
1765 }
1766 oldestUnflushedStoreSequenceIdsOfRegion =
1767 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1768 ConcurrentMap<byte[], Long> alreadyPut =
1769 oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
1770 oldestUnflushedStoreSequenceIdsOfRegion);
1771 return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
1772 }
1773
1774 @Override
1775 public void abortCacheFlush(byte[] encodedRegionName) {
1776 Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
1777 Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1778 synchronized (regionSequenceIdLock) {
1779 storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
1780 encodedRegionName);
1781 if (storeSeqNumsBeforeFlushStarts != null) {
1782 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1783 getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
1784 for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
1785 .entrySet()) {
1786 currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
1787 oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
1788 familyNameAndSeqId.getValue()));
1789 }
1790 }
1791 }
1792 closeBarrier.endOp();
1793 if (storeSeqNumsBeforeFlushStarts != null) {
1794 for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
1795 Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
1796 if (currentSeqNum != null
1797 && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
1798 String errorStr =
1799 "Region " + Bytes.toString(encodedRegionName) + " family "
1800 + Bytes.toString(familyNameAndSeqId.getKey())
1801 + " acquired edits out of order current memstore seq=" + currentSeqNum
1802 + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
1803 LOG.error(errorStr);
1804 Runtime.getRuntime().halt(1);
1805 }
1806 }
1807 }
1808 }
1809
1810 @VisibleForTesting
1811 boolean isLowReplicationRollEnabled() {
1812 return lowReplicationRollEnabled;
1813 }
1814
1815 public static final long FIXED_OVERHEAD = ClassSize.align(
1816 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1817 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1818
1819 private static void split(final Configuration conf, final Path p)
1820 throws IOException {
1821 FileSystem fs = FileSystem.get(conf);
1822 if (!fs.exists(p)) {
1823 throw new FileNotFoundException(p.toString());
1824 }
1825 if (!fs.getFileStatus(p).isDirectory()) {
1826 throw new IOException(p + " is not a directory");
1827 }
1828
1829 final Path baseDir = FSUtils.getRootDir(conf);
1830 final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1831 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1832 }
1833
1834 @Override
1835 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1836 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1837 this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1838 return oldestUnflushedStoreSequenceIdsOfRegion != null ?
1839 getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
1840 }
1841
1842 @Override
1843 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
1844 byte[] familyName) {
1845 synchronized (regionSequenceIdLock) {
1846 Map<byte[], Long> m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName);
1847 if (m != null) {
1848 Long earlist = m.get(familyName);
1849 if (earlist != null) {
1850 return earlist;
1851 }
1852 }
1853 m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1854 if (m != null) {
1855 Long earlist = m.get(familyName);
1856 if (earlist != null) {
1857 return earlist;
1858 }
1859 }
1860 }
1861 return HConstants.NO_SEQNUM;
1862 }
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890 static class SafePointZigZagLatch {
1891
1892
1893
1894 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1895
1896
1897
1898 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910 SyncFuture waitSafePoint(final SyncFuture syncFuture)
1911 throws InterruptedException, FailedSyncBeforeLogCloseException {
1912 while (true) {
1913 if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1914 if (syncFuture.isThrowable()) {
1915 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1916 }
1917 }
1918 return syncFuture;
1919 }
1920
1921
1922
1923
1924
1925
1926
1927 void safePointAttained() throws InterruptedException {
1928 this.safePointAttainedLatch.countDown();
1929 this.safePointReleasedLatch.await();
1930 }
1931
1932
1933
1934
1935
1936 void releaseSafePoint() {
1937 this.safePointReleasedLatch.countDown();
1938 }
1939
1940
1941
1942
1943 boolean isCocked() {
1944 return this.safePointAttainedLatch.getCount() > 0 &&
1945 this.safePointReleasedLatch.getCount() > 0;
1946 }
1947 }
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1973 private final SyncRunner [] syncRunners;
1974 private final SyncFuture [] syncFutures;
1975
1976
1977 private volatile int syncFuturesCount = 0;
1978 private volatile SafePointZigZagLatch zigzagLatch;
1979
1980
1981
1982
1983 private Exception exception = null;
1984
1985
1986
1987 private final Object safePointWaiter = new Object();
1988 private volatile boolean shutdown = false;
1989
1990
1991
1992
1993 private int syncRunnerIndex;
1994
1995 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1996 this.syncFutures = new SyncFuture[maxHandlersCount];
1997 this.syncRunners = new SyncRunner[syncRunnerCount];
1998 for (int i = 0; i < syncRunnerCount; i++) {
1999 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
2000 }
2001 }
2002
2003 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
2004
2005 for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
2006 this.syncFuturesCount = 0;
2007 }
2008
2009
2010
2011
2012 private boolean isOutstandingSyncs() {
2013 for (int i = 0; i < this.syncFuturesCount; i++) {
2014 if (!this.syncFutures[i].isDone()) return true;
2015 }
2016 return false;
2017 }
2018
2019 @Override
2020
2021 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
2022 throws Exception {
2023
2024
2025
2026
2027
2028
2029
2030
2031 try {
2032 if (truck.hasSyncFuturePayload()) {
2033 this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
2034
2035 if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
2036 } else if (truck.hasFSWALEntryPayload()) {
2037 TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
2038 try {
2039 FSWALEntry entry = truck.unloadFSWALEntryPayload();
2040 if (this.exception != null) {
2041
2042
2043
2044
2045
2046 entry.stampRegionSequenceId();
2047
2048 return;
2049 }
2050 append(entry);
2051 } catch (Exception e) {
2052
2053 this.exception = e;
2054
2055 return;
2056 } finally {
2057 assert scope == NullScope.INSTANCE || !scope.isDetached();
2058 scope.close();
2059 }
2060 } else {
2061
2062 cleanupOutstandingSyncsOnException(sequence,
2063 new IllegalStateException("Neither append nor sync"));
2064
2065 return;
2066 }
2067
2068
2069
2070 if (this.exception == null) {
2071
2072
2073 if (!endOfBatch || this.syncFuturesCount <= 0) return;
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
2084 try {
2085
2086
2087 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
2088 this.syncFuturesCount);
2089 } catch (Exception e) {
2090
2091 requestLogRoll();
2092 this.exception = new DamagedWALException("Failed offering sync", e);
2093 }
2094 }
2095
2096 if (this.exception != null) {
2097 cleanupOutstandingSyncsOnException(sequence,
2098 this.exception instanceof DamagedWALException?
2099 this.exception:
2100 new DamagedWALException("On sync", this.exception));
2101 }
2102 attainSafePoint(sequence);
2103 this.syncFuturesCount = 0;
2104 } catch (Throwable t) {
2105 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
2106 }
2107 }
2108
2109 SafePointZigZagLatch attainSafePoint() {
2110 this.zigzagLatch = new SafePointZigZagLatch();
2111 return this.zigzagLatch;
2112 }
2113
2114
2115
2116
2117
2118 private void attainSafePoint(final long currentSequence) {
2119 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
2120
2121 beforeWaitOnSafePoint();
2122 try {
2123
2124
2125
2126 while (!this.shutdown && this.zigzagLatch.isCocked() &&
2127 highestSyncedSequence.get() < currentSequence &&
2128
2129
2130 isOutstandingSyncs()) {
2131 synchronized (this.safePointWaiter) {
2132 this.safePointWaiter.wait(0, 1);
2133 }
2134 }
2135
2136
2137
2138 this.exception = null;
2139 this.zigzagLatch.safePointAttained();
2140 } catch (InterruptedException e) {
2141 LOG.warn("Interrupted ", e);
2142 Thread.currentThread().interrupt();
2143 }
2144 }
2145
2146 private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
2147 Set<byte[]> familyNameSet, Long lRegionSequenceId) {
2148 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
2149 getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
2150 for (byte[] familyName : familyNameSet) {
2151 oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
2152 }
2153 }
2154
2155
2156
2157
2158
2159
2160 void append(final FSWALEntry entry) throws Exception {
2161
2162 atHeadOfRingBufferEventHandlerAppend();
2163
2164 long start = EnvironmentEdgeManager.currentTime();
2165 byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
2166 long regionSequenceId = WALKey.NO_SEQUENCE_ID;
2167 try {
2168
2169
2170
2171 regionSequenceId = entry.stampRegionSequenceId();
2172
2173
2174
2175
2176 if (entry.getEdit().isEmpty()) {
2177 return;
2178 }
2179
2180
2181 if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
2182 entry.getEdit())) {
2183 if (entry.getEdit().isReplay()) {
2184
2185 entry.getKey().setScopes(null);
2186 }
2187 }
2188 if (!listeners.isEmpty()) {
2189 for (WALActionsListener i: listeners) {
2190
2191 i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
2192 entry.getEdit());
2193 }
2194 }
2195
2196 writer.append(entry);
2197 assert highestUnsyncedSequence < entry.getSequence();
2198 highestUnsyncedSequence = entry.getSequence();
2199 Long lRegionSequenceId = Long.valueOf(regionSequenceId);
2200 highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
2201 if (entry.isInMemstore()) {
2202 updateOldestUnflushedSequenceIds(encodedRegionName,
2203 entry.getFamilyNames(), lRegionSequenceId);
2204 }
2205
2206 coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
2207
2208 postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
2209 } catch (Exception e) {
2210 String msg = "Append sequenceId=" + regionSequenceId + ", requesting roll of WAL";
2211 LOG.warn(msg, e);
2212 requestLogRoll();
2213 throw new DamagedWALException(msg, e);
2214 }
2215 numEntries.incrementAndGet();
2216 }
2217
2218 @Override
2219 public void onStart() {
2220 for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
2221 }
2222
2223 @Override
2224 public void onShutdown() {
2225 for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
2226 }
2227 }
2228
2229
2230
2231
2232 @VisibleForTesting
2233 void atHeadOfRingBufferEventHandlerAppend() {
2234
2235 }
2236
2237 private static IOException ensureIOException(final Throwable t) {
2238 return (t instanceof IOException)? (IOException)t: new IOException(t);
2239 }
2240
2241 private static void usage() {
2242 System.err.println("Usage: FSHLog <ARGS>");
2243 System.err.println("Arguments:");
2244 System.err.println(" --dump Dump textual representation of passed one or more files");
2245 System.err.println(" For example: " +
2246 "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
2247 System.err.println(" --split Split the passed directory of WAL logs");
2248 System.err.println(" For example: " +
2249 "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
2250 }
2251
2252
2253
2254
2255
2256
2257
2258
2259 public static void main(String[] args) throws IOException {
2260 if (args.length < 2) {
2261 usage();
2262 System.exit(-1);
2263 }
2264
2265 if (args[0].compareTo("--dump") == 0) {
2266 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2267 } else if (args[0].compareTo("--perf") == 0) {
2268 LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
2269 LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
2270 args[1]);
2271 System.exit(-1);
2272 } else if (args[0].compareTo("--split") == 0) {
2273 Configuration conf = HBaseConfiguration.create();
2274 for (int i = 1; i < args.length; i++) {
2275 try {
2276 Path logPath = new Path(args[i]);
2277 FSUtils.setFsDefault(conf, logPath);
2278 split(conf, logPath);
2279 } catch (IOException t) {
2280 t.printStackTrace(System.err);
2281 System.exit(-1);
2282 }
2283 }
2284 } else {
2285 usage();
2286 System.exit(-1);
2287 }
2288 }
2289
2290
2291
2292
2293
2294 private Method getGetPipeline(final FSDataOutputStream os) {
2295 Method m = null;
2296 if (os != null) {
2297 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
2298 .getClass();
2299 try {
2300 m = wrappedStreamClass.getDeclaredMethod("getPipeline",
2301 new Class<?>[] {});
2302 m.setAccessible(true);
2303 } catch (NoSuchMethodException e) {
2304 LOG.info("FileSystem's output stream doesn't support"
2305 + " getPipeline; not available; fsOut="
2306 + wrappedStreamClass.getName());
2307 } catch (SecurityException e) {
2308 LOG.info(
2309 "Doesn't have access to getPipeline on "
2310 + "FileSystems's output stream ; fsOut="
2311 + wrappedStreamClass.getName(), e);
2312 m = null;
2313 }
2314 }
2315 return m;
2316 }
2317
2318
2319
2320
2321 @VisibleForTesting
2322 DatanodeInfo[] getPipeLine() {
2323 if (this.getPipeLine != null && this.hdfs_out != null) {
2324 Object repl;
2325 try {
2326 repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
2327 if (repl instanceof DatanodeInfo[]) {
2328 return ((DatanodeInfo[]) repl);
2329 }
2330 } catch (Exception e) {
2331 LOG.info("Get pipeline failed", e);
2332 }
2333 }
2334 return new DatanodeInfo[0];
2335 }
2336 }