1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.InvocationTargetException;
26 import java.net.ConnectException;
27 import java.text.ParseException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.TreeMap;
36 import java.util.TreeSet;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedQueue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorCompletionService;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.ThreadFactory;
46 import java.util.concurrent.ThreadPoolExecutor;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.atomic.AtomicReference;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.classification.InterfaceAudience;
55 import org.apache.hadoop.conf.Configuration;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.hbase.HConstants;
60 import org.apache.hadoop.hbase.HRegionInfo;
61 import org.apache.hadoop.hbase.HRegionLocation;
62 import org.apache.hadoop.hbase.HTableDescriptor;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.RemoteExceptionHandler;
65 import org.apache.hadoop.hbase.ServerName;
66 import org.apache.hadoop.hbase.client.ConnectionUtils;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.HConnection;
69 import org.apache.hadoop.hbase.client.HConnectionManager;
70 import org.apache.hadoop.hbase.client.Put;
71 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
72 import org.apache.hadoop.hbase.client.Row;
73 import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
74 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
75 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
76 import org.apache.hadoop.hbase.io.HeapSize;
77 import org.apache.hadoop.hbase.master.RegionState;
78 import org.apache.hadoop.hbase.master.SplitLogManager;
79 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
80 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
81 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
83 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
84 import org.apache.hadoop.hbase.regionserver.HRegion;
85 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
86 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
87 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
88 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.CancelableProgressable;
91 import org.apache.hadoop.hbase.util.ClassSize;
92 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93 import org.apache.hadoop.hbase.util.FSUtils;
94 import org.apache.hadoop.hbase.util.Pair;
95 import org.apache.hadoop.hbase.util.Threads;
96 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
97 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
98 import org.apache.hadoop.hbase.zookeeper.ZKTable;
99 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
100 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
101 import org.apache.hadoop.io.MultipleIOException;
102 import org.apache.hadoop.ipc.RemoteException;
103 import org.apache.zookeeper.KeeperException;
104 import org.apache.zookeeper.data.Stat;
105
106 import com.google.common.base.Preconditions;
107 import com.google.common.collect.Lists;
108
109
110
111
112
113
114 @InterfaceAudience.Private
115 public class HLogSplitter {
116 private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
117
118 static final Log LOG = LogFactory.getLog(HLogSplitter.class);
119
120 private boolean hasSplit = false;
121 private long splitTime = 0;
122 private long splitSize = 0;
123
124
125
126 protected final Path rootDir;
127 protected final Path srcDir;
128 protected final Path oldLogDir;
129 protected final FileSystem fs;
130 protected final Configuration conf;
131
132
133
134 OutputSink outputSink;
135 EntryBuffers entryBuffers;
136
137 private Set<String> disablingOrDisabledTables = new HashSet<String>();
138 private ZooKeeperWatcher watcher;
139
140
141
142 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
143
144
145
146 final Object dataAvailable = new Object();
147
148 private MonitoredTask status;
149
150
151 protected final LastSequenceId sequenceIdChecker;
152
153 protected boolean distributedLogReplay;
154
155
156 Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
157
158
159 private final int numWriterThreads;
160
161
162 private final int minBatchSize;
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public static HLogSplitter createLogSplitter(Configuration conf,
177 final Path rootDir, final Path srcDir,
178 Path oldLogDir, final FileSystem fs) {
179
180 @SuppressWarnings("unchecked")
181 Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
182 .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
183 try {
184 Constructor<? extends HLogSplitter> constructor =
185 splitterClass.getConstructor(
186 Configuration.class,
187 Path.class,
188 Path.class,
189 Path.class,
190 FileSystem.class,
191 LastSequenceId.class);
192 return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
193 } catch (IllegalArgumentException e) {
194 throw new RuntimeException(e);
195 } catch (InstantiationException e) {
196 throw new RuntimeException(e);
197 } catch (IllegalAccessException e) {
198 throw new RuntimeException(e);
199 } catch (InvocationTargetException e) {
200 throw new RuntimeException(e);
201 } catch (SecurityException e) {
202 throw new RuntimeException(e);
203 } catch (NoSuchMethodException e) {
204 throw new RuntimeException(e);
205 }
206 }
207
208 public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
209 Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
210 this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
211 }
212
213 public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
214 Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
215 this.conf = conf;
216 this.rootDir = rootDir;
217 this.srcDir = srcDir;
218 this.oldLogDir = oldLogDir;
219 this.fs = fs;
220 this.sequenceIdChecker = idChecker;
221 this.watcher = zkw;
222
223 entryBuffers = new EntryBuffers(
224 conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
225 128*1024*1024));
226
227 this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
228 this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
229 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
230
231 this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
232 if (zkw != null && this.distributedLogReplay) {
233 outputSink = new LogReplayOutputSink(numWriterThreads);
234 } else {
235 if (this.distributedLogReplay) {
236 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
237 }
238 this.distributedLogReplay = false;
239 outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
240 }
241 }
242
243
244
245
246
247
248
249
250
251 public List<Path> splitLog()
252 throws IOException {
253 return splitLog((CountDownLatch) null);
254 }
255
256
257
258
259
260
261
262
263
264
265 public List<Path> splitLog(CountDownLatch latch)
266 throws IOException {
267 Preconditions.checkState(!hasSplit,
268 "An HLogSplitter instance may only be used once");
269 hasSplit = true;
270
271 status = TaskMonitor.get().createStatus(
272 "Splitting logs in " + srcDir);
273
274 long startTime = EnvironmentEdgeManager.currentTimeMillis();
275
276 status.setStatus("Determining files to split...");
277 List<Path> splits = null;
278 if (!fs.exists(srcDir)) {
279
280 status.markComplete("No log directory existed to split.");
281 return splits;
282 }
283 FileStatus[] logfiles = fs.listStatus(srcDir);
284 if (logfiles == null || logfiles.length == 0) {
285
286 return splits;
287 }
288 logAndReport("Splitting " + logfiles.length + " hlog(s) in "
289 + srcDir.toString());
290 splits = splitLog(logfiles, latch);
291
292 splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
293 String msg = "hlog file splitting completed in " + splitTime +
294 " ms for " + srcDir.toString();
295 status.markComplete(msg);
296 LOG.info(msg);
297 return splits;
298 }
299
300 private void logAndReport(String msg) {
301 status.setStatus(msg);
302 LOG.info(msg);
303 }
304
305
306
307
308 public long getTime() {
309 return this.splitTime;
310 }
311
312
313
314
315 public long getSize() {
316 return this.splitSize;
317 }
318
319
320
321
322
323 Map<byte[], Long> getOutputCounts() {
324 Preconditions.checkState(hasSplit);
325 return outputSink.getOutputCounts();
326 }
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350 private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
351 throws IOException {
352 List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
353 List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
354 List<Path> splits;
355
356 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
357
358 countTotalBytes(logfiles);
359 splitSize = 0;
360
361 outputSink.startWriterThreads();
362
363 try {
364 int i = 0;
365 for (FileStatus log : logfiles) {
366 Path logPath = log.getPath();
367 long logLength = log.getLen();
368 splitSize += logLength;
369 logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
370 + ": " + logPath + ", length=" + logLength);
371 Reader in = null;
372 try {
373
374
375
376
377
378 in = getReader(fs, log, conf, skipErrors, null);
379 if (in != null) {
380 parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
381 }
382 processedLogs.add(logPath);
383 } catch (CorruptedLogFileException e) {
384 LOG.info("Got while parsing hlog " + logPath +
385 ". Marking as corrupted", e);
386 corruptedLogs.add(logPath);
387 } finally {
388 if (in != null) {
389 try {
390 in.close();
391 } catch (IOException e) {
392 LOG.warn("Close log reader threw exception -- continuing", e);
393 }
394 }
395 }
396 }
397 status.setStatus("Log splits complete. Checking for orphaned logs.");
398
399 if (latch != null) {
400 try {
401 latch.await();
402 } catch (InterruptedException ie) {
403 LOG.warn("wait for latch interrupted");
404 Thread.currentThread().interrupt();
405 }
406 }
407 FileStatus[] currFiles = fs.listStatus(srcDir);
408 if (currFiles.length > processedLogs.size()
409 + corruptedLogs.size()) {
410 throw new OrphanHLogAfterSplitException(
411 "Discovered orphan hlog after split. Maybe the "
412 + "HRegionServer was not dead when we started");
413 }
414 } finally {
415 status.setStatus("Finishing writing output logs and closing down.");
416 splits = outputSink.finishWritingAndClose();
417 }
418 status.setStatus("Archiving logs after completed split");
419 archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
420 return splits;
421 }
422
423
424
425
426 private static long countTotalBytes(FileStatus[] logfiles) {
427 long ret = 0;
428 for (FileStatus stat : logfiles) {
429 ret += stat.getLen();
430 }
431 return ret;
432 }
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450 static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
451 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
452 ZooKeeperWatcher zkw)
453 throws IOException {
454 HLogSplitter s = new HLogSplitter(conf, rootDir, null, null
455 return s.splitLogFile(logfile, reporter);
456 }
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471 static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
472 Configuration conf, CancelableProgressable reporter)
473 throws IOException {
474 return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null);
475 }
476
477 public boolean splitLogFile(FileStatus logfile,
478 CancelableProgressable reporter) throws IOException {
479 boolean isCorrupted = false;
480 Preconditions.checkState(status == null);
481 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
482 HLog.SPLIT_SKIP_ERRORS_DEFAULT);
483 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
484 Path logPath = logfile.getPath();
485 boolean outputSinkStarted = false;
486 boolean progress_failed = false;
487 int editsCount = 0;
488 int editsSkipped = 0;
489
490 try {
491 status = TaskMonitor.get().createStatus(
492 "Splitting log file " + logfile.getPath() +
493 "into a temporary staging area.");
494 long logLength = logfile.getLen();
495 LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
496 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
497 status.setStatus("Opening log file");
498 if (reporter != null && !reporter.progress()) {
499 progress_failed = true;
500 return false;
501 }
502 Reader in = null;
503 try {
504 in = getReader(fs, logfile, conf, skipErrors, reporter);
505 } catch (CorruptedLogFileException e) {
506 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
507 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
508 isCorrupted = true;
509 }
510 if (in == null) {
511 status.markComplete("Was nothing to split in log file");
512 LOG.warn("Nothing to split in log file " + logPath);
513 return true;
514 }
515 if(watcher != null) {
516 try {
517 disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
518 } catch (KeeperException e) {
519 throw new IOException("Can't get disabling/disabled tables", e);
520 }
521 }
522 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
523 int numOpenedFilesLastCheck = 0;
524 outputSink.setReporter(reporter);
525 outputSink.startWriterThreads();
526 outputSinkStarted = true;
527 Entry entry;
528 Long lastFlushedSequenceId = -1L;
529 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
530 String serverNameStr = (serverName == null) ? "" : serverName.getServerName();
531 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
532 byte[] region = entry.getKey().getEncodedRegionName();
533 String key = Bytes.toString(region);
534 lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
535 if (lastFlushedSequenceId == null) {
536 if (this.distributedLogReplay) {
537 lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(this.watcher,
538 serverNameStr, key);
539 } else if (sequenceIdChecker != null) {
540 lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
541 }
542 if (lastFlushedSequenceId != null) {
543 lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
544 } else {
545 lastFlushedSequenceId = -1L;
546 }
547 }
548 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
549 editsSkipped++;
550 continue;
551 }
552 entryBuffers.appendEntry(entry);
553 editsCount++;
554 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
555
556 if (editsCount % interval == 0
557 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
558 numOpenedFilesLastCheck = this.getNumOpenWriters();
559 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
560 + " edits, skipped " + editsSkipped + " edits.";
561 status.setStatus("Split " + countsStr);
562 if (reporter != null && !reporter.progress()) {
563 progress_failed = true;
564 return false;
565 }
566 }
567 }
568 } catch (InterruptedException ie) {
569 IOException iie = new InterruptedIOException();
570 iie.initCause(ie);
571 throw iie;
572 } catch (CorruptedLogFileException e) {
573 LOG.warn("Could not parse, corrupted log file " + logPath, e);
574 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
575 isCorrupted = true;
576 } catch (IOException e) {
577 e = RemoteExceptionHandler.checkIOException(e);
578 throw e;
579 } finally {
580 LOG.info("Finishing writing output logs and closing down.");
581 if (outputSinkStarted) {
582 progress_failed = outputSink.finishWritingAndClose() == null;
583 }
584 String msg = "Processed " + editsCount + " edits across "
585 + outputSink.getNumberOfRecoveredRegions() + " regions; log file=" + logPath
586 + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
587 LOG.info(msg);
588 status.markComplete(msg);
589 }
590 return !progress_failed;
591 }
592
593
594
595
596
597
598
599
600
601
602
603
604 public static void finishSplitLogFile(String logfile, Configuration conf)
605 throws IOException {
606 Path rootdir = FSUtils.getRootDir(conf);
607 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
608 finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
609 }
610
611 public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
612 String logfile, Configuration conf) throws IOException {
613 List<Path> processedLogs = new ArrayList<Path>();
614 List<Path> corruptedLogs = new ArrayList<Path>();
615 FileSystem fs;
616 fs = rootdir.getFileSystem(conf);
617 Path logPath = null;
618 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
619 logPath = new Path(logfile);
620 } else {
621 logPath = new Path(rootdir, logfile);
622 }
623 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
624 corruptedLogs.add(logPath);
625 } else {
626 processedLogs.add(logPath);
627 }
628 archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
629 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
630 fs.delete(stagingDir, true);
631 }
632
633
634
635
636
637
638
639
640
641
642
643
644
645 private static void archiveLogs(
646 final Path srcDir,
647 final List<Path> corruptedLogs,
648 final List<Path> processedLogs, final Path oldLogDir,
649 final FileSystem fs, final Configuration conf) throws IOException {
650 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
651 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
652
653 if (!fs.mkdirs(corruptDir)) {
654 LOG.info("Unable to mkdir " + corruptDir);
655 }
656 fs.mkdirs(oldLogDir);
657
658
659
660 for (Path corrupted : corruptedLogs) {
661 Path p = new Path(corruptDir, corrupted.getName());
662 if (fs.exists(corrupted)) {
663 if (!fs.rename(corrupted, p)) {
664 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
665 } else {
666 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
667 }
668 }
669 }
670
671 for (Path p : processedLogs) {
672 Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
673 if (fs.exists(p)) {
674 if (!fs.rename(p, newPath)) {
675 LOG.warn("Unable to move " + p + " to " + newPath);
676 } else {
677 LOG.debug("Archived processed log " + p + " to " + newPath);
678 }
679 }
680 }
681
682
683
684 if (srcDir != null && !fs.delete(srcDir, true)) {
685 throw new IOException("Unable to delete src dir: " + srcDir);
686 }
687 }
688
689
690
691
692
693
694
695
696
697
698
699
700
701 @SuppressWarnings("deprecation")
702 static Path getRegionSplitEditsPath(final FileSystem fs,
703 final Entry logEntry, final Path rootDir, boolean isCreate)
704 throws IOException {
705 Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
706 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
707 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
708 Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
709
710 if (!fs.exists(regiondir)) {
711 LOG.info("This region's directory doesn't exist: "
712 + regiondir.toString() + ". It is very likely that it was" +
713 " already split so it's safe to discard those edits.");
714 return null;
715 }
716 if (fs.exists(dir) && fs.isFile(dir)) {
717 Path tmp = new Path("/tmp");
718 if (!fs.exists(tmp)) {
719 fs.mkdirs(tmp);
720 }
721 tmp = new Path(tmp,
722 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
723 LOG.warn("Found existing old file: " + dir + ". It could be some "
724 + "leftover of an old installation. It should be a folder instead. "
725 + "So moving it to " + tmp);
726 if (!fs.rename(dir, tmp)) {
727 LOG.warn("Failed to sideline old file " + dir);
728 }
729 }
730
731 if (isCreate && !fs.exists(dir)) {
732 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
733 }
734
735
736 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
737 fileName = getTmpRecoveredEditsFileName(fileName);
738 return new Path(dir, fileName);
739 }
740
741 static String getTmpRecoveredEditsFileName(String fileName) {
742 return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
743 }
744
745
746
747
748
749
750
751
752
753 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
754 Long maximumEditLogSeqNum) {
755 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
756 return new Path(srcPath.getParent(), fileName);
757 }
758
759 static String formatRecoveredEditsFileName(final long seqid) {
760 return String.format("%019d", seqid);
761 }
762
763
764
765
766
767
768
769
770
771
772
773
774
775 private void parseHLog(final Reader in, Path path,
776 EntryBuffers entryBuffers, final FileSystem fs,
777 final Configuration conf, boolean skipErrors)
778 throws IOException, CorruptedLogFileException {
779 int editsCount = 0;
780 try {
781 Entry entry;
782 while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
783 entryBuffers.appendEntry(entry);
784 editsCount++;
785 }
786 } catch (InterruptedException ie) {
787 IOException t = new InterruptedIOException();
788 t.initCause(ie);
789 throw t;
790 } finally {
791 LOG.debug("Pushed=" + editsCount + " entries from " + path);
792 }
793 }
794
795
796
797
798
799
800
801
802
803
804
805 protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
806 boolean skipErrors, CancelableProgressable reporter)
807 throws IOException, CorruptedLogFileException {
808 Path path = file.getPath();
809 long length = file.getLen();
810 Reader in;
811
812
813
814
815
816 if (length <= 0) {
817 LOG.warn("File " + path + " might be still open, length is 0");
818 }
819
820 try {
821 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
822 try {
823 in = getReader(fs, path, conf, reporter);
824 } catch (EOFException e) {
825 if (length <= 0) {
826
827
828
829
830
831 LOG.warn("Could not open " + path + " for reading. File is empty", e);
832 return null;
833 } else {
834
835 return null;
836 }
837 }
838 } catch (IOException e) {
839 if (!skipErrors || e instanceof InterruptedIOException) {
840 throw e;
841 }
842 CorruptedLogFileException t =
843 new CorruptedLogFileException("skipErrors=true Could not open hlog " +
844 path + " ignoring");
845 t.initCause(e);
846 throw t;
847 }
848 return in;
849 }
850
851 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
852 throws CorruptedLogFileException, IOException {
853 try {
854 return in.next();
855 } catch (EOFException eof) {
856
857 LOG.info("EOF from hlog " + path + ". continuing");
858 return null;
859 } catch (IOException e) {
860
861
862 if (e.getCause() != null &&
863 (e.getCause() instanceof ParseException ||
864 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
865 LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
866 + path + ". continuing");
867 return null;
868 }
869 if (!skipErrors) {
870 throw e;
871 }
872 CorruptedLogFileException t =
873 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
874 " while parsing hlog " + path + ". Marking as corrupted");
875 t.initCause(e);
876 throw t;
877 }
878 }
879
880
881 private void writerThreadError(Throwable t) {
882 thrown.compareAndSet(null, t);
883 }
884
885
886
887
888 private void checkForErrors() throws IOException {
889 Throwable thrown = this.thrown.get();
890 if (thrown == null) return;
891 if (thrown instanceof IOException) {
892 throw (IOException)thrown;
893 } else {
894 throw new RuntimeException(thrown);
895 }
896 }
897
898
899
900 protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
901 throws IOException {
902 return HLogFactory.createWriter(fs, logfile, conf);
903 }
904
905
906
907
908 protected Reader getReader(FileSystem fs, Path curLogFile,
909 Configuration conf, CancelableProgressable reporter) throws IOException {
910 return HLogFactory.createReader(fs, curLogFile, conf, reporter);
911 }
912
913
914
915
916
917 private int getNumOpenWriters() {
918 int result = 0;
919 if (this.outputSink != null) {
920 result += this.outputSink.getNumOpenWriters();
921 }
922 return result;
923 }
924
925
926
927
928
929
930
931
932 class EntryBuffers {
933 Map<byte[], RegionEntryBuffer> buffers =
934 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
935
936
937
938
939 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
940
941 long totalBuffered = 0;
942 long maxHeapUsage;
943
944 EntryBuffers(long maxHeapUsage) {
945 this.maxHeapUsage = maxHeapUsage;
946 }
947
948
949
950
951
952
953
954
955 void appendEntry(Entry entry) throws InterruptedException, IOException {
956 HLogKey key = entry.getKey();
957
958 RegionEntryBuffer buffer;
959 long incrHeap;
960 synchronized (this) {
961 buffer = buffers.get(key.getEncodedRegionName());
962 if (buffer == null) {
963 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
964 buffers.put(key.getEncodedRegionName(), buffer);
965 }
966 incrHeap= buffer.appendEntry(entry);
967 }
968
969
970 synchronized (dataAvailable) {
971 totalBuffered += incrHeap;
972 while (totalBuffered > maxHeapUsage && thrown.get() == null) {
973 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
974 dataAvailable.wait(2000);
975 }
976 dataAvailable.notifyAll();
977 }
978 checkForErrors();
979 }
980
981
982
983
984 synchronized RegionEntryBuffer getChunkToWrite() {
985 long biggestSize = 0;
986 byte[] biggestBufferKey = null;
987
988 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
989 long size = entry.getValue().heapSize();
990 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
991 biggestSize = size;
992 biggestBufferKey = entry.getKey();
993 }
994 }
995 if (biggestBufferKey == null) {
996 return null;
997 }
998
999 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
1000 currentlyWriting.add(biggestBufferKey);
1001 return buffer;
1002 }
1003
1004 void doneWriting(RegionEntryBuffer buffer) {
1005 synchronized (this) {
1006 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
1007 assert removed;
1008 }
1009 long size = buffer.heapSize();
1010
1011 synchronized (dataAvailable) {
1012 totalBuffered -= size;
1013
1014 dataAvailable.notifyAll();
1015 }
1016 }
1017
1018 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
1019 return currentlyWriting.contains(region);
1020 }
1021 }
1022
1023
1024
1025
1026
1027
1028
1029 static class RegionEntryBuffer implements HeapSize {
1030 long heapInBuffer = 0;
1031 List<Entry> entryBuffer;
1032 byte[] tableName;
1033 byte[] encodedRegionName;
1034
1035 RegionEntryBuffer(byte[] table, byte[] region) {
1036 this.tableName = table;
1037 this.encodedRegionName = region;
1038 this.entryBuffer = new LinkedList<Entry>();
1039 }
1040
1041 long appendEntry(Entry entry) {
1042 internify(entry);
1043 entryBuffer.add(entry);
1044 long incrHeap = entry.getEdit().heapSize() +
1045 ClassSize.align(2 * ClassSize.REFERENCE) +
1046 0;
1047 heapInBuffer += incrHeap;
1048 return incrHeap;
1049 }
1050
1051 private void internify(Entry entry) {
1052 HLogKey k = entry.getKey();
1053 k.internTableName(this.tableName);
1054 k.internEncodedRegionName(this.encodedRegionName);
1055 }
1056
1057 public long heapSize() {
1058 return heapInBuffer;
1059 }
1060 }
1061
1062
1063 class WriterThread extends Thread {
1064 private volatile boolean shouldStop = false;
1065 private OutputSink outputSink = null;
1066
1067 WriterThread(OutputSink sink, int i) {
1068 super("WriterThread-" + i);
1069 outputSink = sink;
1070 }
1071
1072 public void run() {
1073 try {
1074 doRun();
1075 } catch (Throwable t) {
1076 LOG.error("Error in log splitting write thread", t);
1077 writerThreadError(t);
1078 }
1079 }
1080
1081 private void doRun() throws IOException {
1082 LOG.debug("Writer thread " + this + ": starting");
1083 while (true) {
1084 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1085 if (buffer == null) {
1086
1087 synchronized (dataAvailable) {
1088 if (shouldStop && !this.outputSink.flush()) {
1089 return;
1090 }
1091 try {
1092 dataAvailable.wait(500);
1093 } catch (InterruptedException ie) {
1094 if (!shouldStop) {
1095 throw new RuntimeException(ie);
1096 }
1097 }
1098 }
1099 continue;
1100 }
1101
1102 assert buffer != null;
1103 try {
1104 writeBuffer(buffer);
1105 } finally {
1106 entryBuffers.doneWriting(buffer);
1107 }
1108 }
1109 }
1110
1111
1112 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1113 outputSink.append(buffer);
1114 }
1115
1116 void finish() {
1117 synchronized (dataAvailable) {
1118 shouldStop = true;
1119 dataAvailable.notifyAll();
1120 }
1121 }
1122 }
1123
1124 Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
1125 List<String> components = new ArrayList<String>(10);
1126 do {
1127 components.add(edits.getName());
1128 edits = edits.getParent();
1129 } while (edits.depth() > rootdir.depth());
1130 Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
1131 for (int i = components.size() - 1; i >= 0; i--) {
1132 ret = new Path(ret, components.get(i));
1133 }
1134 try {
1135 if (fs.exists(ret)) {
1136 LOG.warn("Found existing old temporary edits file. It could be the "
1137 + "result of a previous failed split attempt. Deleting "
1138 + ret + ", length="
1139 + fs.getFileStatus(ret).getLen());
1140 if (!fs.delete(ret, false)) {
1141 LOG.warn("Failed delete of old " + ret);
1142 }
1143 }
1144 Path dir = ret.getParent();
1145 if (!fs.exists(dir)) {
1146 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
1147 }
1148 } catch (IOException e) {
1149 LOG.warn("Could not prepare temp staging area ", e);
1150
1151 }
1152 return ret;
1153 }
1154
1155
1156
1157
1158
1159 abstract class OutputSink {
1160
1161 protected Map<byte[], SinkWriter> writers = Collections
1162 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1163
1164 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1165 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1166
1167 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1168
1169
1170 protected final Set<byte[]> blacklistedRegions = Collections
1171 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1172
1173 protected boolean closeAndCleanCompleted = false;
1174
1175 protected boolean writersClosed = false;
1176
1177 protected final int numThreads;
1178
1179 protected CancelableProgressable reporter = null;
1180
1181 protected AtomicLong skippedEdits = new AtomicLong();
1182
1183 public OutputSink(int numWriters) {
1184 numThreads = numWriters;
1185 }
1186
1187 void setReporter(CancelableProgressable reporter) {
1188 this.reporter = reporter;
1189 }
1190
1191
1192
1193
1194 synchronized void startWriterThreads() {
1195 for (int i = 0; i < numThreads; i++) {
1196 WriterThread t = new WriterThread(this, i);
1197 t.start();
1198 writerThreads.add(t);
1199 }
1200 }
1201
1202
1203
1204
1205
1206 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1207 synchronized (regionMaximumEditLogSeqNum) {
1208 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1209 .getEncodedRegionName());
1210 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1211 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1212 .getLogSeqNum());
1213 }
1214 }
1215 }
1216
1217 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1218 return regionMaximumEditLogSeqNum.get(region);
1219 }
1220
1221
1222
1223
1224 int getNumOpenWriters() {
1225 return this.writers.size();
1226 }
1227
1228 long getSkippedEdits() {
1229 return this.skippedEdits.get();
1230 }
1231
1232
1233
1234
1235
1236
1237 protected boolean finishWriting() throws IOException {
1238 LOG.info("Waiting for split writer threads to finish");
1239 boolean progress_failed = false;
1240 for (WriterThread t : writerThreads) {
1241 t.finish();
1242 }
1243 for (WriterThread t : writerThreads) {
1244 if (!progress_failed && reporter != null && !reporter.progress()) {
1245 progress_failed = true;
1246 }
1247 try {
1248 t.join();
1249 } catch (InterruptedException ie) {
1250 IOException iie = new InterruptedIOException();
1251 iie.initCause(ie);
1252 throw iie;
1253 }
1254 checkForErrors();
1255 }
1256 LOG.info("Split writers finished");
1257 return (!progress_failed);
1258 }
1259
1260 abstract List<Path> finishWritingAndClose() throws IOException;
1261
1262
1263
1264
1265 abstract Map<byte[], Long> getOutputCounts();
1266
1267
1268
1269
1270 abstract int getNumberOfRecoveredRegions();
1271
1272
1273
1274
1275
1276 abstract void append(RegionEntryBuffer buffer) throws IOException;
1277
1278
1279
1280
1281
1282 protected boolean flush() throws IOException {
1283 return false;
1284 }
1285 }
1286
1287
1288
1289
1290 class LogRecoveredEditsOutputSink extends OutputSink {
1291
1292 public LogRecoveredEditsOutputSink(int numWriters) {
1293
1294
1295
1296
1297
1298 super(numWriters);
1299 }
1300
1301
1302
1303
1304
1305 @Override
1306 List<Path> finishWritingAndClose() throws IOException {
1307 boolean isSuccessful = false;
1308 List<Path> result = null;
1309 try {
1310 isSuccessful = finishWriting();
1311 } finally {
1312 result = close();
1313 List<IOException> thrown = closeLogWriters(null);
1314 if (thrown != null && !thrown.isEmpty()) {
1315 throw MultipleIOException.createIOException(thrown);
1316 }
1317 }
1318 return (isSuccessful) ? result : null;
1319 }
1320
1321
1322
1323
1324
1325 private List<Path> close() throws IOException {
1326 Preconditions.checkState(!closeAndCleanCompleted);
1327
1328 final List<Path> paths = new ArrayList<Path>();
1329 final List<IOException> thrown = Lists.newArrayList();
1330 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1331 TimeUnit.SECONDS, new ThreadFactory() {
1332 private int count = 1;
1333
1334 public Thread newThread(Runnable r) {
1335 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1336 return t;
1337 }
1338 });
1339 CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
1340 closeThreadPool);
1341 for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
1342 completionService.submit(new Callable<Void>() {
1343 public Void call() throws Exception {
1344 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1345 try {
1346 wap.w.close();
1347 } catch (IOException ioe) {
1348 LOG.error("Couldn't close log at " + wap.p, ioe);
1349 thrown.add(ioe);
1350 return null;
1351 }
1352 LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1353 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1354
1355 if (wap.editsWritten == 0) {
1356
1357 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1358 LOG.warn("Failed deleting empty " + wap.p);
1359 throw new IOException("Failed deleting empty " + wap.p);
1360 }
1361 return null;
1362 }
1363
1364 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1365 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1366 try {
1367 if (!dst.equals(wap.p) && fs.exists(dst)) {
1368 LOG.warn("Found existing old edits file. It could be the "
1369 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1370 + fs.getFileStatus(dst).getLen());
1371 if (!fs.delete(dst, false)) {
1372 LOG.warn("Failed deleting of old " + dst);
1373 throw new IOException("Failed deleting of old " + dst);
1374 }
1375 }
1376
1377
1378
1379 if (fs.exists(wap.p)) {
1380 if (!fs.rename(wap.p, dst)) {
1381 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1382 }
1383 LOG.debug("Rename " + wap.p + " to " + dst);
1384 }
1385 } catch (IOException ioe) {
1386 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1387 thrown.add(ioe);
1388 return null;
1389 }
1390 paths.add(dst);
1391 return null;
1392 }
1393 });
1394 }
1395
1396 boolean progress_failed = false;
1397 try {
1398 for (int i = 0, n = this.writers.size(); i < n; i++) {
1399 Future<Void> future = completionService.take();
1400 future.get();
1401 if (!progress_failed && reporter != null && !reporter.progress()) {
1402 progress_failed = true;
1403 }
1404 }
1405 } catch (InterruptedException e) {
1406 IOException iie = new InterruptedIOException();
1407 iie.initCause(e);
1408 throw iie;
1409 } catch (ExecutionException e) {
1410 throw new IOException(e.getCause());
1411 } finally {
1412 closeThreadPool.shutdownNow();
1413 }
1414
1415 if (!thrown.isEmpty()) {
1416 throw MultipleIOException.createIOException(thrown);
1417 }
1418 writersClosed = true;
1419 closeAndCleanCompleted = true;
1420 if (progress_failed) {
1421 return null;
1422 }
1423 return paths;
1424 }
1425
1426 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1427 if (writersClosed) {
1428 return thrown;
1429 }
1430
1431 if (thrown == null) {
1432 thrown = Lists.newArrayList();
1433 }
1434 try {
1435 for (WriterThread t : writerThreads) {
1436 while (t.isAlive()) {
1437 t.shouldStop = true;
1438 t.interrupt();
1439 try {
1440 t.join(10);
1441 } catch (InterruptedException e) {
1442 IOException iie = new InterruptedIOException();
1443 iie.initCause(e);
1444 throw iie;
1445 }
1446 }
1447 }
1448 } finally {
1449 synchronized (writers) {
1450 WriterAndPath wap = null;
1451 for (SinkWriter tmpWAP : writers.values()) {
1452 try {
1453 wap = (WriterAndPath) tmpWAP;
1454 wap.w.close();
1455 } catch (IOException ioe) {
1456 LOG.error("Couldn't close log at " + wap.p, ioe);
1457 thrown.add(ioe);
1458 continue;
1459 }
1460 LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1461 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1462 }
1463 }
1464 writersClosed = true;
1465 }
1466
1467 return thrown;
1468 }
1469
1470
1471
1472
1473
1474
1475 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1476 byte region[] = entry.getKey().getEncodedRegionName();
1477 WriterAndPath ret = (WriterAndPath) writers.get(region);
1478 if (ret != null) {
1479 return ret;
1480 }
1481
1482
1483 if (blacklistedRegions.contains(region)) {
1484 return null;
1485 }
1486 ret = createWAP(region, entry, rootDir, fs, conf);
1487 if (ret == null) {
1488 blacklistedRegions.add(region);
1489 return null;
1490 }
1491 writers.put(region, ret);
1492 return ret;
1493 }
1494
1495 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
1496 Configuration conf) throws IOException {
1497 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1498 if (regionedits == null) {
1499 return null;
1500 }
1501 if (fs.exists(regionedits)) {
1502 LOG.warn("Found old edits file. It could be the "
1503 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1504 + fs.getFileStatus(regionedits).getLen());
1505 if (!fs.delete(regionedits, false)) {
1506 LOG.warn("Failed delete of old " + regionedits);
1507 }
1508 }
1509 Writer w = createWriter(fs, regionedits, conf);
1510 LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1511 return (new WriterAndPath(regionedits, w));
1512 }
1513
1514 void append(RegionEntryBuffer buffer) throws IOException {
1515 List<Entry> entries = buffer.entryBuffer;
1516 if (entries.isEmpty()) {
1517 LOG.warn("got an empty buffer, skipping");
1518 return;
1519 }
1520
1521 WriterAndPath wap = null;
1522
1523 long startTime = System.nanoTime();
1524 try {
1525 int editsCount = 0;
1526
1527 for (Entry logEntry : entries) {
1528 if (wap == null) {
1529 wap = getWriterAndPath(logEntry);
1530 if (wap == null) {
1531
1532 return;
1533 }
1534 }
1535 wap.w.append(logEntry);
1536 this.updateRegionMaximumEditLogSeqNum(logEntry);
1537 editsCount++;
1538 }
1539
1540 wap.incrementEdits(editsCount);
1541 wap.incrementNanoTime(System.nanoTime() - startTime);
1542 } catch (IOException e) {
1543 e = RemoteExceptionHandler.checkIOException(e);
1544 LOG.fatal(" Got while writing log entry to log", e);
1545 throw e;
1546 }
1547 }
1548
1549
1550
1551
1552 Map<byte[], Long> getOutputCounts() {
1553 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1554 synchronized (writers) {
1555 for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
1556 ret.put(entry.getKey(), entry.getValue().editsWritten);
1557 }
1558 }
1559 return ret;
1560 }
1561
1562 @Override
1563 int getNumberOfRecoveredRegions() {
1564 return writers.size();
1565 }
1566 }
1567
1568
1569
1570
1571 private abstract static class SinkWriter {
1572
1573 long editsWritten = 0;
1574
1575 long nanosSpent = 0;
1576
1577 void incrementEdits(int edits) {
1578 editsWritten += edits;
1579 }
1580
1581 void incrementNanoTime(long nanos) {
1582 nanosSpent += nanos;
1583 }
1584 }
1585
1586
1587
1588
1589
1590 private final static class WriterAndPath extends SinkWriter {
1591 final Path p;
1592 final Writer w;
1593
1594 WriterAndPath(final Path p, final Writer w) {
1595 this.p = p;
1596 this.w = w;
1597 }
1598 }
1599
1600
1601
1602
1603 class LogReplayOutputSink extends OutputSink {
1604 private static final double BUFFER_THRESHOLD = 0.35;
1605 private static final String KEY_DELIMITER = "#";
1606
1607 private long waitRegionOnlineTimeOut;
1608 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1609 private final Map<String, RegionServerWriter> writers =
1610 new ConcurrentHashMap<String, RegionServerWriter>();
1611
1612 private final Set<String> onlineRegions = Collections.synchronizedSet(new HashSet<String>());
1613
1614 private Map<byte[], HConnection> tableNameToHConnectionMap = Collections
1615 .synchronizedMap(new TreeMap<byte[], HConnection>(Bytes.BYTES_COMPARATOR));
1616
1617
1618
1619
1620 private Map<String, List<Pair<HRegionLocation, Row>>> serverToBufferQueueMap =
1621 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Row>>>();
1622 private List<Throwable> thrown = new ArrayList<Throwable>();
1623
1624
1625
1626
1627
1628 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1629 private boolean hasEditsInDisablingOrDisabledTables = false;
1630
1631 public LogReplayOutputSink(int numWriters) {
1632 super(numWriters);
1633
1634 this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
1635 SplitLogManager.DEFAULT_TIMEOUT);
1636 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
1637 this.logRecoveredEditsOutputSink.setReporter(reporter);
1638 }
1639
1640 void append(RegionEntryBuffer buffer) throws IOException {
1641 List<Entry> entries = buffer.entryBuffer;
1642 if (entries.isEmpty()) {
1643 LOG.warn("got an empty buffer, skipping");
1644 return;
1645 }
1646
1647
1648 if (disablingOrDisabledTables.contains(Bytes.toString(buffer.tableName))) {
1649
1650 logRecoveredEditsOutputSink.append(buffer);
1651 hasEditsInDisablingOrDisabledTables = true;
1652
1653 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1654 return;
1655 }
1656
1657
1658 groupEditsByServer(entries);
1659
1660
1661 String maxLocKey = null;
1662 int maxSize = 0;
1663 List<Pair<HRegionLocation, Row>> maxQueue = null;
1664 synchronized (this.serverToBufferQueueMap) {
1665 for (String key : this.serverToBufferQueueMap.keySet()) {
1666 List<Pair<HRegionLocation, Row>> curQueue = this.serverToBufferQueueMap.get(key);
1667 if (curQueue.size() > maxSize) {
1668 maxSize = curQueue.size();
1669 maxQueue = curQueue;
1670 maxLocKey = key;
1671 }
1672 }
1673 if (maxSize < minBatchSize
1674 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1675
1676 return;
1677 } else if (maxSize > 0) {
1678 this.serverToBufferQueueMap.remove(maxLocKey);
1679 }
1680 }
1681
1682 if (maxSize > 0) {
1683 processWorkItems(maxLocKey, maxQueue);
1684 }
1685 }
1686
1687 private void addToRecoveredRegions(String encodedRegionName) {
1688 if (!recoveredRegions.contains(encodedRegionName)) {
1689 recoveredRegions.add(encodedRegionName);
1690 }
1691 }
1692
1693
1694
1695
1696
1697 private void groupEditsByServer(List<Entry> entries) throws IOException {
1698 Set<byte[]> nonExistentTables = null;
1699 Long cachedLastFlushedSequenceId = -1l;
1700 for (HLog.Entry entry : entries) {
1701 WALEdit edit = entry.getEdit();
1702 byte[] table = entry.getKey().getTablename();
1703 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1704
1705 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1706 this.skippedEdits.incrementAndGet();
1707 continue;
1708 }
1709 boolean needSkip = false;
1710 Put put = null;
1711 Delete del = null;
1712 KeyValue lastKV = null;
1713 HRegionLocation loc = null;
1714 Row preRow = null;
1715 HRegionLocation preLoc = null;
1716 Row lastAddedRow = null;
1717 String preKey = null;
1718 List<KeyValue> kvs = edit.getKeyValues();
1719 HConnection hconn = this.getConnectionByTableName(table);
1720
1721 for (KeyValue kv : kvs) {
1722
1723
1724
1725
1726 if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
1727
1728 if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
1729 if (preRow != null) {
1730 synchronized (serverToBufferQueueMap) {
1731 List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
1732 if (queue == null) {
1733 queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
1734 serverToBufferQueueMap.put(preKey, queue);
1735 }
1736 queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
1737 lastAddedRow = preRow;
1738 }
1739
1740 addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
1741 }
1742
1743 try {
1744 loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow());
1745 } catch (TableNotFoundException ex) {
1746
1747 LOG.info("Table " + Bytes.toString(table)
1748 + " doesn't exist. Skip log replay for region " + encodeRegionNameStr);
1749 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1750 if (nonExistentTables == null) {
1751 nonExistentTables = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
1752 }
1753 nonExistentTables.add(table);
1754 this.skippedEdits.incrementAndGet();
1755 needSkip = true;
1756 break;
1757 }
1758 cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
1759 .getEncodedName());
1760 if (cachedLastFlushedSequenceId != null
1761 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1762
1763 this.skippedEdits.incrementAndGet();
1764 needSkip = true;
1765 break;
1766 }
1767
1768 if (kv.isDelete()) {
1769 del = new Delete(kv.getRow());
1770 del.setClusterId(entry.getKey().getClusterId());
1771 preRow = del;
1772 } else {
1773 put = new Put(kv.getRow());
1774 put.setClusterId(entry.getKey().getClusterId());
1775 preRow = put;
1776 }
1777 preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
1778 preLoc = loc;
1779 }
1780 if (kv.isDelete()) {
1781 del.addDeleteMarker(kv);
1782 } else {
1783 put.add(kv);
1784 }
1785 lastKV = kv;
1786 }
1787
1788
1789 if(needSkip) continue;
1790
1791
1792 if (preRow != null && lastAddedRow != preRow) {
1793 synchronized (serverToBufferQueueMap) {
1794 List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
1795 if (queue == null) {
1796 queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
1797 serverToBufferQueueMap.put(preKey, queue);
1798 }
1799 queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
1800 }
1801
1802 addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
1803 }
1804 }
1805 }
1806
1807
1808
1809
1810
1811
1812 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1813 byte[] table, byte[] row) throws IOException {
1814 HRegionLocation loc = hconn.getRegionLocation(table, row, false);
1815 if (loc == null) {
1816 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1817 + " of table:" + Bytes.toString(table));
1818 }
1819 if (onlineRegions.contains(loc.getRegionInfo().getEncodedName())) {
1820 return loc;
1821 }
1822
1823 Long lastFlushedSequenceId = -1l;
1824 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
1825 Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
1826 .getEncodedName());
1827
1828 onlineRegions.add(loc.getRegionInfo().getEncodedName());
1829
1830
1831 lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(watcher, loc
1832 .getServerName().getServerName(), loc.getRegionInfo().getEncodedName());
1833 if (cachedLastFlushedSequenceId == null
1834 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1835 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1836 } else if (loc.getRegionInfo().isRecovering() == false) {
1837
1838
1839 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1840 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1841 + " because it's not in recovering.");
1842 }
1843
1844 return loc;
1845 }
1846
1847 private void processWorkItems(String key, List<Pair<HRegionLocation, Row>> actions)
1848 throws IOException {
1849 RegionServerWriter rsw = null;
1850
1851 long startTime = System.nanoTime();
1852 try {
1853 rsw = getRegionServerWriter(key);
1854 rsw.sink.replayEntries(actions);
1855
1856
1857 rsw.incrementEdits(actions.size());
1858 rsw.incrementNanoTime(System.nanoTime() - startTime);
1859 } catch (IOException e) {
1860 e = RemoteExceptionHandler.checkIOException(e);
1861 LOG.fatal(" Got while writing log entry to log", e);
1862 throw e;
1863 }
1864 }
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1875 final long timeout)
1876 throws IOException {
1877 final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
1878 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1879 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1880 boolean reloadLocation = false;
1881 byte[] tableName = loc.getRegionInfo().getTableName();
1882 int tries = 0;
1883 Throwable cause = null;
1884 while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
1885 try {
1886
1887 HConnection hconn = getConnectionByTableName(tableName);
1888 if(reloadLocation) {
1889 loc = hconn.getRegionLocation(tableName, row, true);
1890 }
1891 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1892 HRegionInfo region = loc.getRegionInfo();
1893 if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
1894 loc.getRegionInfo().setRecovering(region.isRecovering());
1895 return loc;
1896 }
1897 } catch (IOException e) {
1898 cause = e.getCause();
1899 if(!(cause instanceof RegionOpeningException)) {
1900 reloadLocation = true;
1901 }
1902 }
1903 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1904 try {
1905 Thread.sleep(expectedSleep);
1906 } catch (InterruptedException e) {
1907 Thread.currentThread().interrupt();
1908 throw new IOException("Interrupted when waiting regon " +
1909 loc.getRegionInfo().getEncodedName() + " online.", e);
1910 }
1911 tries++;
1912 }
1913
1914 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1915 " online for " + timeout + " milliseconds.", cause);
1916 }
1917
1918 @Override
1919 protected boolean flush() throws IOException {
1920 String curLoc = null;
1921 int curSize = 0;
1922 List<Pair<HRegionLocation, Row>> curQueue = null;
1923 synchronized (this.serverToBufferQueueMap) {
1924 for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1925 curQueue = this.serverToBufferQueueMap.get(locationKey);
1926 if (!curQueue.isEmpty()) {
1927 curSize = curQueue.size();
1928 curLoc = locationKey;
1929 break;
1930 }
1931 }
1932 if (curSize > 0) {
1933 this.serverToBufferQueueMap.remove(curLoc);
1934 }
1935 }
1936
1937 if (curSize > 0) {
1938 this.processWorkItems(curLoc, curQueue);
1939 dataAvailable.notifyAll();
1940 return true;
1941 }
1942 return false;
1943 }
1944
1945 void addWriterError(Throwable t) {
1946 thrown.add(t);
1947 }
1948
1949 @Override
1950 List<Path> finishWritingAndClose() throws IOException {
1951 List<Path> result = new ArrayList<Path>();
1952 try {
1953 if (!finishWriting()) {
1954 return null;
1955 }
1956 if (hasEditsInDisablingOrDisabledTables) {
1957 result = logRecoveredEditsOutputSink.finishWritingAndClose();
1958 }
1959
1960 return result;
1961 } finally {
1962 List<IOException> thrown = closeRegionServerWriters();
1963 if (thrown != null && !thrown.isEmpty()) {
1964 throw MultipleIOException.createIOException(thrown);
1965 }
1966 }
1967 }
1968
1969 @Override
1970 int getNumOpenWriters() {
1971 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
1972 }
1973
1974 private List<IOException> closeRegionServerWriters() throws IOException {
1975 List<IOException> result = null;
1976 if (!writersClosed) {
1977 result = Lists.newArrayList();
1978 try {
1979 for (WriterThread t : writerThreads) {
1980 while (t.isAlive()) {
1981 t.shouldStop = true;
1982 t.interrupt();
1983 try {
1984 t.join(10);
1985 } catch (InterruptedException e) {
1986 IOException iie = new InterruptedIOException();
1987 iie.initCause(e);
1988 throw iie;
1989 }
1990 }
1991 }
1992 } finally {
1993 synchronized (writers) {
1994 for (String locationKey : writers.keySet()) {
1995 RegionServerWriter tmpW = writers.get(locationKey);
1996 try {
1997 tmpW.close();
1998 } catch (IOException ioe) {
1999 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2000 result.add(ioe);
2001 }
2002 }
2003 }
2004
2005
2006 synchronized (this.tableNameToHConnectionMap) {
2007 for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
2008 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2009 try {
2010 hconn.close();
2011 } catch (IOException ioe) {
2012 result.add(ioe);
2013 }
2014 }
2015 }
2016 writersClosed = true;
2017 }
2018 }
2019 return result;
2020 }
2021
2022 Map<byte[], Long> getOutputCounts() {
2023 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2024 synchronized (writers) {
2025 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2026 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2027 }
2028 }
2029 return ret;
2030 }
2031
2032 @Override
2033 int getNumberOfRecoveredRegions() {
2034 return this.recoveredRegions.size();
2035 }
2036
2037
2038
2039
2040
2041
2042 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2043 RegionServerWriter ret = writers.get(loc);
2044 if (ret != null) {
2045 return ret;
2046 }
2047
2048 String tableName = getTableFromLocationStr(loc);
2049 if(tableName.isEmpty()){
2050 LOG.warn("Invalid location string:" + loc + " found.");
2051 }
2052
2053 HConnection hconn = getConnectionByTableName(Bytes.toBytes(tableName));
2054 synchronized (writers) {
2055 ret = writers.get(loc);
2056 if (ret == null) {
2057 ret = new RegionServerWriter(conf, Bytes.toBytes(tableName), hconn);
2058 writers.put(loc, ret);
2059 }
2060 }
2061 return ret;
2062 }
2063
2064 private HConnection getConnectionByTableName(final byte[] tableName) throws IOException {
2065 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2066 if (hconn == null) {
2067 synchronized (this.tableNameToHConnectionMap) {
2068 hconn = this.tableNameToHConnectionMap.get(tableName);
2069 if (hconn == null) {
2070 hconn = HConnectionManager.createConnection(conf);
2071 this.tableNameToHConnectionMap.put(tableName, hconn);
2072 }
2073 }
2074 }
2075 return hconn;
2076 }
2077
2078 private String getTableFromLocationStr(String loc) {
2079
2080
2081
2082 String[] splits = loc.split(KEY_DELIMITER);
2083 if (splits.length != 2) {
2084 return "";
2085 }
2086 return splits[1];
2087 }
2088 }
2089
2090
2091
2092
2093
2094 private final static class RegionServerWriter extends SinkWriter {
2095 final WALEditsReplaySink sink;
2096
2097 RegionServerWriter(final Configuration conf, final byte[] tableName, final HConnection conn)
2098 throws IOException {
2099 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2100 }
2101
2102 void close() throws IOException {
2103 }
2104 }
2105
2106 static class CorruptedLogFileException extends Exception {
2107 private static final long serialVersionUID = 1L;
2108
2109 CorruptedLogFileException(String s) {
2110 super(s);
2111 }
2112 }
2113 }