1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileAlreadyExistsException;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellScanner;
62 import org.apache.hadoop.hbase.CellUtil;
63 import org.apache.hadoop.hbase.CoordinatedStateException;
64 import org.apache.hadoop.hbase.CoordinatedStateManager;
65 import org.apache.hadoop.hbase.HBaseConfiguration;
66 import org.apache.hadoop.hbase.HConstants;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.HRegionLocation;
69 import org.apache.hadoop.hbase.RemoteExceptionHandler;
70 import org.apache.hadoop.hbase.ServerName;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.TableNotFoundException;
73 import org.apache.hadoop.hbase.TableStateManager;
74 import org.apache.hadoop.hbase.classification.InterfaceAudience;
75 import org.apache.hadoop.hbase.client.ConnectionUtils;
76 import org.apache.hadoop.hbase.client.Delete;
77 import org.apache.hadoop.hbase.client.Durability;
78 import org.apache.hadoop.hbase.client.HConnection;
79 import org.apache.hadoop.hbase.client.HConnectionManager;
80 import org.apache.hadoop.hbase.client.Mutation;
81 import org.apache.hadoop.hbase.client.Put;
82 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85 import org.apache.hadoop.hbase.io.HeapSize;
86 import org.apache.hadoop.hbase.master.SplitLogManager;
87 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90 import org.apache.hadoop.hbase.protobuf.RequestConverter;
91 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127 import com.google.protobuf.TextFormat;
128
129
130
131
132
133
134 @InterfaceAudience.Private
135 public class WALSplitter {
136 private static final Log LOG = LogFactory.getLog(WALSplitter.class);
137
138
139 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
140
141
142 protected final Path rootDir;
143 protected final FileSystem fs;
144 protected final Configuration conf;
145
146
147
148 PipelineController controller;
149 OutputSink outputSink;
150 EntryBuffers entryBuffers;
151
152 private Set<TableName> disablingOrDisabledTables =
153 new HashSet<TableName>();
154 private BaseCoordinatedStateManager csm;
155 private final WALFactory walFactory;
156
157 private MonitoredTask status;
158
159
160 protected final LastSequenceId sequenceIdChecker;
161
162 protected boolean distributedLogReplay;
163
164
165 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
166
167
168 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
169 new ConcurrentHashMap<String, Map<byte[], Long>>();
170
171
172 protected String failedServerName = "";
173
174
175 private final int numWriterThreads;
176
177
178 private final int minBatchSize;
179
180 WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
181 FileSystem fs, LastSequenceId idChecker,
182 CoordinatedStateManager csm, RecoveryMode mode) {
183 this.conf = HBaseConfiguration.create(conf);
184 String codecClassName = conf
185 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
186 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
187 this.rootDir = rootDir;
188 this.fs = fs;
189 this.sequenceIdChecker = idChecker;
190 this.csm = (BaseCoordinatedStateManager)csm;
191 this.walFactory = factory;
192 this.controller = new PipelineController();
193
194 entryBuffers = new EntryBuffers(controller,
195 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
196 128*1024*1024));
197
198
199
200 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
201 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
202
203 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
204 if (csm != null && this.distributedLogReplay) {
205 outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
206 } else {
207 if (this.distributedLogReplay) {
208 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
209 }
210 this.distributedLogReplay = false;
211 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
212 }
213
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
233 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
234 CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
235 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
236 return s.splitLogFile(logfile, reporter);
237 }
238
239
240
241
242
243 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
244 FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
245 final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
246 Collections.singletonList(logDir), null);
247 List<Path> splits = new ArrayList<Path>();
248 if (logfiles != null && logfiles.length > 0) {
249 for (FileStatus logfile: logfiles) {
250 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
251 RecoveryMode.LOG_SPLITTING);
252 if (s.splitLogFile(logfile, null)) {
253 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
254 if (s.outputSink.splits != null) {
255 splits.addAll(s.outputSink.splits);
256 }
257 }
258 }
259 }
260 if (!fs.delete(logDir, true)) {
261 throw new IOException("Unable to delete src dir: " + logDir);
262 }
263 return splits;
264 }
265
266
267
268
269
270 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
271 Preconditions.checkState(status == null);
272 Preconditions.checkArgument(logfile.isFile(),
273 "passed in file status is for something other than a regular file.");
274 boolean isCorrupted = false;
275 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
276 SPLIT_SKIP_ERRORS_DEFAULT);
277 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
278 Path logPath = logfile.getPath();
279 boolean outputSinkStarted = false;
280 boolean progress_failed = false;
281 int editsCount = 0;
282 int editsSkipped = 0;
283
284 status =
285 TaskMonitor.get().createStatus(
286 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
287 Reader in = null;
288 try {
289 long logLength = logfile.getLen();
290 LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
291 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
292 status.setStatus("Opening log file");
293 if (reporter != null && !reporter.progress()) {
294 progress_failed = true;
295 return false;
296 }
297 try {
298 in = getReader(logfile, skipErrors, reporter);
299 } catch (CorruptedLogFileException e) {
300 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
301 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
302 isCorrupted = true;
303 }
304 if (in == null) {
305 LOG.warn("Nothing to split in log file " + logPath);
306 return true;
307 }
308 if (csm != null) {
309 try {
310 TableStateManager tsm = csm.getTableStateManager();
311 disablingOrDisabledTables = tsm.getTablesInStates(
312 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
313 } catch (CoordinatedStateException e) {
314 throw new IOException("Can't get disabling/disabled tables", e);
315 }
316 }
317 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
318 int numOpenedFilesLastCheck = 0;
319 outputSink.setReporter(reporter);
320 outputSink.startWriterThreads();
321 outputSinkStarted = true;
322 Entry entry;
323 Long lastFlushedSequenceId = -1L;
324 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
325 failedServerName = (serverName == null) ? "" : serverName.getServerName();
326 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
327 byte[] region = entry.getKey().getEncodedRegionName();
328 String encodedRegionNameAsStr = Bytes.toString(region);
329 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
330 if (lastFlushedSequenceId == null) {
331 if (this.distributedLogReplay) {
332 RegionStoreSequenceIds ids =
333 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
334 encodedRegionNameAsStr);
335 if (ids != null) {
336 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
337 if (LOG.isDebugEnabled()) {
338 LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
339 TextFormat.shortDebugString(ids));
340 }
341 }
342 } else if (sequenceIdChecker != null) {
343 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
344 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
345 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
346 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
347 storeSeqId.getSequenceId());
348 }
349 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
350 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
351 if (LOG.isDebugEnabled()) {
352 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
353 TextFormat.shortDebugString(ids));
354 }
355 }
356 if (lastFlushedSequenceId == null) {
357 lastFlushedSequenceId = -1L;
358 }
359 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
360 }
361 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
362 editsSkipped++;
363 continue;
364 }
365
366 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
367 editsSkipped++;
368 continue;
369 }
370 entryBuffers.appendEntry(entry);
371 editsCount++;
372 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
373
374 if (editsCount % interval == 0
375 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
376 numOpenedFilesLastCheck = this.getNumOpenWriters();
377 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
378 + " edits, skipped " + editsSkipped + " edits.";
379 status.setStatus("Split " + countsStr);
380 if (reporter != null && !reporter.progress()) {
381 progress_failed = true;
382 return false;
383 }
384 }
385 }
386 } catch (InterruptedException ie) {
387 IOException iie = new InterruptedIOException();
388 iie.initCause(ie);
389 throw iie;
390 } catch (CorruptedLogFileException e) {
391 LOG.warn("Could not parse, corrupted log file " + logPath, e);
392 csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
393 logfile.getPath().getName(), fs);
394 isCorrupted = true;
395 } catch (IOException e) {
396 e = RemoteExceptionHandler.checkIOException(e);
397 throw e;
398 } finally {
399 LOG.debug("Finishing writing output logs and closing down.");
400 try {
401 if (null != in) {
402 in.close();
403 }
404 } catch (IOException exception) {
405 LOG.warn("Could not close wal reader: " + exception.getMessage());
406 LOG.debug("exception details", exception);
407 }
408 try {
409 if (outputSinkStarted) {
410
411
412 progress_failed = true;
413 progress_failed = outputSink.finishWritingAndClose() == null;
414 }
415 } finally {
416 String msg =
417 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
418 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
419 ", length=" + logfile.getLen() +
420 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
421 LOG.info(msg);
422 status.markComplete(msg);
423 }
424 }
425 return !progress_failed;
426 }
427
428
429
430
431
432
433
434
435
436
437
438
439 public static void finishSplitLogFile(String logfile,
440 Configuration conf) throws IOException {
441 Path rootdir = FSUtils.getRootDir(conf);
442 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
443 Path logPath;
444 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
445 logPath = new Path(logfile);
446 } else {
447 logPath = new Path(rootdir, logfile);
448 }
449 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
450 }
451
452 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
453 Path logPath, Configuration conf) throws IOException {
454 List<Path> processedLogs = new ArrayList<Path>();
455 List<Path> corruptedLogs = new ArrayList<Path>();
456 FileSystem fs;
457 fs = rootdir.getFileSystem(conf);
458 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
459 corruptedLogs.add(logPath);
460 } else {
461 processedLogs.add(logPath);
462 }
463 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
464 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
465 fs.delete(stagingDir, true);
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480 private static void archiveLogs(
481 final List<Path> corruptedLogs,
482 final List<Path> processedLogs, final Path oldLogDir,
483 final FileSystem fs, final Configuration conf) throws IOException {
484 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
485 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
486
487 if (!fs.mkdirs(corruptDir)) {
488 LOG.info("Unable to mkdir " + corruptDir);
489 }
490 fs.mkdirs(oldLogDir);
491
492
493
494 for (Path corrupted : corruptedLogs) {
495 Path p = new Path(corruptDir, corrupted.getName());
496 if (fs.exists(corrupted)) {
497 if (!fs.rename(corrupted, p)) {
498 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
499 } else {
500 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
501 }
502 }
503 }
504
505 for (Path p : processedLogs) {
506 Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
507 if (fs.exists(p)) {
508 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
509 LOG.warn("Unable to move " + p + " to " + newPath);
510 } else {
511 LOG.info("Archived processed log " + p + " to " + newPath);
512 }
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528
529 @SuppressWarnings("deprecation")
530 static Path getRegionSplitEditsPath(final FileSystem fs,
531 final Entry logEntry, final Path rootDir, boolean isCreate)
532 throws IOException {
533 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
534 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
535 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
536 Path dir = getRegionDirRecoveredEditsDir(regiondir);
537
538 if (!fs.exists(regiondir)) {
539 LOG.info("This region's directory doesn't exist: "
540 + regiondir.toString() + ". It is very likely that it was" +
541 " already split so it's safe to discard those edits.");
542 return null;
543 }
544 if (fs.exists(dir) && fs.isFile(dir)) {
545 Path tmp = new Path("/tmp");
546 if (!fs.exists(tmp)) {
547 fs.mkdirs(tmp);
548 }
549 tmp = new Path(tmp,
550 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
551 LOG.warn("Found existing old file: " + dir + ". It could be some "
552 + "leftover of an old installation. It should be a folder instead. "
553 + "So moving it to " + tmp);
554 if (!fs.rename(dir, tmp)) {
555 LOG.warn("Failed to sideline old file " + dir);
556 }
557 }
558
559 if (isCreate && !fs.exists(dir)) {
560 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
561 }
562
563
564 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
565 fileName = getTmpRecoveredEditsFileName(fileName);
566 return new Path(dir, fileName);
567 }
568
569 static String getTmpRecoveredEditsFileName(String fileName) {
570 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
571 }
572
573
574
575
576
577
578
579
580
581 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
582 Long maximumEditLogSeqNum) {
583 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
584 return new Path(srcPath.getParent(), fileName);
585 }
586
587 static String formatRecoveredEditsFileName(final long seqid) {
588 return String.format("%019d", seqid);
589 }
590
591 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
592 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
593
594
595
596
597
598
599
600 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
601 return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
602 }
603
604
605
606
607
608
609
610
611
612
613 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
614 final Path regiondir) throws IOException {
615 NavigableSet<Path> filesSorted = new TreeSet<Path>();
616 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
617 if (!fs.exists(editsdir))
618 return filesSorted;
619 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
620 @Override
621 public boolean accept(Path p) {
622 boolean result = false;
623 try {
624
625
626
627
628 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
629 result = fs.isFile(p) && m.matches();
630
631
632 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
633 result = false;
634 }
635
636 if (isSequenceIdFile(p)) {
637 result = false;
638 }
639 } catch (IOException e) {
640 LOG.warn("Failed isFile check on " + p);
641 }
642 return result;
643 }
644 });
645 if (files == null) {
646 return filesSorted;
647 }
648 for (FileStatus status : files) {
649 filesSorted.add(status.getPath());
650 }
651 return filesSorted;
652 }
653
654
655
656
657
658
659
660
661
662
663 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
664 throws IOException {
665 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
666 + System.currentTimeMillis());
667 if (!fs.rename(edits, moveAsideName)) {
668 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
669 }
670 return moveAsideName;
671 }
672
673 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
674 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
675 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
676
677
678
679
680 @VisibleForTesting
681 public static boolean isSequenceIdFile(final Path file) {
682 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
683 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
684 }
685
686
687
688
689
690
691
692
693
694
695 public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
696 long newSeqId, long saftyBumper) throws IOException {
697
698 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
699 long maxSeqId = 0;
700 FileStatus[] files = null;
701 if (fs.exists(editsdir)) {
702 files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
703 @Override
704 public boolean accept(Path p) {
705 return isSequenceIdFile(p);
706 }
707 });
708 if (files != null) {
709 for (FileStatus status : files) {
710 String fileName = status.getPath().getName();
711 try {
712 Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
713 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
714 maxSeqId = Math.max(tmpSeqId, maxSeqId);
715 } catch (NumberFormatException ex) {
716 LOG.warn("Invalid SeqId File Name=" + fileName);
717 }
718 }
719 }
720 }
721 if (maxSeqId > newSeqId) {
722 newSeqId = maxSeqId;
723 }
724 newSeqId += saftyBumper;
725
726
727 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
728 if (newSeqId != maxSeqId) {
729 try {
730 if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
731 throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
732 }
733 if (LOG.isDebugEnabled()) {
734 LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
735 + ", maxSeqId=" + maxSeqId);
736 }
737 } catch (FileAlreadyExistsException ignored) {
738
739 }
740 }
741
742 if (files != null) {
743 for (FileStatus status : files) {
744 if (newSeqIdFile.equals(status.getPath())) {
745 continue;
746 }
747 fs.delete(status.getPath(), false);
748 }
749 }
750 return newSeqId;
751 }
752
753
754
755
756
757
758
759
760
761 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
762 throws IOException, CorruptedLogFileException {
763 Path path = file.getPath();
764 long length = file.getLen();
765 Reader in;
766
767
768
769
770 if (length <= 0) {
771 LOG.warn("File " + path + " might be still open, length is 0");
772 }
773
774 try {
775 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
776 try {
777 in = getReader(path, reporter);
778 } catch (EOFException e) {
779 if (length <= 0) {
780
781
782
783
784
785 LOG.warn("Could not open " + path + " for reading. File is empty", e);
786 return null;
787 } else {
788
789 return null;
790 }
791 }
792 } catch (IOException e) {
793 if (e instanceof FileNotFoundException) {
794
795 LOG.warn("File " + path + " doesn't exist anymore.", e);
796 return null;
797 }
798 if (!skipErrors || e instanceof InterruptedIOException) {
799 throw e;
800 }
801 CorruptedLogFileException t =
802 new CorruptedLogFileException("skipErrors=true Could not open wal " +
803 path + " ignoring");
804 t.initCause(e);
805 throw t;
806 }
807 return in;
808 }
809
810 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
811 throws CorruptedLogFileException, IOException {
812 try {
813 return in.next();
814 } catch (EOFException eof) {
815
816 LOG.info("EOF from wal " + path + ". continuing");
817 return null;
818 } catch (IOException e) {
819
820
821 if (e.getCause() != null &&
822 (e.getCause() instanceof ParseException ||
823 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
824 LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
825 + path + ". continuing");
826 return null;
827 }
828 if (!skipErrors) {
829 throw e;
830 }
831 CorruptedLogFileException t =
832 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
833 " while parsing wal " + path + ". Marking as corrupted");
834 t.initCause(e);
835 throw t;
836 }
837 }
838
839
840
841
842
843 protected Writer createWriter(Path logfile)
844 throws IOException {
845 return walFactory.createRecoveredEditsWriter(fs, logfile);
846 }
847
848
849
850
851
852 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
853 return walFactory.createReader(fs, curLogFile, reporter);
854 }
855
856
857
858
859 private int getNumOpenWriters() {
860 int result = 0;
861 if (this.outputSink != null) {
862 result += this.outputSink.getNumOpenWriters();
863 }
864 return result;
865 }
866
867
868
869
870 public static class PipelineController {
871
872
873 AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
874
875
876
877 public final Object dataAvailable = new Object();
878
879 void writerThreadError(Throwable t) {
880 thrown.compareAndSet(null, t);
881 }
882
883
884
885
886 void checkForErrors() throws IOException {
887 Throwable thrown = this.thrown.get();
888 if (thrown == null) return;
889 if (thrown instanceof IOException) {
890 throw new IOException(thrown);
891 } else {
892 throw new RuntimeException(thrown);
893 }
894 }
895 }
896
897
898
899
900
901
902
903
904 public static class EntryBuffers {
905 PipelineController controller;
906
907 Map<byte[], RegionEntryBuffer> buffers =
908 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
909
910
911
912
913 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
914
915 long totalBuffered = 0;
916 long maxHeapUsage;
917
918 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
919 this.controller = controller;
920 this.maxHeapUsage = maxHeapUsage;
921 }
922
923
924
925
926
927
928
929
930 public void appendEntry(Entry entry) throws InterruptedException, IOException {
931 WALKey key = entry.getKey();
932
933 RegionEntryBuffer buffer;
934 long incrHeap;
935 synchronized (this) {
936 buffer = buffers.get(key.getEncodedRegionName());
937 if (buffer == null) {
938 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
939 buffers.put(key.getEncodedRegionName(), buffer);
940 }
941 incrHeap= buffer.appendEntry(entry);
942 }
943
944
945 synchronized (controller.dataAvailable) {
946 totalBuffered += incrHeap;
947 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
948 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
949 controller.dataAvailable.wait(2000);
950 }
951 controller.dataAvailable.notifyAll();
952 }
953 controller.checkForErrors();
954 }
955
956
957
958
959 synchronized RegionEntryBuffer getChunkToWrite() {
960 long biggestSize = 0;
961 byte[] biggestBufferKey = null;
962
963 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
964 long size = entry.getValue().heapSize();
965 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
966 biggestSize = size;
967 biggestBufferKey = entry.getKey();
968 }
969 }
970 if (biggestBufferKey == null) {
971 return null;
972 }
973
974 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
975 currentlyWriting.add(biggestBufferKey);
976 return buffer;
977 }
978
979 void doneWriting(RegionEntryBuffer buffer) {
980 synchronized (this) {
981 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
982 assert removed;
983 }
984 long size = buffer.heapSize();
985
986 synchronized (controller.dataAvailable) {
987 totalBuffered -= size;
988
989 controller.dataAvailable.notifyAll();
990 }
991 }
992
993 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
994 return currentlyWriting.contains(region);
995 }
996
997 public void waitUntilDrained() {
998 synchronized (controller.dataAvailable) {
999 while (totalBuffered > 0) {
1000 try {
1001 controller.dataAvailable.wait(2000);
1002 } catch (InterruptedException e) {
1003 LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
1004 Thread.interrupted();
1005 break;
1006 }
1007 }
1008 }
1009 }
1010 }
1011
1012
1013
1014
1015
1016
1017
1018 public static class RegionEntryBuffer implements HeapSize {
1019 long heapInBuffer = 0;
1020 List<Entry> entryBuffer;
1021 TableName tableName;
1022 byte[] encodedRegionName;
1023
1024 RegionEntryBuffer(TableName tableName, byte[] region) {
1025 this.tableName = tableName;
1026 this.encodedRegionName = region;
1027 this.entryBuffer = new LinkedList<Entry>();
1028 }
1029
1030 long appendEntry(Entry entry) {
1031 internify(entry);
1032 entryBuffer.add(entry);
1033 long incrHeap = entry.getEdit().heapSize() +
1034 ClassSize.align(2 * ClassSize.REFERENCE) +
1035 0;
1036 heapInBuffer += incrHeap;
1037 return incrHeap;
1038 }
1039
1040 private void internify(Entry entry) {
1041 WALKey k = entry.getKey();
1042 k.internTableName(this.tableName);
1043 k.internEncodedRegionName(this.encodedRegionName);
1044 }
1045
1046 @Override
1047 public long heapSize() {
1048 return heapInBuffer;
1049 }
1050
1051 public byte[] getEncodedRegionName() {
1052 return encodedRegionName;
1053 }
1054
1055 public List<Entry> getEntryBuffer() {
1056 return entryBuffer;
1057 }
1058
1059 public TableName getTableName() {
1060 return tableName;
1061 }
1062 }
1063
1064 public static class WriterThread extends Thread {
1065 private volatile boolean shouldStop = false;
1066 private PipelineController controller;
1067 private EntryBuffers entryBuffers;
1068 private OutputSink outputSink = null;
1069
1070 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1071 super(Thread.currentThread().getName() + "-Writer-" + i);
1072 this.controller = controller;
1073 this.entryBuffers = entryBuffers;
1074 outputSink = sink;
1075 }
1076
1077 @Override
1078 public void run() {
1079 try {
1080 doRun();
1081 } catch (Throwable t) {
1082 LOG.error("Exiting thread", t);
1083 controller.writerThreadError(t);
1084 }
1085 }
1086
1087 private void doRun() throws IOException {
1088 if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
1089 while (true) {
1090 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1091 if (buffer == null) {
1092
1093 synchronized (controller.dataAvailable) {
1094 if (shouldStop && !this.outputSink.flush()) {
1095 return;
1096 }
1097 try {
1098 controller.dataAvailable.wait(500);
1099 } catch (InterruptedException ie) {
1100 if (!shouldStop) {
1101 throw new RuntimeException(ie);
1102 }
1103 }
1104 }
1105 continue;
1106 }
1107
1108 assert buffer != null;
1109 try {
1110 writeBuffer(buffer);
1111 } finally {
1112 entryBuffers.doneWriting(buffer);
1113 }
1114 }
1115 }
1116
1117 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1118 outputSink.append(buffer);
1119 }
1120
1121 void finish() {
1122 synchronized (controller.dataAvailable) {
1123 shouldStop = true;
1124 controller.dataAvailable.notifyAll();
1125 }
1126 }
1127 }
1128
1129
1130
1131
1132
1133 public static abstract class OutputSink {
1134
1135 protected PipelineController controller;
1136 protected EntryBuffers entryBuffers;
1137
1138 protected Map<byte[], SinkWriter> writers = Collections
1139 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1140
1141 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1142 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1143
1144 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1145
1146
1147 protected final Set<byte[]> blacklistedRegions = Collections
1148 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1149
1150 protected boolean closeAndCleanCompleted = false;
1151
1152 protected boolean writersClosed = false;
1153
1154 protected final int numThreads;
1155
1156 protected CancelableProgressable reporter = null;
1157
1158 protected AtomicLong skippedEdits = new AtomicLong();
1159
1160 protected List<Path> splits = null;
1161
1162 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1163 numThreads = numWriters;
1164 this.controller = controller;
1165 this.entryBuffers = entryBuffers;
1166 }
1167
1168 void setReporter(CancelableProgressable reporter) {
1169 this.reporter = reporter;
1170 }
1171
1172
1173
1174
1175 public synchronized void startWriterThreads() {
1176 for (int i = 0; i < numThreads; i++) {
1177 WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1178 t.start();
1179 writerThreads.add(t);
1180 }
1181 }
1182
1183
1184
1185
1186
1187 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1188 synchronized (regionMaximumEditLogSeqNum) {
1189 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1190 .getEncodedRegionName());
1191 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1192 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1193 .getLogSeqNum());
1194 }
1195 }
1196 }
1197
1198 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1199 return regionMaximumEditLogSeqNum.get(region);
1200 }
1201
1202
1203
1204
1205 int getNumOpenWriters() {
1206 return this.writers.size();
1207 }
1208
1209 long getSkippedEdits() {
1210 return this.skippedEdits.get();
1211 }
1212
1213
1214
1215
1216
1217
1218 protected boolean finishWriting(boolean interrupt) throws IOException {
1219 LOG.debug("Waiting for split writer threads to finish");
1220 boolean progress_failed = false;
1221 for (WriterThread t : writerThreads) {
1222 t.finish();
1223 }
1224 if (interrupt) {
1225 for (WriterThread t : writerThreads) {
1226 t.interrupt();
1227 }
1228 }
1229
1230 for (WriterThread t : writerThreads) {
1231 if (!progress_failed && reporter != null && !reporter.progress()) {
1232 progress_failed = true;
1233 }
1234 try {
1235 t.join();
1236 } catch (InterruptedException ie) {
1237 IOException iie = new InterruptedIOException();
1238 iie.initCause(ie);
1239 throw iie;
1240 }
1241 }
1242 controller.checkForErrors();
1243 LOG.info(this.writerThreads.size() + " split writers finished; closing...");
1244 return (!progress_failed);
1245 }
1246
1247 public abstract List<Path> finishWritingAndClose() throws IOException;
1248
1249
1250
1251
1252 public abstract Map<byte[], Long> getOutputCounts();
1253
1254
1255
1256
1257 public abstract int getNumberOfRecoveredRegions();
1258
1259
1260
1261
1262
1263 public abstract void append(RegionEntryBuffer buffer) throws IOException;
1264
1265
1266
1267
1268
1269 public boolean flush() throws IOException {
1270 return false;
1271 }
1272
1273
1274
1275
1276
1277
1278
1279 public abstract boolean keepRegionEvent(Entry entry);
1280 }
1281
1282
1283
1284
1285 class LogRecoveredEditsOutputSink extends OutputSink {
1286
1287 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1288 int numWriters) {
1289
1290
1291
1292
1293
1294 super(controller, entryBuffers, numWriters);
1295 }
1296
1297
1298
1299
1300
1301 @Override
1302 public List<Path> finishWritingAndClose() throws IOException {
1303 boolean isSuccessful = false;
1304 List<Path> result = null;
1305 try {
1306 isSuccessful = finishWriting(false);
1307 } finally {
1308 result = close();
1309 List<IOException> thrown = closeLogWriters(null);
1310 if (thrown != null && !thrown.isEmpty()) {
1311 throw MultipleIOException.createIOException(thrown);
1312 }
1313 }
1314 if (isSuccessful) {
1315 splits = result;
1316 }
1317 return splits;
1318 }
1319
1320
1321
1322
1323
1324 private List<Path> close() throws IOException {
1325 Preconditions.checkState(!closeAndCleanCompleted);
1326
1327 final List<Path> paths = new ArrayList<Path>();
1328 final List<IOException> thrown = Lists.newArrayList();
1329 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1330 TimeUnit.SECONDS, new ThreadFactory() {
1331 private int count = 1;
1332
1333 @Override
1334 public Thread newThread(Runnable r) {
1335 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1336 return t;
1337 }
1338 });
1339 CompletionService<Void> completionService =
1340 new ExecutorCompletionService<Void>(closeThreadPool);
1341 for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1342 if (LOG.isTraceEnabled()) {
1343 LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1344 }
1345 completionService.submit(new Callable<Void>() {
1346 @Override
1347 public Void call() throws Exception {
1348 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1349 if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
1350 try {
1351 wap.w.close();
1352 } catch (IOException ioe) {
1353 LOG.error("Couldn't close log at " + wap.p, ioe);
1354 thrown.add(ioe);
1355 return null;
1356 }
1357 if (LOG.isDebugEnabled()) {
1358 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1359 + " edits, skipped " + wap.editsSkipped + " edits in "
1360 + (wap.nanosSpent / 1000 / 1000) + "ms");
1361 }
1362 if (wap.editsWritten == 0) {
1363
1364 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1365 LOG.warn("Failed deleting empty " + wap.p);
1366 throw new IOException("Failed deleting empty " + wap.p);
1367 }
1368 return null;
1369 }
1370
1371 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1372 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1373 try {
1374 if (!dst.equals(wap.p) && fs.exists(dst)) {
1375 LOG.warn("Found existing old edits file. It could be the "
1376 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1377 + fs.getFileStatus(dst).getLen());
1378 if (!fs.delete(dst, false)) {
1379 LOG.warn("Failed deleting of old " + dst);
1380 throw new IOException("Failed deleting of old " + dst);
1381 }
1382 }
1383
1384
1385
1386 if (fs.exists(wap.p)) {
1387 if (!fs.rename(wap.p, dst)) {
1388 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1389 }
1390 LOG.info("Rename " + wap.p + " to " + dst);
1391 }
1392 } catch (IOException ioe) {
1393 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1394 thrown.add(ioe);
1395 return null;
1396 }
1397 paths.add(dst);
1398 return null;
1399 }
1400 });
1401 }
1402
1403 boolean progress_failed = false;
1404 try {
1405 for (int i = 0, n = this.writers.size(); i < n; i++) {
1406 Future<Void> future = completionService.take();
1407 future.get();
1408 if (!progress_failed && reporter != null && !reporter.progress()) {
1409 progress_failed = true;
1410 }
1411 }
1412 } catch (InterruptedException e) {
1413 IOException iie = new InterruptedIOException();
1414 iie.initCause(e);
1415 throw iie;
1416 } catch (ExecutionException e) {
1417 throw new IOException(e.getCause());
1418 } finally {
1419 closeThreadPool.shutdownNow();
1420 }
1421
1422 if (!thrown.isEmpty()) {
1423 throw MultipleIOException.createIOException(thrown);
1424 }
1425 writersClosed = true;
1426 closeAndCleanCompleted = true;
1427 if (progress_failed) {
1428 return null;
1429 }
1430 return paths;
1431 }
1432
1433 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1434 if (writersClosed) {
1435 return thrown;
1436 }
1437
1438 if (thrown == null) {
1439 thrown = Lists.newArrayList();
1440 }
1441 try {
1442 for (WriterThread t : writerThreads) {
1443 while (t.isAlive()) {
1444 t.shouldStop = true;
1445 t.interrupt();
1446 try {
1447 t.join(10);
1448 } catch (InterruptedException e) {
1449 IOException iie = new InterruptedIOException();
1450 iie.initCause(e);
1451 throw iie;
1452 }
1453 }
1454 }
1455 } finally {
1456 synchronized (writers) {
1457 WriterAndPath wap = null;
1458 for (SinkWriter tmpWAP : writers.values()) {
1459 try {
1460 wap = (WriterAndPath) tmpWAP;
1461 wap.w.close();
1462 } catch (IOException ioe) {
1463 LOG.error("Couldn't close log at " + wap.p, ioe);
1464 thrown.add(ioe);
1465 continue;
1466 }
1467 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1468 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1469 }
1470 }
1471 writersClosed = true;
1472 }
1473
1474 return thrown;
1475 }
1476
1477
1478
1479
1480
1481
1482 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1483 byte region[] = entry.getKey().getEncodedRegionName();
1484 WriterAndPath ret = (WriterAndPath) writers.get(region);
1485 if (ret != null) {
1486 return ret;
1487 }
1488
1489
1490 if (blacklistedRegions.contains(region)) {
1491 return null;
1492 }
1493 ret = createWAP(region, entry, rootDir);
1494 if (ret == null) {
1495 blacklistedRegions.add(region);
1496 return null;
1497 }
1498 writers.put(region, ret);
1499 return ret;
1500 }
1501
1502
1503
1504
1505 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1506 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1507 if (regionedits == null) {
1508 return null;
1509 }
1510 if (fs.exists(regionedits)) {
1511 LOG.warn("Found old edits file. It could be the "
1512 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1513 + fs.getFileStatus(regionedits).getLen());
1514 if (!fs.delete(regionedits, false)) {
1515 LOG.warn("Failed delete of old " + regionedits);
1516 }
1517 }
1518 Writer w = createWriter(regionedits);
1519 LOG.debug("Creating writer path=" + regionedits);
1520 return new WriterAndPath(regionedits, w);
1521 }
1522
1523 private void filterCellByStore(Entry logEntry) {
1524 Map<byte[], Long> maxSeqIdInStores =
1525 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1526 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1527 return;
1528 }
1529
1530
1531 ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
1532 for (Cell cell : logEntry.getEdit().getCells()) {
1533 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1534 keptCells.add(cell);
1535 } else {
1536 byte[] family = CellUtil.cloneFamily(cell);
1537 Long maxSeqId = maxSeqIdInStores.get(family);
1538
1539
1540 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
1541 keptCells.add(cell);
1542 }
1543 }
1544 }
1545
1546
1547
1548
1549 logEntry.getEdit().setCells(keptCells);
1550 }
1551
1552 @Override
1553 public void append(RegionEntryBuffer buffer) throws IOException {
1554 List<Entry> entries = buffer.entryBuffer;
1555 if (entries.isEmpty()) {
1556 LOG.warn("got an empty buffer, skipping");
1557 return;
1558 }
1559
1560 WriterAndPath wap = null;
1561
1562 long startTime = System.nanoTime();
1563 try {
1564 int editsCount = 0;
1565
1566 for (Entry logEntry : entries) {
1567 if (wap == null) {
1568 wap = getWriterAndPath(logEntry);
1569 if (wap == null) {
1570 if (LOG.isDebugEnabled()) {
1571 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1572 }
1573 return;
1574 }
1575 }
1576 filterCellByStore(logEntry);
1577 if (!logEntry.getEdit().isEmpty()) {
1578 wap.w.append(logEntry);
1579 this.updateRegionMaximumEditLogSeqNum(logEntry);
1580 editsCount++;
1581 } else {
1582 wap.incrementSkippedEdits(1);
1583 }
1584 }
1585
1586 wap.incrementEdits(editsCount);
1587 wap.incrementNanoTime(System.nanoTime() - startTime);
1588 } catch (IOException e) {
1589 e = RemoteExceptionHandler.checkIOException(e);
1590 LOG.fatal(" Got while writing log entry to log", e);
1591 throw e;
1592 }
1593 }
1594
1595 @Override
1596 public boolean keepRegionEvent(Entry entry) {
1597 ArrayList<Cell> cells = entry.getEdit().getCells();
1598 for (int i = 0; i < cells.size(); i++) {
1599 if (WALEdit.isCompactionMarker(cells.get(i))) {
1600 return true;
1601 }
1602 }
1603 return false;
1604 }
1605
1606
1607
1608
1609 @Override
1610 public Map<byte[], Long> getOutputCounts() {
1611 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1612 synchronized (writers) {
1613 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1614 ret.put(entry.getKey(), entry.getValue().editsWritten);
1615 }
1616 }
1617 return ret;
1618 }
1619
1620 @Override
1621 public int getNumberOfRecoveredRegions() {
1622 return writers.size();
1623 }
1624 }
1625
1626
1627
1628
1629 public abstract static class SinkWriter {
1630
1631 long editsWritten = 0;
1632
1633 long editsSkipped = 0;
1634
1635 long nanosSpent = 0;
1636
1637 void incrementEdits(int edits) {
1638 editsWritten += edits;
1639 }
1640
1641 void incrementSkippedEdits(int skipped) {
1642 editsSkipped += skipped;
1643 }
1644
1645 void incrementNanoTime(long nanos) {
1646 nanosSpent += nanos;
1647 }
1648 }
1649
1650
1651
1652
1653
1654 private final static class WriterAndPath extends SinkWriter {
1655 final Path p;
1656 final Writer w;
1657
1658 WriterAndPath(final Path p, final Writer w) {
1659 this.p = p;
1660 this.w = w;
1661 }
1662 }
1663
1664
1665
1666
1667 class LogReplayOutputSink extends OutputSink {
1668 private static final double BUFFER_THRESHOLD = 0.35;
1669 private static final String KEY_DELIMITER = "#";
1670
1671 private long waitRegionOnlineTimeOut;
1672 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1673 private final Map<String, RegionServerWriter> writers =
1674 new ConcurrentHashMap<String, RegionServerWriter>();
1675
1676 private final Map<String, HRegionLocation> onlineRegions =
1677 new ConcurrentHashMap<String, HRegionLocation>();
1678
1679 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1680 .synchronizedMap(new TreeMap<TableName, HConnection>());
1681
1682
1683
1684
1685 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1686 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1687 private List<Throwable> thrown = new ArrayList<Throwable>();
1688
1689
1690
1691
1692
1693 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1694 private boolean hasEditsInDisablingOrDisabledTables = false;
1695
1696 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1697 int numWriters) {
1698 super(controller, entryBuffers, numWriters);
1699 this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1700 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1701 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1702 entryBuffers, numWriters);
1703 this.logRecoveredEditsOutputSink.setReporter(reporter);
1704 }
1705
1706 @Override
1707 public void append(RegionEntryBuffer buffer) throws IOException {
1708 List<Entry> entries = buffer.entryBuffer;
1709 if (entries.isEmpty()) {
1710 LOG.warn("got an empty buffer, skipping");
1711 return;
1712 }
1713
1714
1715 if (disablingOrDisabledTables.contains(buffer.tableName)) {
1716
1717 logRecoveredEditsOutputSink.append(buffer);
1718 hasEditsInDisablingOrDisabledTables = true;
1719
1720 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1721 return;
1722 }
1723
1724
1725 groupEditsByServer(entries);
1726
1727
1728 String maxLocKey = null;
1729 int maxSize = 0;
1730 List<Pair<HRegionLocation, Entry>> maxQueue = null;
1731 synchronized (this.serverToBufferQueueMap) {
1732 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry:
1733 serverToBufferQueueMap.entrySet()) {
1734 List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
1735 if (curQueue.size() > maxSize) {
1736 maxSize = curQueue.size();
1737 maxQueue = curQueue;
1738 maxLocKey = entry.getKey();
1739 }
1740 }
1741 if (maxSize < minBatchSize
1742 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1743
1744 return;
1745 } else if (maxSize > 0) {
1746 this.serverToBufferQueueMap.remove(maxLocKey);
1747 }
1748 }
1749
1750 if (maxSize > 0) {
1751 processWorkItems(maxLocKey, maxQueue);
1752 }
1753 }
1754
1755 private void addToRecoveredRegions(String encodedRegionName) {
1756 if (!recoveredRegions.contains(encodedRegionName)) {
1757 recoveredRegions.add(encodedRegionName);
1758 }
1759 }
1760
1761
1762
1763
1764
1765 private void groupEditsByServer(List<Entry> entries) throws IOException {
1766 Set<TableName> nonExistentTables = null;
1767 Long cachedLastFlushedSequenceId = -1l;
1768 for (Entry entry : entries) {
1769 WALEdit edit = entry.getEdit();
1770 TableName table = entry.getKey().getTablename();
1771
1772 entry.getKey().setScopes(null);
1773 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1774
1775 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1776 this.skippedEdits.incrementAndGet();
1777 continue;
1778 }
1779
1780 Map<byte[], Long> maxStoreSequenceIds = null;
1781 boolean needSkip = false;
1782 HRegionLocation loc = null;
1783 String locKey = null;
1784 List<Cell> cells = edit.getCells();
1785 List<Cell> skippedCells = new ArrayList<Cell>();
1786 HConnection hconn = this.getConnectionByTableName(table);
1787
1788 for (Cell cell : cells) {
1789 byte[] row = CellUtil.cloneRow(cell);
1790 byte[] family = CellUtil.cloneFamily(cell);
1791 boolean isCompactionEntry = false;
1792 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1793 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1794 if (compaction != null && compaction.hasRegionName()) {
1795 try {
1796 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1797 .toByteArray());
1798 row = regionName[1];
1799 family = compaction.getFamilyName().toByteArray();
1800 isCompactionEntry = true;
1801 } catch (Exception ex) {
1802 LOG.warn("Unexpected exception received, ignoring " + ex);
1803 skippedCells.add(cell);
1804 continue;
1805 }
1806 } else {
1807 skippedCells.add(cell);
1808 continue;
1809 }
1810 }
1811
1812 try {
1813 loc =
1814 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1815 encodeRegionNameStr);
1816
1817 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1818 loc.getRegionInfo().getEncodedName())) {
1819 LOG.info("Not replaying a compaction marker for an older region: "
1820 + encodeRegionNameStr);
1821 needSkip = true;
1822 }
1823 } catch (TableNotFoundException ex) {
1824
1825 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1826 + encodeRegionNameStr);
1827 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1828 if (nonExistentTables == null) {
1829 nonExistentTables = new TreeSet<TableName>();
1830 }
1831 nonExistentTables.add(table);
1832 this.skippedEdits.incrementAndGet();
1833 needSkip = true;
1834 break;
1835 }
1836
1837 cachedLastFlushedSequenceId =
1838 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1839 if (cachedLastFlushedSequenceId != null
1840 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1841
1842 this.skippedEdits.incrementAndGet();
1843 needSkip = true;
1844 break;
1845 } else {
1846 if (maxStoreSequenceIds == null) {
1847 maxStoreSequenceIds =
1848 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1849 }
1850 if (maxStoreSequenceIds != null) {
1851 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1852 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1853
1854 skippedCells.add(cell);
1855 continue;
1856 }
1857 }
1858 }
1859 }
1860
1861
1862 if (loc == null || needSkip) continue;
1863
1864 if (!skippedCells.isEmpty()) {
1865 cells.removeAll(skippedCells);
1866 }
1867
1868 synchronized (serverToBufferQueueMap) {
1869 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1870 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1871 if (queue == null) {
1872 queue =
1873 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1874 serverToBufferQueueMap.put(locKey, queue);
1875 }
1876 queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1877 }
1878
1879 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1880 }
1881 }
1882
1883
1884
1885
1886
1887
1888 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1889 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1890
1891 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1892 if(loc != null) return loc;
1893
1894 loc = hconn.getRegionLocation(table, row, true);
1895 if (loc == null) {
1896 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1897 + " of table:" + table);
1898 }
1899
1900 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1901
1902 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1903 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1904 if (tmpLoc != null) return tmpLoc;
1905 }
1906
1907 Long lastFlushedSequenceId = -1l;
1908 AtomicBoolean isRecovering = new AtomicBoolean(true);
1909 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1910 if (!isRecovering.get()) {
1911
1912
1913 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1914 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1915 + " because it's not in recovering.");
1916 } else {
1917 Long cachedLastFlushedSequenceId =
1918 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1919
1920
1921
1922 RegionStoreSequenceIds ids =
1923 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1924 loc.getRegionInfo().getEncodedName());
1925 if (ids != null) {
1926 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1927 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1928 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1929 for (StoreSequenceId id : maxSeqIdInStores) {
1930 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1931 }
1932 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1933 }
1934
1935 if (cachedLastFlushedSequenceId == null
1936 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1937 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1938 }
1939 }
1940
1941 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1942 return loc;
1943 }
1944
1945 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1946 throws IOException {
1947 RegionServerWriter rsw = null;
1948
1949 long startTime = System.nanoTime();
1950 try {
1951 rsw = getRegionServerWriter(key);
1952 rsw.sink.replayEntries(actions);
1953
1954
1955 rsw.incrementEdits(actions.size());
1956 rsw.incrementNanoTime(System.nanoTime() - startTime);
1957 } catch (IOException e) {
1958 e = RemoteExceptionHandler.checkIOException(e);
1959 LOG.fatal(" Got while writing log entry to log", e);
1960 throw e;
1961 }
1962 }
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1974 final long timeout, AtomicBoolean isRecovering)
1975 throws IOException {
1976 final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1977 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1978 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1979 boolean reloadLocation = false;
1980 TableName tableName = loc.getRegionInfo().getTable();
1981 int tries = 0;
1982 Throwable cause = null;
1983 while (endTime > EnvironmentEdgeManager.currentTime()) {
1984 try {
1985
1986 HConnection hconn = getConnectionByTableName(tableName);
1987 if(reloadLocation) {
1988 loc = hconn.getRegionLocation(tableName, row, true);
1989 }
1990 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1991 HRegionInfo region = loc.getRegionInfo();
1992 try {
1993 GetRegionInfoRequest request =
1994 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1995 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1996 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1997 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1998 return loc;
1999 }
2000 } catch (ServiceException se) {
2001 throw ProtobufUtil.getRemoteException(se);
2002 }
2003 } catch (IOException e) {
2004 cause = e.getCause();
2005 if(!(cause instanceof RegionOpeningException)) {
2006 reloadLocation = true;
2007 }
2008 }
2009 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
2010 try {
2011 Thread.sleep(expectedSleep);
2012 } catch (InterruptedException e) {
2013 throw new IOException("Interrupted when waiting region " +
2014 loc.getRegionInfo().getEncodedName() + " online.", e);
2015 }
2016 tries++;
2017 }
2018
2019 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
2020 " online for " + timeout + " milliseconds.", cause);
2021 }
2022
2023 @Override
2024 public boolean flush() throws IOException {
2025 String curLoc = null;
2026 int curSize = 0;
2027 List<Pair<HRegionLocation, Entry>> curQueue = null;
2028 synchronized (this.serverToBufferQueueMap) {
2029 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
2030 serverToBufferQueueMap.entrySet()) {
2031 String locationKey = entry.getKey();
2032 curQueue = entry.getValue();
2033 if (!curQueue.isEmpty()) {
2034 curSize = curQueue.size();
2035 curLoc = locationKey;
2036 break;
2037 }
2038 }
2039 if (curSize > 0) {
2040 this.serverToBufferQueueMap.remove(curLoc);
2041 }
2042 }
2043
2044 if (curSize > 0) {
2045 this.processWorkItems(curLoc, curQueue);
2046
2047 synchronized(controller.dataAvailable) {
2048 controller.dataAvailable.notifyAll();
2049 }
2050 return true;
2051 }
2052 return false;
2053 }
2054
2055 @Override
2056 public boolean keepRegionEvent(Entry entry) {
2057 return true;
2058 }
2059
2060 void addWriterError(Throwable t) {
2061 thrown.add(t);
2062 }
2063
2064 @Override
2065 public List<Path> finishWritingAndClose() throws IOException {
2066 try {
2067 if (!finishWriting(false)) {
2068 return null;
2069 }
2070 if (hasEditsInDisablingOrDisabledTables) {
2071 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2072 } else {
2073 splits = new ArrayList<Path>();
2074 }
2075
2076 return splits;
2077 } finally {
2078 List<IOException> thrown = closeRegionServerWriters();
2079 if (thrown != null && !thrown.isEmpty()) {
2080 throw MultipleIOException.createIOException(thrown);
2081 }
2082 }
2083 }
2084
2085 @Override
2086 int getNumOpenWriters() {
2087 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2088 }
2089
2090 private List<IOException> closeRegionServerWriters() throws IOException {
2091 List<IOException> result = null;
2092 if (!writersClosed) {
2093 result = Lists.newArrayList();
2094 try {
2095 for (WriterThread t : writerThreads) {
2096 while (t.isAlive()) {
2097 t.shouldStop = true;
2098 t.interrupt();
2099 try {
2100 t.join(10);
2101 } catch (InterruptedException e) {
2102 IOException iie = new InterruptedIOException();
2103 iie.initCause(e);
2104 throw iie;
2105 }
2106 }
2107 }
2108 } finally {
2109 synchronized (writers) {
2110 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2111 String locationKey = entry.getKey();
2112 RegionServerWriter tmpW = entry.getValue();
2113 try {
2114 tmpW.close();
2115 } catch (IOException ioe) {
2116 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2117 result.add(ioe);
2118 }
2119 }
2120 }
2121
2122
2123 synchronized (this.tableNameToHConnectionMap) {
2124 for (Map.Entry<TableName, HConnection> entry :
2125 tableNameToHConnectionMap.entrySet()) {
2126 TableName tableName = entry.getKey();
2127 HConnection hconn = entry.getValue();
2128 try {
2129 hconn.clearRegionCache();
2130 hconn.close();
2131 } catch (IOException ioe) {
2132 result.add(ioe);
2133 }
2134 }
2135 }
2136 writersClosed = true;
2137 }
2138 }
2139 return result;
2140 }
2141
2142 @Override
2143 public Map<byte[], Long> getOutputCounts() {
2144 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2145 synchronized (writers) {
2146 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2147 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2148 }
2149 }
2150 return ret;
2151 }
2152
2153 @Override
2154 public int getNumberOfRecoveredRegions() {
2155 return this.recoveredRegions.size();
2156 }
2157
2158
2159
2160
2161
2162
2163 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2164 RegionServerWriter ret = writers.get(loc);
2165 if (ret != null) {
2166 return ret;
2167 }
2168
2169 TableName tableName = getTableFromLocationStr(loc);
2170 if(tableName == null){
2171 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2172 }
2173
2174 HConnection hconn = getConnectionByTableName(tableName);
2175 synchronized (writers) {
2176 ret = writers.get(loc);
2177 if (ret == null) {
2178 ret = new RegionServerWriter(conf, tableName, hconn);
2179 writers.put(loc, ret);
2180 }
2181 }
2182 return ret;
2183 }
2184
2185 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2186 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2187 if (hconn == null) {
2188 synchronized (this.tableNameToHConnectionMap) {
2189 hconn = this.tableNameToHConnectionMap.get(tableName);
2190 if (hconn == null) {
2191 hconn = HConnectionManager.getConnection(conf);
2192 this.tableNameToHConnectionMap.put(tableName, hconn);
2193 }
2194 }
2195 }
2196 return hconn;
2197 }
2198 private TableName getTableFromLocationStr(String loc) {
2199
2200
2201
2202 String[] splits = loc.split(KEY_DELIMITER);
2203 if (splits.length != 2) {
2204 return null;
2205 }
2206 return TableName.valueOf(splits[1]);
2207 }
2208 }
2209
2210
2211
2212
2213
2214 private final static class RegionServerWriter extends SinkWriter {
2215 final WALEditsReplaySink sink;
2216
2217 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2218 throws IOException {
2219 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2220 }
2221
2222 void close() throws IOException {
2223 }
2224 }
2225
2226 static class CorruptedLogFileException extends Exception {
2227 private static final long serialVersionUID = 1L;
2228
2229 CorruptedLogFileException(String s) {
2230 super(s);
2231 }
2232 }
2233
2234
2235 public static class MutationReplay {
2236 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2237 this.type = type;
2238 this.mutation = mutation;
2239 if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2240
2241 this.mutation.setDurability(Durability.ASYNC_WAL);
2242 }
2243 this.nonceGroup = nonceGroup;
2244 this.nonce = nonce;
2245 }
2246
2247 public final MutationType type;
2248 public final Mutation mutation;
2249 public final long nonceGroup;
2250 public final long nonce;
2251 }
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2265 Pair<WALKey, WALEdit> logEntry, Durability durability)
2266 throws IOException {
2267 if (entry == null) {
2268
2269 return new ArrayList<MutationReplay>();
2270 }
2271
2272 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2273 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2274 int count = entry.getAssociatedCellCount();
2275 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2276 Cell previousCell = null;
2277 Mutation m = null;
2278 WALKey key = null;
2279 WALEdit val = null;
2280 if (logEntry != null) val = new WALEdit();
2281
2282 for (int i = 0; i < count; i++) {
2283
2284 if (!cells.advance()) {
2285 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2286 }
2287 Cell cell = cells.current();
2288 if (val != null) val.add(cell);
2289
2290 boolean isNewRowOrType =
2291 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2292 || !CellUtil.matchingRow(previousCell, cell);
2293 if (isNewRowOrType) {
2294
2295 if (CellUtil.isDelete(cell)) {
2296 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2297
2298 mutations.add(new MutationReplay(
2299 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2300 } else {
2301 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2302
2303 long nonceGroup = entry.getKey().hasNonceGroup()
2304 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2305 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2306 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2307 }
2308 }
2309 if (CellUtil.isDelete(cell)) {
2310 ((Delete) m).addDeleteMarker(cell);
2311 } else {
2312 ((Put) m).add(cell);
2313 }
2314 m.setDurability(durability);
2315 previousCell = cell;
2316 }
2317
2318
2319 if (logEntry != null) {
2320 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2321 List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2322 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2323 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2324 }
2325
2326 key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2327 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2328 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
2329 logEntry.setFirst(key);
2330 logEntry.setSecond(val);
2331 }
2332
2333 return mutations;
2334 }
2335 }