1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Random;
32 import java.util.SortedMap;
33 import java.util.SortedSet;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.Server;
49 import org.apache.hadoop.hbase.TableDescriptors;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.regionserver.HRegionServer;
52 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
53 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54 import org.apache.hadoop.hbase.replication.ReplicationException;
55 import org.apache.hadoop.hbase.replication.ReplicationListener;
56 import org.apache.hadoop.hbase.replication.ReplicationPeer;
57 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
58 import org.apache.hadoop.hbase.replication.ReplicationPeers;
59 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60 import org.apache.hadoop.hbase.replication.ReplicationQueues;
61 import org.apache.hadoop.hbase.replication.ReplicationTracker;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 @InterfaceAudience.Private
78 public class ReplicationSourceManager implements ReplicationListener {
79 private static final Log LOG =
80 LogFactory.getLog(ReplicationSourceManager.class);
81
82 private final List<ReplicationSourceInterface> sources;
83
84 private final List<ReplicationSourceInterface> oldsources;
85 private final ReplicationQueues replicationQueues;
86 private final ReplicationTracker replicationTracker;
87 private final ReplicationPeers replicationPeers;
88
89 private final UUID clusterId;
90
91 private final Server server;
92
93 private final Map<String, SortedSet<String>> walsById;
94
95 private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
96 private final Configuration conf;
97 private final FileSystem fs;
98
99 private Path latestPath;
100
101 private final Path logDir;
102
103 private final Path oldLogDir;
104
105 private final long sleepBeforeFailover;
106
107 private final ThreadPoolExecutor executor;
108
109 private final Random rand;
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
125 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
126 final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
127 final Path oldLogDir, final UUID clusterId) {
128
129
130 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
131 this.replicationQueues = replicationQueues;
132 this.replicationPeers = replicationPeers;
133 this.replicationTracker = replicationTracker;
134 this.server = server;
135 this.walsById = new HashMap<String, SortedSet<String>>();
136 this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
137 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
138 this.conf = conf;
139 this.fs = fs;
140 this.logDir = logDir;
141 this.oldLogDir = oldLogDir;
142 this.sleepBeforeFailover =
143 conf.getLong("replication.sleep.before.failover", 30000);
144 this.clusterId = clusterId;
145 this.replicationTracker.registerListener(this);
146 this.replicationPeers.getAllPeerIds();
147
148
149 int nbWorkers = conf.getInt("replication.executor.workers", 1);
150
151
152 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
153 100, TimeUnit.MILLISECONDS,
154 new LinkedBlockingQueue<Runnable>());
155 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
156 tfb.setNameFormat("ReplicationExecutor-%d");
157 tfb.setDaemon(true);
158 this.executor.setThreadFactory(tfb.build());
159 this.rand = new Random();
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173 public void logPositionAndCleanOldLogs(Path log, String id, long position,
174 boolean queueRecovered, boolean holdLogInZK) {
175 String fileName = log.getName();
176 this.replicationQueues.setLogPosition(id, fileName, position);
177 if (holdLogInZK) {
178 return;
179 }
180 cleanOldLogs(fileName, id, queueRecovered);
181 }
182
183
184
185
186
187
188
189
190 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
191 if (queueRecovered) {
192 SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
193 if (wals != null && !wals.first().equals(key)) {
194 cleanOldLogs(wals, key, id);
195 }
196 } else {
197 synchronized (this.walsById) {
198 SortedSet<String> wals = walsById.get(id);
199 if (!wals.first().equals(key)) {
200 cleanOldLogs(wals, key, id);
201 }
202 }
203 }
204 }
205
206 private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
207 SortedSet<String> walSet = wals.headSet(key);
208 LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
209 for (String wal : walSet) {
210 this.replicationQueues.removeLog(id, wal);
211 }
212 walSet.clear();
213 }
214
215
216
217
218
219 protected void init() throws IOException, ReplicationException {
220 for (String id : this.replicationPeers.getPeerIds()) {
221 addSource(id);
222 }
223 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
224 if (currentReplicators == null || currentReplicators.size() == 0) {
225 return;
226 }
227 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
228 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
229 + otherRegionServers);
230
231
232 for (String rs : currentReplicators) {
233 if (!otherRegionServers.contains(rs)) {
234 transferQueues(rs);
235 }
236 }
237 }
238
239
240
241
242
243
244
245 protected ReplicationSourceInterface addSource(String id) throws IOException,
246 ReplicationException {
247 ReplicationPeerConfig peerConfig
248 = replicationPeers.getReplicationPeerConfig(id);
249 ReplicationPeer peer = replicationPeers.getPeer(id);
250 ReplicationSourceInterface src =
251 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
252 this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
253 synchronized (this.walsById) {
254 this.sources.add(src);
255 this.walsById.put(id, new TreeSet<String>());
256
257 if (this.latestPath != null) {
258 String name = this.latestPath.getName();
259 this.walsById.get(id).add(name);
260 try {
261 this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
262 } catch (ReplicationException e) {
263 String message =
264 "Cannot add log to queue when creating a new source, queueId="
265 + src.getPeerClusterZnode() + ", filename=" + name;
266 server.stop(message);
267 throw e;
268 }
269 src.enqueueLog(this.latestPath);
270 }
271 }
272 src.startup();
273 return src;
274 }
275
276
277
278
279
280 public void deleteSource(String peerId, boolean closeConnection) {
281 this.replicationQueues.removeQueue(peerId);
282 if (closeConnection) {
283 this.replicationPeers.peerRemoved(peerId);
284 }
285 }
286
287
288
289
290 public void join() {
291 this.executor.shutdown();
292 if (this.sources.size() == 0) {
293 this.replicationQueues.removeAllQueues();
294 }
295 for (ReplicationSourceInterface source : this.sources) {
296 source.terminate("Region server is closing");
297 }
298 }
299
300
301
302
303
304 protected Map<String, SortedSet<String>> getWALs() {
305 return Collections.unmodifiableMap(walsById);
306 }
307
308
309
310
311
312 protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
313 return Collections.unmodifiableMap(walsByIdRecoveredQueues);
314 }
315
316
317
318
319
320 public List<ReplicationSourceInterface> getSources() {
321 return this.sources;
322 }
323
324
325
326
327
328 public List<ReplicationSourceInterface> getOldSources() {
329 return this.oldsources;
330 }
331
332 @VisibleForTesting
333 List<String> getAllQueues() {
334 return replicationQueues.getAllQueues();
335 }
336
337 void preLogRoll(Path newLog) throws IOException {
338 synchronized (this.walsById) {
339 String name = newLog.getName();
340 for (ReplicationSourceInterface source : this.sources) {
341 try {
342 this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
343 } catch (ReplicationException e) {
344 throw new IOException("Cannot add log to replication queue with id="
345 + source.getPeerClusterZnode() + ", filename=" + name, e);
346 }
347 }
348 for (SortedSet<String> wals : this.walsById.values()) {
349 if (this.sources.isEmpty()) {
350
351
352 wals.clear();
353 }
354 wals.add(name);
355 }
356 }
357
358 this.latestPath = newLog;
359 }
360
361 void postLogRoll(Path newLog) throws IOException {
362
363 for (ReplicationSourceInterface source : this.sources) {
364 source.enqueueLog(newLog);
365 }
366 }
367
368
369
370
371
372
373
374
375
376
377
378 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
379 final FileSystem fs, final ReplicationSourceManager manager,
380 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
381 final Server server, final String peerId, final UUID clusterId,
382 final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
383 throws IOException {
384 RegionServerCoprocessorHost rsServerHost = null;
385 TableDescriptors tableDescriptors = null;
386 if (server instanceof HRegionServer) {
387 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
388 tableDescriptors = ((HRegionServer) server).getTableDescriptors();
389 }
390 ReplicationSourceInterface src;
391 try {
392 @SuppressWarnings("rawtypes")
393 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
394 ReplicationSource.class.getCanonicalName()));
395 src = (ReplicationSourceInterface) c.newInstance();
396 } catch (Exception e) {
397 LOG.warn("Passed replication source implementation throws errors, " +
398 "defaulting to ReplicationSource", e);
399 src = new ReplicationSource();
400 }
401
402 ReplicationEndpoint replicationEndpoint = null;
403 try {
404 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
405 if (replicationEndpointImpl == null) {
406
407 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
408 }
409 @SuppressWarnings("rawtypes")
410 Class c = Class.forName(replicationEndpointImpl);
411 replicationEndpoint = (ReplicationEndpoint) c.newInstance();
412 if(rsServerHost != null) {
413 ReplicationEndpoint newReplicationEndPoint = rsServerHost
414 .postCreateReplicationEndPoint(replicationEndpoint);
415 if(newReplicationEndPoint != null) {
416
417 replicationEndpoint = newReplicationEndPoint;
418 }
419 }
420 } catch (Exception e) {
421 LOG.warn("Passed replication endpoint implementation throws errors", e);
422 throw new IOException(e);
423 }
424
425 MetricsSource metrics = new MetricsSource(peerId);
426
427 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
428 clusterId, replicationEndpoint, metrics);
429
430
431 replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
432 fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
433
434 return src;
435 }
436
437
438
439
440
441
442
443
444
445 private void transferQueues(String rsZnode) {
446 NodeFailoverWorker transfer =
447 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
448 this.clusterId);
449 try {
450 this.executor.execute(transfer);
451 } catch (RejectedExecutionException ex) {
452 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
453 }
454 }
455
456
457
458
459
460 public void closeRecoveredQueue(ReplicationSourceInterface src) {
461 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
462 this.oldsources.remove(src);
463 deleteSource(src.getPeerClusterZnode(), false);
464 this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
465 }
466
467
468
469
470
471
472 public void removePeer(String id) {
473 LOG.info("Closing the following queue " + id + ", currently have "
474 + sources.size() + " and another "
475 + oldsources.size() + " that were recovered");
476 String terminateMessage = "Replication stream was removed by a user";
477 List<ReplicationSourceInterface> oldSourcesToDelete =
478 new ArrayList<ReplicationSourceInterface>();
479
480
481 synchronized (oldsources) {
482
483 for (ReplicationSourceInterface src : oldsources) {
484 if (id.equals(src.getPeerClusterId())) {
485 oldSourcesToDelete.add(src);
486 }
487 }
488 for (ReplicationSourceInterface src : oldSourcesToDelete) {
489 src.terminate(terminateMessage);
490 closeRecoveredQueue(src);
491 }
492 }
493 LOG.info("Number of deleted recovered sources for " + id + ": "
494 + oldSourcesToDelete.size());
495
496 List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
497 for (ReplicationSourceInterface src : this.sources) {
498 if (id.equals(src.getPeerClusterId())) {
499 srcToRemove.add(src);
500 }
501 }
502 if (srcToRemove.size() == 0) {
503 LOG.error("The queue we wanted to close is missing " + id);
504 return;
505 }
506 for (ReplicationSourceInterface toRemove : srcToRemove) {
507 toRemove.terminate(terminateMessage);
508 this.sources.remove(toRemove);
509 }
510 deleteSource(id, true);
511 }
512
513 @Override
514 public void regionServerRemoved(String regionserver) {
515 transferQueues(regionserver);
516 }
517
518 @Override
519 public void peerRemoved(String peerId) {
520 removePeer(peerId);
521 }
522
523 @Override
524 public void peerListChanged(List<String> peerIds) {
525 for (String id : peerIds) {
526 try {
527 boolean added = this.replicationPeers.peerAdded(id);
528 if (added) {
529 addSource(id);
530 }
531 } catch (Exception e) {
532 LOG.error("Error while adding a new peer", e);
533 }
534 }
535 }
536
537
538
539
540
541 class NodeFailoverWorker extends Thread {
542
543 private String rsZnode;
544 private final ReplicationQueues rq;
545 private final ReplicationPeers rp;
546 private final UUID clusterId;
547
548
549
550
551 public NodeFailoverWorker(String rsZnode) {
552 this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
553 }
554
555 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
556 final ReplicationPeers replicationPeers, final UUID clusterId) {
557 super("Failover-for-"+rsZnode);
558 this.rsZnode = rsZnode;
559 this.rq = replicationQueues;
560 this.rp = replicationPeers;
561 this.clusterId = clusterId;
562 }
563
564 @Override
565 public void run() {
566 if (this.rq.isThisOurZnode(rsZnode)) {
567 return;
568 }
569
570
571 try {
572 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
573 } catch (InterruptedException e) {
574 LOG.warn("Interrupted while waiting before transferring a queue.");
575 Thread.currentThread().interrupt();
576 }
577
578 if (server.isStopped()) {
579 LOG.info("Not transferring queue since we are shutting down");
580 return;
581 }
582 SortedMap<String, SortedSet<String>> newQueues = null;
583
584 newQueues = this.rq.claimQueues(rsZnode);
585
586
587 if (newQueues.isEmpty()) {
588
589
590 return;
591 }
592
593 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
594 String peerId = entry.getKey();
595 SortedSet<String> walsSet = entry.getValue();
596 try {
597
598 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
599 String actualPeerId = replicationQueueInfo.getPeerId();
600 ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
601 ReplicationPeerConfig peerConfig = null;
602 try {
603 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
604 } catch (ReplicationException ex) {
605 LOG.warn("Received exception while getting replication peer config, skipping replay"
606 + ex);
607 }
608 if (peer == null || peerConfig == null) {
609 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
610 replicationQueues.removeQueue(peerId);
611 continue;
612 }
613
614 ReplicationSourceInterface src =
615 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
616 server, peerId, this.clusterId, peerConfig, peer);
617
618
619 synchronized (oldsources) {
620 if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
621 src.terminate("Recovered queue doesn't belong to any current peer");
622 closeRecoveredQueue(src);
623 continue;
624 }
625 oldsources.add(src);
626 for (String wal : walsSet) {
627 src.enqueueLog(new Path(oldLogDir, wal));
628 }
629 src.startup();
630 }
631 walsByIdRecoveredQueues.put(peerId, walsSet);
632 } catch (IOException e) {
633
634 LOG.error("Failed creating a source", e);
635 }
636 }
637 }
638 }
639
640
641
642
643
644 public Path getOldLogDir() {
645 return this.oldLogDir;
646 }
647
648
649
650
651
652 public Path getLogDir() {
653 return this.logDir;
654 }
655
656
657
658
659
660 public FileSystem getFs() {
661 return this.fs;
662 }
663
664
665
666
667 public String getStats() {
668 StringBuffer stats = new StringBuffer();
669 for (ReplicationSourceInterface source : sources) {
670 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
671 stats.append(source.getStats() + "\n");
672 }
673 for (ReplicationSourceInterface oldSource : oldsources) {
674 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
675 stats.append(oldSource.getStats()+ "\n");
676 }
677 return stats.toString();
678 }
679 }