1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.SortedMap;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.Stoppable;
45 import org.apache.hadoop.hbase.replication.ReplicationQueues;
46 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
48 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49 import org.apache.zookeeper.KeeperException;
50
51 import com.google.common.util.concurrent.ThreadFactoryBuilder;
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class ReplicationSourceManager {
66 private static final Log LOG =
67 LogFactory.getLog(ReplicationSourceManager.class);
68
69 private final List<ReplicationSourceInterface> sources;
70
71 private final List<ReplicationSourceInterface> oldsources;
72
73 private final AtomicBoolean replicating;
74
75 private final ReplicationZookeeper zkHelper;
76 private final ReplicationQueues replicationQueues;
77
78 private final Stoppable stopper;
79
80 private final Map<String, SortedSet<String>> hlogsById;
81 private final Configuration conf;
82 private final FileSystem fs;
83
84 private Path latestPath;
85
86 private final List<String> otherRegionServers = new ArrayList<String>();
87
88 private final Path logDir;
89
90 private final Path oldLogDir;
91
92 private final long sleepBeforeFailover;
93
94 private final ThreadPoolExecutor executor;
95
96 private final Random rand;
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
111 final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
112 final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
113 final Path oldLogDir) {
114 this.sources = new ArrayList<ReplicationSourceInterface>();
115 this.replicating = replicating;
116 this.zkHelper = zkHelper;
117 this.replicationQueues = replicationQueues;
118 this.stopper = stopper;
119 this.hlogsById = new HashMap<String, SortedSet<String>>();
120 this.oldsources = new ArrayList<ReplicationSourceInterface>();
121 this.conf = conf;
122 this.fs = fs;
123 this.logDir = logDir;
124 this.oldLogDir = oldLogDir;
125 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
126 this.zkHelper.registerRegionServerListener(
127 new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
128 this.zkHelper.registerRegionServerListener(
129 new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
130 this.zkHelper.listPeersIdsAndWatch();
131
132
133 int nbWorkers = conf.getInt("replication.executor.workers", 1);
134
135
136 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
137 100, TimeUnit.MILLISECONDS,
138 new LinkedBlockingQueue<Runnable>());
139 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
140 tfb.setNameFormat("ReplicationExecutor-%d");
141 this.executor.setThreadFactory(tfb.build());
142 this.rand = new Random();
143 }
144
145
146
147
148
149
150
151
152
153
154
155
156 public void logPositionAndCleanOldLogs(Path log, String id, long position,
157 boolean queueRecovered, boolean holdLogInZK) {
158 String key = log.getName();
159 this.zkHelper.writeReplicationStatus(key, id, position);
160 if (holdLogInZK) {
161 return;
162 }
163 synchronized (this.hlogsById) {
164 SortedSet<String> hlogs = this.hlogsById.get(id);
165 if (!queueRecovered && !hlogs.first().equals(key)) {
166 SortedSet<String> hlogSet = hlogs.headSet(key);
167 for (String hlog : hlogSet) {
168 this.zkHelper.removeLogFromList(hlog, id);
169 }
170 hlogSet.clear();
171 }
172 }
173 }
174
175
176
177
178
179 public void init() throws IOException {
180 for (String id : this.zkHelper.getPeerClusters().keySet()) {
181 addSource(id);
182 }
183 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
184 if (currentReplicators == null || currentReplicators.size() == 0) {
185 return;
186 }
187 synchronized (otherRegionServers) {
188 refreshOtherRegionServersList();
189 LOG.info("Current list of replicators: " + currentReplicators
190 + " other RSs: " + otherRegionServers);
191 }
192
193 for (String rs : currentReplicators) {
194 synchronized (otherRegionServers) {
195 if (!this.otherRegionServers.contains(rs)) {
196 transferQueues(rs);
197 }
198 }
199 }
200 }
201
202
203
204
205
206
207
208 public ReplicationSourceInterface addSource(String id) throws IOException {
209 ReplicationSourceInterface src =
210 getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
211 synchronized (this.hlogsById) {
212 this.sources.add(src);
213 this.hlogsById.put(id, new TreeSet<String>());
214
215 if (this.latestPath != null) {
216 String name = this.latestPath.getName();
217 this.hlogsById.get(id).add(name);
218 try {
219 this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
220 } catch (KeeperException ke) {
221 String message = "Cannot add log to zk for" +
222 " replication when creating a new source";
223 stopper.stop(message);
224 throw new IOException(message, ke);
225 }
226 src.enqueueLog(this.latestPath);
227 }
228 }
229 src.startup();
230 return src;
231 }
232
233
234
235
236 public void join() {
237 this.executor.shutdown();
238 if (this.sources.size() == 0) {
239 this.zkHelper.deleteOwnRSZNode();
240 }
241 for (ReplicationSourceInterface source : this.sources) {
242 source.terminate("Region server is closing");
243 }
244 }
245
246
247
248
249
250 protected Map<String, SortedSet<String>> getHLogs() {
251 return Collections.unmodifiableMap(hlogsById);
252 }
253
254
255
256
257
258 public List<ReplicationSourceInterface> getSources() {
259 return this.sources;
260 }
261
262 void preLogRoll(Path newLog) throws IOException {
263 if (!this.replicating.get()) {
264 LOG.warn("Replication stopped, won't add new log");
265 return;
266 }
267
268 synchronized (this.hlogsById) {
269 String name = newLog.getName();
270 for (ReplicationSourceInterface source : this.sources) {
271 try {
272 this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
273 } catch (KeeperException ke) {
274 throw new IOException("Cannot add log to zk for replication", ke);
275 }
276 }
277 for (SortedSet<String> hlogs : this.hlogsById.values()) {
278 if (this.sources.isEmpty()) {
279
280
281 hlogs.clear();
282 }
283 hlogs.add(name);
284 }
285 }
286
287 this.latestPath = newLog;
288 }
289
290 void postLogRoll(Path newLog) throws IOException {
291 if (!this.replicating.get()) {
292 LOG.warn("Replication stopped, won't add new log");
293 return;
294 }
295
296
297 for (ReplicationSourceInterface source : this.sources) {
298 source.enqueueLog(newLog);
299 }
300 }
301
302
303
304
305
306 public ReplicationZookeeper getRepZkWrapper() {
307 return zkHelper;
308 }
309
310
311
312
313
314
315
316
317
318
319
320
321 public ReplicationSourceInterface getReplicationSource(
322 final Configuration conf,
323 final FileSystem fs,
324 final ReplicationSourceManager manager,
325 final Stoppable stopper,
326 final AtomicBoolean replicating,
327 final String peerId) throws IOException {
328 ReplicationSourceInterface src;
329 try {
330 @SuppressWarnings("rawtypes")
331 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
332 ReplicationSource.class.getCanonicalName()));
333 src = (ReplicationSourceInterface) c.newInstance();
334 } catch (Exception e) {
335 LOG.warn("Passed replication source implementation throws errors, " +
336 "defaulting to ReplicationSource", e);
337 src = new ReplicationSource();
338
339 }
340 src.init(conf, fs, manager, stopper, replicating, peerId);
341 return src;
342 }
343
344
345
346
347
348
349
350
351
352 private void transferQueues(String rsZnode) {
353 NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
354 try {
355 this.executor.execute(transfer);
356 } catch (RejectedExecutionException ex) {
357 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
358 }
359 }
360
361
362
363
364
365 public void closeRecoveredQueue(ReplicationSourceInterface src) {
366 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
367 this.oldsources.remove(src);
368 this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
369 }
370
371
372
373
374
375
376 public void removePeer(String id) {
377 LOG.info("Closing the following queue " + id + ", currently have "
378 + sources.size() + " and another "
379 + oldsources.size() + " that were recovered");
380 String terminateMessage = "Replication stream was removed by a user";
381 ReplicationSourceInterface srcToRemove = null;
382 List<ReplicationSourceInterface> oldSourcesToDelete =
383 new ArrayList<ReplicationSourceInterface>();
384
385 for (ReplicationSourceInterface src : oldsources) {
386 if (id.equals(src.getPeerClusterId())) {
387 oldSourcesToDelete.add(src);
388 }
389 }
390 for (ReplicationSourceInterface src : oldSourcesToDelete) {
391 src.terminate(terminateMessage);
392 closeRecoveredQueue((src));
393 }
394 LOG.info("Number of deleted recovered sources for " + id + ": "
395 + oldSourcesToDelete.size());
396
397 for (ReplicationSourceInterface src : this.sources) {
398 if (id.equals(src.getPeerClusterId())) {
399 srcToRemove = src;
400 break;
401 }
402 }
403 if (srcToRemove == null) {
404 LOG.error("The queue we wanted to close is missing " + id);
405 return;
406 }
407 srcToRemove.terminate(terminateMessage);
408 this.sources.remove(srcToRemove);
409 this.zkHelper.deleteSource(id, true);
410 }
411
412
413
414
415
416
417
418
419
420 private boolean refreshOtherRegionServersList() {
421 List<String> newRsList = zkHelper.getRegisteredRegionServers();
422 if (newRsList == null) {
423 return false;
424 } else {
425 synchronized (otherRegionServers) {
426 otherRegionServers.clear();
427 otherRegionServers.addAll(newRsList);
428 }
429 }
430 return true;
431 }
432
433
434
435
436
437
438 public class OtherRegionServerWatcher extends ZooKeeperListener {
439
440
441
442
443 public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
444 super(watcher);
445 }
446
447
448
449
450
451 public void nodeCreated(String path) {
452 refreshListIfRightPath(path);
453 }
454
455
456
457
458
459 public void nodeDeleted(String path) {
460 if (stopper.isStopped()) {
461 return;
462 }
463 boolean cont = refreshListIfRightPath(path);
464 if (!cont) {
465 return;
466 }
467 LOG.info(path + " znode expired, trying to lock it");
468 transferQueues(ReplicationZookeeper.getZNodeName(path));
469 }
470
471
472
473
474
475 public void nodeChildrenChanged(String path) {
476 if (stopper.isStopped()) {
477 return;
478 }
479 refreshListIfRightPath(path);
480 }
481
482 private boolean refreshListIfRightPath(String path) {
483 if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
484 return false;
485 }
486 return refreshOtherRegionServersList();
487 }
488 }
489
490
491
492
493 public class PeersWatcher extends ZooKeeperListener {
494
495
496
497
498 public PeersWatcher(ZooKeeperWatcher watcher) {
499 super(watcher);
500 }
501
502
503
504
505
506 public void nodeDeleted(String path) {
507 List<String> peers = refreshPeersList(path);
508 if (peers == null) {
509 return;
510 }
511 if (zkHelper.isPeerPath(path)) {
512 String id = ReplicationZookeeper.getZNodeName(path);
513 removePeer(id);
514 }
515 }
516
517
518
519
520
521 public void nodeChildrenChanged(String path) {
522 List<String> peers = refreshPeersList(path);
523 if (peers == null) {
524 return;
525 }
526 for (String id : peers) {
527 try {
528 boolean added = zkHelper.connectToPeer(id);
529 if (added) {
530 addSource(id);
531 }
532 } catch (IOException e) {
533
534 LOG.error("Error while adding a new peer", e);
535 } catch (KeeperException e) {
536 LOG.error("Error while adding a new peer", e);
537 }
538 }
539 }
540
541
542
543
544
545
546
547
548 private List<String> refreshPeersList(String path) {
549 if (!path.startsWith(zkHelper.getPeersZNode())) {
550 return null;
551 }
552 return zkHelper.listPeersIdsAndWatch();
553 }
554 }
555
556
557
558
559
560 class NodeFailoverWorker extends Thread {
561
562 private String rsZnode;
563
564
565
566
567
568 public NodeFailoverWorker(String rsZnode) {
569 super("Failover-for-"+rsZnode);
570 this.rsZnode = rsZnode;
571 }
572
573 @Override
574 public void run() {
575
576
577 try {
578 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
579 } catch (InterruptedException e) {
580 LOG.warn("Interrupted while waiting before transferring a queue.");
581 Thread.currentThread().interrupt();
582 }
583
584 if (stopper.isStopped()) {
585 LOG.info("Not transferring queue since we are shutting down");
586 return;
587 }
588 SortedMap<String, SortedSet<String>> newQueues = null;
589
590 newQueues = zkHelper.claimQueues(rsZnode);
591
592
593 if (newQueues.isEmpty()) {
594
595
596 return;
597 }
598
599 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
600 String peerId = entry.getKey();
601 try {
602 ReplicationSourceInterface src = getReplicationSource(conf,
603 fs, ReplicationSourceManager.this, stopper, replicating, peerId);
604 if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
605 src.terminate("Recovered queue doesn't belong to any current peer");
606 break;
607 }
608 oldsources.add(src);
609 for (String hlog : entry.getValue()) {
610 src.enqueueLog(new Path(oldLogDir, hlog));
611 }
612 src.startup();
613 } catch (IOException e) {
614
615 LOG.error("Failed creating a source", e);
616 }
617 }
618 }
619 }
620
621
622
623
624
625 public Path getOldLogDir() {
626 return this.oldLogDir;
627 }
628
629
630
631
632
633 public Path getLogDir() {
634 return this.logDir;
635 }
636
637
638
639
640
641 public FileSystem getFs() {
642 return this.fs;
643 }
644
645
646
647
648 public String getStats() {
649 StringBuffer stats = new StringBuffer();
650 for (ReplicationSourceInterface source : sources) {
651 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
652 stats.append(source.getStats() + "\n");
653 }
654 for (ReplicationSourceInterface oldSource : oldsources) {
655 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
656 stats.append(oldSource.getStats()+ "\n");
657 }
658 return stats.toString();
659 }
660 }