1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Comparator;
26 import java.util.List;
27 import java.util.UUID;
28 import java.util.concurrent.PriorityBlockingQueue;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.lang.StringUtils;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.Stoppable;
44 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
45 import org.apache.hadoop.hbase.wal.WAL;
46 import org.apache.hadoop.hbase.wal.WALKey;
47 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
49 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50 import org.apache.hadoop.hbase.replication.ReplicationException;
51 import org.apache.hadoop.hbase.replication.ReplicationPeers;
52 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
53 import org.apache.hadoop.hbase.replication.ReplicationQueues;
54 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
55 import org.apache.hadoop.hbase.replication.WALEntryFilter;
56 import org.apache.hadoop.hbase.util.CancelableProgressable;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.FSUtils;
59 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
60 import org.apache.hadoop.hbase.util.Threads;
61
62 import com.google.common.collect.Lists;
63 import com.google.common.util.concurrent.ListenableFuture;
64 import com.google.common.util.concurrent.Service;
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @InterfaceAudience.Private
79 public class ReplicationSource extends Thread
80 implements ReplicationSourceInterface {
81
82 public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
83
84 private PriorityBlockingQueue<Path> queue;
85 private ReplicationQueues replicationQueues;
86 private ReplicationPeers replicationPeers;
87
88 private Configuration conf;
89 private ReplicationQueueInfo replicationQueueInfo;
90
91 private String peerId;
92
93 private ReplicationSourceManager manager;
94
95 private Stoppable stopper;
96
97 private long sleepForRetries;
98
99 private long replicationQueueSizeCapacity;
100
101 private int replicationQueueNbCapacity;
102
103 private WAL.Reader reader;
104
105 private long lastLoggedPosition = -1;
106
107 private volatile Path currentPath;
108 private FileSystem fs;
109
110 private UUID clusterId;
111
112 private UUID peerClusterId;
113
114 private long totalReplicatedEdits = 0;
115
116 private long totalReplicatedOperations = 0;
117
118 private String peerClusterZnode;
119
120 private int maxRetriesMultiplier;
121
122 private int currentNbOperations = 0;
123
124 private int currentSize = 0;
125
126 private volatile boolean running = true;
127
128 private MetricsSource metrics;
129
130 private ReplicationWALReaderManager repLogReader;
131
132 private int logQueueWarnThreshold;
133
134 private ReplicationEndpoint replicationEndpoint;
135
136 private WALEntryFilter walEntryFilter;
137
138 private ReplicationThrottler throttler;
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 @Override
154 public void init(final Configuration conf, final FileSystem fs,
155 final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
156 final ReplicationPeers replicationPeers, final Stoppable stopper,
157 final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
158 final MetricsSource metrics)
159 throws IOException {
160 this.stopper = stopper;
161 this.conf = HBaseConfiguration.create(conf);
162 decorateConf();
163 this.replicationQueueSizeCapacity =
164 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
165 this.replicationQueueNbCapacity =
166 this.conf.getInt("replication.source.nb.capacity", 25000);
167 this.sleepForRetries =
168 this.conf.getLong("replication.source.sleepforretries", 1000);
169 this.maxRetriesMultiplier =
170 this.conf.getInt("replication.source.maxretriesmultiplier", 300);
171 this.queue =
172 new PriorityBlockingQueue<Path>(
173 this.conf.getInt("hbase.regionserver.maxlogs", 32),
174 new LogsComparator());
175 long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
176 this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
177 this.replicationQueues = replicationQueues;
178 this.replicationPeers = replicationPeers;
179 this.manager = manager;
180 this.fs = fs;
181 this.metrics = metrics;
182 this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
183 this.clusterId = clusterId;
184
185 this.peerClusterZnode = peerClusterZnode;
186 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
187
188 this.peerId = this.replicationQueueInfo.getPeerId();
189 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
190 this.replicationEndpoint = replicationEndpoint;
191 }
192
193 private void decorateConf() {
194 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
195 if (StringUtils.isNotEmpty(replicationCodec)) {
196 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
197 }
198 }
199
200 @Override
201 public void enqueueLog(Path log) {
202 this.queue.put(log);
203 int queueSize = queue.size();
204 this.metrics.setSizeOfLogQueue(queueSize);
205
206 if (queueSize > this.logQueueWarnThreshold) {
207 LOG.warn("Queue size: " + queueSize +
208 " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
209 }
210 }
211
212 private void uninitialize() {
213 LOG.debug("Source exiting " + this.peerId);
214 metrics.clear();
215 if (replicationEndpoint.state() == Service.State.STARTING
216 || replicationEndpoint.state() == Service.State.RUNNING) {
217 replicationEndpoint.stopAndWait();
218 }
219 }
220
221 @Override
222 public void run() {
223
224 if (!this.isActive()) {
225 uninitialize();
226 return;
227 }
228
229 try {
230
231 Service.State state = replicationEndpoint.start().get();
232 if (state != Service.State.RUNNING) {
233 LOG.warn("ReplicationEndpoint was not started. Exiting");
234 uninitialize();
235 return;
236 }
237 } catch (Exception ex) {
238 LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
239 throw new RuntimeException(ex);
240 }
241
242
243 ArrayList<WALEntryFilter> filters = Lists.newArrayList(
244 (WALEntryFilter)new SystemTableWALEntryFilter());
245 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
246 if (filterFromEndpoint != null) {
247 filters.add(filterFromEndpoint);
248 }
249 this.walEntryFilter = new ChainWALEntryFilter(filters);
250
251 int sleepMultiplier = 1;
252
253 while (this.isActive() && this.peerClusterId == null) {
254 this.peerClusterId = replicationEndpoint.getPeerUUID();
255 if (this.isActive() && this.peerClusterId == null) {
256 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
257 sleepMultiplier++;
258 }
259 }
260 }
261
262 if (!this.isActive()) {
263 uninitialize();
264 return;
265 }
266
267
268 sleepMultiplier = 1;
269
270
271
272 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
273 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
274 + peerClusterId + " which is not allowed by ReplicationEndpoint:"
275 + replicationEndpoint.getClass().getName(), null, false);
276 }
277 LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
278
279
280
281 if (this.replicationQueueInfo.isQueueRecovered()) {
282 try {
283 this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
284 this.queue.peek().getName()));
285 if (LOG.isTraceEnabled()) {
286 LOG.trace("Recovered queue started with log " + this.queue.peek() +
287 " at position " + this.repLogReader.getPosition());
288 }
289 } catch (ReplicationException e) {
290 this.terminate("Couldn't get the position of this recovered queue " +
291 this.peerClusterZnode, e);
292 }
293 }
294
295 while (isActive()) {
296
297 if (!isPeerEnabled()) {
298 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
299 sleepMultiplier++;
300 }
301 continue;
302 }
303 Path oldPath = getCurrentPath();
304
305
306
307 boolean hasCurrentPath = getNextPath();
308 if (getCurrentPath() != null && oldPath == null) {
309 sleepMultiplier = 1;
310 }
311 if (!hasCurrentPath) {
312 if (sleepForRetries("No log to process", sleepMultiplier)) {
313 sleepMultiplier++;
314 }
315 continue;
316 }
317 boolean currentWALisBeingWrittenTo = false;
318
319
320
321
322
323
324
325
326 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
327 currentWALisBeingWrittenTo = true;
328 }
329
330 if (!openReader(sleepMultiplier)) {
331
332 sleepMultiplier = 1;
333 continue;
334 }
335
336
337 if (this.reader == null) {
338 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
339 sleepMultiplier++;
340 }
341 continue;
342 }
343
344 boolean gotIOE = false;
345 currentNbOperations = 0;
346 List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
347 currentSize = 0;
348 try {
349 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
350 continue;
351 }
352 } catch (IOException ioe) {
353 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
354 gotIOE = true;
355 if (ioe.getCause() instanceof EOFException) {
356
357 boolean considerDumping = false;
358 if (this.replicationQueueInfo.isQueueRecovered()) {
359 try {
360 FileStatus stat = this.fs.getFileStatus(this.currentPath);
361 if (stat.getLen() == 0) {
362 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
363 }
364 considerDumping = true;
365 } catch (IOException e) {
366 LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
367 }
368 }
369
370 if (considerDumping &&
371 sleepMultiplier == this.maxRetriesMultiplier &&
372 processEndOfFile()) {
373 continue;
374 }
375 }
376 } finally {
377 try {
378 this.reader = null;
379 this.repLogReader.closeReader();
380 } catch (IOException e) {
381 gotIOE = true;
382 LOG.warn("Unable to finalize the tailing of a file", e);
383 }
384 }
385
386
387
388
389 if (this.isActive() && (gotIOE || entries.isEmpty())) {
390 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
391 this.manager.logPositionAndCleanOldLogs(this.currentPath,
392 this.peerClusterZnode, this.repLogReader.getPosition(),
393 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
394 this.lastLoggedPosition = this.repLogReader.getPosition();
395 }
396
397 if (!gotIOE) {
398 sleepMultiplier = 1;
399
400
401 this.metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime());
402 }
403 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
404 sleepMultiplier++;
405 }
406 continue;
407 }
408 sleepMultiplier = 1;
409 shipEdits(currentWALisBeingWrittenTo, entries);
410 }
411 uninitialize();
412 }
413
414
415
416
417
418
419
420
421
422
423 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
424 List<WAL.Entry> entries) throws IOException {
425 long seenEntries = 0;
426 if (LOG.isTraceEnabled()) {
427 LOG.trace("Seeking in " + this.currentPath + " at position "
428 + this.repLogReader.getPosition());
429 }
430 this.repLogReader.seek();
431 long positionBeforeRead = this.repLogReader.getPosition();
432 WAL.Entry entry =
433 this.repLogReader.readNextAndSetPosition();
434 while (entry != null) {
435 this.metrics.incrLogEditsRead();
436 seenEntries++;
437
438
439 if (replicationEndpoint.canReplicateToSameCluster()
440 || !entry.getKey().getClusterIds().contains(peerClusterId)) {
441
442 entry = walEntryFilter.filter(entry);
443 WALEdit edit = null;
444 WALKey logKey = null;
445 if (entry != null) {
446 edit = entry.getEdit();
447 logKey = entry.getKey();
448 }
449
450 if (edit != null && edit.size() != 0) {
451
452 logKey.addClusterId(clusterId);
453 currentNbOperations += countDistinctRowKeys(edit);
454 entries.add(entry);
455 currentSize += entry.getEdit().heapSize();
456 } else {
457 this.metrics.incrLogEditsFiltered();
458 }
459 }
460
461 if (currentSize >= this.replicationQueueSizeCapacity ||
462 entries.size() >= this.replicationQueueNbCapacity) {
463 break;
464 }
465 try {
466 entry = this.repLogReader.readNextAndSetPosition();
467 } catch (IOException ie) {
468 LOG.debug("Break on IOE: " + ie.getMessage());
469 break;
470 }
471 }
472 metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
473 if (currentWALisBeingWrittenTo) {
474 return false;
475 }
476
477
478 return seenEntries == 0 && processEndOfFile();
479 }
480
481
482
483
484
485 protected boolean getNextPath() {
486 try {
487 if (this.currentPath == null) {
488 this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
489 this.metrics.setSizeOfLogQueue(queue.size());
490 if (this.currentPath != null) {
491 this.manager.cleanOldLogs(this.currentPath.getName(),
492 this.peerId,
493 this.replicationQueueInfo.isQueueRecovered());
494 if (LOG.isTraceEnabled()) {
495 LOG.trace("New log: " + this.currentPath);
496 }
497 }
498 }
499 } catch (InterruptedException e) {
500 LOG.warn("Interrupted while reading edits", e);
501 }
502 return this.currentPath != null;
503 }
504
505
506
507
508
509
510
511 protected boolean openReader(int sleepMultiplier) {
512 try {
513 try {
514 if (LOG.isTraceEnabled()) {
515 LOG.trace("Opening log " + this.currentPath);
516 }
517 this.reader = repLogReader.openReader(this.currentPath);
518 } catch (FileNotFoundException fnfe) {
519 if (this.replicationQueueInfo.isQueueRecovered()) {
520
521
522
523 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
524 LOG.info("NB dead servers : " + deadRegionServers.size());
525 final Path rootDir = FSUtils.getRootDir(this.conf);
526 for (String curDeadServerName : deadRegionServers) {
527 final Path deadRsDirectory = new Path(rootDir,
528 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
529 Path[] locs = new Path[] {
530 new Path(deadRsDirectory, currentPath.getName()),
531 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
532 currentPath.getName()),
533 };
534 for (Path possibleLogLocation : locs) {
535 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
536 if (this.manager.getFs().exists(possibleLogLocation)) {
537
538 LOG.info("Log " + this.currentPath + " still exists at " +
539 possibleLogLocation);
540
541
542 return true;
543 }
544 }
545 }
546
547
548 if (stopper instanceof ReplicationSyncUp.DummyServer) {
549
550
551 FileStatus[] rss = fs.listStatus(manager.getLogDir());
552 for (FileStatus rs : rss) {
553 Path p = rs.getPath();
554 FileStatus[] logs = fs.listStatus(p);
555 for (FileStatus log : logs) {
556 p = new Path(p, log.getPath().getName());
557 if (p.getName().equals(currentPath.getName())) {
558 currentPath = p;
559 LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
560
561 this.openReader(sleepMultiplier);
562 return true;
563 }
564 }
565 }
566 }
567
568
569
570
571
572
573
574
575 throw new IOException("File from recovered queue is " +
576 "nowhere to be found", fnfe);
577 } else {
578
579 Path archivedLogLocation =
580 new Path(manager.getOldLogDir(), currentPath.getName());
581 if (this.manager.getFs().exists(archivedLogLocation)) {
582 currentPath = archivedLogLocation;
583 LOG.info("Log " + this.currentPath + " was moved to " +
584 archivedLogLocation);
585
586 this.openReader(sleepMultiplier);
587
588 }
589
590 }
591 }
592 } catch (LeaseNotRecoveredException lnre) {
593
594 LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
595 recoverLease(conf, currentPath);
596 this.reader = null;
597 } catch (IOException ioe) {
598 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
599 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
600 this.reader = null;
601 if (ioe.getCause() instanceof NullPointerException) {
602
603
604
605 LOG.warn("Got NPE opening reader, will retry.");
606 } else if (sleepMultiplier == this.maxRetriesMultiplier) {
607
608
609 LOG.warn("Waited too long for this file, considering dumping");
610 return !processEndOfFile();
611 }
612 }
613 return true;
614 }
615
616 private void recoverLease(final Configuration conf, final Path path) {
617 try {
618 final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
619 FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
620 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
621 @Override
622 public boolean progress() {
623 LOG.debug("recover WAL lease: " + path);
624 return isActive();
625 }
626 });
627 } catch (IOException e) {
628 LOG.warn("unable to recover lease for WAL: " + path, e);
629 }
630 }
631
632
633
634
635
636
637
638 private boolean isCurrentLogEmpty() {
639 return (this.repLogReader.getPosition() == 0 &&
640 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
641 }
642
643
644
645
646
647
648
649 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
650 try {
651 if (LOG.isTraceEnabled()) {
652 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
653 }
654 Thread.sleep(this.sleepForRetries * sleepMultiplier);
655 } catch (InterruptedException e) {
656 LOG.debug("Interrupted while sleeping between retries");
657 Thread.currentThread().interrupt();
658 }
659 return sleepMultiplier < maxRetriesMultiplier;
660 }
661
662
663
664
665
666
667
668 private int countDistinctRowKeys(WALEdit edit) {
669 List<Cell> cells = edit.getCells();
670 int distinctRowKeys = 1;
671 Cell lastCell = cells.get(0);
672 for (int i = 0; i < edit.size(); i++) {
673 if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
674 distinctRowKeys++;
675 }
676 lastCell = cells.get(i);
677 }
678 return distinctRowKeys;
679 }
680
681
682
683
684
685
686 protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
687 int sleepMultiplier = 0;
688 if (entries.isEmpty()) {
689 LOG.warn("Was given 0 edits to ship");
690 return;
691 }
692 while (this.isActive()) {
693 try {
694 if (this.throttler.isEnabled()) {
695 long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
696 if (sleepTicks > 0) {
697 try {
698 if (LOG.isTraceEnabled()) {
699 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
700 }
701 Thread.sleep(sleepTicks);
702 } catch (InterruptedException e) {
703 LOG.debug("Interrupted while sleeping for throttling control");
704 Thread.currentThread().interrupt();
705
706
707 continue;
708 }
709
710 this.throttler.resetStartTick();
711 }
712 }
713
714 ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
715 replicateContext.setEntries(entries).setSize(currentSize);
716
717 long startTimeNs = System.nanoTime();
718
719 boolean replicated = replicationEndpoint.replicate(replicateContext);
720 long endTimeNs = System.nanoTime();
721
722 if (!replicated) {
723 continue;
724 } else {
725 sleepMultiplier = Math.max(sleepMultiplier-1, 0);
726 }
727
728 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
729 this.manager.logPositionAndCleanOldLogs(this.currentPath,
730 this.peerClusterZnode, this.repLogReader.getPosition(),
731 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
732 this.lastLoggedPosition = this.repLogReader.getPosition();
733 }
734 if (this.throttler.isEnabled()) {
735 this.throttler.addPushSize(currentSize);
736 }
737 this.totalReplicatedEdits += entries.size();
738 this.totalReplicatedOperations += currentNbOperations;
739 this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
740 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
741 if (LOG.isTraceEnabled()) {
742 LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
743 + this.totalReplicatedOperations + " operations in " +
744 ((endTimeNs - startTimeNs)/1000000) + " ms");
745 }
746 break;
747 } catch (Exception ex) {
748 LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
749 org.apache.hadoop.util.StringUtils.stringifyException(ex));
750 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
751 sleepMultiplier++;
752 }
753 }
754 }
755 }
756
757
758
759
760
761
762 protected boolean isPeerEnabled() {
763 return this.replicationPeers.getStatusOfPeer(this.peerId);
764 }
765
766
767
768
769
770
771
772
773 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
774 justification="Yeah, this is how it works")
775 protected boolean processEndOfFile() {
776
777 if (this.queue.size() != 0) {
778
779 final long trailerSize = this.repLogReader.currentTrailerSize();
780 final long currentPosition = this.repLogReader.getPosition();
781 FileStatus stat = null;
782 try {
783 stat = fs.getFileStatus(this.currentPath);
784 } catch (IOException exception) {
785 LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
786 + ", stats: " + getStats());
787 metrics.incrUnknownFileLengthForClosedWAL();
788 }
789 if (stat != null) {
790 if (trailerSize < 0) {
791 if (currentPosition < stat.getLen()) {
792 final long skippedBytes = stat.getLen() - currentPosition;
793 LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
794 metrics.incrUncleanlyClosedWALs();
795 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
796 }
797 } else if (currentPosition + trailerSize < stat.getLen()){
798 LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
799 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
800 repLogReader.setPosition(0);
801 metrics.incrRestartedWALReading();
802 metrics.incrRepeatedFileBytes(currentPosition);
803 return false;
804 }
805 }
806 if (LOG.isTraceEnabled()) {
807 LOG.trace("Reached the end of a log, stats: " + getStats()
808 + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
809 }
810 this.currentPath = null;
811 this.repLogReader.finishCurrentFile();
812 this.reader = null;
813 metrics.incrCompletedWAL();
814 return true;
815 } else if (this.replicationQueueInfo.isQueueRecovered()) {
816 this.manager.closeRecoveredQueue(this);
817 LOG.info("Finished recovering the queue with the following stats " + getStats());
818 metrics.incrCompletedRecoveryQueue();
819 this.running = false;
820 return true;
821 }
822 return false;
823 }
824
825 @Override
826 public void startup() {
827 String n = Thread.currentThread().getName();
828 Thread.UncaughtExceptionHandler handler =
829 new Thread.UncaughtExceptionHandler() {
830 @Override
831 public void uncaughtException(final Thread t, final Throwable e) {
832 LOG.error("Unexpected exception in ReplicationSource," +
833 " currentPath=" + currentPath, e);
834 }
835 };
836 Threads.setDaemonThreadRunning(
837 this, n + ".replicationSource," +
838 this.peerClusterZnode, handler);
839 }
840
841 @Override
842 public void terminate(String reason) {
843 terminate(reason, null);
844 }
845
846 @Override
847 public void terminate(String reason, Exception cause) {
848 terminate(reason, cause, true);
849 }
850
851 public void terminate(String reason, Exception cause, boolean join) {
852 if (cause == null) {
853 LOG.info("Closing source "
854 + this.peerClusterZnode + " because: " + reason);
855
856 } else {
857 LOG.error("Closing source " + this.peerClusterZnode
858 + " because an error occurred: " + reason, cause);
859 }
860 this.running = false;
861 this.interrupt();
862 ListenableFuture<Service.State> future = null;
863 if (this.replicationEndpoint != null) {
864 future = this.replicationEndpoint.stop();
865 }
866 if (join) {
867 Threads.shutdown(this, this.sleepForRetries);
868 if (future != null) {
869 try {
870 future.get();
871 } catch (Exception e) {
872 LOG.warn("Got exception:" + e);
873 }
874 }
875 }
876 }
877
878 @Override
879 public String getPeerClusterZnode() {
880 return this.peerClusterZnode;
881 }
882
883 @Override
884 public String getPeerClusterId() {
885 return this.peerId;
886 }
887
888 @Override
889 public Path getCurrentPath() {
890 return this.currentPath;
891 }
892
893 private boolean isActive() {
894 return !this.stopper.isStopped() && this.running && !isInterrupted();
895 }
896
897
898
899
900 public static class LogsComparator implements Comparator<Path> {
901
902 @Override
903 public int compare(Path o1, Path o2) {
904 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
905 }
906
907
908
909
910
911
912
913 private static long getTS(Path p) {
914 int tsIndex = p.getName().lastIndexOf('.') + 1;
915 return Long.parseLong(p.getName().substring(tsIndex));
916 }
917 }
918
919 @Override
920 public String getStats() {
921 long position = this.repLogReader.getPosition();
922 return "Total replicated edits: " + totalReplicatedEdits +
923 ", currently replicating from: " + this.currentPath +
924 " at position: " + position;
925 }
926
927
928
929
930
931 public MetricsSource getSourceMetrics() {
932 return this.metrics;
933 }
934 }