View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.replication.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Random;
30  import java.util.SortedMap;
31  import java.util.SortedSet;
32  import java.util.TreeSet;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
48  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49  import org.apache.zookeeper.KeeperException;
50  
51  import com.google.common.util.concurrent.ThreadFactoryBuilder;
52  
53  /**
54   * This class is responsible to manage all the replication
55   * sources. There are two classes of sources:
56   * <li> Normal sources are persistent and one per peer cluster</li>
57   * <li> Old sources are recovered from a failed region server and our
58   * only goal is to finish replicating the HLog queue it had up in ZK</li>
59   *
60   * When a region server dies, this class uses a watcher to get notified and it
61   * tries to grab a lock in order to transfer all the queues in a local
62   * old source.
63   */
64  public class ReplicationSourceManager {
65    private static final Log LOG =
66        LogFactory.getLog(ReplicationSourceManager.class);
67    // List of all the sources that read this RS's logs
68    private final List<ReplicationSourceInterface> sources;
69    // List of all the sources we got from died RSs
70    private final List<ReplicationSourceInterface> oldsources;
71    // Indicates if we are currently replicating
72    private final AtomicBoolean replicating;
73    // Helper for zookeeper
74    private final ReplicationZookeeper zkHelper;
75    // All about stopping
76    private final Stoppable stopper;
77    // All logs we are currently trackign
78    private final Map<String, SortedSet<String>> hlogsById;
79    private final Configuration conf;
80    private final FileSystem fs;
81    // The path to the latest log we saw, for new coming sources
82    private Path latestPath;
83    // List of all the other region servers in this cluster
84    private final List<String> otherRegionServers = new ArrayList<String>();
85    // Path to the hlogs directories
86    private final Path logDir;
87    // Path to the hlog archive
88    private final Path oldLogDir;
89    // The number of ms that we wait before moving znodes, HBASE-3596
90    private final long sleepBeforeFailover;
91    // Homemade executer service for replication
92    private final ThreadPoolExecutor executor;
93    
94    private final Random rand;
95  
96  
97    /**
98     * Creates a replication manager and sets the watch on all the other
99     * registered region servers
100    * @param zkHelper the zk helper for replication
101    * @param conf the configuration to use
102    * @param stopper the stopper object for this region server
103    * @param fs the file system to use
104    * @param replicating the status of the replication on this cluster
105    * @param logDir the directory that contains all hlog directories of live RSs
106    * @param oldLogDir the directory where old logs are archived
107    */
108   public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
109                                   final Configuration conf,
110                                   final Stoppable stopper,
111                                   final FileSystem fs,
112                                   final AtomicBoolean replicating,
113                                   final Path logDir,
114                                   final Path oldLogDir) {
115     this.sources = new ArrayList<ReplicationSourceInterface>();
116     this.replicating = replicating;
117     this.zkHelper = zkHelper;
118     this.stopper = stopper;
119     this.hlogsById = new HashMap<String, SortedSet<String>>();
120     this.oldsources = new ArrayList<ReplicationSourceInterface>();
121     this.conf = conf;
122     this.fs = fs;
123     this.logDir = logDir;
124     this.oldLogDir = oldLogDir;
125     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
126     this.zkHelper.registerRegionServerListener(
127         new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
128     this.zkHelper.registerRegionServerListener(
129         new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
130     this.zkHelper.listPeersIdsAndWatch();
131     // It's preferable to failover 1 RS at a time, but with good zk servers
132     // more could be processed at the same time.
133     int nbWorkers = conf.getInt("replication.executor.workers", 1);
134     // use a short 100ms sleep since this could be done inline with a RS startup
135     // even if we fail, other region servers can take care of it
136     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
137         100, TimeUnit.MILLISECONDS,
138         new LinkedBlockingQueue<Runnable>());
139     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
140     tfb.setNameFormat("ReplicationExecutor-%d");
141     tfb.setDaemon(true);
142     this.executor.setThreadFactory(tfb.build());
143     this.rand = new Random();
144   }
145 
146   /**
147    * Provide the id of the peer and a log key and this method will figure which
148    * hlog it belongs to and will log, for this region server, the current
149    * position. It will also clean old logs from the queue.
150    * @param log Path to the log currently being replicated from
151    * replication status in zookeeper. It will also delete older entries.
152    * @param id id of the peer cluster
153    * @param position current location in the log
154    * @param queueRecovered indicates if this queue comes from another region server
155    * @param holdLogInZK if true then the log is retained in ZK
156    */
157   public void logPositionAndCleanOldLogs(Path log, String id, long position, 
158       boolean queueRecovered, boolean holdLogInZK) {
159     String key = log.getName();
160     LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
161     this.zkHelper.writeReplicationStatus(key, id, position);
162     if (holdLogInZK) {
163      return;
164     }
165     cleanOldLogs(key, id, queueRecovered);
166   }
167 
168   /**
169    * Cleans a log file and all older files from ZK. Called when we are sure that a
170    * log file is closed and has no more entries.
171    * @param key Path to the log
172    * @param id id of the peer cluster
173    * @param queueRecovered Whether this is a recovered queue
174    */
175   public void cleanOldLogs(String key,
176                            String id,
177                            boolean queueRecovered) {
178     synchronized (this.hlogsById) {
179       SortedSet<String> hlogs = this.hlogsById.get(id);
180       if (queueRecovered || hlogs.first().equals(key)) {
181         return;
182       }
183       SortedSet<String> hlogSet = hlogs.headSet(key);
184       for (String hlog : hlogSet) {
185         this.zkHelper.removeLogFromList(hlog, id);
186       }
187       hlogSet.clear();
188     }
189   }
190 
191   /**
192    * Adds a normal source per registered peer cluster and tries to process all
193    * old region server hlog queues
194    */
195   public void init() throws IOException {
196     for (String id : this.zkHelper.getPeerClusters().keySet()) {
197       addSource(id);
198     }
199     List<String> currentReplicators = this.zkHelper.getListOfReplicators();
200     if (currentReplicators == null || currentReplicators.size() == 0) {
201       return;
202     }
203     synchronized (otherRegionServers) {
204       refreshOtherRegionServersList();
205       LOG.info("Current list of replicators: " + currentReplicators
206           + " other RSs: " + otherRegionServers);
207     }
208     // Look if there's anything to process after a restart
209     for (String rs : currentReplicators) {
210       synchronized (otherRegionServers) {
211         if (!this.otherRegionServers.contains(rs)) {
212           transferQueues(rs);
213         }
214       }
215     }
216   }
217 
218   /**
219    * Add a new normal source to this region server
220    * @param id the id of the peer cluster
221    * @return the source that was created
222    * @throws IOException
223    */
224   public ReplicationSourceInterface addSource(String id) throws IOException {
225     ReplicationSourceInterface src =
226         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
227     synchronized (this.hlogsById) {
228       this.sources.add(src);
229       this.hlogsById.put(id, new TreeSet<String>());
230       // Add the latest hlog to that source's queue
231       if (this.latestPath != null) {
232         String name = this.latestPath.getName();
233         this.hlogsById.get(id).add(name);
234         try {
235           this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
236         } catch (KeeperException ke) {
237           String message = "Cannot add log to zk for" +
238             " replication when creating a new source";
239           stopper.stop(message);
240           throw new IOException(message, ke);
241         }
242         src.enqueueLog(this.latestPath);
243       }
244     }
245     src.startup();
246     return src;
247   }
248 
249   /**
250    * Terminate the replication on this region server
251    */
252   public void join() {
253     this.executor.shutdown();
254     if (this.sources.size() == 0) {
255       this.zkHelper.deleteOwnRSZNode();
256     }
257     for (ReplicationSourceInterface source : this.sources) {
258       source.terminate("Region server is closing");
259     }
260   }
261 
262   /**
263    * Get a copy of the hlogs of the first source on this rs
264    * @return a sorted set of hlog names
265    */
266   protected Map<String, SortedSet<String>> getHLogs() {
267     return Collections.unmodifiableMap(hlogsById);
268   }
269 
270   /**
271    * Get a list of all the normal sources of this rs
272    * @return lis of all sources
273    */
274   public List<ReplicationSourceInterface> getSources() {
275     return this.sources;
276   }
277 
278   /**
279    * Get a list of all the old sources of this rs
280    * @return list of all old sources
281    */
282   public List<ReplicationSourceInterface> getOldSources() {
283     return this.oldsources;
284   }
285 
286   void preLogRoll(Path newLog) throws IOException {
287     if (!this.replicating.get()) {
288       LOG.warn("Replication stopped, won't add new log");
289       return;
290     }
291 
292     synchronized (this.hlogsById) {
293       String name = newLog.getName();
294       for (ReplicationSourceInterface source : this.sources) {
295         try {
296           this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
297         } catch (KeeperException ke) {
298           throw new IOException("Cannot add log to zk for replication", ke);
299         }
300       }
301       for (SortedSet<String> hlogs : this.hlogsById.values()) {
302         if (this.sources.isEmpty()) {
303           // If there's no slaves, don't need to keep the old hlogs since
304           // we only consider the last one when a new slave comes in
305           hlogs.clear();
306         }
307         hlogs.add(name);
308       }
309     }
310 
311     this.latestPath = newLog;
312   }
313 
314   void postLogRoll(Path newLog) throws IOException {
315     if (!this.replicating.get()) {
316       LOG.warn("Replication stopped, won't add new log");
317       return;
318     }
319 
320     // This only updates the sources we own, not the recovered ones
321     for (ReplicationSourceInterface source : this.sources) {
322       source.enqueueLog(newLog);    
323     }
324   }
325 
326   /**
327    * Get the ZK help of this manager
328    * @return the helper
329    */
330   public ReplicationZookeeper getRepZkWrapper() {
331     return zkHelper;
332   }
333 
334   /**
335    * Factory method to create a replication source
336    * @param conf the configuration to use
337    * @param fs the file system to use
338    * @param manager the manager to use
339    * @param stopper the stopper object for this region server
340    * @param replicating the status of the replication on this cluster
341    * @param peerId the id of the peer cluster
342    * @return the created source
343    * @throws IOException
344    */
345   public ReplicationSourceInterface getReplicationSource(
346       final Configuration conf,
347       final FileSystem fs,
348       final ReplicationSourceManager manager,
349       final Stoppable stopper,
350       final AtomicBoolean replicating,
351       final String peerId) throws IOException {
352     ReplicationSourceInterface src;
353     try {
354       @SuppressWarnings("rawtypes")
355       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
356           ReplicationSource.class.getCanonicalName()));
357       src = (ReplicationSourceInterface) c.newInstance();
358     } catch (Exception e) {
359       LOG.warn("Passed replication source implementation throws errors, " +
360           "defaulting to ReplicationSource", e);
361       src = new ReplicationSource();
362 
363     }
364     src.init(conf, fs, manager, stopper, replicating, peerId);
365     return src;
366   }
367 
368   /**
369    * Transfer all the queues of the specified to this region server.
370    * First it tries to grab a lock and if it works it will move the
371    * znodes and finally will delete the old znodes.
372    *
373    * It creates one old source for any type of source of the old rs.
374    * @param rsZnode
375    */
376   public void transferQueues(String rsZnode) {
377     NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
378     try {
379       this.executor.execute(transfer);
380     } catch (RejectedExecutionException ex) {
381       LOG.info("Cancelling the transfer of " + rsZnode +
382           " because of " + ex.getMessage());
383     }
384   }
385 
386   /**
387    * Clear the references to the specified old source
388    * @param src source to clear
389    */
390   public void closeRecoveredQueue(ReplicationSourceInterface src) {
391     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
392     this.oldsources.remove(src);
393     this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
394   }
395 
396   /**
397    * Thie method first deletes all the recovered sources for the specified
398    * id, then deletes the normal source (deleting all related data in ZK).
399    * @param id The id of the peer cluster
400    */
401   public void removePeer(String id) {
402     LOG.info("Closing the following queue " + id + ", currently have "
403         + sources.size() + " and another "
404         + oldsources.size() + " that were recovered");
405     String terminateMessage = "Replication stream was removed by a user";
406     ReplicationSourceInterface srcToRemove = null;
407     List<ReplicationSourceInterface> oldSourcesToDelete =
408         new ArrayList<ReplicationSourceInterface>();
409     // First close all the recovered sources for this peer
410     for (ReplicationSourceInterface src : oldsources) {
411       if (id.equals(src.getPeerClusterId())) {
412         oldSourcesToDelete.add(src);
413       }
414     }
415     for (ReplicationSourceInterface src : oldSourcesToDelete) {
416       src.terminate(terminateMessage);
417       closeRecoveredQueue((src));
418     }
419     LOG.info("Number of deleted recovered sources for " + id + ": "
420         + oldSourcesToDelete.size());
421     // Now look for the one on this cluster
422     for (ReplicationSourceInterface src : this.sources) {
423       if (id.equals(src.getPeerClusterId())) {
424         srcToRemove = src;
425         break;
426       }
427     }
428     if (srcToRemove == null) {
429       LOG.error("The queue we wanted to close is missing " + id);
430       return;
431     }
432     srcToRemove.terminate(terminateMessage);
433     this.sources.remove(srcToRemove);
434     this.zkHelper.deleteSource(id, true);
435   }
436 
437   /**
438    * Reads the list of region servers from ZK and atomically clears our
439    * local view of it and replaces it with the updated list.
440    * 
441    * @return true if the local list of the other region servers was updated
442    * with the ZK data (even if it was empty),
443    * false if the data was missing in ZK
444    */
445   private boolean refreshOtherRegionServersList() {
446     List<String> newRsList = zkHelper.getRegisteredRegionServers();
447     if (newRsList == null) {
448       return false;
449     } else {
450       synchronized (otherRegionServers) {
451         otherRegionServers.clear();
452         otherRegionServers.addAll(newRsList);
453       }
454     }
455     return true;
456   }
457 
458   /**
459    * Watcher used to be notified of the other region server's death
460    * in the local cluster. It initiates the process to transfer the queues
461    * if it is able to grab the lock.
462    */
463   public class OtherRegionServerWatcher extends ZooKeeperListener {
464 
465     /**
466      * Construct a ZooKeeper event listener.
467      */
468     public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
469       super(watcher);
470     }
471 
472     /**
473      * Called when a new node has been created.
474      * @param path full path of the new node
475      */
476     public void nodeCreated(String path) {
477       refreshListIfRightPath(path);
478     }
479 
480     /**
481      * Called when a node has been deleted
482      * @param path full path of the deleted node
483      */
484     public void nodeDeleted(String path) {
485       if (stopper.isStopped()) {
486         return;
487       }
488       boolean cont = refreshListIfRightPath(path);
489       if (!cont) {
490         return;
491       }
492       LOG.info(path + " znode expired, trying to lock it");
493       transferQueues(ReplicationZookeeper.getZNodeName(path));
494     }
495 
496     /**
497      * Called when an existing node has a child node added or removed.
498      * @param path full path of the node whose children have changed
499      */
500     public void nodeChildrenChanged(String path) {
501       if (stopper.isStopped()) {
502         return;
503       }
504       refreshListIfRightPath(path);
505     }
506 
507     private boolean refreshListIfRightPath(String path) {
508       if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
509         return false;
510       }
511       return refreshOtherRegionServersList();
512     }
513   }
514 
515   /**
516    * Watcher used to follow the creation and deletion of peer clusters.
517    */
518   public class PeersWatcher extends ZooKeeperListener {
519 
520     /**
521      * Construct a ZooKeeper event listener.
522      */
523     public PeersWatcher(ZooKeeperWatcher watcher) {
524       super(watcher);
525     }
526 
527     /**
528      * Called when a node has been deleted
529      * @param path full path of the deleted node
530      */
531     public void nodeDeleted(String path) {
532       List<String> peers = refreshPeersList(path);
533       if (peers == null) {
534         return;
535       }
536       String id = ReplicationZookeeper.getZNodeName(path);
537       removePeer(id);
538     }
539 
540     /**
541      * Called when an existing node has a child node added or removed.
542      * @param path full path of the node whose children have changed
543      */
544     public void nodeChildrenChanged(String path) {
545       List<String> peers = refreshPeersList(path);
546       if (peers == null) {
547         return;
548       }
549       for (String id : peers) {
550         try {
551           boolean added = zkHelper.connectToPeer(id);
552           if (added) {
553             addSource(id);
554           }
555         } catch (IOException e) {
556           // TODO manage better than that ?
557           LOG.error("Error while adding a new peer", e);
558         } catch (KeeperException e) {
559           LOG.error("Error while adding a new peer", e);
560         }
561       }
562     }
563 
564     /**
565      * Verify if this event is meant for us, and if so then get the latest
566      * peers' list from ZK. Also reset the watches.
567      * @param path path to check against
568      * @return A list of peers' identifiers if the event concerns this watcher,
569      * else null.
570      */
571     private List<String> refreshPeersList(String path) {
572       if (!path.startsWith(zkHelper.getPeersZNode())) {
573         return null;
574       }
575       return zkHelper.listPeersIdsAndWatch();
576     }
577   }
578 
579   /**
580    * Class responsible to setup new ReplicationSources to take care of the
581    * queues from dead region servers.
582    */
583   class NodeFailoverWorker extends Thread {
584 
585     private String rsZnode;
586 
587     /**
588      *
589      * @param rsZnode
590      */
591     public NodeFailoverWorker(String rsZnode) {
592       super("Failover-for-"+rsZnode);
593       this.rsZnode = rsZnode;
594     }
595 
596     @Override
597     public void run() {
598       // We could end up checking if this is us
599       if (zkHelper.isThisOurZnode(this.rsZnode)) {
600         return;
601       }
602       // Wait a bit before transferring the queues, we may be shutting down.
603       // This sleep may not be enough in some cases.
604       try {
605         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
606       } catch (InterruptedException e) {
607         LOG.warn("Interrupted while waiting before transferring a queue.");
608         Thread.currentThread().interrupt();
609       }
610       // We try to lock that rs' queue directory
611       if (stopper.isStopped()) {
612         LOG.info("Not transferring queue since we are shutting down");
613         return;
614       }
615       SortedMap<String, SortedSet<String>> newQueues = null;
616 
617       // check whether there is multi support. If yes, use it.
618       if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
619         LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
620         newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
621       } else {
622         LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
623         if (!zkHelper.lockOtherRS(rsZnode)) {
624           return;
625         }
626         newQueues = zkHelper.copyQueuesFromRS(rsZnode);
627         zkHelper.deleteRsQueues(rsZnode);
628       }
629       // process of copying over the failed queue is completed.
630       if (newQueues.isEmpty()) {
631         return;
632       }
633 
634       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
635         String peerId = entry.getKey();
636         try {
637           ReplicationSourceInterface src = getReplicationSource(conf,
638               fs, ReplicationSourceManager.this, stopper, replicating, peerId);
639           if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
640             src.terminate("Recovered queue doesn't belong to any current peer");
641             break;
642           }
643           oldsources.add(src);
644           for (String hlog : entry.getValue()) {
645             src.enqueueLog(new Path(oldLogDir, hlog));
646           }
647           src.startup();
648         } catch (IOException e) {
649           // TODO manage it
650           LOG.error("Failed creating a source", e);
651         }
652       }
653     }
654   }
655 
656   /**
657    * Get the directory where hlogs are archived
658    * @return the directory where hlogs are archived
659    */
660   public Path getOldLogDir() {
661     return this.oldLogDir;
662   }
663 
664   /**
665    * Get the directory where hlogs are stored by their RSs
666    * @return the directory where hlogs are stored by their RSs
667    */
668   public Path getLogDir() {
669     return this.logDir;
670   }
671 
672   /**
673    * Get the handle on the local file system
674    * @return Handle on the local file system
675    */
676   public FileSystem getFs() {
677     return this.fs;
678   }
679 }