View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.Stoppable;
47  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
48  import org.apache.hadoop.hbase.replication.ReplicationException;
49  import org.apache.hadoop.hbase.replication.ReplicationListener;
50  import org.apache.hadoop.hbase.replication.ReplicationPeer;
51  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
52  import org.apache.hadoop.hbase.replication.ReplicationPeers;
53  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
54  import org.apache.hadoop.hbase.replication.ReplicationQueues;
55  import org.apache.hadoop.hbase.replication.ReplicationTracker;
56  
57  import com.google.common.util.concurrent.ThreadFactoryBuilder;
58  
59  /**
60   * This class is responsible to manage all the replication
61   * sources. There are two classes of sources:
62   * <li> Normal sources are persistent and one per peer cluster</li>
63   * <li> Old sources are recovered from a failed region server and our
64   * only goal is to finish replicating the HLog queue it had up in ZK</li>
65   *
66   * When a region server dies, this class uses a watcher to get notified and it
67   * tries to grab a lock in order to transfer all the queues in a local
68   * old source.
69   *
70   * This class implements the ReplicationListener interface so that it can track changes in
71   * replication state.
72   */
73  @InterfaceAudience.Private
74  public class ReplicationSourceManager implements ReplicationListener {
75    private static final Log LOG =
76        LogFactory.getLog(ReplicationSourceManager.class);
77    // List of all the sources that read this RS's logs
78    private final List<ReplicationSourceInterface> sources;
79    // List of all the sources we got from died RSs
80    private final List<ReplicationSourceInterface> oldsources;
81    private final ReplicationQueues replicationQueues;
82    private final ReplicationTracker replicationTracker;
83    private final ReplicationPeers replicationPeers;
84    // UUID for this cluster
85    private final UUID clusterId;
86    // All about stopping
87    private final Stoppable stopper;
88    // All logs we are currently tracking
89    private final Map<String, SortedSet<String>> hlogsById;
90    // Logs for recovered sources we are currently tracking
91    private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
92    private final Configuration conf;
93    private final FileSystem fs;
94    // The path to the latest log we saw, for new coming sources
95    private Path latestPath;
96    // Path to the hlogs directories
97    private final Path logDir;
98    // Path to the hlog archive
99    private final Path oldLogDir;
100   // The number of ms that we wait before moving znodes, HBASE-3596
101   private final long sleepBeforeFailover;
102   // Homemade executer service for replication
103   private final ThreadPoolExecutor executor;
104 
105   private final Random rand;
106 
107 
108   /**
109    * Creates a replication manager and sets the watch on all the other registered region servers
110    * @param replicationQueues the interface for manipulating replication queues
111    * @param replicationPeers
112    * @param replicationTracker
113    * @param conf the configuration to use
114    * @param stopper the stopper object for this region server
115    * @param fs the file system to use
116    * @param logDir the directory that contains all hlog directories of live RSs
117    * @param oldLogDir the directory where old logs are archived
118    * @param clusterId
119    */
120   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
121       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
122       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
123       final Path oldLogDir, final UUID clusterId) {
124     //CopyOnWriteArrayList is thread-safe.
125     //Generally, reading is more than modifying.
126     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
127     this.replicationQueues = replicationQueues;
128     this.replicationPeers = replicationPeers;
129     this.replicationTracker = replicationTracker;
130     this.stopper = stopper;
131     this.hlogsById = new HashMap<String, SortedSet<String>>();
132     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
133     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
134     this.conf = conf;
135     this.fs = fs;
136     this.logDir = logDir;
137     this.oldLogDir = oldLogDir;
138     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
139     this.clusterId = clusterId;
140     this.replicationTracker.registerListener(this);
141     this.replicationPeers.getAllPeerIds();
142     // It's preferable to failover 1 RS at a time, but with good zk servers
143     // more could be processed at the same time.
144     int nbWorkers = conf.getInt("replication.executor.workers", 1);
145     // use a short 100ms sleep since this could be done inline with a RS startup
146     // even if we fail, other region servers can take care of it
147     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
148         100, TimeUnit.MILLISECONDS,
149         new LinkedBlockingQueue<Runnable>());
150     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
151     tfb.setNameFormat("ReplicationExecutor-%d");
152     this.executor.setThreadFactory(tfb.build());
153     this.rand = new Random();
154   }
155 
156   /**
157    * Provide the id of the peer and a log key and this method will figure which
158    * hlog it belongs to and will log, for this region server, the current
159    * position. It will also clean old logs from the queue.
160    * @param log Path to the log currently being replicated from
161    * replication status in zookeeper. It will also delete older entries.
162    * @param id id of the peer cluster
163    * @param position current location in the log
164    * @param queueRecovered indicates if this queue comes from another region server
165    * @param holdLogInZK if true then the log is retained in ZK
166    */
167   public void logPositionAndCleanOldLogs(Path log, String id, long position,
168       boolean queueRecovered, boolean holdLogInZK) {
169     String fileName = log.getName();
170     this.replicationQueues.setLogPosition(id, fileName, position);
171     if (holdLogInZK) {
172      return;
173     }
174     cleanOldLogs(fileName, id, queueRecovered);
175   }
176 
177   /**
178    * Cleans a log file and all older files from ZK. Called when we are sure that a
179    * log file is closed and has no more entries.
180    * @param key Path to the log
181    * @param id id of the peer cluster
182    * @param queueRecovered Whether this is a recovered queue
183    */
184   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
185     if (queueRecovered) {
186       SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
187       if (hlogs != null && !hlogs.first().equals(key)) {
188         cleanOldLogs(hlogs, key, id);
189       }
190     } else {
191       synchronized (this.hlogsById) {
192         SortedSet<String> hlogs = hlogsById.get(id);
193         if (!hlogs.first().equals(key)) {
194           cleanOldLogs(hlogs, key, id);
195         }
196       }
197     }
198  }
199   
200   private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
201     SortedSet<String> hlogSet = hlogs.headSet(key);
202     LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
203     for (String hlog : hlogSet) {
204       this.replicationQueues.removeLog(id, hlog);
205     }
206     hlogSet.clear();
207   }
208 
209   /**
210    * Adds a normal source per registered peer cluster and tries to process all
211    * old region server hlog queues
212    */
213   protected void init() throws IOException, ReplicationException {
214     for (String id : this.replicationPeers.getPeerIds()) {
215       addSource(id);
216     }
217     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
218     if (currentReplicators == null || currentReplicators.size() == 0) {
219       return;
220     }
221     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
222     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
223         + otherRegionServers);
224 
225     // Look if there's anything to process after a restart
226     for (String rs : currentReplicators) {
227       if (!otherRegionServers.contains(rs)) {
228         transferQueues(rs);
229       }
230     }
231   }
232 
233   /**
234    * Add a new normal source to this region server
235    * @param id the id of the peer cluster
236    * @return the source that was created
237    * @throws IOException
238    */
239   protected ReplicationSourceInterface addSource(String id) throws IOException,
240       ReplicationException {
241     ReplicationPeerConfig peerConfig
242       = replicationPeers.getReplicationPeerConfig(id);
243     ReplicationPeer peer = replicationPeers.getPeer(id);
244     ReplicationSourceInterface src =
245         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
246           this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
247     synchronized (this.hlogsById) {
248       this.sources.add(src);
249       this.hlogsById.put(id, new TreeSet<String>());
250       // Add the latest hlog to that source's queue
251       if (this.latestPath != null) {
252         String name = this.latestPath.getName();
253         this.hlogsById.get(id).add(name);
254         try {
255           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
256         } catch (ReplicationException e) {
257           String message =
258               "Cannot add log to queue when creating a new source, queueId="
259                   + src.getPeerClusterZnode() + ", filename=" + name;
260           stopper.stop(message);
261           throw e;
262         }
263         src.enqueueLog(this.latestPath);
264       }
265     }
266     src.startup();
267     return src;
268   }
269 
270   /**
271    * Delete a complete queue of hlogs associated with a peer cluster
272    * @param peerId Id of the peer cluster queue of hlogs to delete
273    */
274   public void deleteSource(String peerId, boolean closeConnection) {
275     this.replicationQueues.removeQueue(peerId);
276     if (closeConnection) {
277       this.replicationPeers.peerRemoved(peerId);
278     }
279   }
280 
281   /**
282    * Terminate the replication on this region server
283    */
284   public void join() {
285     this.executor.shutdown();
286     if (this.sources.size() == 0) {
287       this.replicationQueues.removeAllQueues();
288     }
289     for (ReplicationSourceInterface source : this.sources) {
290       source.terminate("Region server is closing");
291     }
292   }
293 
294   /**
295    * Get a copy of the hlogs of the first source on this rs
296    * @return a sorted set of hlog names
297    */
298   protected Map<String, SortedSet<String>> getHLogs() {
299     return Collections.unmodifiableMap(hlogsById);
300   }
301   
302   /**
303    * Get a copy of the hlogs of the recovered sources on this rs
304    * @return a sorted set of hlog names
305    */
306   protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
307     return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
308   }
309 
310   /**
311    * Get a list of all the normal sources of this rs
312    * @return lis of all sources
313    */
314   public List<ReplicationSourceInterface> getSources() {
315     return this.sources;
316   }
317 
318   /**
319    * Get a list of all the old sources of this rs
320    * @return list of all old sources
321    */
322   public List<ReplicationSourceInterface> getOldSources() {
323     return this.oldsources;
324   }
325 
326   void preLogRoll(Path newLog) throws IOException {
327     synchronized (this.hlogsById) {
328       String name = newLog.getName();
329       for (ReplicationSourceInterface source : this.sources) {
330         try {
331           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
332         } catch (ReplicationException e) {
333           throw new IOException("Cannot add log to replication queue with id="
334               + source.getPeerClusterZnode() + ", filename=" + name, e);
335         }
336       }
337       for (SortedSet<String> hlogs : this.hlogsById.values()) {
338         if (this.sources.isEmpty()) {
339           // If there's no slaves, don't need to keep the old hlogs since
340           // we only consider the last one when a new slave comes in
341           hlogs.clear();
342         }
343         hlogs.add(name);
344       }
345     }
346 
347     this.latestPath = newLog;
348   }
349 
350   void postLogRoll(Path newLog) throws IOException {
351     // This only updates the sources we own, not the recovered ones
352     for (ReplicationSourceInterface source : this.sources) {
353       source.enqueueLog(newLog);
354     }
355   }
356 
357   /**
358    * Factory method to create a replication source
359    * @param conf the configuration to use
360    * @param fs the file system to use
361    * @param manager the manager to use
362    * @param stopper the stopper object for this region server
363    * @param peerId the id of the peer cluster
364    * @return the created source
365    * @throws IOException
366    */
367   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
368       final FileSystem fs, final ReplicationSourceManager manager,
369       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
370       final Stoppable stopper, final String peerId, final UUID clusterId,
371       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
372           throws IOException {
373     ReplicationSourceInterface src;
374     try {
375       @SuppressWarnings("rawtypes")
376       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
377           ReplicationSource.class.getCanonicalName()));
378       src = (ReplicationSourceInterface) c.newInstance();
379     } catch (Exception e) {
380       LOG.warn("Passed replication source implementation throws errors, " +
381           "defaulting to ReplicationSource", e);
382       src = new ReplicationSource();
383     }
384 
385     ReplicationEndpoint replicationEndpoint = null;
386     try {
387       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
388       if (replicationEndpointImpl == null) {
389         // Default to HBase inter-cluster replication endpoint
390         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
391       }
392       @SuppressWarnings("rawtypes")
393       Class c = Class.forName(replicationEndpointImpl);
394       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
395     } catch (Exception e) {
396       LOG.warn("Passed replication endpoint implementation throws errors", e);
397       throw new IOException(e);
398     }
399 
400     MetricsSource metrics = new MetricsSource(peerId);
401     // init replication source
402     src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
403       clusterId, replicationEndpoint, metrics);
404 
405     // init replication endpoint
406     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
407       fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
408 
409     return src;
410   }
411 
412   /**
413    * Transfer all the queues of the specified to this region server.
414    * First it tries to grab a lock and if it works it will move the
415    * znodes and finally will delete the old znodes.
416    *
417    * It creates one old source for any type of source of the old rs.
418    * @param rsZnode
419    */
420   private void transferQueues(String rsZnode) {
421     NodeFailoverWorker transfer =
422         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
423             this.clusterId);
424     try {
425       this.executor.execute(transfer);
426     } catch (RejectedExecutionException ex) {
427       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
428     }
429   }
430 
431   /**
432    * Clear the references to the specified old source
433    * @param src source to clear
434    */
435   public void closeRecoveredQueue(ReplicationSourceInterface src) {
436     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
437     this.oldsources.remove(src);
438     deleteSource(src.getPeerClusterZnode(), false);
439     this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
440   }
441 
442   /**
443    * Thie method first deletes all the recovered sources for the specified
444    * id, then deletes the normal source (deleting all related data in ZK).
445    * @param id The id of the peer cluster
446    */
447   public void removePeer(String id) {
448     LOG.info("Closing the following queue " + id + ", currently have "
449         + sources.size() + " and another "
450         + oldsources.size() + " that were recovered");
451     String terminateMessage = "Replication stream was removed by a user";
452     ReplicationSourceInterface srcToRemove = null;
453     List<ReplicationSourceInterface> oldSourcesToDelete =
454         new ArrayList<ReplicationSourceInterface>();
455     // First close all the recovered sources for this peer
456     for (ReplicationSourceInterface src : oldsources) {
457       if (id.equals(src.getPeerClusterId())) {
458         oldSourcesToDelete.add(src);
459       }
460     }
461     for (ReplicationSourceInterface src : oldSourcesToDelete) {
462       src.terminate(terminateMessage);
463       closeRecoveredQueue((src));
464     }
465     LOG.info("Number of deleted recovered sources for " + id + ": "
466         + oldSourcesToDelete.size());
467     // Now look for the one on this cluster
468     for (ReplicationSourceInterface src : this.sources) {
469       if (id.equals(src.getPeerClusterId())) {
470         srcToRemove = src;
471         break;
472       }
473     }
474     if (srcToRemove == null) {
475       LOG.error("The queue we wanted to close is missing " + id);
476       return;
477     }
478     srcToRemove.terminate(terminateMessage);
479     this.sources.remove(srcToRemove);
480     deleteSource(id, true);
481   }
482 
483   @Override
484   public void regionServerRemoved(String regionserver) {
485     transferQueues(regionserver);
486   }
487 
488   @Override
489   public void peerRemoved(String peerId) {
490     removePeer(peerId);
491   }
492 
493   @Override
494   public void peerListChanged(List<String> peerIds) {
495     for (String id : peerIds) {
496       try {
497         boolean added = this.replicationPeers.peerAdded(id);
498         if (added) {
499           addSource(id);
500         }
501       } catch (Exception e) {
502         LOG.error("Error while adding a new peer", e);
503       }
504     }
505   }
506 
507   /**
508    * Class responsible to setup new ReplicationSources to take care of the
509    * queues from dead region servers.
510    */
511   class NodeFailoverWorker extends Thread {
512 
513     private String rsZnode;
514     private final ReplicationQueues rq;
515     private final ReplicationPeers rp;
516     private final UUID clusterId;
517 
518     /**
519      *
520      * @param rsZnode
521      */
522     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
523         final ReplicationPeers replicationPeers, final UUID clusterId) {
524       super("Failover-for-"+rsZnode);
525       this.rsZnode = rsZnode;
526       this.rq = replicationQueues;
527       this.rp = replicationPeers;
528       this.clusterId = clusterId;
529     }
530 
531     @Override
532     public void run() {
533       if (this.rq.isThisOurZnode(rsZnode)) {
534         return;
535       }
536       // Wait a bit before transferring the queues, we may be shutting down.
537       // This sleep may not be enough in some cases.
538       try {
539         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
540       } catch (InterruptedException e) {
541         LOG.warn("Interrupted while waiting before transferring a queue.");
542         Thread.currentThread().interrupt();
543       }
544       // We try to lock that rs' queue directory
545       if (stopper.isStopped()) {
546         LOG.info("Not transferring queue since we are shutting down");
547         return;
548       }
549       SortedMap<String, SortedSet<String>> newQueues = null;
550 
551       newQueues = this.rq.claimQueues(rsZnode);
552 
553       // Copying over the failed queue is completed.
554       if (newQueues.isEmpty()) {
555         // We either didn't get the lock or the failed region server didn't have any outstanding
556         // HLogs to replicate, so we are done.
557         return;
558       }
559 
560       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
561         String peerId = entry.getKey();
562         try {
563           // there is not an actual peer defined corresponding to peerId for the failover.
564           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
565           String actualPeerId = replicationQueueInfo.getPeerId();
566           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
567           ReplicationPeerConfig peerConfig = null;
568           try {
569             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
570           } catch (ReplicationException ex) {
571             LOG.warn("Received exception while getting replication peer config, skipping replay"
572                 + ex);
573           }
574           if (peer == null || peerConfig == null) {
575             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
576             continue;
577           }
578 
579           ReplicationSourceInterface src =
580               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
581                 stopper, peerId, this.clusterId, peerConfig, peer);
582           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
583             src.terminate("Recovered queue doesn't belong to any current peer");
584             break;
585           }
586           oldsources.add(src);
587           SortedSet<String> hlogsSet = entry.getValue();
588           for (String hlog : hlogsSet) {
589             src.enqueueLog(new Path(oldLogDir, hlog));
590           }
591           src.startup();
592           hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
593         } catch (IOException e) {
594           // TODO manage it
595           LOG.error("Failed creating a source", e);
596         }
597       }
598     }
599   }
600 
601   /**
602    * Get the directory where hlogs are archived
603    * @return the directory where hlogs are archived
604    */
605   public Path getOldLogDir() {
606     return this.oldLogDir;
607   }
608 
609   /**
610    * Get the directory where hlogs are stored by their RSs
611    * @return the directory where hlogs are stored by their RSs
612    */
613   public Path getLogDir() {
614     return this.logDir;
615   }
616 
617   /**
618    * Get the handle on the local file system
619    * @return Handle on the local file system
620    */
621   public FileSystem getFs() {
622     return this.fs;
623   }
624 
625   /**
626    * Get a string representation of all the sources' metrics
627    */
628   public String getStats() {
629     StringBuffer stats = new StringBuffer();
630     for (ReplicationSourceInterface source : sources) {
631       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
632       stats.append(source.getStats() + "\n");
633     }
634     for (ReplicationSourceInterface oldSource : oldsources) {
635       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
636       stats.append(oldSource.getStats()+ "\n");
637     }
638     return stats.toString();
639   }
640 }