1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileAlreadyExistsException;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellScanner;
62 import org.apache.hadoop.hbase.CellUtil;
63 import org.apache.hadoop.hbase.CoordinatedStateException;
64 import org.apache.hadoop.hbase.CoordinatedStateManager;
65 import org.apache.hadoop.hbase.HBaseConfiguration;
66 import org.apache.hadoop.hbase.HConstants;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.HRegionLocation;
69 import org.apache.hadoop.hbase.RemoteExceptionHandler;
70 import org.apache.hadoop.hbase.ServerName;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.TableNotFoundException;
73 import org.apache.hadoop.hbase.TableStateManager;
74 import org.apache.hadoop.hbase.classification.InterfaceAudience;
75 import org.apache.hadoop.hbase.client.ConnectionUtils;
76 import org.apache.hadoop.hbase.client.Delete;
77 import org.apache.hadoop.hbase.client.Durability;
78 import org.apache.hadoop.hbase.client.HConnection;
79 import org.apache.hadoop.hbase.client.HConnectionManager;
80 import org.apache.hadoop.hbase.client.Mutation;
81 import org.apache.hadoop.hbase.client.Put;
82 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85 import org.apache.hadoop.hbase.io.HeapSize;
86 import org.apache.hadoop.hbase.master.SplitLogManager;
87 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90 import org.apache.hadoop.hbase.protobuf.RequestConverter;
91 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127
128
129
130
131
132
133 @InterfaceAudience.Private
134 public class WALSplitter {
135 static final Log LOG = LogFactory.getLog(WALSplitter.class);
136
137
138 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
139
140
141 protected final Path rootDir;
142 protected final FileSystem fs;
143 protected final Configuration conf;
144
145
146
147 PipelineController controller;
148 OutputSink outputSink;
149 EntryBuffers entryBuffers;
150
151 private Set<TableName> disablingOrDisabledTables =
152 new HashSet<TableName>();
153 private BaseCoordinatedStateManager csm;
154 private final WALFactory walFactory;
155
156 private MonitoredTask status;
157
158
159 protected final LastSequenceId sequenceIdChecker;
160
161 protected boolean distributedLogReplay;
162
163
164 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
165
166
167 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
168 new ConcurrentHashMap<String, Map<byte[], Long>>();
169
170
171 protected String failedServerName = "";
172
173
174 private final int numWriterThreads;
175
176
177 private final int minBatchSize;
178
179 WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
180 FileSystem fs, LastSequenceId idChecker,
181 CoordinatedStateManager csm, RecoveryMode mode) {
182 this.conf = HBaseConfiguration.create(conf);
183 String codecClassName = conf
184 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
185 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
186 this.rootDir = rootDir;
187 this.fs = fs;
188 this.sequenceIdChecker = idChecker;
189 this.csm = (BaseCoordinatedStateManager)csm;
190 this.walFactory = factory;
191 this.controller = new PipelineController();
192
193 entryBuffers = new EntryBuffers(controller,
194 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
195 128*1024*1024));
196
197
198
199 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
200 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
201
202 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
203 if (csm != null && this.distributedLogReplay) {
204 outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
205 } else {
206 if (this.distributedLogReplay) {
207 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
208 }
209 this.distributedLogReplay = false;
210 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
211 }
212
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
232 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
233 CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
234 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
235 return s.splitLogFile(logfile, reporter);
236 }
237
238
239
240
241
242 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
243 FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
244 final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
245 Collections.singletonList(logDir), null);
246 List<Path> splits = new ArrayList<Path>();
247 if (logfiles != null && logfiles.length > 0) {
248 for (FileStatus logfile: logfiles) {
249 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
250 RecoveryMode.LOG_SPLITTING);
251 if (s.splitLogFile(logfile, null)) {
252 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
253 if (s.outputSink.splits != null) {
254 splits.addAll(s.outputSink.splits);
255 }
256 }
257 }
258 }
259 if (!fs.delete(logDir, true)) {
260 throw new IOException("Unable to delete src dir: " + logDir);
261 }
262 return splits;
263 }
264
265
266
267
268
269 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
270 Preconditions.checkState(status == null);
271 Preconditions.checkArgument(logfile.isFile(),
272 "passed in file status is for something other than a regular file.");
273 boolean isCorrupted = false;
274 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
275 SPLIT_SKIP_ERRORS_DEFAULT);
276 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
277 Path logPath = logfile.getPath();
278 boolean outputSinkStarted = false;
279 boolean progress_failed = false;
280 int editsCount = 0;
281 int editsSkipped = 0;
282
283 status =
284 TaskMonitor.get().createStatus(
285 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
286 Reader in = null;
287 try {
288 long logLength = logfile.getLen();
289 LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
290 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
291 status.setStatus("Opening log file");
292 if (reporter != null && !reporter.progress()) {
293 progress_failed = true;
294 return false;
295 }
296 try {
297 in = getReader(logfile, skipErrors, reporter);
298 } catch (CorruptedLogFileException e) {
299 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
300 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
301 isCorrupted = true;
302 }
303 if (in == null) {
304 LOG.warn("Nothing to split in log file " + logPath);
305 return true;
306 }
307 if (csm != null) {
308 try {
309 TableStateManager tsm = csm.getTableStateManager();
310 disablingOrDisabledTables = tsm.getTablesInStates(
311 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
312 } catch (CoordinatedStateException e) {
313 throw new IOException("Can't get disabling/disabled tables", e);
314 }
315 }
316 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
317 int numOpenedFilesLastCheck = 0;
318 outputSink.setReporter(reporter);
319 outputSink.startWriterThreads();
320 outputSinkStarted = true;
321 Entry entry;
322 Long lastFlushedSequenceId = -1L;
323 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
324 failedServerName = (serverName == null) ? "" : serverName.getServerName();
325 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
326 byte[] region = entry.getKey().getEncodedRegionName();
327 String key = Bytes.toString(region);
328 lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
329 if (lastFlushedSequenceId == null) {
330 if (this.distributedLogReplay) {
331 RegionStoreSequenceIds ids =
332 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
333 key);
334 if (ids != null) {
335 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
336 }
337 } else if (sequenceIdChecker != null) {
338 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
339 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
340 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
341 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
342 storeSeqId.getSequenceId());
343 }
344 regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
345 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
346 }
347 if (lastFlushedSequenceId == null) {
348 lastFlushedSequenceId = -1L;
349 }
350 lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
351 }
352 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
353 editsSkipped++;
354 continue;
355 }
356 entryBuffers.appendEntry(entry);
357 editsCount++;
358 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
359
360 if (editsCount % interval == 0
361 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
362 numOpenedFilesLastCheck = this.getNumOpenWriters();
363 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
364 + " edits, skipped " + editsSkipped + " edits.";
365 status.setStatus("Split " + countsStr);
366 if (reporter != null && !reporter.progress()) {
367 progress_failed = true;
368 return false;
369 }
370 }
371 }
372 } catch (InterruptedException ie) {
373 IOException iie = new InterruptedIOException();
374 iie.initCause(ie);
375 throw iie;
376 } catch (CorruptedLogFileException e) {
377 LOG.warn("Could not parse, corrupted log file " + logPath, e);
378 csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
379 logfile.getPath().getName(), fs);
380 isCorrupted = true;
381 } catch (IOException e) {
382 e = RemoteExceptionHandler.checkIOException(e);
383 throw e;
384 } finally {
385 LOG.debug("Finishing writing output logs and closing down.");
386 try {
387 if (null != in) {
388 in.close();
389 }
390 } catch (IOException exception) {
391 LOG.warn("Could not close wal reader: " + exception.getMessage());
392 LOG.debug("exception details", exception);
393 }
394 try {
395 if (outputSinkStarted) {
396
397
398 progress_failed = true;
399 progress_failed = outputSink.finishWritingAndClose() == null;
400 }
401 } finally {
402 String msg =
403 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
404 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
405 ", length=" + logfile.getLen() +
406 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
407 LOG.info(msg);
408 status.markComplete(msg);
409 }
410 }
411 return !progress_failed;
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425 public static void finishSplitLogFile(String logfile,
426 Configuration conf) throws IOException {
427 Path rootdir = FSUtils.getRootDir(conf);
428 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
429 Path logPath;
430 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
431 logPath = new Path(logfile);
432 } else {
433 logPath = new Path(rootdir, logfile);
434 }
435 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
436 }
437
438 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
439 Path logPath, Configuration conf) throws IOException {
440 List<Path> processedLogs = new ArrayList<Path>();
441 List<Path> corruptedLogs = new ArrayList<Path>();
442 FileSystem fs;
443 fs = rootdir.getFileSystem(conf);
444 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
445 corruptedLogs.add(logPath);
446 } else {
447 processedLogs.add(logPath);
448 }
449 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
450 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
451 fs.delete(stagingDir, true);
452 }
453
454
455
456
457
458
459
460
461
462
463
464
465
466 private static void archiveLogs(
467 final List<Path> corruptedLogs,
468 final List<Path> processedLogs, final Path oldLogDir,
469 final FileSystem fs, final Configuration conf) throws IOException {
470 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
471 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
472
473 if (!fs.mkdirs(corruptDir)) {
474 LOG.info("Unable to mkdir " + corruptDir);
475 }
476 fs.mkdirs(oldLogDir);
477
478
479
480 for (Path corrupted : corruptedLogs) {
481 Path p = new Path(corruptDir, corrupted.getName());
482 if (fs.exists(corrupted)) {
483 if (!fs.rename(corrupted, p)) {
484 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
485 } else {
486 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
487 }
488 }
489 }
490
491 for (Path p : processedLogs) {
492 Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
493 if (fs.exists(p)) {
494 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
495 LOG.warn("Unable to move " + p + " to " + newPath);
496 } else {
497 LOG.info("Archived processed log " + p + " to " + newPath);
498 }
499 }
500 }
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515 @SuppressWarnings("deprecation")
516 static Path getRegionSplitEditsPath(final FileSystem fs,
517 final Entry logEntry, final Path rootDir, boolean isCreate)
518 throws IOException {
519 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
520 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
521 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
522 Path dir = getRegionDirRecoveredEditsDir(regiondir);
523
524 if (!fs.exists(regiondir)) {
525 LOG.info("This region's directory doesn't exist: "
526 + regiondir.toString() + ". It is very likely that it was" +
527 " already split so it's safe to discard those edits.");
528 return null;
529 }
530 if (fs.exists(dir) && fs.isFile(dir)) {
531 Path tmp = new Path("/tmp");
532 if (!fs.exists(tmp)) {
533 fs.mkdirs(tmp);
534 }
535 tmp = new Path(tmp,
536 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
537 LOG.warn("Found existing old file: " + dir + ". It could be some "
538 + "leftover of an old installation. It should be a folder instead. "
539 + "So moving it to " + tmp);
540 if (!fs.rename(dir, tmp)) {
541 LOG.warn("Failed to sideline old file " + dir);
542 }
543 }
544
545 if (isCreate && !fs.exists(dir)) {
546 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
547 }
548
549
550 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
551 fileName = getTmpRecoveredEditsFileName(fileName);
552 return new Path(dir, fileName);
553 }
554
555 static String getTmpRecoveredEditsFileName(String fileName) {
556 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
557 }
558
559
560
561
562
563
564
565
566
567 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
568 Long maximumEditLogSeqNum) {
569 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
570 return new Path(srcPath.getParent(), fileName);
571 }
572
573 static String formatRecoveredEditsFileName(final long seqid) {
574 return String.format("%019d", seqid);
575 }
576
577 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
578 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
579
580
581
582
583
584
585
586 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
587 return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
588 }
589
590
591
592
593
594
595
596
597
598
599 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
600 final Path regiondir) throws IOException {
601 NavigableSet<Path> filesSorted = new TreeSet<Path>();
602 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
603 if (!fs.exists(editsdir))
604 return filesSorted;
605 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
606 @Override
607 public boolean accept(Path p) {
608 boolean result = false;
609 try {
610
611
612
613
614 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
615 result = fs.isFile(p) && m.matches();
616
617
618 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
619 result = false;
620 }
621
622 if (isSequenceIdFile(p)) {
623 result = false;
624 }
625 } catch (IOException e) {
626 LOG.warn("Failed isFile check on " + p);
627 }
628 return result;
629 }
630 });
631 if (files == null) {
632 return filesSorted;
633 }
634 for (FileStatus status : files) {
635 filesSorted.add(status.getPath());
636 }
637 return filesSorted;
638 }
639
640
641
642
643
644
645
646
647
648
649 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
650 throws IOException {
651 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
652 + System.currentTimeMillis());
653 if (!fs.rename(edits, moveAsideName)) {
654 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
655 }
656 return moveAsideName;
657 }
658
659 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
660 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
661 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
662
663
664
665
666 @VisibleForTesting
667 public static boolean isSequenceIdFile(final Path file) {
668 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
669 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
670 }
671
672
673
674
675
676
677
678
679
680
681 public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
682 long newSeqId, long saftyBumper) throws IOException {
683
684 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
685 long maxSeqId = 0;
686 FileStatus[] files = null;
687 if (fs.exists(editsdir)) {
688 files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
689 @Override
690 public boolean accept(Path p) {
691 return isSequenceIdFile(p);
692 }
693 });
694 if (files != null) {
695 for (FileStatus status : files) {
696 String fileName = status.getPath().getName();
697 try {
698 Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
699 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
700 maxSeqId = Math.max(tmpSeqId, maxSeqId);
701 } catch (NumberFormatException ex) {
702 LOG.warn("Invalid SeqId File Name=" + fileName);
703 }
704 }
705 }
706 }
707 if (maxSeqId > newSeqId) {
708 newSeqId = maxSeqId;
709 }
710 newSeqId += saftyBumper;
711
712
713 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
714 if (newSeqId != maxSeqId) {
715 try {
716 if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
717 throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
718 }
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
721 + ", maxSeqId=" + maxSeqId);
722 }
723 } catch (FileAlreadyExistsException ignored) {
724
725 }
726 }
727
728 if (files != null) {
729 for (FileStatus status : files) {
730 if (newSeqIdFile.equals(status.getPath())) {
731 continue;
732 }
733 fs.delete(status.getPath(), false);
734 }
735 }
736 return newSeqId;
737 }
738
739
740
741
742
743
744
745
746
747 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
748 throws IOException, CorruptedLogFileException {
749 Path path = file.getPath();
750 long length = file.getLen();
751 Reader in;
752
753
754
755
756 if (length <= 0) {
757 LOG.warn("File " + path + " might be still open, length is 0");
758 }
759
760 try {
761 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
762 try {
763 in = getReader(path, reporter);
764 } catch (EOFException e) {
765 if (length <= 0) {
766
767
768
769
770
771 LOG.warn("Could not open " + path + " for reading. File is empty", e);
772 return null;
773 } else {
774
775 return null;
776 }
777 }
778 } catch (IOException e) {
779 if (e instanceof FileNotFoundException) {
780
781 LOG.warn("File " + path + " doesn't exist anymore.", e);
782 return null;
783 }
784 if (!skipErrors || e instanceof InterruptedIOException) {
785 throw e;
786 }
787 CorruptedLogFileException t =
788 new CorruptedLogFileException("skipErrors=true Could not open wal " +
789 path + " ignoring");
790 t.initCause(e);
791 throw t;
792 }
793 return in;
794 }
795
796 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
797 throws CorruptedLogFileException, IOException {
798 try {
799 return in.next();
800 } catch (EOFException eof) {
801
802 LOG.info("EOF from wal " + path + ". continuing");
803 return null;
804 } catch (IOException e) {
805
806
807 if (e.getCause() != null &&
808 (e.getCause() instanceof ParseException ||
809 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
810 LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
811 + path + ". continuing");
812 return null;
813 }
814 if (!skipErrors) {
815 throw e;
816 }
817 CorruptedLogFileException t =
818 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
819 " while parsing wal " + path + ". Marking as corrupted");
820 t.initCause(e);
821 throw t;
822 }
823 }
824
825
826
827
828
829 protected Writer createWriter(Path logfile)
830 throws IOException {
831 return walFactory.createRecoveredEditsWriter(fs, logfile);
832 }
833
834
835
836
837
838 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
839 return walFactory.createReader(fs, curLogFile, reporter);
840 }
841
842
843
844
845 private int getNumOpenWriters() {
846 int result = 0;
847 if (this.outputSink != null) {
848 result += this.outputSink.getNumOpenWriters();
849 }
850 return result;
851 }
852
853
854
855
856 public static class PipelineController {
857
858
859 AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
860
861
862
863 public final Object dataAvailable = new Object();
864
865 void writerThreadError(Throwable t) {
866 thrown.compareAndSet(null, t);
867 }
868
869
870
871
872 void checkForErrors() throws IOException {
873 Throwable thrown = this.thrown.get();
874 if (thrown == null) return;
875 if (thrown instanceof IOException) {
876 throw new IOException(thrown);
877 } else {
878 throw new RuntimeException(thrown);
879 }
880 }
881 }
882
883
884
885
886
887
888
889
890 public static class EntryBuffers {
891 PipelineController controller;
892
893 Map<byte[], RegionEntryBuffer> buffers =
894 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
895
896
897
898
899 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
900
901 long totalBuffered = 0;
902 long maxHeapUsage;
903
904 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
905 this.controller = controller;
906 this.maxHeapUsage = maxHeapUsage;
907 }
908
909
910
911
912
913
914
915
916 public void appendEntry(Entry entry) throws InterruptedException, IOException {
917 WALKey key = entry.getKey();
918
919 RegionEntryBuffer buffer;
920 long incrHeap;
921 synchronized (this) {
922 buffer = buffers.get(key.getEncodedRegionName());
923 if (buffer == null) {
924 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
925 buffers.put(key.getEncodedRegionName(), buffer);
926 }
927 incrHeap= buffer.appendEntry(entry);
928 }
929
930
931 synchronized (controller.dataAvailable) {
932 totalBuffered += incrHeap;
933 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
934 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
935 controller.dataAvailable.wait(2000);
936 }
937 controller.dataAvailable.notifyAll();
938 }
939 controller.checkForErrors();
940 }
941
942
943
944
945 synchronized RegionEntryBuffer getChunkToWrite() {
946 long biggestSize = 0;
947 byte[] biggestBufferKey = null;
948
949 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
950 long size = entry.getValue().heapSize();
951 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
952 biggestSize = size;
953 biggestBufferKey = entry.getKey();
954 }
955 }
956 if (biggestBufferKey == null) {
957 return null;
958 }
959
960 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
961 currentlyWriting.add(biggestBufferKey);
962 return buffer;
963 }
964
965 void doneWriting(RegionEntryBuffer buffer) {
966 synchronized (this) {
967 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
968 assert removed;
969 }
970 long size = buffer.heapSize();
971
972 synchronized (controller.dataAvailable) {
973 totalBuffered -= size;
974
975 controller.dataAvailable.notifyAll();
976 }
977 }
978
979 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
980 return currentlyWriting.contains(region);
981 }
982
983 public void waitUntilDrained() {
984 synchronized (controller.dataAvailable) {
985 while (totalBuffered > 0) {
986 try {
987 controller.dataAvailable.wait(2000);
988 } catch (InterruptedException e) {
989 LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
990 Thread.interrupted();
991 break;
992 }
993 }
994 }
995 }
996 }
997
998
999
1000
1001
1002
1003
1004 public static class RegionEntryBuffer implements HeapSize {
1005 long heapInBuffer = 0;
1006 List<Entry> entryBuffer;
1007 TableName tableName;
1008 byte[] encodedRegionName;
1009
1010 RegionEntryBuffer(TableName tableName, byte[] region) {
1011 this.tableName = tableName;
1012 this.encodedRegionName = region;
1013 this.entryBuffer = new LinkedList<Entry>();
1014 }
1015
1016 long appendEntry(Entry entry) {
1017 internify(entry);
1018 entryBuffer.add(entry);
1019 long incrHeap = entry.getEdit().heapSize() +
1020 ClassSize.align(2 * ClassSize.REFERENCE) +
1021 0;
1022 heapInBuffer += incrHeap;
1023 return incrHeap;
1024 }
1025
1026 private void internify(Entry entry) {
1027 WALKey k = entry.getKey();
1028 k.internTableName(this.tableName);
1029 k.internEncodedRegionName(this.encodedRegionName);
1030 }
1031
1032 @Override
1033 public long heapSize() {
1034 return heapInBuffer;
1035 }
1036
1037 public byte[] getEncodedRegionName() {
1038 return encodedRegionName;
1039 }
1040
1041 public List<Entry> getEntryBuffer() {
1042 return entryBuffer;
1043 }
1044
1045 public TableName getTableName() {
1046 return tableName;
1047 }
1048 }
1049
1050 public static class WriterThread extends Thread {
1051 private volatile boolean shouldStop = false;
1052 private PipelineController controller;
1053 private EntryBuffers entryBuffers;
1054 private OutputSink outputSink = null;
1055
1056 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1057 super(Thread.currentThread().getName() + "-Writer-" + i);
1058 this.controller = controller;
1059 this.entryBuffers = entryBuffers;
1060 outputSink = sink;
1061 }
1062
1063 @Override
1064 public void run() {
1065 try {
1066 doRun();
1067 } catch (Throwable t) {
1068 LOG.error("Exiting thread", t);
1069 controller.writerThreadError(t);
1070 }
1071 }
1072
1073 private void doRun() throws IOException {
1074 LOG.debug("Writer thread " + this + ": starting");
1075 while (true) {
1076 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1077 if (buffer == null) {
1078
1079 synchronized (controller.dataAvailable) {
1080 if (shouldStop && !this.outputSink.flush()) {
1081 return;
1082 }
1083 try {
1084 controller.dataAvailable.wait(500);
1085 } catch (InterruptedException ie) {
1086 if (!shouldStop) {
1087 throw new RuntimeException(ie);
1088 }
1089 }
1090 }
1091 continue;
1092 }
1093
1094 assert buffer != null;
1095 try {
1096 writeBuffer(buffer);
1097 } finally {
1098 entryBuffers.doneWriting(buffer);
1099 }
1100 }
1101 }
1102
1103 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1104 outputSink.append(buffer);
1105 }
1106
1107 void finish() {
1108 synchronized (controller.dataAvailable) {
1109 shouldStop = true;
1110 controller.dataAvailable.notifyAll();
1111 }
1112 }
1113 }
1114
1115
1116
1117
1118
1119 public static abstract class OutputSink {
1120
1121 protected PipelineController controller;
1122 protected EntryBuffers entryBuffers;
1123
1124 protected Map<byte[], SinkWriter> writers = Collections
1125 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1126
1127 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1128 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1129
1130 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1131
1132
1133 protected final Set<byte[]> blacklistedRegions = Collections
1134 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1135
1136 protected boolean closeAndCleanCompleted = false;
1137
1138 protected boolean writersClosed = false;
1139
1140 protected final int numThreads;
1141
1142 protected CancelableProgressable reporter = null;
1143
1144 protected AtomicLong skippedEdits = new AtomicLong();
1145
1146 protected List<Path> splits = null;
1147
1148 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1149 numThreads = numWriters;
1150 this.controller = controller;
1151 this.entryBuffers = entryBuffers;
1152 }
1153
1154 void setReporter(CancelableProgressable reporter) {
1155 this.reporter = reporter;
1156 }
1157
1158
1159
1160
1161 public synchronized void startWriterThreads() {
1162 for (int i = 0; i < numThreads; i++) {
1163 WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1164 t.start();
1165 writerThreads.add(t);
1166 }
1167 }
1168
1169
1170
1171
1172
1173 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1174 synchronized (regionMaximumEditLogSeqNum) {
1175 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1176 .getEncodedRegionName());
1177 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1178 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1179 .getLogSeqNum());
1180 }
1181 }
1182 }
1183
1184 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1185 return regionMaximumEditLogSeqNum.get(region);
1186 }
1187
1188
1189
1190
1191 int getNumOpenWriters() {
1192 return this.writers.size();
1193 }
1194
1195 long getSkippedEdits() {
1196 return this.skippedEdits.get();
1197 }
1198
1199
1200
1201
1202
1203
1204 protected boolean finishWriting(boolean interrupt) throws IOException {
1205 LOG.debug("Waiting for split writer threads to finish");
1206 boolean progress_failed = false;
1207 for (WriterThread t : writerThreads) {
1208 t.finish();
1209 }
1210 if (interrupt) {
1211 for (WriterThread t : writerThreads) {
1212 t.interrupt();
1213 }
1214 }
1215
1216 for (WriterThread t : writerThreads) {
1217 if (!progress_failed && reporter != null && !reporter.progress()) {
1218 progress_failed = true;
1219 }
1220 try {
1221 t.join();
1222 } catch (InterruptedException ie) {
1223 IOException iie = new InterruptedIOException();
1224 iie.initCause(ie);
1225 throw iie;
1226 }
1227 }
1228 controller.checkForErrors();
1229 LOG.info("Split writers finished");
1230 return (!progress_failed);
1231 }
1232
1233 public abstract List<Path> finishWritingAndClose() throws IOException;
1234
1235
1236
1237
1238 public abstract Map<byte[], Long> getOutputCounts();
1239
1240
1241
1242
1243 public abstract int getNumberOfRecoveredRegions();
1244
1245
1246
1247
1248
1249 public abstract void append(RegionEntryBuffer buffer) throws IOException;
1250
1251
1252
1253
1254
1255 public boolean flush() throws IOException {
1256 return false;
1257 }
1258 }
1259
1260
1261
1262
1263 class LogRecoveredEditsOutputSink extends OutputSink {
1264
1265 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1266 int numWriters) {
1267
1268
1269
1270
1271
1272 super(controller, entryBuffers, numWriters);
1273 }
1274
1275
1276
1277
1278
1279 @Override
1280 public List<Path> finishWritingAndClose() throws IOException {
1281 boolean isSuccessful = false;
1282 List<Path> result = null;
1283 try {
1284 isSuccessful = finishWriting(false);
1285 } finally {
1286 result = close();
1287 List<IOException> thrown = closeLogWriters(null);
1288 if (thrown != null && !thrown.isEmpty()) {
1289 throw MultipleIOException.createIOException(thrown);
1290 }
1291 }
1292 if (isSuccessful) {
1293 splits = result;
1294 }
1295 return splits;
1296 }
1297
1298
1299
1300
1301
1302 private List<Path> close() throws IOException {
1303 Preconditions.checkState(!closeAndCleanCompleted);
1304
1305 final List<Path> paths = new ArrayList<Path>();
1306 final List<IOException> thrown = Lists.newArrayList();
1307 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1308 TimeUnit.SECONDS, new ThreadFactory() {
1309 private int count = 1;
1310
1311 @Override
1312 public Thread newThread(Runnable r) {
1313 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1314 return t;
1315 }
1316 });
1317 CompletionService<Void> completionService =
1318 new ExecutorCompletionService<Void>(closeThreadPool);
1319 for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1320 LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1321 completionService.submit(new Callable<Void>() {
1322 @Override
1323 public Void call() throws Exception {
1324 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1325 LOG.debug("Closing " + wap.p);
1326 try {
1327 wap.w.close();
1328 } catch (IOException ioe) {
1329 LOG.error("Couldn't close log at " + wap.p, ioe);
1330 thrown.add(ioe);
1331 return null;
1332 }
1333 if (LOG.isDebugEnabled()) {
1334 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1335 + " edits, skipped " + wap.editsSkipped + " edits in "
1336 + (wap.nanosSpent / 1000 / 1000) + "ms");
1337 }
1338 if (wap.editsWritten == 0) {
1339
1340 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1341 LOG.warn("Failed deleting empty " + wap.p);
1342 throw new IOException("Failed deleting empty " + wap.p);
1343 }
1344 return null;
1345 }
1346
1347 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1348 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1349 try {
1350 if (!dst.equals(wap.p) && fs.exists(dst)) {
1351 LOG.warn("Found existing old edits file. It could be the "
1352 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1353 + fs.getFileStatus(dst).getLen());
1354 if (!fs.delete(dst, false)) {
1355 LOG.warn("Failed deleting of old " + dst);
1356 throw new IOException("Failed deleting of old " + dst);
1357 }
1358 }
1359
1360
1361
1362 if (fs.exists(wap.p)) {
1363 if (!fs.rename(wap.p, dst)) {
1364 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1365 }
1366 LOG.info("Rename " + wap.p + " to " + dst);
1367 }
1368 } catch (IOException ioe) {
1369 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1370 thrown.add(ioe);
1371 return null;
1372 }
1373 paths.add(dst);
1374 return null;
1375 }
1376 });
1377 }
1378
1379 boolean progress_failed = false;
1380 try {
1381 for (int i = 0, n = this.writers.size(); i < n; i++) {
1382 Future<Void> future = completionService.take();
1383 future.get();
1384 if (!progress_failed && reporter != null && !reporter.progress()) {
1385 progress_failed = true;
1386 }
1387 }
1388 } catch (InterruptedException e) {
1389 IOException iie = new InterruptedIOException();
1390 iie.initCause(e);
1391 throw iie;
1392 } catch (ExecutionException e) {
1393 throw new IOException(e.getCause());
1394 } finally {
1395 closeThreadPool.shutdownNow();
1396 }
1397
1398 if (!thrown.isEmpty()) {
1399 throw MultipleIOException.createIOException(thrown);
1400 }
1401 writersClosed = true;
1402 closeAndCleanCompleted = true;
1403 if (progress_failed) {
1404 return null;
1405 }
1406 return paths;
1407 }
1408
1409 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1410 if (writersClosed) {
1411 return thrown;
1412 }
1413
1414 if (thrown == null) {
1415 thrown = Lists.newArrayList();
1416 }
1417 try {
1418 for (WriterThread t : writerThreads) {
1419 while (t.isAlive()) {
1420 t.shouldStop = true;
1421 t.interrupt();
1422 try {
1423 t.join(10);
1424 } catch (InterruptedException e) {
1425 IOException iie = new InterruptedIOException();
1426 iie.initCause(e);
1427 throw iie;
1428 }
1429 }
1430 }
1431 } finally {
1432 synchronized (writers) {
1433 WriterAndPath wap = null;
1434 for (SinkWriter tmpWAP : writers.values()) {
1435 try {
1436 wap = (WriterAndPath) tmpWAP;
1437 wap.w.close();
1438 } catch (IOException ioe) {
1439 LOG.error("Couldn't close log at " + wap.p, ioe);
1440 thrown.add(ioe);
1441 continue;
1442 }
1443 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1444 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1445 }
1446 }
1447 writersClosed = true;
1448 }
1449
1450 return thrown;
1451 }
1452
1453
1454
1455
1456
1457
1458 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1459 byte region[] = entry.getKey().getEncodedRegionName();
1460 WriterAndPath ret = (WriterAndPath) writers.get(region);
1461 if (ret != null) {
1462 return ret;
1463 }
1464
1465
1466 if (blacklistedRegions.contains(region)) {
1467 return null;
1468 }
1469 ret = createWAP(region, entry, rootDir);
1470 if (ret == null) {
1471 blacklistedRegions.add(region);
1472 return null;
1473 }
1474 writers.put(region, ret);
1475 return ret;
1476 }
1477
1478
1479
1480
1481 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1482 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1483 if (regionedits == null) {
1484 return null;
1485 }
1486 if (fs.exists(regionedits)) {
1487 LOG.warn("Found old edits file. It could be the "
1488 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1489 + fs.getFileStatus(regionedits).getLen());
1490 if (!fs.delete(regionedits, false)) {
1491 LOG.warn("Failed delete of old " + regionedits);
1492 }
1493 }
1494 Writer w = createWriter(regionedits);
1495 LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1496 return (new WriterAndPath(regionedits, w));
1497 }
1498
1499 private void filterCellByStore(Entry logEntry) {
1500 Map<byte[], Long> maxSeqIdInStores =
1501 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1502 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1503 return;
1504 }
1505 List<Cell> skippedCells = new ArrayList<Cell>();
1506 for (Cell cell : logEntry.getEdit().getCells()) {
1507 if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1508 byte[] family = CellUtil.cloneFamily(cell);
1509 Long maxSeqId = maxSeqIdInStores.get(family);
1510
1511
1512 if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
1513 skippedCells.add(cell);
1514 }
1515 }
1516 }
1517 if (!skippedCells.isEmpty()) {
1518 logEntry.getEdit().getCells().removeAll(skippedCells);
1519 }
1520 }
1521
1522 @Override
1523 public void append(RegionEntryBuffer buffer) throws IOException {
1524 List<Entry> entries = buffer.entryBuffer;
1525 if (entries.isEmpty()) {
1526 LOG.warn("got an empty buffer, skipping");
1527 return;
1528 }
1529
1530 WriterAndPath wap = null;
1531
1532 long startTime = System.nanoTime();
1533 try {
1534 int editsCount = 0;
1535
1536 for (Entry logEntry : entries) {
1537 if (wap == null) {
1538 wap = getWriterAndPath(logEntry);
1539 if (wap == null) {
1540 if (LOG.isDebugEnabled()) {
1541 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1542 }
1543 return;
1544 }
1545 }
1546 filterCellByStore(logEntry);
1547 if (!logEntry.getEdit().isEmpty()) {
1548 wap.w.append(logEntry);
1549 this.updateRegionMaximumEditLogSeqNum(logEntry);
1550 editsCount++;
1551 } else {
1552 wap.incrementSkippedEdits(1);
1553 }
1554 }
1555
1556 wap.incrementEdits(editsCount);
1557 wap.incrementNanoTime(System.nanoTime() - startTime);
1558 } catch (IOException e) {
1559 e = RemoteExceptionHandler.checkIOException(e);
1560 LOG.fatal(" Got while writing log entry to log", e);
1561 throw e;
1562 }
1563 }
1564
1565
1566
1567
1568 @Override
1569 public Map<byte[], Long> getOutputCounts() {
1570 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1571 synchronized (writers) {
1572 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1573 ret.put(entry.getKey(), entry.getValue().editsWritten);
1574 }
1575 }
1576 return ret;
1577 }
1578
1579 @Override
1580 public int getNumberOfRecoveredRegions() {
1581 return writers.size();
1582 }
1583 }
1584
1585
1586
1587
1588 public abstract static class SinkWriter {
1589
1590 long editsWritten = 0;
1591
1592 long editsSkipped = 0;
1593
1594 long nanosSpent = 0;
1595
1596 void incrementEdits(int edits) {
1597 editsWritten += edits;
1598 }
1599
1600 void incrementSkippedEdits(int skipped) {
1601 editsSkipped += skipped;
1602 }
1603
1604 void incrementNanoTime(long nanos) {
1605 nanosSpent += nanos;
1606 }
1607 }
1608
1609
1610
1611
1612
1613 private final static class WriterAndPath extends SinkWriter {
1614 final Path p;
1615 final Writer w;
1616
1617 WriterAndPath(final Path p, final Writer w) {
1618 this.p = p;
1619 this.w = w;
1620 }
1621 }
1622
1623
1624
1625
1626 class LogReplayOutputSink extends OutputSink {
1627 private static final double BUFFER_THRESHOLD = 0.35;
1628 private static final String KEY_DELIMITER = "#";
1629
1630 private long waitRegionOnlineTimeOut;
1631 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1632 private final Map<String, RegionServerWriter> writers =
1633 new ConcurrentHashMap<String, RegionServerWriter>();
1634
1635 private final Map<String, HRegionLocation> onlineRegions =
1636 new ConcurrentHashMap<String, HRegionLocation>();
1637
1638 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1639 .synchronizedMap(new TreeMap<TableName, HConnection>());
1640
1641
1642
1643
1644 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1645 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1646 private List<Throwable> thrown = new ArrayList<Throwable>();
1647
1648
1649
1650
1651
1652 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1653 private boolean hasEditsInDisablingOrDisabledTables = false;
1654
1655 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1656 int numWriters) {
1657 super(controller, entryBuffers, numWriters);
1658 this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1659 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1660 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1661 entryBuffers, numWriters);
1662 this.logRecoveredEditsOutputSink.setReporter(reporter);
1663 }
1664
1665 @Override
1666 public void append(RegionEntryBuffer buffer) throws IOException {
1667 List<Entry> entries = buffer.entryBuffer;
1668 if (entries.isEmpty()) {
1669 LOG.warn("got an empty buffer, skipping");
1670 return;
1671 }
1672
1673
1674 if (disablingOrDisabledTables.contains(buffer.tableName)) {
1675
1676 logRecoveredEditsOutputSink.append(buffer);
1677 hasEditsInDisablingOrDisabledTables = true;
1678
1679 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1680 return;
1681 }
1682
1683
1684 groupEditsByServer(entries);
1685
1686
1687 String maxLocKey = null;
1688 int maxSize = 0;
1689 List<Pair<HRegionLocation, Entry>> maxQueue = null;
1690 synchronized (this.serverToBufferQueueMap) {
1691 for (String key : this.serverToBufferQueueMap.keySet()) {
1692 List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1693 if (curQueue.size() > maxSize) {
1694 maxSize = curQueue.size();
1695 maxQueue = curQueue;
1696 maxLocKey = key;
1697 }
1698 }
1699 if (maxSize < minBatchSize
1700 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1701
1702 return;
1703 } else if (maxSize > 0) {
1704 this.serverToBufferQueueMap.remove(maxLocKey);
1705 }
1706 }
1707
1708 if (maxSize > 0) {
1709 processWorkItems(maxLocKey, maxQueue);
1710 }
1711 }
1712
1713 private void addToRecoveredRegions(String encodedRegionName) {
1714 if (!recoveredRegions.contains(encodedRegionName)) {
1715 recoveredRegions.add(encodedRegionName);
1716 }
1717 }
1718
1719
1720
1721
1722
1723 private void groupEditsByServer(List<Entry> entries) throws IOException {
1724 Set<TableName> nonExistentTables = null;
1725 Long cachedLastFlushedSequenceId = -1l;
1726 for (Entry entry : entries) {
1727 WALEdit edit = entry.getEdit();
1728 TableName table = entry.getKey().getTablename();
1729
1730 entry.getKey().setScopes(null);
1731 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1732
1733 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1734 this.skippedEdits.incrementAndGet();
1735 continue;
1736 }
1737
1738 Map<byte[], Long> maxStoreSequenceIds = null;
1739 boolean needSkip = false;
1740 HRegionLocation loc = null;
1741 String locKey = null;
1742 List<Cell> cells = edit.getCells();
1743 List<Cell> skippedCells = new ArrayList<Cell>();
1744 HConnection hconn = this.getConnectionByTableName(table);
1745
1746 for (Cell cell : cells) {
1747 byte[] row = CellUtil.cloneRow(cell);
1748 byte[] family = CellUtil.cloneFamily(cell);
1749 boolean isCompactionEntry = false;
1750 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1751 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1752 if (compaction != null && compaction.hasRegionName()) {
1753 try {
1754 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1755 .toByteArray());
1756 row = regionName[1];
1757 family = compaction.getFamilyName().toByteArray();
1758 isCompactionEntry = true;
1759 } catch (Exception ex) {
1760 LOG.warn("Unexpected exception received, ignoring " + ex);
1761 skippedCells.add(cell);
1762 continue;
1763 }
1764 } else {
1765 skippedCells.add(cell);
1766 continue;
1767 }
1768 }
1769
1770 try {
1771 loc =
1772 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1773 encodeRegionNameStr);
1774
1775 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1776 loc.getRegionInfo().getEncodedName())) {
1777 LOG.info("Not replaying a compaction marker for an older region: "
1778 + encodeRegionNameStr);
1779 needSkip = true;
1780 }
1781 } catch (TableNotFoundException ex) {
1782
1783 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1784 + encodeRegionNameStr);
1785 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1786 if (nonExistentTables == null) {
1787 nonExistentTables = new TreeSet<TableName>();
1788 }
1789 nonExistentTables.add(table);
1790 this.skippedEdits.incrementAndGet();
1791 needSkip = true;
1792 break;
1793 }
1794
1795 cachedLastFlushedSequenceId =
1796 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1797 if (cachedLastFlushedSequenceId != null
1798 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1799
1800 this.skippedEdits.incrementAndGet();
1801 needSkip = true;
1802 break;
1803 } else {
1804 if (maxStoreSequenceIds == null) {
1805 maxStoreSequenceIds =
1806 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1807 }
1808 if (maxStoreSequenceIds != null) {
1809 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1810 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1811
1812 skippedCells.add(cell);
1813 continue;
1814 }
1815 }
1816 }
1817 }
1818
1819
1820 if (loc == null || needSkip) continue;
1821
1822 if (!skippedCells.isEmpty()) {
1823 cells.removeAll(skippedCells);
1824 }
1825
1826 synchronized (serverToBufferQueueMap) {
1827 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1828 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1829 if (queue == null) {
1830 queue =
1831 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1832 serverToBufferQueueMap.put(locKey, queue);
1833 }
1834 queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1835 }
1836
1837 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1838 }
1839 }
1840
1841
1842
1843
1844
1845
1846 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1847 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1848
1849 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1850 if(loc != null) return loc;
1851
1852 loc = hconn.getRegionLocation(table, row, true);
1853 if (loc == null) {
1854 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1855 + " of table:" + table);
1856 }
1857
1858 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1859
1860 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1861 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1862 if (tmpLoc != null) return tmpLoc;
1863 }
1864
1865 Long lastFlushedSequenceId = -1l;
1866 AtomicBoolean isRecovering = new AtomicBoolean(true);
1867 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1868 if (!isRecovering.get()) {
1869
1870
1871 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1872 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1873 + " because it's not in recovering.");
1874 } else {
1875 Long cachedLastFlushedSequenceId =
1876 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1877
1878
1879
1880 RegionStoreSequenceIds ids =
1881 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1882 loc.getRegionInfo().getEncodedName());
1883 if (ids != null) {
1884 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1885 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1886 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1887 for (StoreSequenceId id : maxSeqIdInStores) {
1888 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1889 }
1890 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1891 }
1892
1893 if (cachedLastFlushedSequenceId == null
1894 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1895 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1896 }
1897 }
1898
1899 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1900 return loc;
1901 }
1902
1903 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1904 throws IOException {
1905 RegionServerWriter rsw = null;
1906
1907 long startTime = System.nanoTime();
1908 try {
1909 rsw = getRegionServerWriter(key);
1910 rsw.sink.replayEntries(actions);
1911
1912
1913 rsw.incrementEdits(actions.size());
1914 rsw.incrementNanoTime(System.nanoTime() - startTime);
1915 } catch (IOException e) {
1916 e = RemoteExceptionHandler.checkIOException(e);
1917 LOG.fatal(" Got while writing log entry to log", e);
1918 throw e;
1919 }
1920 }
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1932 final long timeout, AtomicBoolean isRecovering)
1933 throws IOException {
1934 final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1935 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1936 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1937 boolean reloadLocation = false;
1938 TableName tableName = loc.getRegionInfo().getTable();
1939 int tries = 0;
1940 Throwable cause = null;
1941 while (endTime > EnvironmentEdgeManager.currentTime()) {
1942 try {
1943
1944 HConnection hconn = getConnectionByTableName(tableName);
1945 if(reloadLocation) {
1946 loc = hconn.getRegionLocation(tableName, row, true);
1947 }
1948 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1949 HRegionInfo region = loc.getRegionInfo();
1950 try {
1951 GetRegionInfoRequest request =
1952 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1953 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1954 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1955 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1956 return loc;
1957 }
1958 } catch (ServiceException se) {
1959 throw ProtobufUtil.getRemoteException(se);
1960 }
1961 } catch (IOException e) {
1962 cause = e.getCause();
1963 if(!(cause instanceof RegionOpeningException)) {
1964 reloadLocation = true;
1965 }
1966 }
1967 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1968 try {
1969 Thread.sleep(expectedSleep);
1970 } catch (InterruptedException e) {
1971 throw new IOException("Interrupted when waiting region " +
1972 loc.getRegionInfo().getEncodedName() + " online.", e);
1973 }
1974 tries++;
1975 }
1976
1977 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1978 " online for " + timeout + " milliseconds.", cause);
1979 }
1980
1981 @Override
1982 public boolean flush() throws IOException {
1983 String curLoc = null;
1984 int curSize = 0;
1985 List<Pair<HRegionLocation, Entry>> curQueue = null;
1986 synchronized (this.serverToBufferQueueMap) {
1987 for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1988 curQueue = this.serverToBufferQueueMap.get(locationKey);
1989 if (!curQueue.isEmpty()) {
1990 curSize = curQueue.size();
1991 curLoc = locationKey;
1992 break;
1993 }
1994 }
1995 if (curSize > 0) {
1996 this.serverToBufferQueueMap.remove(curLoc);
1997 }
1998 }
1999
2000 if (curSize > 0) {
2001 this.processWorkItems(curLoc, curQueue);
2002
2003 synchronized(controller.dataAvailable) {
2004 controller.dataAvailable.notifyAll();
2005 }
2006 return true;
2007 }
2008 return false;
2009 }
2010
2011 void addWriterError(Throwable t) {
2012 thrown.add(t);
2013 }
2014
2015 @Override
2016 public List<Path> finishWritingAndClose() throws IOException {
2017 try {
2018 if (!finishWriting(false)) {
2019 return null;
2020 }
2021 if (hasEditsInDisablingOrDisabledTables) {
2022 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2023 } else {
2024 splits = new ArrayList<Path>();
2025 }
2026
2027 return splits;
2028 } finally {
2029 List<IOException> thrown = closeRegionServerWriters();
2030 if (thrown != null && !thrown.isEmpty()) {
2031 throw MultipleIOException.createIOException(thrown);
2032 }
2033 }
2034 }
2035
2036 @Override
2037 int getNumOpenWriters() {
2038 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2039 }
2040
2041 private List<IOException> closeRegionServerWriters() throws IOException {
2042 List<IOException> result = null;
2043 if (!writersClosed) {
2044 result = Lists.newArrayList();
2045 try {
2046 for (WriterThread t : writerThreads) {
2047 while (t.isAlive()) {
2048 t.shouldStop = true;
2049 t.interrupt();
2050 try {
2051 t.join(10);
2052 } catch (InterruptedException e) {
2053 IOException iie = new InterruptedIOException();
2054 iie.initCause(e);
2055 throw iie;
2056 }
2057 }
2058 }
2059 } finally {
2060 synchronized (writers) {
2061 for (String locationKey : writers.keySet()) {
2062 RegionServerWriter tmpW = writers.get(locationKey);
2063 try {
2064 tmpW.close();
2065 } catch (IOException ioe) {
2066 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2067 result.add(ioe);
2068 }
2069 }
2070 }
2071
2072
2073 synchronized (this.tableNameToHConnectionMap) {
2074 for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
2075 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2076 try {
2077 hconn.clearRegionCache();
2078 hconn.close();
2079 } catch (IOException ioe) {
2080 result.add(ioe);
2081 }
2082 }
2083 }
2084 writersClosed = true;
2085 }
2086 }
2087 return result;
2088 }
2089
2090 @Override
2091 public Map<byte[], Long> getOutputCounts() {
2092 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2093 synchronized (writers) {
2094 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2095 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2096 }
2097 }
2098 return ret;
2099 }
2100
2101 @Override
2102 public int getNumberOfRecoveredRegions() {
2103 return this.recoveredRegions.size();
2104 }
2105
2106
2107
2108
2109
2110
2111 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2112 RegionServerWriter ret = writers.get(loc);
2113 if (ret != null) {
2114 return ret;
2115 }
2116
2117 TableName tableName = getTableFromLocationStr(loc);
2118 if(tableName == null){
2119 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2120 }
2121
2122 HConnection hconn = getConnectionByTableName(tableName);
2123 synchronized (writers) {
2124 ret = writers.get(loc);
2125 if (ret == null) {
2126 ret = new RegionServerWriter(conf, tableName, hconn);
2127 writers.put(loc, ret);
2128 }
2129 }
2130 return ret;
2131 }
2132
2133 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2134 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2135 if (hconn == null) {
2136 synchronized (this.tableNameToHConnectionMap) {
2137 hconn = this.tableNameToHConnectionMap.get(tableName);
2138 if (hconn == null) {
2139 hconn = HConnectionManager.getConnection(conf);
2140 this.tableNameToHConnectionMap.put(tableName, hconn);
2141 }
2142 }
2143 }
2144 return hconn;
2145 }
2146 private TableName getTableFromLocationStr(String loc) {
2147
2148
2149
2150 String[] splits = loc.split(KEY_DELIMITER);
2151 if (splits.length != 2) {
2152 return null;
2153 }
2154 return TableName.valueOf(splits[1]);
2155 }
2156 }
2157
2158
2159
2160
2161
2162 private final static class RegionServerWriter extends SinkWriter {
2163 final WALEditsReplaySink sink;
2164
2165 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2166 throws IOException {
2167 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2168 }
2169
2170 void close() throws IOException {
2171 }
2172 }
2173
2174 static class CorruptedLogFileException extends Exception {
2175 private static final long serialVersionUID = 1L;
2176
2177 CorruptedLogFileException(String s) {
2178 super(s);
2179 }
2180 }
2181
2182
2183 public static class MutationReplay {
2184 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2185 this.type = type;
2186 this.mutation = mutation;
2187 if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2188
2189 this.mutation.setDurability(Durability.ASYNC_WAL);
2190 }
2191 this.nonceGroup = nonceGroup;
2192 this.nonce = nonce;
2193 }
2194
2195 public final MutationType type;
2196 public final Mutation mutation;
2197 public final long nonceGroup;
2198 public final long nonce;
2199 }
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2213 Pair<WALKey, WALEdit> logEntry, Durability durability)
2214 throws IOException {
2215 if (entry == null) {
2216
2217 return new ArrayList<MutationReplay>();
2218 }
2219
2220 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2221 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2222 int count = entry.getAssociatedCellCount();
2223 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2224 Cell previousCell = null;
2225 Mutation m = null;
2226 WALKey key = null;
2227 WALEdit val = null;
2228 if (logEntry != null) val = new WALEdit();
2229
2230 for (int i = 0; i < count; i++) {
2231
2232 if (!cells.advance()) {
2233 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2234 }
2235 Cell cell = cells.current();
2236 if (val != null) val.add(cell);
2237
2238 boolean isNewRowOrType =
2239 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2240 || !CellUtil.matchingRow(previousCell, cell);
2241 if (isNewRowOrType) {
2242
2243 if (CellUtil.isDelete(cell)) {
2244 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2245
2246 mutations.add(new MutationReplay(
2247 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2248 } else {
2249 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2250
2251 long nonceGroup = entry.getKey().hasNonceGroup()
2252 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2253 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2254 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2255 }
2256 }
2257 if (CellUtil.isDelete(cell)) {
2258 ((Delete) m).addDeleteMarker(cell);
2259 } else {
2260 ((Put) m).add(cell);
2261 }
2262 if (m != null) {
2263 m.setDurability(durability);
2264 }
2265 previousCell = cell;
2266 }
2267
2268
2269 if (logEntry != null) {
2270 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2271 List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2272 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2273 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2274 }
2275
2276 key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2277 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2278 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
2279 logEntry.setFirst(key);
2280 logEntry.setSecond(val);
2281 }
2282
2283 return mutations;
2284 }
2285 }