1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Comparator;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.UUID;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.PriorityBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.Stoppable;
50 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
51 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
53 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54 import org.apache.hadoop.hbase.replication.ReplicationException;
55 import org.apache.hadoop.hbase.replication.ReplicationPeers;
56 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
57 import org.apache.hadoop.hbase.replication.ReplicationQueues;
58 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
59 import org.apache.hadoop.hbase.replication.WALEntryFilter;
60 import org.apache.hadoop.hbase.util.CancelableProgressable;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66 import org.apache.hadoop.hbase.wal.WAL;
67 import org.apache.hadoop.hbase.wal.WALKey;
68
69 import com.google.common.collect.Lists;
70 import com.google.common.util.concurrent.ListenableFuture;
71 import com.google.common.util.concurrent.Service;
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.Private
86 public class ReplicationSource extends Thread
87 implements ReplicationSourceInterface {
88
89 private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
90
91
92 private Map<String, PriorityBlockingQueue<Path>> queues =
93 new HashMap<String, PriorityBlockingQueue<Path>>();
94
95 private int queueSizePerGroup;
96 private ReplicationQueues replicationQueues;
97 private ReplicationPeers replicationPeers;
98
99 private Configuration conf;
100 private ReplicationQueueInfo replicationQueueInfo;
101
102 private String peerId;
103
104 private ReplicationSourceManager manager;
105
106 private Stoppable stopper;
107
108 private long sleepForRetries;
109
110 private long replicationQueueSizeCapacity;
111
112 private int replicationQueueNbCapacity;
113 private FileSystem fs;
114
115 private UUID clusterId;
116
117 private UUID peerClusterId;
118
119 private AtomicLong totalReplicatedEdits = new AtomicLong(0);
120
121 private AtomicLong totalReplicatedOperations = new AtomicLong(0);
122
123 private String peerClusterZnode;
124
125 private int maxRetriesMultiplier;
126
127 private volatile boolean sourceRunning = false;
128
129 private MetricsSource metrics;
130
131 private int logQueueWarnThreshold;
132
133 private ReplicationEndpoint replicationEndpoint;
134
135 private WALEntryFilter walEntryFilter;
136
137 private ReplicationThrottler throttler;
138 private AtomicInteger logQueueSize = new AtomicInteger(0);
139 private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
140 new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
141
142 public enum WorkerState {
143 RUNNING,
144 STOPPED,
145 FINISHED
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 @Override
162 public void init(final Configuration conf, final FileSystem fs,
163 final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
164 final ReplicationPeers replicationPeers, final Stoppable stopper,
165 final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
166 final MetricsSource metrics)
167 throws IOException {
168 this.stopper = stopper;
169 this.conf = HBaseConfiguration.create(conf);
170 decorateConf();
171 this.replicationQueueSizeCapacity =
172 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
173 this.replicationQueueNbCapacity =
174 this.conf.getInt("replication.source.nb.capacity", 25000);
175 this.sleepForRetries =
176 this.conf.getLong("replication.source.sleepforretries", 1000);
177 this.maxRetriesMultiplier =
178 this.conf.getInt("replication.source.maxretriesmultiplier", 300);
179 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
180 long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
181 this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
182 this.replicationQueues = replicationQueues;
183 this.replicationPeers = replicationPeers;
184 this.manager = manager;
185 this.fs = fs;
186 this.metrics = metrics;
187 this.clusterId = clusterId;
188
189 this.peerClusterZnode = peerClusterZnode;
190 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
191
192 this.peerId = this.replicationQueueInfo.getPeerId();
193 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
194 this.replicationEndpoint = replicationEndpoint;
195 }
196
197 private void decorateConf() {
198 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
199 if (StringUtils.isNotEmpty(replicationCodec)) {
200 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
201 }
202 }
203
204 @Override
205 public void enqueueLog(Path log) {
206 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
207 PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
208 if (queue == null) {
209 queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
210 queues.put(logPrefix, queue);
211 if (this.sourceRunning) {
212
213
214
215 final ReplicationSourceWorkerThread worker =
216 new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
217 ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
218 if (extant != null) {
219 LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
220 } else {
221 LOG.debug("Starting up worker for wal group " + logPrefix);
222 worker.startup();
223 }
224 }
225 }
226 queue.put(log);
227 int queueSize = logQueueSize.incrementAndGet();
228 this.metrics.setSizeOfLogQueue(queueSize);
229
230 if (queue.size() > this.logQueueWarnThreshold) {
231 LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
232 + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
233 }
234 }
235
236 private void uninitialize() {
237 LOG.debug("Source exiting " + this.peerId);
238 metrics.clear();
239 if (replicationEndpoint.state() == Service.State.STARTING
240 || replicationEndpoint.state() == Service.State.RUNNING) {
241 replicationEndpoint.stopAndWait();
242 }
243 }
244
245 @Override
246 public void run() {
247
248 this.sourceRunning = true;
249 try {
250
251 Service.State state = replicationEndpoint.start().get();
252 if (state != Service.State.RUNNING) {
253 LOG.warn("ReplicationEndpoint was not started. Exiting");
254 uninitialize();
255 return;
256 }
257 } catch (Exception ex) {
258 LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
259 throw new RuntimeException(ex);
260 }
261
262
263 ArrayList<WALEntryFilter> filters = Lists.newArrayList(
264 (WALEntryFilter)new SystemTableWALEntryFilter());
265 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
266 if (filterFromEndpoint != null) {
267 filters.add(filterFromEndpoint);
268 }
269 this.walEntryFilter = new ChainWALEntryFilter(filters);
270
271 int sleepMultiplier = 1;
272
273 while (this.isSourceActive() && this.peerClusterId == null) {
274 this.peerClusterId = replicationEndpoint.getPeerUUID();
275 if (this.isSourceActive() && this.peerClusterId == null) {
276 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
277 sleepMultiplier++;
278 }
279 }
280 }
281
282
283
284 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
285 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
286 + peerClusterId + " which is not allowed by ReplicationEndpoint:"
287 + replicationEndpoint.getClass().getName(), null, false);
288 this.manager.closeQueue(this);
289 return;
290 }
291 LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
292
293 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
294 String walGroupId = entry.getKey();
295 PriorityBlockingQueue<Path> queue = entry.getValue();
296 final ReplicationSourceWorkerThread worker =
297 new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
298 ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
299 if (extant != null) {
300 LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
301 } else {
302 LOG.debug("Starting up worker for wal group " + walGroupId);
303 worker.startup();
304 }
305 }
306 }
307
308
309
310
311
312
313
314 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
315 try {
316 if (LOG.isTraceEnabled()) {
317 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
318 }
319 Thread.sleep(this.sleepForRetries * sleepMultiplier);
320 } catch (InterruptedException e) {
321 LOG.debug("Interrupted while sleeping between retries");
322 Thread.currentThread().interrupt();
323 }
324 return sleepMultiplier < maxRetriesMultiplier;
325 }
326
327
328
329
330
331
332 protected boolean isPeerEnabled() {
333 return this.replicationPeers.getStatusOfPeer(this.peerId);
334 }
335
336 @Override
337 public void startup() {
338 String n = Thread.currentThread().getName();
339 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
340 @Override
341 public void uncaughtException(final Thread t, final Throwable e) {
342 LOG.error("Unexpected exception in ReplicationSource", e);
343 }
344 };
345 Threads
346 .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
347 }
348
349 @Override
350 public void terminate(String reason) {
351 terminate(reason, null);
352 }
353
354 @Override
355 public void terminate(String reason, Exception cause) {
356 terminate(reason, cause, true);
357 }
358
359 public void terminate(String reason, Exception cause, boolean join) {
360 if (cause == null) {
361 LOG.info("Closing source "
362 + this.peerClusterZnode + " because: " + reason);
363
364 } else {
365 LOG.error("Closing source " + this.peerClusterZnode
366 + " because an error occurred: " + reason, cause);
367 }
368 this.sourceRunning = false;
369 Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
370 for (ReplicationSourceWorkerThread worker : workers) {
371 worker.setWorkerState(WorkerState.STOPPED);
372 worker.interrupt();
373 }
374 ListenableFuture<Service.State> future = null;
375 if (this.replicationEndpoint != null) {
376 future = this.replicationEndpoint.stop();
377 }
378 if (join) {
379 for (ReplicationSourceWorkerThread worker : workers) {
380 Threads.shutdown(worker, this.sleepForRetries);
381 LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
382 }
383 if (future != null) {
384 try {
385 future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
386 } catch (Exception e) {
387 LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
388 + this.peerClusterZnode,
389 e);
390 }
391 }
392 }
393 }
394
395 @Override
396 public String getPeerClusterZnode() {
397 return this.peerClusterZnode;
398 }
399
400 @Override
401 public String getPeerClusterId() {
402 return this.peerId;
403 }
404
405 @Override
406 public Path getCurrentPath() {
407
408 for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
409 if (worker.getCurrentPath() != null) return worker.getCurrentPath();
410 }
411 return null;
412 }
413
414 private boolean isSourceActive() {
415 return !this.stopper.isStopped() && this.sourceRunning;
416 }
417
418
419
420
421 public static class LogsComparator implements Comparator<Path> {
422
423 @Override
424 public int compare(Path o1, Path o2) {
425 return Long.compare(getTS(o1), getTS(o2));
426 }
427
428
429
430
431
432
433
434 private static long getTS(Path p) {
435 int tsIndex = p.getName().lastIndexOf('.') + 1;
436 return Long.parseLong(p.getName().substring(tsIndex));
437 }
438 }
439
440 @Override
441 public String getStats() {
442 StringBuilder sb = new StringBuilder();
443 sb.append("Total replicated edits: ").append(totalReplicatedEdits)
444 .append(", current progress: \n");
445 for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
446 String walGroupId = entry.getKey();
447 ReplicationSourceWorkerThread worker = entry.getValue();
448 long position = worker.getCurrentPosition();
449 Path currentPath = worker.getCurrentPath();
450 sb.append("walGroup [").append(walGroupId).append("]: ");
451 if (currentPath != null) {
452 sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
453 .append(position).append("\n");
454 } else {
455 sb.append("no replication ongoing, waiting for new log");
456 }
457 }
458 return sb.toString();
459 }
460
461
462
463
464
465 public MetricsSource getSourceMetrics() {
466 return this.metrics;
467 }
468
469 public class ReplicationSourceWorkerThread extends Thread {
470 private ReplicationSource source;
471 private String walGroupId;
472 private PriorityBlockingQueue<Path> queue;
473 private ReplicationQueueInfo replicationQueueInfo;
474
475 private WAL.Reader reader;
476
477 private long lastLoggedPosition = -1;
478
479 private volatile Path currentPath;
480
481 private ReplicationWALReaderManager repLogReader;
482
483 private int currentNbOperations = 0;
484
485 private int currentSize = 0;
486
487 private WorkerState state;
488
489 public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
490 ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
491 this.walGroupId = walGroupId;
492 this.queue = queue;
493 this.replicationQueueInfo = replicationQueueInfo;
494 this.repLogReader = new ReplicationWALReaderManager(fs, conf);
495 this.source = source;
496 }
497
498 @Override
499 public void run() {
500 setWorkerState(WorkerState.RUNNING);
501
502
503 if (this.replicationQueueInfo.isQueueRecovered()) {
504 try {
505 this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
506 this.queue.peek().getName()));
507 if (LOG.isTraceEnabled()) {
508 LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
509 + this.repLogReader.getPosition());
510 }
511 } catch (ReplicationException e) {
512 terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
513 }
514 }
515 int sleepMultiplier = 1;
516
517 while (isWorkerActive()) {
518
519 if (!isPeerEnabled()) {
520 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
521 sleepMultiplier++;
522 }
523 continue;
524 }
525 Path oldPath = getCurrentPath();
526
527
528
529 boolean hasCurrentPath = getNextPath();
530 if (getCurrentPath() != null && oldPath == null) {
531 sleepMultiplier = 1;
532 }
533 if (!hasCurrentPath) {
534 if (sleepForRetries("No log to process", sleepMultiplier)) {
535 sleepMultiplier++;
536 }
537 continue;
538 }
539 boolean currentWALisBeingWrittenTo = false;
540
541
542
543
544
545
546
547
548 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
549 currentWALisBeingWrittenTo = true;
550 }
551
552 if (!openReader(sleepMultiplier)) {
553
554 sleepMultiplier = 1;
555 continue;
556 }
557
558
559 if (this.reader == null) {
560 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
561 sleepMultiplier++;
562 }
563 continue;
564 }
565
566 boolean gotIOE = false;
567 currentNbOperations = 0;
568 List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
569 currentSize = 0;
570 try {
571 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
572 continue;
573 }
574 } catch (IOException ioe) {
575 LOG.warn(peerClusterZnode + " Got: ", ioe);
576 gotIOE = true;
577 if (ioe.getCause() instanceof EOFException) {
578
579 boolean considerDumping = false;
580 if (this.replicationQueueInfo.isQueueRecovered()) {
581 try {
582 FileStatus stat = fs.getFileStatus(this.currentPath);
583 if (stat.getLen() == 0) {
584 LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
585 }
586 considerDumping = true;
587 } catch (IOException e) {
588 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
589 }
590 }
591
592 if (considerDumping &&
593 sleepMultiplier == maxRetriesMultiplier &&
594 processEndOfFile(false)) {
595 continue;
596 }
597 }
598 } finally {
599 try {
600 this.reader = null;
601 this.repLogReader.closeReader();
602 } catch (IOException e) {
603 gotIOE = true;
604 LOG.warn("Unable to finalize the tailing of a file", e);
605 }
606 }
607
608
609
610
611 if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
612 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
613 manager.logPositionAndCleanOldLogs(this.currentPath,
614 peerClusterZnode, this.repLogReader.getPosition(),
615 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
616 this.lastLoggedPosition = this.repLogReader.getPosition();
617 }
618
619 if (!gotIOE) {
620 sleepMultiplier = 1;
621
622
623 metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
624 }
625 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
626 sleepMultiplier++;
627 }
628 continue;
629 }
630 sleepMultiplier = 1;
631 shipEdits(currentWALisBeingWrittenTo, entries);
632 }
633 if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
634
635 synchronized (workerThreads) {
636 Threads.sleep(100);
637 boolean allOtherTaskDone = true;
638 for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
639 if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
640 allOtherTaskDone = false;
641 break;
642 }
643 }
644 if (allOtherTaskDone) {
645 manager.closeRecoveredQueue(this.source);
646 LOG.info("Finished recovering queue " + peerClusterZnode
647 + " with the following stats: " + getStats());
648 }
649 }
650 }
651
652 if (state != WorkerState.FINISHED) {
653 setWorkerState(WorkerState.STOPPED);
654 }
655 }
656
657
658
659
660
661
662
663
664
665 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
666 List<WAL.Entry> entries) throws IOException {
667 long seenEntries = 0;
668 if (LOG.isTraceEnabled()) {
669 LOG.trace("Seeking in " + this.currentPath + " at position "
670 + this.repLogReader.getPosition());
671 }
672 this.repLogReader.seek();
673 long positionBeforeRead = this.repLogReader.getPosition();
674 WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
675 while (entry != null) {
676 metrics.incrLogEditsRead();
677 seenEntries++;
678
679
680 if (replicationEndpoint.canReplicateToSameCluster()
681 || !entry.getKey().getClusterIds().contains(peerClusterId)) {
682
683 entry = walEntryFilter.filter(entry);
684 WALEdit edit = null;
685 WALKey logKey = null;
686 if (entry != null) {
687 edit = entry.getEdit();
688 logKey = entry.getKey();
689 }
690
691 if (edit != null && edit.size() != 0) {
692
693 logKey.addClusterId(clusterId);
694 currentNbOperations += countDistinctRowKeys(edit);
695 entries.add(entry);
696 currentSize += entry.getEdit().heapSize();
697 } else {
698 metrics.incrLogEditsFiltered();
699 }
700 }
701
702
703 if (currentSize >= replicationQueueSizeCapacity
704 || entries.size() >= replicationQueueNbCapacity) {
705 break;
706 }
707 try {
708 entry = this.repLogReader.readNextAndSetPosition();
709 } catch (IOException ie) {
710 LOG.debug("Break on IOE: " + ie.getMessage());
711 break;
712 }
713 }
714 metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
715 if (currentWALisBeingWrittenTo) {
716 return false;
717 }
718
719
720 return seenEntries == 0 && processEndOfFile(false);
721 }
722
723
724
725
726
727 protected boolean getNextPath() {
728 try {
729 if (this.currentPath == null) {
730 this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
731 int queueSize = logQueueSize.decrementAndGet();
732 metrics.setSizeOfLogQueue(queueSize);
733 if (this.currentPath != null) {
734
735 manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
736 this.replicationQueueInfo.isQueueRecovered());
737 if (LOG.isTraceEnabled()) {
738 LOG.trace("New log: " + this.currentPath);
739 }
740 }
741 }
742 } catch (InterruptedException e) {
743 LOG.warn("Interrupted while reading edits", e);
744 }
745 return this.currentPath != null;
746 }
747
748
749
750
751
752
753
754 protected boolean openReader(int sleepMultiplier) {
755 try {
756 try {
757 if (LOG.isTraceEnabled()) {
758 LOG.trace("Opening log " + this.currentPath);
759 }
760 this.reader = repLogReader.openReader(this.currentPath);
761 } catch (FileNotFoundException fnfe) {
762 if (this.replicationQueueInfo.isQueueRecovered()) {
763
764
765
766 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
767 LOG.info("NB dead servers : " + deadRegionServers.size());
768 final Path rootDir = FSUtils.getRootDir(conf);
769 for (String curDeadServerName : deadRegionServers) {
770 final Path deadRsDirectory = new Path(rootDir,
771 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
772 Path[] locs = new Path[] {
773 new Path(deadRsDirectory, currentPath.getName()),
774 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
775 currentPath.getName()),
776 };
777 for (Path possibleLogLocation : locs) {
778 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
779 if (manager.getFs().exists(possibleLogLocation)) {
780
781 LOG.info("Log " + this.currentPath + " still exists at " +
782 possibleLogLocation);
783
784
785 if (stopper instanceof ReplicationSyncUp.DummyServer) {
786
787 this.currentPath = possibleLogLocation;
788 this.openReader(sleepMultiplier);
789 }
790 return true;
791 }
792 }
793 }
794
795
796 if (stopper instanceof ReplicationSyncUp.DummyServer) {
797
798
799 FileStatus[] rss = fs.listStatus(manager.getLogDir());
800 for (FileStatus rs : rss) {
801 Path p = rs.getPath();
802 FileStatus[] logs = fs.listStatus(p);
803 for (FileStatus log : logs) {
804 p = new Path(p, log.getPath().getName());
805 if (p.getName().equals(currentPath.getName())) {
806 currentPath = p;
807 LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
808
809 this.openReader(sleepMultiplier);
810 return true;
811 }
812 }
813 }
814 }
815
816
817
818
819
820
821
822
823 throw new IOException("File from recovered queue is " +
824 "nowhere to be found", fnfe);
825 } else {
826
827 Path archivedLogLocation =
828 new Path(manager.getOldLogDir(), currentPath.getName());
829 if (manager.getFs().exists(archivedLogLocation)) {
830 currentPath = archivedLogLocation;
831 LOG.info("Log " + this.currentPath + " was moved to " +
832 archivedLogLocation);
833
834 this.openReader(sleepMultiplier);
835
836 }
837
838 }
839 }
840 } catch (LeaseNotRecoveredException lnre) {
841
842 LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
843 recoverLease(conf, currentPath);
844 this.reader = null;
845 } catch (IOException ioe) {
846 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
847 LOG.warn(peerClusterZnode + " Got: ", ioe);
848 this.reader = null;
849 if (ioe.getCause() instanceof NullPointerException) {
850
851
852
853 LOG.warn("Got NPE opening reader, will retry.");
854 } else if (sleepMultiplier >= maxRetriesMultiplier
855 && conf.getBoolean("replication.source.eof.autorecovery", false)) {
856
857
858 LOG.warn("Waited too long for this file, considering dumping");
859 return !processEndOfFile(true);
860 }
861 }
862 return true;
863 }
864
865 private void recoverLease(final Configuration conf, final Path path) {
866 try {
867 final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
868 FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
869 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
870 @Override
871 public boolean progress() {
872 LOG.debug("recover WAL lease: " + path);
873 return isWorkerActive();
874 }
875 });
876 } catch (IOException e) {
877 LOG.warn("unable to recover lease for WAL: " + path, e);
878 }
879 }
880
881
882
883
884
885
886
887 private boolean isCurrentLogEmpty() {
888 return (this.repLogReader.getPosition() == 0 &&
889 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
890 }
891
892
893
894
895
896
897
898 private int countDistinctRowKeys(WALEdit edit) {
899 List<Cell> cells = edit.getCells();
900 int distinctRowKeys = 1;
901 Cell lastCell = cells.get(0);
902 for (int i = 0; i < edit.size(); i++) {
903 if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
904 distinctRowKeys++;
905 }
906 }
907 return distinctRowKeys;
908 }
909
910
911
912
913
914
915 protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
916 int sleepMultiplier = 0;
917 if (entries.isEmpty()) {
918 LOG.warn("Was given 0 edits to ship");
919 return;
920 }
921 while (isWorkerActive()) {
922 try {
923 if (throttler.isEnabled()) {
924 long sleepTicks = throttler.getNextSleepInterval(currentSize);
925 if (sleepTicks > 0) {
926 try {
927 if (LOG.isTraceEnabled()) {
928 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
929 }
930 Thread.sleep(sleepTicks);
931 } catch (InterruptedException e) {
932 LOG.debug("Interrupted while sleeping for throttling control");
933 Thread.currentThread().interrupt();
934
935
936 continue;
937 }
938
939 throttler.resetStartTick();
940 }
941 }
942
943
944 ReplicationEndpoint.ReplicateContext replicateContext =
945 new ReplicationEndpoint.ReplicateContext();
946 replicateContext.setEntries(entries).setSize(currentSize);
947 replicateContext.setWalGroupId(walGroupId);
948
949 long startTimeNs = System.nanoTime();
950
951 boolean replicated = replicationEndpoint.replicate(replicateContext);
952 long endTimeNs = System.nanoTime();
953
954 if (!replicated) {
955 continue;
956 } else {
957 sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
958 }
959
960 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
961 manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
962 this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
963 currentWALisBeingWrittenTo);
964 this.lastLoggedPosition = this.repLogReader.getPosition();
965 }
966 if (throttler.isEnabled()) {
967 throttler.addPushSize(currentSize);
968 }
969 totalReplicatedEdits.addAndGet(entries.size());
970 totalReplicatedOperations.addAndGet(currentNbOperations);
971
972 metrics.shipBatch(currentNbOperations, currentSize / 1024);
973 metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
974 walGroupId);
975 if (LOG.isTraceEnabled()) {
976 LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
977 + totalReplicatedOperations + " operations in "
978 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
979 }
980 break;
981 } catch (Exception ex) {
982 LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
983 + org.apache.hadoop.util.StringUtils.stringifyException(ex));
984 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
985 sleepMultiplier++;
986 }
987 }
988 }
989 }
990
991
992
993
994
995
996
997 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
998 justification = "Yeah, this is how it works")
999 protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
1000
1001 if (this.queue.size() != 0) {
1002
1003 final long trailerSize = this.repLogReader.currentTrailerSize();
1004 final long currentPosition = this.repLogReader.getPosition();
1005 FileStatus stat = null;
1006 try {
1007 stat = fs.getFileStatus(this.currentPath);
1008 } catch (IOException exception) {
1009 LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
1010 + ", stats: " + getStats());
1011 metrics.incrUnknownFileLengthForClosedWAL();
1012 }
1013 if (stat != null) {
1014 if (trailerSize < 0) {
1015 if (currentPosition < stat.getLen()) {
1016 final long skippedBytes = stat.getLen() - currentPosition;
1017 LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
1018 metrics.incrUncleanlyClosedWALs();
1019 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
1020 }
1021 } else if (currentPosition + trailerSize < stat.getLen()){
1022 LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
1023 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
1024 repLogReader.setPosition(0);
1025 metrics.incrRestartedWALReading();
1026 metrics.incrRepeatedFileBytes(currentPosition);
1027 return false;
1028 }
1029 }
1030 if (LOG.isTraceEnabled()) {
1031 LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1032 + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
1033 }
1034 if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
1035 return false;
1036 }
1037 this.currentPath = null;
1038 this.repLogReader.finishCurrentFile();
1039 this.reader = null;
1040 metrics.incrCompletedWAL();
1041 return true;
1042 } else if (this.replicationQueueInfo.isQueueRecovered()) {
1043 LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1044 + peerClusterZnode);
1045 metrics.incrCompletedRecoveryQueue();
1046 setWorkerState(WorkerState.FINISHED);
1047 return true;
1048 }
1049 return false;
1050 }
1051
1052 public void startup() {
1053 String n = Thread.currentThread().getName();
1054 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1055 @Override
1056 public void uncaughtException(final Thread t, final Throwable e) {
1057 RSRpcServices.exitIfOOME(e);
1058 LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1059 + getCurrentPath(), e);
1060 stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
1061 }
1062 };
1063 Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1064 + peerClusterZnode, handler);
1065 workerThreads.put(walGroupId, this);
1066 }
1067
1068 public Path getCurrentPath() {
1069 return this.currentPath;
1070 }
1071
1072 public long getCurrentPosition() {
1073 return this.repLogReader.getPosition();
1074 }
1075
1076 private boolean isWorkerActive() {
1077 return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
1078 }
1079
1080 private void terminate(String reason, Exception cause) {
1081 if (cause == null) {
1082 LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1083
1084 } else {
1085 LOG.error("Closing worker for wal group " + this.walGroupId
1086 + " because an error occurred: " + reason, cause);
1087 }
1088 setWorkerState(WorkerState.STOPPED);
1089 this.interrupt();
1090 Threads.shutdown(this, sleepForRetries);
1091 LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1092 }
1093
1094
1095
1096
1097
1098 public void setWorkerState(WorkerState state) {
1099 this.state = state;
1100 }
1101
1102
1103
1104
1105
1106 public WorkerState getWorkerState() {
1107 return state;
1108 }
1109 }
1110 }