1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.URLEncoder;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.SortedMap;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.ConcurrentSkipListMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileStatus;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.Syncable;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HTableDescriptor;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.ClassSize;
60 import org.apache.hadoop.hbase.util.DrainBarrier;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.HasThread;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.apache.hadoop.util.StringUtils;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @InterfaceAudience.Private
107 class FSHLog implements HLog, Syncable {
108 static final Log LOG = LogFactory.getLog(FSHLog.class);
109
110 private final FileSystem fs;
111 private final Path rootDir;
112 private final Path dir;
113 private final Configuration conf;
114
115 private List<WALActionsListener> listeners =
116 new CopyOnWriteArrayList<WALActionsListener>();
117 private final long optionalFlushInterval;
118 private final long blocksize;
119 private final String prefix;
120 private final AtomicLong unflushedEntries = new AtomicLong(0);
121 private volatile long syncedTillHere = 0;
122 private long lastDeferredTxid;
123 private final Path oldLogDir;
124 private volatile boolean logRollRunning;
125
126 private WALCoprocessorHost coprocessorHost;
127
128 private FSDataOutputStream hdfs_out;
129
130
131 private int minTolerableReplication;
132 private Method getNumCurrentReplicas;
133 final static Object [] NO_ARGS = new Object []{};
134
135
136 private DrainBarrier closeBarrier = new DrainBarrier();
137
138
139
140
141 Writer writer;
142
143
144
145
146 final SortedMap<Long, Path> outputfiles =
147 Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
148
149
150
151
152
153
154
155 private final Object oldestSeqNumsLock = new Object();
156
157
158
159
160
161 private final Object rollWriterLock = new Object();
162
163
164
165
166 private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
167 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
168
169
170
171
172
173 private final Map<byte[], Long> oldestFlushingSeqNums =
174 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
175
176 private volatile boolean closed = false;
177
178 private final AtomicLong logSeqNum = new AtomicLong(0);
179
180 private boolean forMeta = false;
181
182
183 private volatile long filenum = -1;
184
185
186 private final AtomicInteger numEntries = new AtomicInteger(0);
187
188
189
190
191
192 private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
193 private final int lowReplicationRollLimit;
194
195
196
197
198 private volatile boolean lowReplicationRollEnabled = true;
199
200
201
202 private final long logrollsize;
203
204
205
206
207 private final Object updateLock = new Object();
208 private final Object flushLock = new Object();
209
210 private final boolean enabled;
211
212
213
214
215
216
217 private final int maxLogs;
218
219
220
221
222 private final LogSyncer logSyncer;
223
224
225 private final int closeErrorsTolerated;
226
227 private final AtomicInteger closeErrorCount = new AtomicInteger();
228 private final MetricsWAL metrics;
229
230
231
232
233
234
235
236
237
238
239 public FSHLog(final FileSystem fs, final Path root, final String logDir,
240 final Configuration conf)
241 throws IOException {
242 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
243 conf, null, true, null, false);
244 }
245
246
247
248
249
250
251
252
253
254
255
256 public FSHLog(final FileSystem fs, final Path root, final String logDir,
257 final String oldLogDir, final Configuration conf)
258 throws IOException {
259 this(fs, root, logDir, oldLogDir,
260 conf, null, true, null, false);
261 }
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 public FSHLog(final FileSystem fs, final Path root, final String logDir,
283 final Configuration conf, final List<WALActionsListener> listeners,
284 final String prefix) throws IOException {
285 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
286 conf, listeners, true, prefix, false);
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311 public FSHLog(final FileSystem fs, final Path root, final String logDir,
312 final String oldLogDir, final Configuration conf,
313 final List<WALActionsListener> listeners,
314 final boolean failIfLogDirExists, final String prefix, boolean forMeta)
315 throws IOException {
316 super();
317 this.fs = fs;
318 this.rootDir = root;
319 this.dir = new Path(this.rootDir, logDir);
320 this.oldLogDir = new Path(this.rootDir, oldLogDir);
321 this.forMeta = forMeta;
322 this.conf = conf;
323
324 if (listeners != null) {
325 for (WALActionsListener i: listeners) {
326 registerWALActionsListener(i);
327 }
328 }
329
330 this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
331 FSUtils.getDefaultBlockSize(this.fs, this.dir));
332
333 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
334 this.logrollsize = (long)(this.blocksize * multi);
335 this.optionalFlushInterval =
336 conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
337
338 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
339 this.minTolerableReplication = conf.getInt(
340 "hbase.regionserver.hlog.tolerable.lowreplication",
341 FSUtils.getDefaultReplication(fs, this.dir));
342 this.lowReplicationRollLimit = conf.getInt(
343 "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
344 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
345 this.closeErrorsTolerated = conf.getInt(
346 "hbase.regionserver.logroll.errors.tolerated", 0);
347
348 this.logSyncer = new LogSyncer(this.optionalFlushInterval);
349
350 LOG.info("WAL/HLog configuration: blocksize=" +
351 StringUtils.byteDesc(this.blocksize) +
352 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
353 ", enabled=" + this.enabled +
354 ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
355
356 this.prefix = prefix == null || prefix.isEmpty() ?
357 "hlog" : URLEncoder.encode(prefix, "UTF8");
358
359 boolean dirExists = false;
360 if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
361 throw new IOException("Target HLog directory already exists: " + dir);
362 }
363 if (!dirExists && !fs.mkdirs(dir)) {
364 throw new IOException("Unable to mkdir " + dir);
365 }
366
367 if (!fs.exists(this.oldLogDir)) {
368 if (!fs.mkdirs(this.oldLogDir)) {
369 throw new IOException("Unable to mkdir " + this.oldLogDir);
370 }
371 }
372
373 rollWriter();
374
375
376 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
377
378
379 if (this.optionalFlushInterval > 0) {
380 Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
381 + ".logSyncer");
382 } else {
383 LOG.info("hbase.regionserver.optionallogflushinterval is set as "
384 + this.optionalFlushInterval + ". Deferred log syncing won't work. "
385 + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
386 }
387 coprocessorHost = new WALCoprocessorHost(this, conf);
388
389 this.metrics = new MetricsWAL();
390 }
391
392
393
394
395
396 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
397 Method m = null;
398 if (os != null) {
399 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
400 .getClass();
401 try {
402 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
403 new Class<?>[] {});
404 m.setAccessible(true);
405 } catch (NoSuchMethodException e) {
406 LOG.info("FileSystem's output stream doesn't support"
407 + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
408 + wrappedStreamClass.getName());
409 } catch (SecurityException e) {
410 LOG.info("Doesn't have access to getNumCurrentReplicas on "
411 + "FileSystems's output stream --HDFS-826 not available; fsOut="
412 + wrappedStreamClass.getName(), e);
413 m = null;
414 }
415 }
416 if (m != null) {
417 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
418 }
419 return m;
420 }
421
422 @Override
423 public void registerWALActionsListener(final WALActionsListener listener) {
424 this.listeners.add(listener);
425 }
426
427 @Override
428 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
429 return this.listeners.remove(listener);
430 }
431
432 @Override
433 public long getFilenum() {
434 return this.filenum;
435 }
436
437 @Override
438 public void setSequenceNumber(final long newvalue) {
439 for (long id = this.logSeqNum.get(); id < newvalue &&
440 !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
441
442
443 LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
444 }
445 }
446
447 @Override
448 public long getSequenceNumber() {
449 return logSeqNum.get();
450 }
451
452
453
454
455
456
457
458
459
460 OutputStream getOutputStream() {
461 return this.hdfs_out.getWrappedStream();
462 }
463
464 @Override
465 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
466 return rollWriter(false);
467 }
468
469 @Override
470 public byte [][] rollWriter(boolean force)
471 throws FailedLogCloseException, IOException {
472 synchronized (rollWriterLock) {
473
474 if (!force && this.writer != null && this.numEntries.get() <= 0) {
475 return null;
476 }
477 byte [][] regionsToFlush = null;
478 if (closed) {
479 LOG.debug("HLog closed. Skipping rolling of writer");
480 return null;
481 }
482 try {
483 this.logRollRunning = true;
484 if (!closeBarrier.beginOp()) {
485 LOG.debug("HLog closing. Skipping rolling of writer");
486 return regionsToFlush;
487 }
488
489
490 long currentFilenum = this.filenum;
491 Path oldPath = null;
492 if (currentFilenum > 0) {
493
494 oldPath = computeFilename(currentFilenum);
495 }
496 this.filenum = System.currentTimeMillis();
497 Path newPath = computeFilename();
498
499
500 if (!this.listeners.isEmpty()) {
501 for (WALActionsListener i : this.listeners) {
502 i.preLogRoll(oldPath, newPath);
503 }
504 }
505 FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
506
507 FSDataOutputStream nextHdfsOut = null;
508 if (nextWriter instanceof ProtobufLogWriter) {
509 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
510 }
511
512 Path oldFile = null;
513 int oldNumEntries = 0;
514 synchronized (updateLock) {
515
516 oldNumEntries = this.numEntries.get();
517 oldFile = cleanupCurrentWriter(currentFilenum);
518 this.writer = nextWriter;
519 this.hdfs_out = nextHdfsOut;
520 this.numEntries.set(0);
521 }
522 if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
523 else LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries +
524 ", filesize=" + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) +
525 "; new WAL " + FSUtils.getPath(newPath));
526
527
528 if (!this.listeners.isEmpty()) {
529 for (WALActionsListener i : this.listeners) {
530 i.postLogRoll(oldPath, newPath);
531 }
532 }
533
534
535 if (getNumLogFiles() > 0) {
536 cleanOldLogs();
537 regionsToFlush = getRegionsToForceFlush();
538 }
539 } finally {
540 this.logRollRunning = false;
541 closeBarrier.endOp();
542 }
543 return regionsToFlush;
544 }
545 }
546
547
548
549
550
551
552
553
554
555
556
557 protected Writer createWriterInstance(final FileSystem fs, final Path path,
558 final Configuration conf) throws IOException {
559 if (forMeta) {
560
561 }
562 return HLogFactory.createWriter(fs, path, conf);
563 }
564
565
566
567
568
569
570
571
572 private void cleanOldLogs() throws IOException {
573 long oldestOutstandingSeqNum = Long.MAX_VALUE;
574 synchronized (oldestSeqNumsLock) {
575 Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
576 ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
577 Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
578 ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
579 oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
580 }
581
582
583
584 TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
585 oldestOutstandingSeqNum).keySet());
586
587 if (LOG.isDebugEnabled()) {
588 if (sequenceNumbers.size() > 0) {
589 LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
590 " out of total " + this.outputfiles.size() + ";" +
591 " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
592 }
593 }
594 for (Long seq : sequenceNumbers) {
595 archiveLogFile(this.outputfiles.remove(seq), seq);
596 }
597 }
598
599
600
601
602
603
604
605
606 static byte[][] findMemstoresWithEditsEqualOrOlderThan(
607 final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
608 List<byte[]> regions = null;
609 for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
610 if (e.getValue().longValue() <= walSeqNum) {
611 if (regions == null) regions = new ArrayList<byte[]>();
612 regions.add(e.getKey());
613 }
614 }
615 return regions == null ? null : regions
616 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
617 }
618
619 private byte[][] getRegionsToForceFlush() throws IOException {
620
621
622 byte [][] regions = null;
623 int logCount = getNumLogFiles();
624 if (logCount > this.maxLogs && logCount > 0) {
625
626 synchronized (oldestSeqNumsLock) {
627 regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
628 this.oldestUnflushedSeqNums);
629 }
630 if (regions != null) {
631 StringBuilder sb = new StringBuilder();
632 for (int i = 0; i < regions.length; i++) {
633 if (i > 0) sb.append(", ");
634 sb.append(Bytes.toStringBinary(regions[i]));
635 }
636 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
637 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
638 sb.toString());
639 }
640 }
641 return regions;
642 }
643
644
645
646
647
648
649
650 Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
651 Path oldFile = null;
652 if (this.writer != null) {
653
654 try {
655
656
657 if (this.unflushedEntries.get() != this.syncedTillHere) {
658 LOG.debug("cleanupCurrentWriter " +
659 " waiting for transactions to get synced " +
660 " total " + this.unflushedEntries.get() +
661 " synced till here " + syncedTillHere);
662 sync();
663 }
664 this.writer.close();
665 this.writer = null;
666 closeErrorCount.set(0);
667 } catch (IOException e) {
668 LOG.error("Failed close of HLog writer", e);
669 int errors = closeErrorCount.incrementAndGet();
670 if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
671 LOG.warn("Riding over HLog close failure! error count="+errors);
672 } else {
673 if (hasDeferredEntries()) {
674 LOG.error("Aborting due to unflushed edits in HLog");
675 }
676
677
678
679 FailedLogCloseException flce =
680 new FailedLogCloseException("#" + currentfilenum);
681 flce.initCause(e);
682 throw flce;
683 }
684 }
685 if (currentfilenum >= 0) {
686 oldFile = computeFilename(currentfilenum);
687 this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
688 }
689 }
690 return oldFile;
691 }
692
693 private void archiveLogFile(final Path p, final Long seqno) throws IOException {
694 Path newPath = getHLogArchivePath(this.oldLogDir, p);
695 LOG.info("moving old hlog file " + FSUtils.getPath(p) +
696 " whose highest sequenceid is " + seqno + " to " +
697 FSUtils.getPath(newPath));
698
699
700 if (!this.listeners.isEmpty()) {
701 for (WALActionsListener i : this.listeners) {
702 i.preLogArchive(p, newPath);
703 }
704 }
705 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
706 throw new IOException("Unable to rename " + p + " to " + newPath);
707 }
708
709 if (!this.listeners.isEmpty()) {
710 for (WALActionsListener i : this.listeners) {
711 i.postLogArchive(p, newPath);
712 }
713 }
714 }
715
716
717
718
719
720
721 protected Path computeFilename() {
722 return computeFilename(this.filenum);
723 }
724
725
726
727
728
729
730
731 protected Path computeFilename(long filenum) {
732 if (filenum < 0) {
733 throw new RuntimeException("hlog file number can't be < 0");
734 }
735 String child = prefix + "." + filenum;
736 if (forMeta) {
737 child += HLog.META_HLOG_FILE_EXTN;
738 }
739 return new Path(dir, child);
740 }
741
742 @Override
743 public void closeAndDelete() throws IOException {
744 close();
745 if (!fs.exists(this.dir)) return;
746 FileStatus[] files = fs.listStatus(this.dir);
747 if (files != null) {
748 for(FileStatus file : files) {
749
750 Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
751
752 if (!this.listeners.isEmpty()) {
753 for (WALActionsListener i : this.listeners) {
754 i.preLogArchive(file.getPath(), p);
755 }
756 }
757
758 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
759 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
760 }
761
762 if (!this.listeners.isEmpty()) {
763 for (WALActionsListener i : this.listeners) {
764 i.postLogArchive(file.getPath(), p);
765 }
766 }
767 }
768 LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir));
769 }
770 if (!fs.delete(dir, true)) {
771 LOG.info("Unable to delete " + dir);
772 }
773 }
774
775 @Override
776 public void close() throws IOException {
777 if (this.closed) {
778 return;
779 }
780
781 if (this.optionalFlushInterval > 0) {
782 try {
783 logSyncer.close();
784
785 logSyncer.join(this.optionalFlushInterval * 2);
786 } catch (InterruptedException e) {
787 LOG.error("Exception while waiting for syncer thread to die", e);
788 Thread.currentThread().interrupt();
789 }
790 }
791 try {
792
793 closeBarrier.stopAndDrainOps();
794 } catch (InterruptedException e) {
795 LOG.error("Exception while waiting for cache flushes and log rolls", e);
796 Thread.currentThread().interrupt();
797 }
798
799
800 if (!this.listeners.isEmpty()) {
801 for (WALActionsListener i : this.listeners) {
802 i.logCloseRequested();
803 }
804 }
805 synchronized (updateLock) {
806 this.closed = true;
807 if (LOG.isDebugEnabled()) {
808 LOG.debug("Closing WAL writer in " + this.dir.toString());
809 }
810 if (this.writer != null) {
811 this.writer.close();
812 this.writer = null;
813 }
814 }
815 }
816
817
818
819
820
821
822
823
824
825 protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum,
826 long now, UUID clusterId) {
827 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
828 }
829
830 @Override
831 public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
832 final long now, HTableDescriptor htd)
833 throws IOException {
834 append(info, tableName, edits, now, htd, true);
835 }
836
837 @Override
838 public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
839 final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
840 append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
841 }
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869 private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
870 final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
871 throws IOException {
872 if (edits.isEmpty()) return this.unflushedEntries.get();
873 if (this.closed) {
874 throw new IOException("Cannot append; log is closed");
875 }
876 long txid = 0;
877 synchronized (this.updateLock) {
878 long seqNum = obtainSeqNum();
879
880
881
882
883
884
885
886 byte [] encodedRegionName = info.getEncodedNameAsBytes();
887 if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
888 HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
889 doWrite(info, logKey, edits, htd);
890 this.numEntries.incrementAndGet();
891 txid = this.unflushedEntries.incrementAndGet();
892 if (htd.isDeferredLogFlush()) {
893 lastDeferredTxid = txid;
894 }
895 }
896
897
898 if (doSync &&
899 (info.isMetaRegion() ||
900 !htd.isDeferredLogFlush())) {
901
902 this.sync(txid);
903 }
904 return txid;
905 }
906
907 @Override
908 public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
909 UUID clusterId, final long now, HTableDescriptor htd)
910 throws IOException {
911 return append(info, tableName, edits, clusterId, now, htd, false, true);
912 }
913
914
915
916
917
918
919
920
921
922 class LogSyncer extends HasThread {
923
924 private final long optionalFlushInterval;
925
926 private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
927
928
929
930
931
932
933
934 private List<Entry> pendingWrites = new LinkedList<Entry>();
935
936 LogSyncer(long optionalFlushInterval) {
937 this.optionalFlushInterval = optionalFlushInterval;
938 }
939
940 @Override
941 public void run() {
942 try {
943
944
945 while(!this.isInterrupted() && !closeLogSyncer.get()) {
946
947 try {
948 if (unflushedEntries.get() <= syncedTillHere) {
949 synchronized (closeLogSyncer) {
950 closeLogSyncer.wait(this.optionalFlushInterval);
951 }
952 }
953
954
955
956 sync();
957 } catch (IOException e) {
958 LOG.error("Error while syncing, requesting close of hlog ", e);
959 requestLogRoll();
960 Threads.sleep(this.optionalFlushInterval);
961 }
962 }
963 } catch (InterruptedException e) {
964 LOG.debug(getName() + " interrupted while waiting for sync requests");
965 } finally {
966 LOG.info(getName() + " exiting");
967 }
968 }
969
970
971
972
973 synchronized void append(Entry e) throws IOException {
974 pendingWrites.add(e);
975 }
976
977
978
979 synchronized List<Entry> getPendingWrites() {
980 List<Entry> save = this.pendingWrites;
981 this.pendingWrites = new LinkedList<Entry>();
982 return save;
983 }
984
985
986 void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
987 if (pending == null) return;
988
989
990 for (Entry e : pending) {
991 writer.append(e);
992 }
993 }
994
995 void close() {
996 synchronized (closeLogSyncer) {
997 closeLogSyncer.set(true);
998 closeLogSyncer.notifyAll();
999 }
1000 }
1001 }
1002
1003
1004 private void syncer() throws IOException {
1005 syncer(this.unflushedEntries.get());
1006 }
1007
1008
1009 private void syncer(long txid) throws IOException {
1010
1011
1012 if (txid <= this.syncedTillHere) {
1013 return;
1014 }
1015 Writer tempWriter;
1016 synchronized (this.updateLock) {
1017 if (this.closed) return;
1018
1019
1020
1021
1022 tempWriter = this.writer;
1023 }
1024 try {
1025 long doneUpto;
1026 long now = EnvironmentEdgeManager.currentTimeMillis();
1027
1028
1029
1030
1031 IOException ioe = null;
1032 List<Entry> pending = null;
1033 synchronized (flushLock) {
1034 if (txid <= this.syncedTillHere) {
1035 return;
1036 }
1037 doneUpto = this.unflushedEntries.get();
1038 pending = logSyncer.getPendingWrites();
1039 try {
1040 logSyncer.hlogFlush(tempWriter, pending);
1041 } catch(IOException io) {
1042 ioe = io;
1043 LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
1044 }
1045 }
1046 if (ioe != null && pending != null) {
1047 synchronized (this.updateLock) {
1048 synchronized (flushLock) {
1049
1050 tempWriter = this.writer;
1051 logSyncer.hlogFlush(tempWriter, pending);
1052 }
1053 }
1054 }
1055
1056 if (txid <= this.syncedTillHere) {
1057 return;
1058 }
1059 try {
1060 if (tempWriter != null) tempWriter.sync();
1061 } catch(IOException ex) {
1062 synchronized (this.updateLock) {
1063
1064
1065
1066 tempWriter = this.writer;
1067 if (tempWriter != null) tempWriter.sync();
1068 }
1069 }
1070 this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
1071
1072 this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
1073
1074
1075
1076 if (!this.logRollRunning) {
1077 checkLowReplication();
1078 try {
1079 if (tempWriter.getLength() > this.logrollsize) {
1080 requestLogRoll();
1081 }
1082 } catch (IOException x) {
1083 LOG.debug("Log roll failed and will be retried. (This is not an error)");
1084 }
1085 }
1086 } catch (IOException e) {
1087 LOG.fatal("Could not sync. Requesting roll of hlog", e);
1088 requestLogRoll();
1089 throw e;
1090 }
1091 }
1092
1093 private void checkLowReplication() {
1094
1095
1096 try {
1097 int numCurrentReplicas = getLogReplication();
1098 if (numCurrentReplicas != 0
1099 && numCurrentReplicas < this.minTolerableReplication) {
1100 if (this.lowReplicationRollEnabled) {
1101 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1102 LOG.warn("HDFS pipeline error detected. " + "Found "
1103 + numCurrentReplicas + " replicas but expecting no less than "
1104 + this.minTolerableReplication + " replicas. "
1105 + " Requesting close of hlog.");
1106 requestLogRoll();
1107
1108
1109
1110 this.consecutiveLogRolls.getAndIncrement();
1111 } else {
1112 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1113 + "the total number of live datanodes is lower than the tolerable replicas.");
1114 this.consecutiveLogRolls.set(0);
1115 this.lowReplicationRollEnabled = false;
1116 }
1117 }
1118 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1119
1120 if (!this.lowReplicationRollEnabled) {
1121
1122
1123
1124 if (this.numEntries.get() <= 1) {
1125 return;
1126 }
1127
1128
1129 this.lowReplicationRollEnabled = true;
1130 LOG.info("LowReplication-Roller was enabled.");
1131 }
1132 }
1133 } catch (Exception e) {
1134 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1135 " still proceeding ahead...");
1136 }
1137 }
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151 int getLogReplication()
1152 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1153 if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1154 Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1155 if (repl instanceof Integer) {
1156 return ((Integer)repl).intValue();
1157 }
1158 }
1159 return 0;
1160 }
1161
1162 boolean canGetCurReplicas() {
1163 return this.getNumCurrentReplicas != null;
1164 }
1165
1166 @Override
1167 public void hsync() throws IOException {
1168 syncer();
1169 }
1170
1171 @Override
1172 public void hflush() throws IOException {
1173 syncer();
1174 }
1175
1176 @Override
1177 public void sync() throws IOException {
1178 syncer();
1179 }
1180
1181 @Override
1182 public void sync(long txid) throws IOException {
1183 syncer(txid);
1184 }
1185
1186 private void requestLogRoll() {
1187 if (!this.listeners.isEmpty()) {
1188 for (WALActionsListener i: this.listeners) {
1189 i.logRollRequested();
1190 }
1191 }
1192 }
1193
1194
1195 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1196 HTableDescriptor htd)
1197 throws IOException {
1198 if (!this.enabled) {
1199 return;
1200 }
1201 if (!this.listeners.isEmpty()) {
1202 for (WALActionsListener i: this.listeners) {
1203 i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1204 }
1205 }
1206 try {
1207 long now = EnvironmentEdgeManager.currentTimeMillis();
1208
1209 if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1210 if (logEdit.isReplay()) {
1211
1212 logKey.setScopes(null);
1213 }
1214
1215 logSyncer.append(new FSHLog.Entry(logKey, logEdit));
1216 }
1217 long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1218 coprocessorHost.postWALWrite(info, logKey, logEdit);
1219 long len = 0;
1220 for (KeyValue kv : logEdit.getKeyValues()) {
1221 len += kv.getLength();
1222 }
1223 this.metrics.finishAppend(took, len);
1224 } catch (IOException e) {
1225 LOG.fatal("Could not append. Requesting close of hlog", e);
1226 requestLogRoll();
1227 throw e;
1228 }
1229 }
1230
1231
1232
1233 int getNumEntries() {
1234 return numEntries.get();
1235 }
1236
1237 @Override
1238 public long obtainSeqNum() {
1239 return this.logSeqNum.incrementAndGet();
1240 }
1241
1242
1243 int getNumLogFiles() {
1244 return outputfiles.size();
1245 }
1246
1247 @Override
1248 public Long startCacheFlush(final byte[] encodedRegionName) {
1249 Long oldRegionSeqNum = null;
1250 if (!closeBarrier.beginOp()) {
1251 return null;
1252 }
1253 synchronized (oldestSeqNumsLock) {
1254 oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1255 if (oldRegionSeqNum != null) {
1256 Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1257 assert oldValue == null : "Flushing map not cleaned up for "
1258 + Bytes.toString(encodedRegionName);
1259 }
1260 }
1261 if (oldRegionSeqNum == null) {
1262
1263
1264
1265
1266
1267 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1268 + Bytes.toString(encodedRegionName) + "]");
1269 }
1270 return obtainSeqNum();
1271 }
1272
1273 @Override
1274 public void completeCacheFlush(final byte [] encodedRegionName)
1275 {
1276 synchronized (oldestSeqNumsLock) {
1277 this.oldestFlushingSeqNums.remove(encodedRegionName);
1278 }
1279 closeBarrier.endOp();
1280 }
1281
1282 @Override
1283 public void abortCacheFlush(byte[] encodedRegionName) {
1284 Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1285 synchronized (oldestSeqNumsLock) {
1286 seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1287 if (seqNumBeforeFlushStarts != null) {
1288 currentSeqNum =
1289 this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1290 }
1291 }
1292 closeBarrier.endOp();
1293 if ((currentSeqNum != null)
1294 && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1295 String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1296 "acquired edits out of order current memstore seq=" + currentSeqNum
1297 + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1298 LOG.error(errorStr);
1299 assert false : errorStr;
1300 Runtime.getRuntime().halt(1);
1301 }
1302 }
1303
1304 @Override
1305 public boolean isLowReplicationRollEnabled() {
1306 return lowReplicationRollEnabled;
1307 }
1308
1309
1310
1311
1312
1313
1314 protected Path getDir() {
1315 return dir;
1316 }
1317
1318 static Path getHLogArchivePath(Path oldLogDir, Path p) {
1319 return new Path(oldLogDir, p.getName());
1320 }
1321
1322 static String formatRecoveredEditsFileName(final long seqid) {
1323 return String.format("%019d", seqid);
1324 }
1325
1326 public static final long FIXED_OVERHEAD = ClassSize.align(
1327 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1328 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1329
1330 private static void usage() {
1331 System.err.println("Usage: HLog <ARGS>");
1332 System.err.println("Arguments:");
1333 System.err.println(" --dump Dump textual representation of passed one or more files");
1334 System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1335 System.err.println(" --split Split the passed directory of WAL logs");
1336 System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1337 }
1338
1339 private static void split(final Configuration conf, final Path p)
1340 throws IOException {
1341 FileSystem fs = FileSystem.get(conf);
1342 if (!fs.exists(p)) {
1343 throw new FileNotFoundException(p.toString());
1344 }
1345 final Path baseDir = FSUtils.getRootDir(conf);
1346 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1347 if (!fs.getFileStatus(p).isDir()) {
1348 throw new IOException(p + " is not a directory");
1349 }
1350
1351 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1352 conf, baseDir, p, oldLogDir, fs);
1353 logSplitter.splitLog();
1354 }
1355
1356 @Override
1357 public WALCoprocessorHost getCoprocessorHost() {
1358 return coprocessorHost;
1359 }
1360
1361
1362 boolean hasDeferredEntries() {
1363 return lastDeferredTxid > syncedTillHere;
1364 }
1365
1366 @Override
1367 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1368 Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1369 return result == null ? HConstants.NO_SEQNUM : result.longValue();
1370 }
1371
1372
1373
1374
1375
1376
1377
1378
1379 public static void main(String[] args) throws IOException {
1380 if (args.length < 2) {
1381 usage();
1382 System.exit(-1);
1383 }
1384
1385 if (args[0].compareTo("--dump") == 0) {
1386 HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1387 } else if (args[0].compareTo("--split") == 0) {
1388 Configuration conf = HBaseConfiguration.create();
1389 for (int i = 1; i < args.length; i++) {
1390 try {
1391 Path logPath = new Path(args[i]);
1392 FSUtils.setFsDefault(conf, logPath);
1393 split(conf, logPath);
1394 } catch (Throwable t) {
1395 t.printStackTrace(System.err);
1396 System.exit(-1);
1397 }
1398 }
1399 } else {
1400 usage();
1401 System.exit(-1);
1402 }
1403 }
1404 }