1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.URLEncoder;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.SortedMap;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.ConcurrentSkipListMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileStatus;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.Syncable;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HTableDescriptor;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.ClassSize;
60 import org.apache.hadoop.hbase.util.DrainBarrier;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.HasThread;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.apache.hadoop.util.StringUtils;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @InterfaceAudience.Private
107 class FSHLog implements HLog, Syncable {
108 static final Log LOG = LogFactory.getLog(FSHLog.class);
109
110 private final FileSystem fs;
111 private final Path rootDir;
112 private final Path dir;
113 private final Configuration conf;
114
115 private List<WALActionsListener> listeners =
116 new CopyOnWriteArrayList<WALActionsListener>();
117 private final long optionalFlushInterval;
118 private final long blocksize;
119 private final String prefix;
120 private final AtomicLong unflushedEntries = new AtomicLong(0);
121 private volatile long syncedTillHere = 0;
122 private long lastDeferredTxid;
123 private final Path oldLogDir;
124 private volatile boolean logRollRunning;
125
126 private WALCoprocessorHost coprocessorHost;
127
128 private FSDataOutputStream hdfs_out;
129
130
131 private int minTolerableReplication;
132 private Method getNumCurrentReplicas;
133 final static Object [] NO_ARGS = new Object []{};
134
135
136 private DrainBarrier closeBarrier = new DrainBarrier();
137
138
139
140
141 Writer writer;
142
143
144
145
146 final SortedMap<Long, Path> outputfiles =
147 Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
148
149
150
151
152
153
154
155 private final Object oldestSeqNumsLock = new Object();
156
157
158
159
160
161 private final Object rollWriterLock = new Object();
162
163
164
165
166 private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
167 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
168
169
170
171
172
173 private final Map<byte[], Long> oldestFlushingSeqNums =
174 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
175
176 private volatile boolean closed = false;
177
178 private final AtomicLong logSeqNum = new AtomicLong(0);
179
180 private boolean forMeta = false;
181
182
183 private volatile long filenum = -1;
184
185
186 private final AtomicInteger numEntries = new AtomicInteger(0);
187
188
189
190
191
192 private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
193 private final int lowReplicationRollLimit;
194
195
196
197
198 private volatile boolean lowReplicationRollEnabled = true;
199
200
201
202 private final long logrollsize;
203
204
205
206
207 private final Object updateLock = new Object();
208 private final Object flushLock = new Object();
209
210 private final boolean enabled;
211
212
213
214
215
216
217 private final int maxLogs;
218
219
220
221
222 private final LogSyncer logSyncer;
223
224
225 private final int closeErrorsTolerated;
226
227 private final AtomicInteger closeErrorCount = new AtomicInteger();
228 private final MetricsWAL metrics;
229
230
231
232
233
234
235
236
237
238
239 public FSHLog(final FileSystem fs, final Path root, final String logDir,
240 final Configuration conf)
241 throws IOException {
242 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
243 conf, null, true, null, false);
244 }
245
246
247
248
249
250
251
252
253
254
255
256 public FSHLog(final FileSystem fs, final Path root, final String logDir,
257 final String oldLogDir, final Configuration conf)
258 throws IOException {
259 this(fs, root, logDir, oldLogDir,
260 conf, null, true, null, false);
261 }
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 public FSHLog(final FileSystem fs, final Path root, final String logDir,
283 final Configuration conf, final List<WALActionsListener> listeners,
284 final String prefix) throws IOException {
285 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
286 conf, listeners, true, prefix, false);
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311 public FSHLog(final FileSystem fs, final Path root, final String logDir,
312 final String oldLogDir, final Configuration conf,
313 final List<WALActionsListener> listeners,
314 final boolean failIfLogDirExists, final String prefix, boolean forMeta)
315 throws IOException {
316 super();
317 this.fs = fs;
318 this.rootDir = root;
319 this.dir = new Path(this.rootDir, logDir);
320 this.oldLogDir = new Path(this.rootDir, oldLogDir);
321 this.forMeta = forMeta;
322 this.conf = conf;
323
324 if (listeners != null) {
325 for (WALActionsListener i: listeners) {
326 registerWALActionsListener(i);
327 }
328 }
329
330 this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
331 FSUtils.getDefaultBlockSize(this.fs, this.dir));
332
333 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
334 this.logrollsize = (long)(this.blocksize * multi);
335 this.optionalFlushInterval =
336 conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
337
338 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
339 this.minTolerableReplication = conf.getInt(
340 "hbase.regionserver.hlog.tolerable.lowreplication",
341 FSUtils.getDefaultReplication(fs, this.dir));
342 this.lowReplicationRollLimit = conf.getInt(
343 "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
344 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
345 this.closeErrorsTolerated = conf.getInt(
346 "hbase.regionserver.logroll.errors.tolerated", 0);
347
348 this.logSyncer = new LogSyncer(this.optionalFlushInterval);
349
350 LOG.info("HLog configuration: blocksize=" +
351 StringUtils.byteDesc(this.blocksize) +
352 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
353 ", enabled=" + this.enabled +
354 ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
355
356 this.prefix = prefix == null || prefix.isEmpty() ?
357 "hlog" : URLEncoder.encode(prefix, "UTF8");
358
359 boolean dirExists = false;
360 if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
361 throw new IOException("Target HLog directory already exists: " + dir);
362 }
363 if (!dirExists && !fs.mkdirs(dir)) {
364 throw new IOException("Unable to mkdir " + dir);
365 }
366
367 if (!fs.exists(this.oldLogDir)) {
368 if (!fs.mkdirs(this.oldLogDir)) {
369 throw new IOException("Unable to mkdir " + this.oldLogDir);
370 }
371 }
372
373 rollWriter();
374
375
376 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
377
378
379 if (this.optionalFlushInterval > 0) {
380 Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
381 + ".logSyncer");
382 } else {
383 LOG.info("hbase.regionserver.optionallogflushinterval is set as "
384 + this.optionalFlushInterval + ". Deferred log syncing won't work. "
385 + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
386 }
387 coprocessorHost = new WALCoprocessorHost(this, conf);
388
389 this.metrics = new MetricsWAL();
390 }
391
392
393
394
395
396 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
397 Method m = null;
398 if (os != null) {
399 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
400 .getClass();
401 try {
402 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
403 new Class<?>[] {});
404 m.setAccessible(true);
405 } catch (NoSuchMethodException e) {
406 LOG.info("FileSystem's output stream doesn't support"
407 + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
408 + wrappedStreamClass.getName());
409 } catch (SecurityException e) {
410 LOG.info("Doesn't have access to getNumCurrentReplicas on "
411 + "FileSystems's output stream --HDFS-826 not available; fsOut="
412 + wrappedStreamClass.getName(), e);
413 m = null;
414 }
415 }
416 if (m != null) {
417 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
418 }
419 return m;
420 }
421
422 @Override
423 public void registerWALActionsListener(final WALActionsListener listener) {
424 this.listeners.add(listener);
425 }
426
427 @Override
428 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
429 return this.listeners.remove(listener);
430 }
431
432 @Override
433 public long getFilenum() {
434 return this.filenum;
435 }
436
437 @Override
438 public void setSequenceNumber(final long newvalue) {
439 for (long id = this.logSeqNum.get(); id < newvalue &&
440 !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
441
442
443 LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
444 }
445 }
446
447 @Override
448 public long getSequenceNumber() {
449 return logSeqNum.get();
450 }
451
452
453
454
455
456
457
458
459
460 OutputStream getOutputStream() {
461 return this.hdfs_out.getWrappedStream();
462 }
463
464 @Override
465 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
466 return rollWriter(false);
467 }
468
469 @Override
470 public byte [][] rollWriter(boolean force)
471 throws FailedLogCloseException, IOException {
472 synchronized (rollWriterLock) {
473
474 if (!force && this.writer != null && this.numEntries.get() <= 0) {
475 return null;
476 }
477 byte [][] regionsToFlush = null;
478 if (closed) {
479 LOG.debug("HLog closed. Skipping rolling of writer");
480 return null;
481 }
482 try {
483 this.logRollRunning = true;
484 if (!closeBarrier.beginOp()) {
485 LOG.debug("HLog closing. Skipping rolling of writer");
486 return regionsToFlush;
487 }
488
489
490 long currentFilenum = this.filenum;
491 Path oldPath = null;
492 if (currentFilenum > 0) {
493
494 oldPath = computeFilename(currentFilenum);
495 }
496 this.filenum = System.currentTimeMillis();
497 Path newPath = computeFilename();
498
499
500 if (!this.listeners.isEmpty()) {
501 for (WALActionsListener i : this.listeners) {
502 i.preLogRoll(oldPath, newPath);
503 }
504 }
505 FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
506
507 FSDataOutputStream nextHdfsOut = null;
508 if (nextWriter instanceof ProtobufLogWriter) {
509 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
510 }
511
512 Path oldFile = null;
513 int oldNumEntries = 0;
514 synchronized (updateLock) {
515
516 oldNumEntries = this.numEntries.get();
517 oldFile = cleanupCurrentWriter(currentFilenum);
518 this.writer = nextWriter;
519 this.hdfs_out = nextHdfsOut;
520 this.numEntries.set(0);
521 }
522 LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
523 + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
524 : "" ) + "; new path=" + FSUtils.getPath(newPath));
525
526
527 if (!this.listeners.isEmpty()) {
528 for (WALActionsListener i : this.listeners) {
529 i.postLogRoll(oldPath, newPath);
530 }
531 }
532
533
534 if (getNumLogFiles() > 0) {
535 cleanOldLogs();
536 regionsToFlush = getRegionsToForceFlush();
537 }
538 } finally {
539 this.logRollRunning = false;
540 closeBarrier.endOp();
541 }
542 return regionsToFlush;
543 }
544 }
545
546
547
548
549
550
551
552
553
554
555
556 protected Writer createWriterInstance(final FileSystem fs, final Path path,
557 final Configuration conf) throws IOException {
558 if (forMeta) {
559
560 }
561 return HLogFactory.createWriter(fs, path, conf);
562 }
563
564
565
566
567
568
569
570
571 private void cleanOldLogs() throws IOException {
572 long oldestOutstandingSeqNum = Long.MAX_VALUE;
573 synchronized (oldestSeqNumsLock) {
574 Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
575 ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
576 Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
577 ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
578 oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
579 }
580
581
582
583 TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
584 oldestOutstandingSeqNum).keySet());
585
586 if (LOG.isDebugEnabled()) {
587 if (sequenceNumbers.size() > 0) {
588 LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
589 " out of total " + this.outputfiles.size() + ";" +
590 " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
591 }
592 }
593 for (Long seq : sequenceNumbers) {
594 archiveLogFile(this.outputfiles.remove(seq), seq);
595 }
596 }
597
598
599
600
601
602
603
604
605 static byte[][] findMemstoresWithEditsEqualOrOlderThan(
606 final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
607 List<byte[]> regions = null;
608 for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
609 if (e.getValue().longValue() <= walSeqNum) {
610 if (regions == null) regions = new ArrayList<byte[]>();
611 regions.add(e.getKey());
612 }
613 }
614 return regions == null ? null : regions
615 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
616 }
617
618 private byte[][] getRegionsToForceFlush() throws IOException {
619
620
621 byte [][] regions = null;
622 int logCount = getNumLogFiles();
623 if (logCount > this.maxLogs && logCount > 0) {
624
625 synchronized (oldestSeqNumsLock) {
626 regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
627 this.oldestUnflushedSeqNums);
628 }
629 if (regions != null) {
630 StringBuilder sb = new StringBuilder();
631 for (int i = 0; i < regions.length; i++) {
632 if (i > 0) sb.append(", ");
633 sb.append(Bytes.toStringBinary(regions[i]));
634 }
635 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
636 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
637 sb.toString());
638 }
639 }
640 return regions;
641 }
642
643
644
645
646
647
648
649 Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
650 Path oldFile = null;
651 if (this.writer != null) {
652
653 try {
654
655
656 if (this.unflushedEntries.get() != this.syncedTillHere) {
657 LOG.debug("cleanupCurrentWriter " +
658 " waiting for transactions to get synced " +
659 " total " + this.unflushedEntries.get() +
660 " synced till here " + syncedTillHere);
661 sync();
662 }
663 this.writer.close();
664 this.writer = null;
665 closeErrorCount.set(0);
666 } catch (IOException e) {
667 LOG.error("Failed close of HLog writer", e);
668 int errors = closeErrorCount.incrementAndGet();
669 if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
670 LOG.warn("Riding over HLog close failure! error count="+errors);
671 } else {
672 if (hasDeferredEntries()) {
673 LOG.error("Aborting due to unflushed edits in HLog");
674 }
675
676
677
678 FailedLogCloseException flce =
679 new FailedLogCloseException("#" + currentfilenum);
680 flce.initCause(e);
681 throw flce;
682 }
683 }
684 if (currentfilenum >= 0) {
685 oldFile = computeFilename(currentfilenum);
686 this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
687 }
688 }
689 return oldFile;
690 }
691
692 private void archiveLogFile(final Path p, final Long seqno) throws IOException {
693 Path newPath = getHLogArchivePath(this.oldLogDir, p);
694 LOG.info("moving old hlog file " + FSUtils.getPath(p) +
695 " whose highest sequenceid is " + seqno + " to " +
696 FSUtils.getPath(newPath));
697
698
699 if (!this.listeners.isEmpty()) {
700 for (WALActionsListener i : this.listeners) {
701 i.preLogArchive(p, newPath);
702 }
703 }
704 if (!this.fs.rename(p, newPath)) {
705 throw new IOException("Unable to rename " + p + " to " + newPath);
706 }
707
708 if (!this.listeners.isEmpty()) {
709 for (WALActionsListener i : this.listeners) {
710 i.postLogArchive(p, newPath);
711 }
712 }
713 }
714
715
716
717
718
719
720 protected Path computeFilename() {
721 return computeFilename(this.filenum);
722 }
723
724
725
726
727
728
729
730 protected Path computeFilename(long filenum) {
731 if (filenum < 0) {
732 throw new RuntimeException("hlog file number can't be < 0");
733 }
734 String child = prefix + "." + filenum;
735 if (forMeta) {
736 child += HLog.META_HLOG_FILE_EXTN;
737 }
738 return new Path(dir, child);
739 }
740
741 @Override
742 public void closeAndDelete() throws IOException {
743 close();
744 if (!fs.exists(this.dir)) return;
745 FileStatus[] files = fs.listStatus(this.dir);
746 if (files != null) {
747 for(FileStatus file : files) {
748
749 Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
750
751 if (!this.listeners.isEmpty()) {
752 for (WALActionsListener i : this.listeners) {
753 i.preLogArchive(file.getPath(), p);
754 }
755 }
756
757 if (!fs.rename(file.getPath(),p)) {
758 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
759 }
760
761 if (!this.listeners.isEmpty()) {
762 for (WALActionsListener i : this.listeners) {
763 i.postLogArchive(file.getPath(), p);
764 }
765 }
766 }
767 LOG.debug("Moved " + files.length + " log files to " +
768 FSUtils.getPath(this.oldLogDir));
769 }
770 if (!fs.delete(dir, true)) {
771 LOG.info("Unable to delete " + dir);
772 }
773 }
774
775 @Override
776 public void close() throws IOException {
777 if (this.closed) {
778 return;
779 }
780
781 if (this.optionalFlushInterval > 0) {
782 try {
783 logSyncer.close();
784
785 logSyncer.join(this.optionalFlushInterval * 2);
786 } catch (InterruptedException e) {
787 LOG.error("Exception while waiting for syncer thread to die", e);
788 Thread.currentThread().interrupt();
789 }
790 }
791 try {
792
793 closeBarrier.stopAndDrainOps();
794 } catch (InterruptedException e) {
795 LOG.error("Exception while waiting for cache flushes and log rolls", e);
796 Thread.currentThread().interrupt();
797 }
798
799
800 if (!this.listeners.isEmpty()) {
801 for (WALActionsListener i : this.listeners) {
802 i.logCloseRequested();
803 }
804 }
805 synchronized (updateLock) {
806 this.closed = true;
807 if (LOG.isDebugEnabled()) {
808 LOG.debug("closing hlog writer in " + this.dir.toString());
809 }
810 if (this.writer != null) {
811 this.writer.close();
812 this.writer = null;
813 }
814 }
815 }
816
817
818
819
820
821
822
823
824
825 protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum,
826 long now, UUID clusterId) {
827 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
828 }
829
830 @Override
831 public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
832 final long now, HTableDescriptor htd)
833 throws IOException {
834 append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
835 }
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863 private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
864 final long now, HTableDescriptor htd, boolean doSync)
865 throws IOException {
866 if (edits.isEmpty()) return this.unflushedEntries.get();;
867 if (this.closed) {
868 throw new IOException("Cannot append; log is closed");
869 }
870 long txid = 0;
871 synchronized (this.updateLock) {
872 long seqNum = obtainSeqNum();
873
874
875
876
877
878
879
880 byte [] encodedRegionName = info.getEncodedNameAsBytes();
881 this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
882 HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
883 doWrite(info, logKey, edits, htd);
884 this.numEntries.incrementAndGet();
885 txid = this.unflushedEntries.incrementAndGet();
886 if (htd.isDeferredLogFlush()) {
887 lastDeferredTxid = txid;
888 }
889 }
890
891
892 if (doSync &&
893 (info.isMetaRegion() ||
894 !htd.isDeferredLogFlush())) {
895
896 this.sync(txid);
897 }
898 return txid;
899 }
900
901 @Override
902 public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
903 UUID clusterId, final long now, HTableDescriptor htd)
904 throws IOException {
905 return append(info, tableName, edits, clusterId, now, htd, false);
906 }
907
908 @Override
909 public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
910 UUID clusterId, final long now, HTableDescriptor htd)
911 throws IOException {
912 return append(info, tableName, edits, clusterId, now, htd, true);
913 }
914
915
916
917
918
919
920
921
922
923 class LogSyncer extends HasThread {
924
925 private final long optionalFlushInterval;
926
927 private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
928
929
930
931
932
933
934
935 private List<Entry> pendingWrites = new LinkedList<Entry>();
936
937 LogSyncer(long optionalFlushInterval) {
938 this.optionalFlushInterval = optionalFlushInterval;
939 }
940
941 @Override
942 public void run() {
943 try {
944
945
946 while(!this.isInterrupted() && !closeLogSyncer.get()) {
947
948 try {
949 if (unflushedEntries.get() <= syncedTillHere) {
950 synchronized (closeLogSyncer) {
951 closeLogSyncer.wait(this.optionalFlushInterval);
952 }
953 }
954
955
956
957 sync();
958 } catch (IOException e) {
959 LOG.error("Error while syncing, requesting close of hlog ", e);
960 requestLogRoll();
961 Threads.sleep(this.optionalFlushInterval);
962 }
963 }
964 } catch (InterruptedException e) {
965 LOG.debug(getName() + " interrupted while waiting for sync requests");
966 } finally {
967 LOG.info(getName() + " exiting");
968 }
969 }
970
971
972
973
974 synchronized void append(Entry e) throws IOException {
975 pendingWrites.add(e);
976 }
977
978
979
980 synchronized List<Entry> getPendingWrites() {
981 List<Entry> save = this.pendingWrites;
982 this.pendingWrites = new LinkedList<Entry>();
983 return save;
984 }
985
986
987 void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
988 if (pending == null) return;
989
990
991 for (Entry e : pending) {
992 writer.append(e);
993 }
994 }
995
996 void close() {
997 synchronized (closeLogSyncer) {
998 closeLogSyncer.set(true);
999 closeLogSyncer.notifyAll();
1000 }
1001 }
1002 }
1003
1004
1005 private void syncer() throws IOException {
1006 syncer(this.unflushedEntries.get());
1007 }
1008
1009
1010 private void syncer(long txid) throws IOException {
1011
1012
1013 if (txid <= this.syncedTillHere) {
1014 return;
1015 }
1016 Writer tempWriter;
1017 synchronized (this.updateLock) {
1018 if (this.closed) return;
1019
1020
1021
1022
1023 tempWriter = this.writer;
1024 }
1025 try {
1026 long doneUpto;
1027 long now = EnvironmentEdgeManager.currentTimeMillis();
1028
1029
1030
1031
1032 IOException ioe = null;
1033 List<Entry> pending = null;
1034 synchronized (flushLock) {
1035 if (txid <= this.syncedTillHere) {
1036 return;
1037 }
1038 doneUpto = this.unflushedEntries.get();
1039 pending = logSyncer.getPendingWrites();
1040 try {
1041 logSyncer.hlogFlush(tempWriter, pending);
1042 } catch(IOException io) {
1043 ioe = io;
1044 LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
1045 }
1046 }
1047 if (ioe != null && pending != null) {
1048 synchronized (this.updateLock) {
1049 synchronized (flushLock) {
1050
1051 tempWriter = this.writer;
1052 logSyncer.hlogFlush(tempWriter, pending);
1053 }
1054 }
1055 }
1056
1057 if (txid <= this.syncedTillHere) {
1058 return;
1059 }
1060 try {
1061 if (tempWriter != null) tempWriter.sync();
1062 } catch(IOException ex) {
1063 synchronized (this.updateLock) {
1064
1065
1066
1067 tempWriter = this.writer;
1068 if (tempWriter != null) tempWriter.sync();
1069 }
1070 }
1071 this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
1072
1073 this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
1074
1075
1076
1077 if (!this.logRollRunning) {
1078 checkLowReplication();
1079 try {
1080 if (tempWriter.getLength() > this.logrollsize) {
1081 requestLogRoll();
1082 }
1083 } catch (IOException x) {
1084 LOG.debug("Log roll failed and will be retried. (This is not an error)");
1085 }
1086 }
1087 } catch (IOException e) {
1088 LOG.fatal("Could not sync. Requesting roll of hlog", e);
1089 requestLogRoll();
1090 throw e;
1091 }
1092 }
1093
1094 private void checkLowReplication() {
1095
1096
1097 try {
1098 int numCurrentReplicas = getLogReplication();
1099 if (numCurrentReplicas != 0
1100 && numCurrentReplicas < this.minTolerableReplication) {
1101 if (this.lowReplicationRollEnabled) {
1102 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1103 LOG.warn("HDFS pipeline error detected. " + "Found "
1104 + numCurrentReplicas + " replicas but expecting no less than "
1105 + this.minTolerableReplication + " replicas. "
1106 + " Requesting close of hlog.");
1107 requestLogRoll();
1108
1109
1110
1111 this.consecutiveLogRolls.getAndIncrement();
1112 } else {
1113 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1114 + "the total number of live datanodes is lower than the tolerable replicas.");
1115 this.consecutiveLogRolls.set(0);
1116 this.lowReplicationRollEnabled = false;
1117 }
1118 }
1119 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1120
1121 if (!this.lowReplicationRollEnabled) {
1122
1123
1124
1125 if (this.numEntries.get() <= 1) {
1126 return;
1127 }
1128
1129
1130 this.lowReplicationRollEnabled = true;
1131 LOG.info("LowReplication-Roller was enabled.");
1132 }
1133 }
1134 } catch (Exception e) {
1135 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1136 " still proceeding ahead...");
1137 }
1138 }
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152 int getLogReplication()
1153 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1154 if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1155 Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1156 if (repl instanceof Integer) {
1157 return ((Integer)repl).intValue();
1158 }
1159 }
1160 return 0;
1161 }
1162
1163 boolean canGetCurReplicas() {
1164 return this.getNumCurrentReplicas != null;
1165 }
1166
1167 @Override
1168 public void hsync() throws IOException {
1169 syncer();
1170 }
1171
1172 @Override
1173 public void hflush() throws IOException {
1174 syncer();
1175 }
1176
1177 @Override
1178 public void sync() throws IOException {
1179 syncer();
1180 }
1181
1182 @Override
1183 public void sync(long txid) throws IOException {
1184 syncer(txid);
1185 }
1186
1187 private void requestLogRoll() {
1188 if (!this.listeners.isEmpty()) {
1189 for (WALActionsListener i: this.listeners) {
1190 i.logRollRequested();
1191 }
1192 }
1193 }
1194
1195
1196 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1197 HTableDescriptor htd)
1198 throws IOException {
1199 if (!this.enabled) {
1200 return;
1201 }
1202 if (!this.listeners.isEmpty()) {
1203 for (WALActionsListener i: this.listeners) {
1204 i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1205 }
1206 }
1207 try {
1208 long now = EnvironmentEdgeManager.currentTimeMillis();
1209
1210 if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1211 if (logEdit.isReplay()) {
1212
1213 logKey.setScopes(null);
1214 }
1215
1216 logSyncer.append(new FSHLog.Entry(logKey, logEdit));
1217 }
1218 long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1219 coprocessorHost.postWALWrite(info, logKey, logEdit);
1220 long len = 0;
1221 for (KeyValue kv : logEdit.getKeyValues()) {
1222 len += kv.getLength();
1223 }
1224 this.metrics.finishAppend(took, len);
1225 } catch (IOException e) {
1226 LOG.fatal("Could not append. Requesting close of hlog", e);
1227 requestLogRoll();
1228 throw e;
1229 }
1230 }
1231
1232
1233
1234 int getNumEntries() {
1235 return numEntries.get();
1236 }
1237
1238 @Override
1239 public long obtainSeqNum() {
1240 return this.logSeqNum.incrementAndGet();
1241 }
1242
1243
1244 int getNumLogFiles() {
1245 return outputfiles.size();
1246 }
1247
1248 @Override
1249 public Long startCacheFlush(final byte[] encodedRegionName) {
1250 Long oldRegionSeqNum = null;
1251 if (!closeBarrier.beginOp()) {
1252 return null;
1253 }
1254 synchronized (oldestSeqNumsLock) {
1255 oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1256 if (oldRegionSeqNum != null) {
1257 Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1258 assert oldValue == null : "Flushing map not cleaned up for "
1259 + Bytes.toString(encodedRegionName);
1260 }
1261 }
1262 if (oldRegionSeqNum == null) {
1263
1264
1265
1266
1267
1268 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1269 + Bytes.toString(encodedRegionName) + "]");
1270 }
1271 return obtainSeqNum();
1272 }
1273
1274 @Override
1275 public void completeCacheFlush(final byte [] encodedRegionName)
1276 {
1277 synchronized (oldestSeqNumsLock) {
1278 this.oldestFlushingSeqNums.remove(encodedRegionName);
1279 }
1280 closeBarrier.endOp();
1281 }
1282
1283 @Override
1284 public void abortCacheFlush(byte[] encodedRegionName) {
1285 Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1286 synchronized (oldestSeqNumsLock) {
1287 seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1288 if (seqNumBeforeFlushStarts != null) {
1289 currentSeqNum =
1290 this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1291 }
1292 }
1293 closeBarrier.endOp();
1294 if ((currentSeqNum != null)
1295 && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1296 String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1297 "acquired edits out of order current memstore seq=" + currentSeqNum
1298 + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1299 LOG.error(errorStr);
1300 assert false : errorStr;
1301 Runtime.getRuntime().halt(1);
1302 }
1303 }
1304
1305 @Override
1306 public boolean isLowReplicationRollEnabled() {
1307 return lowReplicationRollEnabled;
1308 }
1309
1310
1311
1312
1313
1314
1315 protected Path getDir() {
1316 return dir;
1317 }
1318
1319 static Path getHLogArchivePath(Path oldLogDir, Path p) {
1320 return new Path(oldLogDir, p.getName());
1321 }
1322
1323 static String formatRecoveredEditsFileName(final long seqid) {
1324 return String.format("%019d", seqid);
1325 }
1326
1327 public static final long FIXED_OVERHEAD = ClassSize.align(
1328 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1329 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1330
1331 private static void usage() {
1332 System.err.println("Usage: HLog <ARGS>");
1333 System.err.println("Arguments:");
1334 System.err.println(" --dump Dump textual representation of passed one or more files");
1335 System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1336 System.err.println(" --split Split the passed directory of WAL logs");
1337 System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1338 }
1339
1340 private static void split(final Configuration conf, final Path p)
1341 throws IOException {
1342 FileSystem fs = FileSystem.get(conf);
1343 if (!fs.exists(p)) {
1344 throw new FileNotFoundException(p.toString());
1345 }
1346 final Path baseDir = FSUtils.getRootDir(conf);
1347 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1348 if (!fs.getFileStatus(p).isDir()) {
1349 throw new IOException(p + " is not a directory");
1350 }
1351
1352 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1353 conf, baseDir, p, oldLogDir, fs);
1354 logSplitter.splitLog();
1355 }
1356
1357 @Override
1358 public WALCoprocessorHost getCoprocessorHost() {
1359 return coprocessorHost;
1360 }
1361
1362
1363 boolean hasDeferredEntries() {
1364 return lastDeferredTxid > syncedTillHere;
1365 }
1366
1367 @Override
1368 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1369 Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1370 return result == null ? HConstants.NO_SEQNUM : result.longValue();
1371 }
1372
1373
1374
1375
1376
1377
1378
1379
1380 public static void main(String[] args) throws IOException {
1381 if (args.length < 2) {
1382 usage();
1383 System.exit(-1);
1384 }
1385
1386 if (args[0].compareTo("--dump") == 0) {
1387 HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1388 } else if (args[0].compareTo("--split") == 0) {
1389 Configuration conf = HBaseConfiguration.create();
1390 for (int i = 1; i < args.length; i++) {
1391 try {
1392 Path logPath = new Path(args[i]);
1393 FSUtils.setFsDefault(conf, logPath);
1394 split(conf, logPath);
1395 } catch (Throwable t) {
1396 t.printStackTrace(System.err);
1397 System.exit(-1);
1398 }
1399 }
1400 } else {
1401 usage();
1402 System.exit(-1);
1403 }
1404 }
1405 }