View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.regionserver.HRegionServer;
48  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationListener;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeers;
55  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  
59  import com.google.common.util.concurrent.ThreadFactoryBuilder;
60  
61  /**
62   * This class is responsible to manage all the replication
63   * sources. There are two classes of sources:
64   * <li> Normal sources are persistent and one per peer cluster</li>
65   * <li> Old sources are recovered from a failed region server and our
66   * only goal is to finish replicating the WAL queue it had up in ZK</li>
67   *
68   * When a region server dies, this class uses a watcher to get notified and it
69   * tries to grab a lock in order to transfer all the queues in a local
70   * old source.
71   *
72   * This class implements the ReplicationListener interface so that it can track changes in
73   * replication state.
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSourceManager implements ReplicationListener {
77    private static final Log LOG =
78        LogFactory.getLog(ReplicationSourceManager.class);
79    // List of all the sources that read this RS's logs
80    private final List<ReplicationSourceInterface> sources;
81    // List of all the sources we got from died RSs
82    private final List<ReplicationSourceInterface> oldsources;
83    private final ReplicationQueues replicationQueues;
84    private final ReplicationTracker replicationTracker;
85    private final ReplicationPeers replicationPeers;
86    // UUID for this cluster
87    private final UUID clusterId;
88    // All about stopping
89    private final Server server;
90    // All logs we are currently tracking
91    private final Map<String, SortedSet<String>> walsById;
92    // Logs for recovered sources we are currently tracking
93    private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
94    private final Configuration conf;
95    private final FileSystem fs;
96    // The path to the latest log we saw, for new coming sources
97    private Path latestPath;
98    // Path to the wals directories
99    private final Path logDir;
100   // Path to the wal archive
101   private final Path oldLogDir;
102   // The number of ms that we wait before moving znodes, HBASE-3596
103   private final long sleepBeforeFailover;
104   // Homemade executer service for replication
105   private final ThreadPoolExecutor executor;
106 
107   private final Random rand;
108 
109 
110   /**
111    * Creates a replication manager and sets the watch on all the other registered region servers
112    * @param replicationQueues the interface for manipulating replication queues
113    * @param replicationPeers
114    * @param replicationTracker
115    * @param conf the configuration to use
116    * @param server the server for this region server
117    * @param fs the file system to use
118    * @param logDir the directory that contains all wal directories of live RSs
119    * @param oldLogDir the directory where old logs are archived
120    * @param clusterId
121    */
122   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
123       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
124       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
125       final Path oldLogDir, final UUID clusterId) {
126     //CopyOnWriteArrayList is thread-safe.
127     //Generally, reading is more than modifying.
128     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
129     this.replicationQueues = replicationQueues;
130     this.replicationPeers = replicationPeers;
131     this.replicationTracker = replicationTracker;
132     this.server = server;
133     this.walsById = new HashMap<String, SortedSet<String>>();
134     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
135     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
136     this.conf = conf;
137     this.fs = fs;
138     this.logDir = logDir;
139     this.oldLogDir = oldLogDir;
140     this.sleepBeforeFailover =
141         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
142     this.clusterId = clusterId;
143     this.replicationTracker.registerListener(this);
144     this.replicationPeers.getAllPeerIds();
145     // It's preferable to failover 1 RS at a time, but with good zk servers
146     // more could be processed at the same time.
147     int nbWorkers = conf.getInt("replication.executor.workers", 1);
148     // use a short 100ms sleep since this could be done inline with a RS startup
149     // even if we fail, other region servers can take care of it
150     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
151         100, TimeUnit.MILLISECONDS,
152         new LinkedBlockingQueue<Runnable>());
153     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
154     tfb.setNameFormat("ReplicationExecutor-%d");
155     tfb.setDaemon(true);
156     this.executor.setThreadFactory(tfb.build());
157     this.rand = new Random();
158   }
159 
160   /**
161    * Provide the id of the peer and a log key and this method will figure which
162    * wal it belongs to and will log, for this region server, the current
163    * position. It will also clean old logs from the queue.
164    * @param log Path to the log currently being replicated from
165    * replication status in zookeeper. It will also delete older entries.
166    * @param id id of the peer cluster
167    * @param position current location in the log
168    * @param queueRecovered indicates if this queue comes from another region server
169    * @param holdLogInZK if true then the log is retained in ZK
170    */
171   public void logPositionAndCleanOldLogs(Path log, String id, long position,
172       boolean queueRecovered, boolean holdLogInZK) {
173     String fileName = log.getName();
174     this.replicationQueues.setLogPosition(id, fileName, position);
175     if (holdLogInZK) {
176      return;
177     }
178     cleanOldLogs(fileName, id, queueRecovered);
179   }
180 
181   /**
182    * Cleans a log file and all older files from ZK. Called when we are sure that a
183    * log file is closed and has no more entries.
184    * @param key Path to the log
185    * @param id id of the peer cluster
186    * @param queueRecovered Whether this is a recovered queue
187    */
188   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
189     if (queueRecovered) {
190       SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
191       if (wals != null && !wals.first().equals(key)) {
192         cleanOldLogs(wals, key, id);
193       }
194     } else {
195       synchronized (this.walsById) {
196         SortedSet<String> wals = walsById.get(id);
197         if (!wals.first().equals(key)) {
198           cleanOldLogs(wals, key, id);
199         }
200       }
201     }
202  }
203   
204   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
205     SortedSet<String> walSet = wals.headSet(key);
206     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
207     for (String wal : walSet) {
208       this.replicationQueues.removeLog(id, wal);
209     }
210     walSet.clear();
211   }
212 
213   /**
214    * Adds a normal source per registered peer cluster and tries to process all
215    * old region server wal queues
216    */
217   protected void init() throws IOException, ReplicationException {
218     for (String id : this.replicationPeers.getPeerIds()) {
219       addSource(id);
220     }
221     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
222     if (currentReplicators == null || currentReplicators.size() == 0) {
223       return;
224     }
225     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
226     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
227         + otherRegionServers);
228 
229     // Look if there's anything to process after a restart
230     for (String rs : currentReplicators) {
231       if (!otherRegionServers.contains(rs)) {
232         transferQueues(rs);
233       }
234     }
235   }
236 
237   /**
238    * Add a new normal source to this region server
239    * @param id the id of the peer cluster
240    * @return the source that was created
241    * @throws IOException
242    */
243   protected ReplicationSourceInterface addSource(String id) throws IOException,
244       ReplicationException {
245     ReplicationPeerConfig peerConfig
246       = replicationPeers.getReplicationPeerConfig(id);
247     ReplicationPeer peer = replicationPeers.getPeer(id);
248     ReplicationSourceInterface src =
249         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
250           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
251     synchronized (this.walsById) {
252       this.sources.add(src);
253       this.walsById.put(id, new TreeSet<String>());
254       // Add the latest wal to that source's queue
255       if (this.latestPath != null) {
256         String name = this.latestPath.getName();
257         this.walsById.get(id).add(name);
258         try {
259           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
260         } catch (ReplicationException e) {
261           String message =
262               "Cannot add log to queue when creating a new source, queueId="
263                   + src.getPeerClusterZnode() + ", filename=" + name;
264           server.stop(message);
265           throw e;
266         }
267         src.enqueueLog(this.latestPath);
268       }
269     }
270     src.startup();
271     return src;
272   }
273 
274   /**
275    * Delete a complete queue of wals associated with a peer cluster
276    * @param peerId Id of the peer cluster queue of wals to delete
277    */
278   public void deleteSource(String peerId, boolean closeConnection) {
279     this.replicationQueues.removeQueue(peerId);
280     if (closeConnection) {
281       this.replicationPeers.peerRemoved(peerId);
282     }
283   }
284 
285   /**
286    * Terminate the replication on this region server
287    */
288   public void join() {
289     this.executor.shutdown();
290     if (this.sources.size() == 0) {
291       this.replicationQueues.removeAllQueues();
292     }
293     for (ReplicationSourceInterface source : this.sources) {
294       source.terminate("Region server is closing");
295     }
296   }
297 
298   /**
299    * Get a copy of the wals of the first source on this rs
300    * @return a sorted set of wal names
301    */
302   protected Map<String, SortedSet<String>> getWALs() {
303     return Collections.unmodifiableMap(walsById);
304   }
305   
306   /**
307    * Get a copy of the wals of the recovered sources on this rs
308    * @return a sorted set of wal names
309    */
310   protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
311     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
312   }
313 
314   /**
315    * Get a list of all the normal sources of this rs
316    * @return lis of all sources
317    */
318   public List<ReplicationSourceInterface> getSources() {
319     return this.sources;
320   }
321 
322   /**
323    * Get a list of all the old sources of this rs
324    * @return list of all old sources
325    */
326   public List<ReplicationSourceInterface> getOldSources() {
327     return this.oldsources;
328   }
329 
330   void preLogRoll(Path newLog) throws IOException {
331     synchronized (this.walsById) {
332       String name = newLog.getName();
333       for (ReplicationSourceInterface source : this.sources) {
334         try {
335           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
336         } catch (ReplicationException e) {
337           throw new IOException("Cannot add log to replication queue with id="
338               + source.getPeerClusterZnode() + ", filename=" + name, e);
339         }
340       }
341       for (SortedSet<String> wals : this.walsById.values()) {
342         if (this.sources.isEmpty()) {
343           // If there's no slaves, don't need to keep the old wals since
344           // we only consider the last one when a new slave comes in
345           wals.clear();
346         }
347         wals.add(name);
348       }
349     }
350 
351     this.latestPath = newLog;
352   }
353 
354   void postLogRoll(Path newLog) throws IOException {
355     // This only updates the sources we own, not the recovered ones
356     for (ReplicationSourceInterface source : this.sources) {
357       source.enqueueLog(newLog);
358     }
359   }
360 
361   /**
362    * Factory method to create a replication source
363    * @param conf the configuration to use
364    * @param fs the file system to use
365    * @param manager the manager to use
366    * @param server the server object for this region server
367    * @param peerId the id of the peer cluster
368    * @return the created source
369    * @throws IOException
370    */
371   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
372       final FileSystem fs, final ReplicationSourceManager manager,
373       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
374       final Server server, final String peerId, final UUID clusterId,
375       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
376           throws IOException {
377     RegionServerCoprocessorHost rsServerHost = null;
378     if (server instanceof HRegionServer) {
379       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
380     }
381     ReplicationSourceInterface src;
382     try {
383       @SuppressWarnings("rawtypes")
384       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
385           ReplicationSource.class.getCanonicalName()));
386       src = (ReplicationSourceInterface) c.newInstance();
387     } catch (Exception e) {
388       LOG.warn("Passed replication source implementation throws errors, " +
389           "defaulting to ReplicationSource", e);
390       src = new ReplicationSource();
391     }
392 
393     ReplicationEndpoint replicationEndpoint = null;
394     try {
395       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
396       if (replicationEndpointImpl == null) {
397         // Default to HBase inter-cluster replication endpoint
398         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
399       }
400       @SuppressWarnings("rawtypes")
401       Class c = Class.forName(replicationEndpointImpl);
402       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
403       if(rsServerHost != null) {
404         ReplicationEndpoint newReplicationEndPoint = rsServerHost
405             .postCreateReplicationEndPoint(replicationEndpoint);
406         if(newReplicationEndPoint != null) {
407           // Override the newly created endpoint from the hook with configured end point
408           replicationEndpoint = newReplicationEndPoint;
409         }
410       }
411     } catch (Exception e) {
412       LOG.warn("Passed replication endpoint implementation throws errors", e);
413       throw new IOException(e);
414     }
415 
416     MetricsSource metrics = new MetricsSource(peerId);
417     // init replication source
418     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
419       clusterId, replicationEndpoint, metrics);
420 
421     // init replication endpoint
422     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
423       fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
424 
425     return src;
426   }
427 
428   /**
429    * Transfer all the queues of the specified to this region server.
430    * First it tries to grab a lock and if it works it will move the
431    * znodes and finally will delete the old znodes.
432    *
433    * It creates one old source for any type of source of the old rs.
434    * @param rsZnode
435    */
436   private void transferQueues(String rsZnode) {
437     NodeFailoverWorker transfer =
438         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
439             this.clusterId);
440     try {
441       this.executor.execute(transfer);
442     } catch (RejectedExecutionException ex) {
443       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
444     }
445   }
446 
447   /**
448    * Clear the references to the specified old source
449    * @param src source to clear
450    */
451   public void closeRecoveredQueue(ReplicationSourceInterface src) {
452     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
453     this.oldsources.remove(src);
454     deleteSource(src.getPeerClusterZnode(), false);
455     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
456   }
457 
458   /**
459    * Thie method first deletes all the recovered sources for the specified
460    * id, then deletes the normal source (deleting all related data in ZK).
461    * @param id The id of the peer cluster
462    */
463   public void removePeer(String id) {
464     LOG.info("Closing the following queue " + id + ", currently have "
465         + sources.size() + " and another "
466         + oldsources.size() + " that were recovered");
467     String terminateMessage = "Replication stream was removed by a user";
468     ReplicationSourceInterface srcToRemove = null;
469     List<ReplicationSourceInterface> oldSourcesToDelete =
470         new ArrayList<ReplicationSourceInterface>();
471     // First close all the recovered sources for this peer
472     for (ReplicationSourceInterface src : oldsources) {
473       if (id.equals(src.getPeerClusterId())) {
474         oldSourcesToDelete.add(src);
475       }
476     }
477     for (ReplicationSourceInterface src : oldSourcesToDelete) {
478       src.terminate(terminateMessage);
479       closeRecoveredQueue((src));
480     }
481     LOG.info("Number of deleted recovered sources for " + id + ": "
482         + oldSourcesToDelete.size());
483     // Now look for the one on this cluster
484     for (ReplicationSourceInterface src : this.sources) {
485       if (id.equals(src.getPeerClusterId())) {
486         srcToRemove = src;
487         break;
488       }
489     }
490     if (srcToRemove == null) {
491       LOG.error("The queue we wanted to close is missing " + id);
492       return;
493     }
494     srcToRemove.terminate(terminateMessage);
495     this.sources.remove(srcToRemove);
496     deleteSource(id, true);
497   }
498 
499   @Override
500   public void regionServerRemoved(String regionserver) {
501     transferQueues(regionserver);
502   }
503 
504   @Override
505   public void peerRemoved(String peerId) {
506     removePeer(peerId);
507   }
508 
509   @Override
510   public void peerListChanged(List<String> peerIds) {
511     for (String id : peerIds) {
512       try {
513         boolean added = this.replicationPeers.peerAdded(id);
514         if (added) {
515           addSource(id);
516         }
517       } catch (Exception e) {
518         LOG.error("Error while adding a new peer", e);
519       }
520     }
521   }
522 
523   /**
524    * Class responsible to setup new ReplicationSources to take care of the
525    * queues from dead region servers.
526    */
527   class NodeFailoverWorker extends Thread {
528 
529     private String rsZnode;
530     private final ReplicationQueues rq;
531     private final ReplicationPeers rp;
532     private final UUID clusterId;
533 
534     /**
535      *
536      * @param rsZnode
537      */
538     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
539         final ReplicationPeers replicationPeers, final UUID clusterId) {
540       super("Failover-for-"+rsZnode);
541       this.rsZnode = rsZnode;
542       this.rq = replicationQueues;
543       this.rp = replicationPeers;
544       this.clusterId = clusterId;
545     }
546 
547     @Override
548     public void run() {
549       if (this.rq.isThisOurZnode(rsZnode)) {
550         return;
551       }
552       // Wait a bit before transferring the queues, we may be shutting down.
553       // This sleep may not be enough in some cases.
554       try {
555         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
556       } catch (InterruptedException e) {
557         LOG.warn("Interrupted while waiting before transferring a queue.");
558         Thread.currentThread().interrupt();
559       }
560       // We try to lock that rs' queue directory
561       if (server.isStopped()) {
562         LOG.info("Not transferring queue since we are shutting down");
563         return;
564       }
565       SortedMap<String, SortedSet<String>> newQueues = null;
566 
567       newQueues = this.rq.claimQueues(rsZnode);
568 
569       // Copying over the failed queue is completed.
570       if (newQueues.isEmpty()) {
571         // We either didn't get the lock or the failed region server didn't have any outstanding
572         // WALs to replicate, so we are done.
573         return;
574       }
575 
576       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
577         String peerId = entry.getKey();
578         try {
579           // there is not an actual peer defined corresponding to peerId for the failover.
580           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
581           String actualPeerId = replicationQueueInfo.getPeerId();
582           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
583           ReplicationPeerConfig peerConfig = null;
584           try {
585             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
586           } catch (ReplicationException ex) {
587             LOG.warn("Received exception while getting replication peer config, skipping replay"
588                 + ex);
589           }
590           if (peer == null || peerConfig == null) {
591             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
592             continue;
593           }
594 
595           ReplicationSourceInterface src =
596               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
597                 server, peerId, this.clusterId, peerConfig, peer);
598           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
599             src.terminate("Recovered queue doesn't belong to any current peer");
600             break;
601           }
602           oldsources.add(src);
603           SortedSet<String> walsSet = entry.getValue();
604           for (String wal : walsSet) {
605             src.enqueueLog(new Path(oldLogDir, wal));
606           }
607           src.startup();
608           walsByIdRecoveredQueues.put(peerId, walsSet);
609         } catch (IOException e) {
610           // TODO manage it
611           LOG.error("Failed creating a source", e);
612         }
613       }
614     }
615   }
616 
617   /**
618    * Get the directory where wals are archived
619    * @return the directory where wals are archived
620    */
621   public Path getOldLogDir() {
622     return this.oldLogDir;
623   }
624 
625   /**
626    * Get the directory where wals are stored by their RSs
627    * @return the directory where wals are stored by their RSs
628    */
629   public Path getLogDir() {
630     return this.logDir;
631   }
632 
633   /**
634    * Get the handle on the local file system
635    * @return Handle on the local file system
636    */
637   public FileSystem getFs() {
638     return this.fs;
639   }
640 
641   /**
642    * Get a string representation of all the sources' metrics
643    */
644   public String getStats() {
645     StringBuffer stats = new StringBuffer();
646     for (ReplicationSourceInterface source : sources) {
647       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
648       stats.append(source.getStats() + "\n");
649     }
650     for (ReplicationSourceInterface oldSource : oldsources) {
651       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
652       stats.append(oldSource.getStats()+ "\n");
653     }
654     return stats.toString();
655   }
656 }