1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.RejectedExecutionException;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.replication.ReplicationQueues;
46  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
48  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49  import org.apache.zookeeper.KeeperException;
50  
51  import com.google.common.util.concurrent.ThreadFactoryBuilder;
52  
53  /**
54   * This class is responsible to manage all the replication
55   * sources. There are two classes of sources:
56   * <li> Normal sources are persistent and one per peer cluster</li>
57   * <li> Old sources are recovered from a failed region server and our
58   * only goal is to finish replicating the HLog queue it had up in ZK</li>
59   *
60   * When a region server dies, this class uses a watcher to get notified and it
61   * tries to grab a lock in order to transfer all the queues in a local
62   * old source.
63   */
64  @InterfaceAudience.Private
65  public class ReplicationSourceManager {
66    private static final Log LOG =
67        LogFactory.getLog(ReplicationSourceManager.class);
68    // List of all the sources that read this RS's logs
69    private final List<ReplicationSourceInterface> sources;
70    // List of all the sources we got from died RSs
71    private final List<ReplicationSourceInterface> oldsources;
72    // Indicates if we are currently replicating
73    private final AtomicBoolean replicating;
74    // Helper for zookeeper
75    private final ReplicationZookeeper zkHelper;
76    private final ReplicationQueues replicationQueues;
77    // All about stopping
78    private final Stoppable stopper;
79    // All logs we are currently tracking
80    private final Map<String, SortedSet<String>> hlogsById;
81    private final Configuration conf;
82    private final FileSystem fs;
83    // The path to the latest log we saw, for new coming sources
84    private Path latestPath;
85    // List of all the other region servers in this cluster
86    private final List<String> otherRegionServers = new ArrayList<String>();
87    // Path to the hlogs directories
88    private final Path logDir;
89    // Path to the hlog archive
90    private final Path oldLogDir;
91    // The number of ms that we wait before moving znodes, HBASE-3596
92    private final long sleepBeforeFailover;
93    // Homemade executer service for replication
94    private final ThreadPoolExecutor executor;
95  
96    private final Random rand;
97  
98  
99    /**
100    * Creates a replication manager and sets the watch on all the other registered region servers
101    * @param zkHelper the zk helper for replication
102    * @param replicationQueues the interface for manipulating replication queues
103    * @param conf the configuration to use
104    * @param stopper the stopper object for this region server
105    * @param fs the file system to use
106    * @param replicating the status of the replication on this cluster
107    * @param logDir the directory that contains all hlog directories of live RSs
108    * @param oldLogDir the directory where old logs are archived
109    */
110   public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
111       final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
112       final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
113       final Path oldLogDir) {
114     this.sources = new ArrayList<ReplicationSourceInterface>();
115     this.replicating = replicating;
116     this.zkHelper = zkHelper;
117     this.replicationQueues = replicationQueues;
118     this.stopper = stopper;
119     this.hlogsById = new HashMap<String, SortedSet<String>>();
120     this.oldsources = new ArrayList<ReplicationSourceInterface>();
121     this.conf = conf;
122     this.fs = fs;
123     this.logDir = logDir;
124     this.oldLogDir = oldLogDir;
125     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
126     this.zkHelper.registerRegionServerListener(
127         new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
128     this.zkHelper.registerRegionServerListener(
129         new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
130     this.zkHelper.listPeersIdsAndWatch();
131     // It's preferable to failover 1 RS at a time, but with good zk servers
132     // more could be processed at the same time.
133     int nbWorkers = conf.getInt("replication.executor.workers", 1);
134     // use a short 100ms sleep since this could be done inline with a RS startup
135     // even if we fail, other region servers can take care of it
136     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
137         100, TimeUnit.MILLISECONDS,
138         new LinkedBlockingQueue<Runnable>());
139     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
140     tfb.setNameFormat("ReplicationExecutor-%d");
141     this.executor.setThreadFactory(tfb.build());
142     this.rand = new Random();
143   }
144 
145   /**
146    * Provide the id of the peer and a log key and this method will figure which
147    * hlog it belongs to and will log, for this region server, the current
148    * position. It will also clean old logs from the queue.
149    * @param log Path to the log currently being replicated from
150    * replication status in zookeeper. It will also delete older entries.
151    * @param id id of the peer cluster
152    * @param position current location in the log
153    * @param queueRecovered indicates if this queue comes from another region server
154    * @param holdLogInZK if true then the log is retained in ZK
155    */
156   public void logPositionAndCleanOldLogs(Path log, String id, long position, 
157       boolean queueRecovered, boolean holdLogInZK) {
158     String key = log.getName();
159     this.zkHelper.writeReplicationStatus(key, id, position);
160     if (holdLogInZK) {
161      return;
162     }
163     synchronized (this.hlogsById) {
164       SortedSet<String> hlogs = this.hlogsById.get(id);
165       if (!queueRecovered && !hlogs.first().equals(key)) {
166         SortedSet<String> hlogSet = hlogs.headSet(key);
167         for (String hlog : hlogSet) {
168           this.zkHelper.removeLogFromList(hlog, id);
169         }
170         hlogSet.clear();
171       }
172     }
173   }
174 
175   /**
176    * Adds a normal source per registered peer cluster and tries to process all
177    * old region server hlog queues
178    */
179   public void init() throws IOException {
180     for (String id : this.zkHelper.getPeerClusters().keySet()) {
181       addSource(id);
182     }
183     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
184     if (currentReplicators == null || currentReplicators.size() == 0) {
185       return;
186     }
187     synchronized (otherRegionServers) {
188       refreshOtherRegionServersList();
189       LOG.info("Current list of replicators: " + currentReplicators
190           + " other RSs: " + otherRegionServers);
191     }
192     // Look if there's anything to process after a restart
193     for (String rs : currentReplicators) {
194       synchronized (otherRegionServers) {
195         if (!this.otherRegionServers.contains(rs)) {
196           transferQueues(rs);
197         }
198       }
199     }
200   }
201 
202   /**
203    * Add a new normal source to this region server
204    * @param id the id of the peer cluster
205    * @return the source that was created
206    * @throws IOException
207    */
208   public ReplicationSourceInterface addSource(String id) throws IOException {
209     ReplicationSourceInterface src =
210         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
211     synchronized (this.hlogsById) {
212       this.sources.add(src);
213       this.hlogsById.put(id, new TreeSet<String>());
214       // Add the latest hlog to that source's queue
215       if (this.latestPath != null) {
216         String name = this.latestPath.getName();
217         this.hlogsById.get(id).add(name);
218         try {
219           this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
220         } catch (KeeperException ke) {
221           String message = "Cannot add log to zk for" +
222             " replication when creating a new source";
223           stopper.stop(message);
224           throw new IOException(message, ke);
225         }
226         src.enqueueLog(this.latestPath);
227       }
228     }
229     src.startup();
230     return src;
231   }
232 
233   /**
234    * Terminate the replication on this region server
235    */
236   public void join() {
237     this.executor.shutdown();
238     if (this.sources.size() == 0) {
239       this.zkHelper.deleteOwnRSZNode();
240     }
241     for (ReplicationSourceInterface source : this.sources) {
242       source.terminate("Region server is closing");
243     }
244   }
245 
246   /**
247    * Get a copy of the hlogs of the first source on this rs
248    * @return a sorted set of hlog names
249    */
250   protected Map<String, SortedSet<String>> getHLogs() {
251     return Collections.unmodifiableMap(hlogsById);
252   }
253 
254   /**
255    * Get a list of all the normal sources of this rs
256    * @return lis of all sources
257    */
258   public List<ReplicationSourceInterface> getSources() {
259     return this.sources;
260   }
261 
262   void preLogRoll(Path newLog) throws IOException {
263     if (!this.replicating.get()) {
264       LOG.warn("Replication stopped, won't add new log");
265       return;
266     }
267 
268     synchronized (this.hlogsById) {
269       String name = newLog.getName();
270       for (ReplicationSourceInterface source : this.sources) {
271         try {
272           this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
273         } catch (KeeperException ke) {
274           throw new IOException("Cannot add log to zk for replication", ke);
275         }
276       }
277       for (SortedSet<String> hlogs : this.hlogsById.values()) {
278         if (this.sources.isEmpty()) {
279           // If there's no slaves, don't need to keep the old hlogs since
280           // we only consider the last one when a new slave comes in
281           hlogs.clear();
282         }
283         hlogs.add(name);
284       }
285     }
286 
287     this.latestPath = newLog;
288   }
289 
290   void postLogRoll(Path newLog) throws IOException {
291     if (!this.replicating.get()) {
292       LOG.warn("Replication stopped, won't add new log");
293       return;
294     }
295 
296     // This only updates the sources we own, not the recovered ones
297     for (ReplicationSourceInterface source : this.sources) {
298       source.enqueueLog(newLog);    
299     }
300   }
301 
302   /**
303    * Get the ZK help of this manager
304    * @return the helper
305    */
306   public ReplicationZookeeper getRepZkWrapper() {
307     return zkHelper;
308   }
309 
310   /**
311    * Factory method to create a replication source
312    * @param conf the configuration to use
313    * @param fs the file system to use
314    * @param manager the manager to use
315    * @param stopper the stopper object for this region server
316    * @param replicating the status of the replication on this cluster
317    * @param peerId the id of the peer cluster
318    * @return the created source
319    * @throws IOException
320    */
321   public ReplicationSourceInterface getReplicationSource(
322       final Configuration conf,
323       final FileSystem fs,
324       final ReplicationSourceManager manager,
325       final Stoppable stopper,
326       final AtomicBoolean replicating,
327       final String peerId) throws IOException {
328     ReplicationSourceInterface src;
329     try {
330       @SuppressWarnings("rawtypes")
331       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
332           ReplicationSource.class.getCanonicalName()));
333       src = (ReplicationSourceInterface) c.newInstance();
334     } catch (Exception e) {
335       LOG.warn("Passed replication source implementation throws errors, " +
336           "defaulting to ReplicationSource", e);
337       src = new ReplicationSource();
338 
339     }
340     src.init(conf, fs, manager, stopper, replicating, peerId);
341     return src;
342   }
343 
344   /**
345    * Transfer all the queues of the specified to this region server.
346    * First it tries to grab a lock and if it works it will move the
347    * znodes and finally will delete the old znodes.
348    *
349    * It creates one old source for any type of source of the old rs.
350    * @param rsZnode
351    */
352   private void transferQueues(String rsZnode) {
353     NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
354     try {
355       this.executor.execute(transfer);
356     } catch (RejectedExecutionException ex) {
357       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
358     }
359   }
360 
361   /**
362    * Clear the references to the specified old source
363    * @param src source to clear
364    */
365   public void closeRecoveredQueue(ReplicationSourceInterface src) {
366     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
367     this.oldsources.remove(src);
368     this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
369   }
370 
371   /**
372    * Thie method first deletes all the recovered sources for the specified
373    * id, then deletes the normal source (deleting all related data in ZK).
374    * @param id The id of the peer cluster
375    */
376   public void removePeer(String id) {
377     LOG.info("Closing the following queue " + id + ", currently have "
378         + sources.size() + " and another "
379         + oldsources.size() + " that were recovered");
380     String terminateMessage = "Replication stream was removed by a user";
381     ReplicationSourceInterface srcToRemove = null;
382     List<ReplicationSourceInterface> oldSourcesToDelete =
383         new ArrayList<ReplicationSourceInterface>();
384     // First close all the recovered sources for this peer
385     for (ReplicationSourceInterface src : oldsources) {
386       if (id.equals(src.getPeerClusterId())) {
387         oldSourcesToDelete.add(src);
388       }
389     }
390     for (ReplicationSourceInterface src : oldSourcesToDelete) {
391       src.terminate(terminateMessage);
392       closeRecoveredQueue((src));
393     }
394     LOG.info("Number of deleted recovered sources for " + id + ": "
395         + oldSourcesToDelete.size());
396     // Now look for the one on this cluster
397     for (ReplicationSourceInterface src : this.sources) {
398       if (id.equals(src.getPeerClusterId())) {
399         srcToRemove = src;
400         break;
401       }
402     }
403     if (srcToRemove == null) {
404       LOG.error("The queue we wanted to close is missing " + id);
405       return;
406     }
407     srcToRemove.terminate(terminateMessage);
408     this.sources.remove(srcToRemove);
409     this.zkHelper.deleteSource(id, true);
410   }
411 
412   /**
413    * Reads the list of region servers from ZK and atomically clears our
414    * local view of it and replaces it with the updated list.
415    * 
416    * @return true if the local list of the other region servers was updated
417    * with the ZK data (even if it was empty),
418    * false if the data was missing in ZK
419    */
420   private boolean refreshOtherRegionServersList() {
421     List<String> newRsList = zkHelper.getRegisteredRegionServers();
422     if (newRsList == null) {
423       return false;
424     } else {
425       synchronized (otherRegionServers) {
426         otherRegionServers.clear();
427         otherRegionServers.addAll(newRsList);
428       }
429     }
430     return true;
431   }
432 
433   /**
434    * Watcher used to be notified of the other region server's death
435    * in the local cluster. It initiates the process to transfer the queues
436    * if it is able to grab the lock.
437    */
438   public class OtherRegionServerWatcher extends ZooKeeperListener {
439 
440     /**
441      * Construct a ZooKeeper event listener.
442      */
443     public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
444       super(watcher);
445     }
446 
447     /**
448      * Called when a new node has been created.
449      * @param path full path of the new node
450      */
451     public void nodeCreated(String path) {
452       refreshListIfRightPath(path);
453     }
454 
455     /**
456      * Called when a node has been deleted
457      * @param path full path of the deleted node
458      */
459     public void nodeDeleted(String path) {
460       if (stopper.isStopped()) {
461         return;
462       }
463       boolean cont = refreshListIfRightPath(path);
464       if (!cont) {
465         return;
466       }
467       LOG.info(path + " znode expired, trying to lock it");
468       transferQueues(ReplicationZookeeper.getZNodeName(path));
469     }
470 
471     /**
472      * Called when an existing node has a child node added or removed.
473      * @param path full path of the node whose children have changed
474      */
475     public void nodeChildrenChanged(String path) {
476       if (stopper.isStopped()) {
477         return;
478       }
479       refreshListIfRightPath(path);
480     }
481 
482     private boolean refreshListIfRightPath(String path) {
483       if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
484         return false;
485       }
486       return refreshOtherRegionServersList();
487     }
488   }
489 
490   /**
491    * Watcher used to follow the creation and deletion of peer clusters.
492    */
493   public class PeersWatcher extends ZooKeeperListener {
494 
495     /**
496      * Construct a ZooKeeper event listener.
497      */
498     public PeersWatcher(ZooKeeperWatcher watcher) {
499       super(watcher);
500     }
501 
502     /**
503      * Called when a node has been deleted
504      * @param path full path of the deleted node
505      */
506     public void nodeDeleted(String path) {
507       List<String> peers = refreshPeersList(path);
508       if (peers == null) {
509         return;
510       }
511       if (zkHelper.isPeerPath(path)) {
512         String id = ReplicationZookeeper.getZNodeName(path);
513         removePeer(id);
514       }
515     }
516 
517     /**
518      * Called when an existing node has a child node added or removed.
519      * @param path full path of the node whose children have changed
520      */
521     public void nodeChildrenChanged(String path) {
522       List<String> peers = refreshPeersList(path);
523       if (peers == null) {
524         return;
525       }
526       for (String id : peers) {
527         try {
528           boolean added = zkHelper.connectToPeer(id);
529           if (added) {
530             addSource(id);
531           }
532         } catch (IOException e) {
533           // TODO manage better than that ?
534           LOG.error("Error while adding a new peer", e);
535         } catch (KeeperException e) {
536           LOG.error("Error while adding a new peer", e);
537         }
538       }
539     }
540 
541     /**
542      * Verify if this event is meant for us, and if so then get the latest
543      * peers' list from ZK. Also reset the watches.
544      * @param path path to check against
545      * @return A list of peers' identifiers if the event concerns this watcher,
546      * else null.
547      */
548     private List<String> refreshPeersList(String path) {
549       if (!path.startsWith(zkHelper.getPeersZNode())) {
550         return null;
551       }
552       return zkHelper.listPeersIdsAndWatch();
553     }
554   }
555 
556   /**
557    * Class responsible to setup new ReplicationSources to take care of the
558    * queues from dead region servers.
559    */
560   class NodeFailoverWorker extends Thread {
561 
562     private String rsZnode;
563 
564     /**
565      *
566      * @param rsZnode
567      */
568     public NodeFailoverWorker(String rsZnode) {
569       super("Failover-for-"+rsZnode);
570       this.rsZnode = rsZnode;
571     }
572 
573     @Override
574     public void run() {
575       // Wait a bit before transferring the queues, we may be shutting down.
576       // This sleep may not be enough in some cases.
577       try {
578         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
579       } catch (InterruptedException e) {
580         LOG.warn("Interrupted while waiting before transferring a queue.");
581         Thread.currentThread().interrupt();
582       }
583       // We try to lock that rs' queue directory
584       if (stopper.isStopped()) {
585         LOG.info("Not transferring queue since we are shutting down");
586         return;
587       }
588       SortedMap<String, SortedSet<String>> newQueues = null;
589 
590       newQueues = zkHelper.claimQueues(rsZnode);
591 
592       // Copying over the failed queue is completed.
593       if (newQueues.isEmpty()) {
594         // We either didn't get the lock or the failed region server didn't have any outstanding
595         // HLogs to replicate, so we are done.
596         return;
597       }
598 
599       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
600         String peerId = entry.getKey();
601         try {
602           ReplicationSourceInterface src = getReplicationSource(conf,
603               fs, ReplicationSourceManager.this, stopper, replicating, peerId);
604           if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
605             src.terminate("Recovered queue doesn't belong to any current peer");
606             break;
607           }
608           oldsources.add(src);
609           for (String hlog : entry.getValue()) {
610             src.enqueueLog(new Path(oldLogDir, hlog));
611           }
612           src.startup();
613         } catch (IOException e) {
614           // TODO manage it
615           LOG.error("Failed creating a source", e);
616         }
617       }
618     }
619   }
620 
621   /**
622    * Get the directory where hlogs are archived
623    * @return the directory where hlogs are archived
624    */
625   public Path getOldLogDir() {
626     return this.oldLogDir;
627   }
628 
629   /**
630    * Get the directory where hlogs are stored by their RSs
631    * @return the directory where hlogs are stored by their RSs
632    */
633   public Path getLogDir() {
634     return this.logDir;
635   }
636 
637   /**
638    * Get the handle on the local file system
639    * @return Handle on the local file system
640    */
641   public FileSystem getFs() {
642     return this.fs;
643   }
644 
645   /**
646    * Get a string representation of all the sources' metrics
647    */
648   public String getStats() {
649     StringBuffer stats = new StringBuffer();
650     for (ReplicationSourceInterface source : sources) {
651       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
652       stats.append(source.getStats() + "\n");
653     }
654     for (ReplicationSourceInterface oldSource : oldsources) {
655       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
656       stats.append(oldSource.getStats()+ "\n");
657     }
658     return stats.toString();
659   }
660 }