1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.SortedMap;
36 import java.util.SortedSet;
37 import java.util.TreeSet;
38 import java.util.UUID;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.CopyOnWriteArrayList;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.RejectedExecutionException;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Server;
52 import org.apache.hadoop.hbase.TableDescriptors;
53 import org.apache.hadoop.hbase.classification.InterfaceAudience;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
56 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
57 import org.apache.hadoop.hbase.replication.ReplicationException;
58 import org.apache.hadoop.hbase.replication.ReplicationListener;
59 import org.apache.hadoop.hbase.replication.ReplicationPeer;
60 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
61 import org.apache.hadoop.hbase.replication.ReplicationPeers;
62 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
63 import org.apache.hadoop.hbase.replication.ReplicationQueues;
64 import org.apache.hadoop.hbase.replication.ReplicationTracker;
65 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @InterfaceAudience.Private
84 public class ReplicationSourceManager implements ReplicationListener {
85 private static final Log LOG =
86 LogFactory.getLog(ReplicationSourceManager.class);
87
88 private final List<ReplicationSourceInterface> sources;
89
90 private final List<ReplicationSourceInterface> oldsources;
91 private final ReplicationQueues replicationQueues;
92 private final ReplicationTracker replicationTracker;
93 private final ReplicationPeers replicationPeers;
94
95 private final UUID clusterId;
96
97 private final Server server;
98
99
100 private final Map<String, Map<String, SortedSet<String>>> walsById;
101
102 private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
103 private final Configuration conf;
104 private final FileSystem fs;
105
106 private Set<Path> latestPaths;
107
108 private final Path logDir;
109
110 private final Path oldLogDir;
111
112 private final long sleepBeforeFailover;
113
114 private final ThreadPoolExecutor executor;
115
116 private final Random rand;
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
132 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
133 final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
134 final Path oldLogDir, final UUID clusterId) {
135
136
137 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
138 this.replicationQueues = replicationQueues;
139 this.replicationPeers = replicationPeers;
140 this.replicationTracker = replicationTracker;
141 this.server = server;
142 this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
143 this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
144 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
145 this.conf = conf;
146 this.fs = fs;
147 this.logDir = logDir;
148 this.oldLogDir = oldLogDir;
149 this.sleepBeforeFailover =
150 conf.getLong("replication.sleep.before.failover", 30000);
151 this.clusterId = clusterId;
152 this.replicationTracker.registerListener(this);
153 this.replicationPeers.getAllPeerIds();
154
155
156 int nbWorkers = conf.getInt("replication.executor.workers", 1);
157
158
159 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
160 100, TimeUnit.MILLISECONDS,
161 new LinkedBlockingQueue<Runnable>());
162 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
163 tfb.setNameFormat("ReplicationExecutor-%d");
164 tfb.setDaemon(true);
165 this.executor.setThreadFactory(tfb.build());
166 this.rand = new Random();
167 this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181 public void logPositionAndCleanOldLogs(Path log, String id, long position,
182 boolean queueRecovered, boolean holdLogInZK) {
183 String fileName = log.getName();
184 this.replicationQueues.setLogPosition(id, fileName, position);
185 if (holdLogInZK) {
186 return;
187 }
188 cleanOldLogs(fileName, id, queueRecovered);
189 }
190
191
192
193
194
195
196
197
198 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
199 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
200 if (queueRecovered) {
201 SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
202 if (wals != null && !wals.first().equals(key)) {
203 cleanOldLogs(wals, key, id);
204 }
205 } else {
206 synchronized (this.walsById) {
207 SortedSet<String> wals = walsById.get(id).get(logPrefix);
208 if (wals != null && !wals.first().equals(key)) {
209 cleanOldLogs(wals, key, id);
210 }
211 }
212 }
213 }
214
215 private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
216 SortedSet<String> walSet = wals.headSet(key);
217 LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
218 for (String wal : walSet) {
219 this.replicationQueues.removeLog(id, wal);
220 }
221 walSet.clear();
222 }
223
224
225
226
227
228 protected void init() throws IOException, ReplicationException {
229 for (String id : this.replicationPeers.getPeerIds()) {
230 addSource(id);
231 }
232 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
233 if (currentReplicators == null || currentReplicators.isEmpty()) {
234 return;
235 }
236 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
237 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
238 + otherRegionServers);
239
240
241 for (String rs : currentReplicators) {
242 if (!otherRegionServers.contains(rs)) {
243 transferQueues(rs);
244 }
245 }
246 }
247
248
249
250
251
252
253
254
255 protected ReplicationSourceInterface addSource(String id) throws IOException,
256 ReplicationException {
257 ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
258 ReplicationPeer peer = replicationPeers.getPeer(id);
259 ReplicationSourceInterface src =
260 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
261 this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
262 synchronized (this.walsById) {
263 this.sources.add(src);
264 Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
265 this.walsById.put(id, walsByGroup);
266
267 synchronized (latestPaths) {
268 if (this.latestPaths.size() > 0) {
269 for (Path logPath : latestPaths) {
270 String name = logPath.getName();
271 String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
272 SortedSet<String> logs = new TreeSet<String>();
273 logs.add(name);
274 walsByGroup.put(walPrefix, logs);
275 try {
276 this.replicationQueues.addLog(id, name);
277 } catch (ReplicationException e) {
278 String message =
279 "Cannot add log to queue when creating a new source, queueId=" + id
280 + ", filename=" + name;
281 server.stop(message);
282 throw e;
283 }
284 src.enqueueLog(logPath);
285 }
286 }
287 }
288 }
289 src.startup();
290 return src;
291 }
292
293
294
295
296
297 public void deleteSource(String peerId, boolean closeConnection) {
298 this.replicationQueues.removeQueue(peerId);
299 if (closeConnection) {
300 this.replicationPeers.peerRemoved(peerId);
301 }
302 }
303
304
305
306
307 public void join() {
308 this.executor.shutdown();
309 if (this.sources.size() == 0) {
310 this.replicationQueues.removeAllQueues();
311 }
312 for (ReplicationSourceInterface source : this.sources) {
313 source.terminate("Region server is closing");
314 }
315 }
316
317
318
319
320
321 protected Map<String, Map<String, SortedSet<String>>> getWALs() {
322 return Collections.unmodifiableMap(walsById);
323 }
324
325
326
327
328
329 protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
330 return Collections.unmodifiableMap(walsByIdRecoveredQueues);
331 }
332
333
334
335
336
337 public List<ReplicationSourceInterface> getSources() {
338 return this.sources;
339 }
340
341
342
343
344
345 public List<ReplicationSourceInterface> getOldSources() {
346 return this.oldsources;
347 }
348
349 @VisibleForTesting
350 List<String> getAllQueues() {
351 return replicationQueues.getAllQueues();
352 }
353
354 void preLogRoll(Path newLog) throws IOException {
355 recordLog(newLog);
356 String logName = newLog.getName();
357 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
358 synchronized (latestPaths) {
359 Iterator<Path> iterator = latestPaths.iterator();
360 while (iterator.hasNext()) {
361 Path path = iterator.next();
362 if (path.getName().contains(logPrefix)) {
363 iterator.remove();
364 break;
365 }
366 }
367 this.latestPaths.add(newLog);
368 }
369 }
370
371
372
373
374
375
376
377 private void recordLog(Path logPath) throws IOException {
378 String logName = logPath.getName();
379 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
380
381
382 synchronized (replicationPeers) {
383 for (String id : replicationPeers.getPeerIds()) {
384 try {
385 this.replicationQueues.addLog(id, logName);
386 } catch (ReplicationException e) {
387 throw new IOException("Cannot add log to replication queue"
388 + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
389 }
390 }
391 }
392
393 synchronized (walsById) {
394 for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
395 String peerId = entry.getKey();
396 Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
397 boolean existingPrefix = false;
398 for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
399 SortedSet<String> wals = walsEntry.getValue();
400 if (this.sources.isEmpty()) {
401
402
403 wals.clear();
404 }
405 if (logPrefix.equals(walsEntry.getKey())) {
406 wals.add(logName);
407 existingPrefix = true;
408 }
409 }
410 if (!existingPrefix) {
411
412 LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
413 SortedSet<String> wals = new TreeSet<String>();
414 wals.add(logName);
415 walsByPrefix.put(logPrefix, wals);
416 }
417 }
418 }
419 }
420
421 void postLogRoll(Path newLog) throws IOException {
422
423 for (ReplicationSourceInterface source : this.sources) {
424 source.enqueueLog(newLog);
425 }
426 }
427
428
429
430
431
432
433
434
435
436
437
438 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
439 final FileSystem fs, final ReplicationSourceManager manager,
440 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
441 final Server server, final String peerId, final UUID clusterId,
442 final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
443 throws IOException {
444 RegionServerCoprocessorHost rsServerHost = null;
445 TableDescriptors tableDescriptors = null;
446 if (server instanceof HRegionServer) {
447 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
448 tableDescriptors = ((HRegionServer) server).getTableDescriptors();
449 }
450 ReplicationSourceInterface src;
451 try {
452 @SuppressWarnings("rawtypes")
453 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
454 ReplicationSource.class.getCanonicalName()));
455 src = (ReplicationSourceInterface) c.newInstance();
456 } catch (Exception e) {
457 LOG.warn("Passed replication source implementation throws errors, " +
458 "defaulting to ReplicationSource", e);
459 src = new ReplicationSource();
460 }
461
462 ReplicationEndpoint replicationEndpoint = null;
463 try {
464 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
465 if (replicationEndpointImpl == null) {
466
467 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
468 }
469 @SuppressWarnings("rawtypes")
470 Class c = Class.forName(replicationEndpointImpl);
471 replicationEndpoint = (ReplicationEndpoint) c.newInstance();
472 if(rsServerHost != null) {
473 ReplicationEndpoint newReplicationEndPoint = rsServerHost
474 .postCreateReplicationEndPoint(replicationEndpoint);
475 if(newReplicationEndPoint != null) {
476
477 replicationEndpoint = newReplicationEndPoint;
478 }
479 }
480 } catch (Exception e) {
481 LOG.warn("Passed replication endpoint implementation throws errors"
482 + " while initializing ReplicationSource for peer: " + peerId, e);
483 throw new IOException(e);
484 }
485
486 MetricsSource metrics = new MetricsSource(peerId);
487
488 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
489 clusterId, replicationEndpoint, metrics);
490
491
492 replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
493 fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
494
495 return src;
496 }
497
498
499
500
501
502
503
504
505
506 private void transferQueues(String rsZnode) {
507 NodeFailoverWorker transfer =
508 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
509 this.clusterId);
510 try {
511 this.executor.execute(transfer);
512 } catch (RejectedExecutionException ex) {
513 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
514 }
515 }
516
517
518
519
520
521 public void closeRecoveredQueue(ReplicationSourceInterface src) {
522 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
523 if (src instanceof ReplicationSource) {
524 ((ReplicationSource) src).getSourceMetrics().clear();
525 }
526 this.oldsources.remove(src);
527 deleteSource(src.getPeerClusterZnode(), false);
528 this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
529 }
530
531
532
533
534
535 public void closeQueue(ReplicationSourceInterface src) {
536 LOG.info("Done with the queue " + src.getPeerClusterZnode());
537 if (src instanceof ReplicationSource) {
538 ((ReplicationSource) src).getSourceMetrics().clear();
539 }
540 this.sources.remove(src);
541 deleteSource(src.getPeerClusterZnode(), true);
542 this.walsById.remove(src.getPeerClusterZnode());
543 }
544
545
546
547
548
549
550 public void removePeer(String id) {
551 LOG.info("Closing the following queue " + id + ", currently have "
552 + sources.size() + " and another "
553 + oldsources.size() + " that were recovered");
554 String terminateMessage = "Replication stream was removed by a user";
555 List<ReplicationSourceInterface> oldSourcesToDelete =
556 new ArrayList<ReplicationSourceInterface>();
557
558
559 synchronized (oldsources) {
560
561 for (ReplicationSourceInterface src : oldsources) {
562 if (id.equals(src.getPeerClusterId())) {
563 oldSourcesToDelete.add(src);
564 }
565 }
566 for (ReplicationSourceInterface src : oldSourcesToDelete) {
567 src.terminate(terminateMessage);
568 closeRecoveredQueue(src);
569 }
570 }
571 LOG.info("Number of deleted recovered sources for " + id + ": "
572 + oldSourcesToDelete.size());
573
574 List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
575
576 synchronized (this.replicationPeers) {
577 for (ReplicationSourceInterface src : this.sources) {
578 if (id.equals(src.getPeerClusterId())) {
579 srcToRemove.add(src);
580 }
581 }
582 if (srcToRemove.size() == 0) {
583 LOG.error("The queue we wanted to close is missing " + id);
584 return;
585 }
586 for (ReplicationSourceInterface toRemove : srcToRemove) {
587 toRemove.terminate(terminateMessage);
588 if (toRemove instanceof ReplicationSource) {
589 ((ReplicationSource) toRemove).getSourceMetrics().clear();
590 }
591 this.sources.remove(toRemove);
592 }
593 deleteSource(id, true);
594 }
595 }
596
597 @Override
598 public void regionServerRemoved(String regionserver) {
599 transferQueues(regionserver);
600 }
601
602 @Override
603 public void peerRemoved(String peerId) {
604 removePeer(peerId);
605 }
606
607 @Override
608 public void peerListChanged(List<String> peerIds) {
609 for (String id : peerIds) {
610 try {
611 boolean added = this.replicationPeers.peerAdded(id);
612 if (added) {
613 addSource(id);
614 }
615 } catch (Exception e) {
616 LOG.error("Error while adding a new peer", e);
617 }
618 }
619 }
620
621
622
623
624
625 class NodeFailoverWorker extends Thread {
626
627 private String rsZnode;
628 private final ReplicationQueues rq;
629 private final ReplicationPeers rp;
630 private final UUID clusterId;
631
632
633
634
635 public NodeFailoverWorker(String rsZnode) {
636 this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
637 }
638
639 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
640 final ReplicationPeers replicationPeers, final UUID clusterId) {
641 super("Failover-for-"+rsZnode);
642 this.rsZnode = rsZnode;
643 this.rq = replicationQueues;
644 this.rp = replicationPeers;
645 this.clusterId = clusterId;
646 }
647
648 @Override
649 public void run() {
650 if (this.rq.isThisOurZnode(rsZnode)) {
651 return;
652 }
653
654
655 try {
656 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
657 } catch (InterruptedException e) {
658 LOG.warn("Interrupted while waiting before transferring a queue.");
659 Thread.currentThread().interrupt();
660 }
661
662 if (server.isStopped()) {
663 LOG.info("Not transferring queue since we are shutting down");
664 return;
665 }
666 SortedMap<String, SortedSet<String>> newQueues = null;
667
668 newQueues = this.rq.claimQueues(rsZnode);
669
670
671 if (newQueues.isEmpty()) {
672
673
674 return;
675 }
676
677 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
678 String peerId = entry.getKey();
679 SortedSet<String> walsSet = entry.getValue();
680 try {
681
682 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
683 String actualPeerId = replicationQueueInfo.getPeerId();
684 ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
685 ReplicationPeerConfig peerConfig = null;
686 try {
687 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
688 } catch (ReplicationException ex) {
689 LOG.warn("Received exception while getting replication peer config, skipping replay"
690 + ex);
691 }
692 if (peer == null || peerConfig == null) {
693 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
694 replicationQueues.removeQueue(peerId);
695 continue;
696 }
697
698 Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
699 walsByIdRecoveredQueues.put(peerId, walsByGroup);
700 for (String wal : walsSet) {
701 String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
702 SortedSet<String> wals = walsByGroup.get(walPrefix);
703 if (wals == null) {
704 wals = new TreeSet<String>();
705 walsByGroup.put(walPrefix, wals);
706 }
707 wals.add(wal);
708 }
709
710
711 ReplicationSourceInterface src =
712 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
713 server, peerId, this.clusterId, peerConfig, peer);
714
715
716 synchronized (oldsources) {
717 if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
718 src.terminate("Recovered queue doesn't belong to any current peer");
719 closeRecoveredQueue(src);
720 continue;
721 }
722 oldsources.add(src);
723 for (String wal : walsSet) {
724 src.enqueueLog(new Path(oldLogDir, wal));
725 }
726 src.startup();
727 }
728 } catch (IOException e) {
729
730 LOG.error("Failed creating a source", e);
731 }
732 }
733 }
734 }
735
736
737
738
739
740 public Path getOldLogDir() {
741 return this.oldLogDir;
742 }
743
744
745
746
747
748 public Path getLogDir() {
749 return this.logDir;
750 }
751
752
753
754
755
756 public FileSystem getFs() {
757 return this.fs;
758 }
759
760
761
762
763 public String getStats() {
764 StringBuffer stats = new StringBuffer();
765 for (ReplicationSourceInterface source : sources) {
766 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
767 stats.append(source.getStats() + "\n");
768 }
769 for (ReplicationSourceInterface oldSource : oldsources) {
770 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
771 stats.append(oldSource.getStats()+ "\n");
772 }
773 return stats.toString();
774 }
775 }