View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.replication.ReplicationException;
47  import org.apache.hadoop.hbase.replication.ReplicationListener;
48  import org.apache.hadoop.hbase.replication.ReplicationPeers;
49  import org.apache.hadoop.hbase.replication.ReplicationQueues;
50  import org.apache.hadoop.hbase.replication.ReplicationTracker;
51  
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * This class is responsible to manage all the replication
56   * sources. There are two classes of sources:
57   * <li> Normal sources are persistent and one per peer cluster</li>
58   * <li> Old sources are recovered from a failed region server and our
59   * only goal is to finish replicating the HLog queue it had up in ZK</li>
60   *
61   * When a region server dies, this class uses a watcher to get notified and it
62   * tries to grab a lock in order to transfer all the queues in a local
63   * old source.
64   *
65   * This class implements the ReplicationListener interface so that it can track changes in
66   * replication state.
67   */
68  @InterfaceAudience.Private
69  public class ReplicationSourceManager implements ReplicationListener {
70    private static final Log LOG =
71        LogFactory.getLog(ReplicationSourceManager.class);
72    // List of all the sources that read this RS's logs
73    private final List<ReplicationSourceInterface> sources;
74    // List of all the sources we got from died RSs
75    private final List<ReplicationSourceInterface> oldsources;
76    private final ReplicationQueues replicationQueues;
77    private final ReplicationTracker replicationTracker;
78    private final ReplicationPeers replicationPeers;
79    // UUID for this cluster
80    private final UUID clusterId;
81    // All about stopping
82    private final Stoppable stopper;
83    // All logs we are currently tracking
84    private final Map<String, SortedSet<String>> hlogsById;
85    private final Configuration conf;
86    private final FileSystem fs;
87    // The path to the latest log we saw, for new coming sources
88    private Path latestPath;
89    // Path to the hlogs directories
90    private final Path logDir;
91    // Path to the hlog archive
92    private final Path oldLogDir;
93    // The number of ms that we wait before moving znodes, HBASE-3596
94    private final long sleepBeforeFailover;
95    // Homemade executer service for replication
96    private final ThreadPoolExecutor executor;
97  
98    private final Random rand;
99  
100 
101   /**
102    * Creates a replication manager and sets the watch on all the other registered region servers
103    * @param replicationQueues the interface for manipulating replication queues
104    * @param replicationPeers
105    * @param replicationTracker
106    * @param conf the configuration to use
107    * @param stopper the stopper object for this region server
108    * @param fs the file system to use
109    * @param logDir the directory that contains all hlog directories of live RSs
110    * @param oldLogDir the directory where old logs are archived
111    * @param clusterId
112    */
113   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
114       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
115       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
116       final Path oldLogDir, final UUID clusterId) {
117     //CopyOnWriteArrayList is thread-safe.
118     //Generally, reading is more than modifying. 
119     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
120     this.replicationQueues = replicationQueues;
121     this.replicationPeers = replicationPeers;
122     this.replicationTracker = replicationTracker;
123     this.stopper = stopper;
124     this.hlogsById = new HashMap<String, SortedSet<String>>();
125     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
126     this.conf = conf;
127     this.fs = fs;
128     this.logDir = logDir;
129     this.oldLogDir = oldLogDir;
130     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
131     this.clusterId = clusterId;
132     this.replicationTracker.registerListener(this);
133     this.replicationPeers.getAllPeerIds();
134     // It's preferable to failover 1 RS at a time, but with good zk servers
135     // more could be processed at the same time.
136     int nbWorkers = conf.getInt("replication.executor.workers", 1);
137     // use a short 100ms sleep since this could be done inline with a RS startup
138     // even if we fail, other region servers can take care of it
139     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
140         100, TimeUnit.MILLISECONDS,
141         new LinkedBlockingQueue<Runnable>());
142     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
143     tfb.setNameFormat("ReplicationExecutor-%d");
144     this.executor.setThreadFactory(tfb.build());
145     this.rand = new Random();
146   }
147 
148   /**
149    * Provide the id of the peer and a log key and this method will figure which
150    * hlog it belongs to and will log, for this region server, the current
151    * position. It will also clean old logs from the queue.
152    * @param log Path to the log currently being replicated from
153    * replication status in zookeeper. It will also delete older entries.
154    * @param id id of the peer cluster
155    * @param position current location in the log
156    * @param queueRecovered indicates if this queue comes from another region server
157    * @param holdLogInZK if true then the log is retained in ZK
158    */
159   public void logPositionAndCleanOldLogs(Path log, String id, long position,
160       boolean queueRecovered, boolean holdLogInZK) {
161     String fileName = log.getName();
162     this.replicationQueues.setLogPosition(id, fileName, position);
163     if (holdLogInZK) {
164      return;
165     }
166     cleanOldLogs(fileName, id, queueRecovered);
167   }
168 
169   /**
170    * Cleans a log file and all older files from ZK. Called when we are sure that a
171    * log file is closed and has no more entries.
172    * @param key Path to the log
173    * @param id id of the peer cluster
174    * @param queueRecovered Whether this is a recovered queue
175    */
176   public void cleanOldLogs(String key,
177                            String id,
178                            boolean queueRecovered) {
179     synchronized (this.hlogsById) {
180       SortedSet<String> hlogs = this.hlogsById.get(id);
181       if (queueRecovered || hlogs.first().equals(key)) {
182         return;
183       }
184       SortedSet<String> hlogSet = hlogs.headSet(key);
185       for (String hlog : hlogSet) {
186         this.replicationQueues.removeLog(id, hlog);
187       }
188       hlogSet.clear();
189     }
190   }
191 
192   /**
193    * Adds a normal source per registered peer cluster and tries to process all
194    * old region server hlog queues
195    */
196   protected void init() throws IOException, ReplicationException {
197     for (String id : this.replicationPeers.getConnectedPeers()) {
198       addSource(id);
199     }
200     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
201     if (currentReplicators == null || currentReplicators.size() == 0) {
202       return;
203     }
204     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
205     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
206         + otherRegionServers);
207 
208     // Look if there's anything to process after a restart
209     for (String rs : currentReplicators) {
210       if (!otherRegionServers.contains(rs)) {
211         transferQueues(rs);
212       }
213     }
214   }
215 
216   /**
217    * Add a new normal source to this region server
218    * @param id the id of the peer cluster
219    * @return the source that was created
220    * @throws IOException
221    */
222   protected ReplicationSourceInterface addSource(String id) throws IOException,
223       ReplicationException {
224     ReplicationSourceInterface src =
225         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
226           this.replicationPeers, stopper, id, this.clusterId);
227     synchronized (this.hlogsById) {
228       this.sources.add(src);
229       this.hlogsById.put(id, new TreeSet<String>());
230       // Add the latest hlog to that source's queue
231       if (this.latestPath != null) {
232         String name = this.latestPath.getName();
233         this.hlogsById.get(id).add(name);
234         try {
235           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
236         } catch (ReplicationException e) {
237           String message =
238               "Cannot add log to queue when creating a new source, queueId="
239                   + src.getPeerClusterZnode() + ", filename=" + name;
240           stopper.stop(message);
241           throw e;
242         }
243         src.enqueueLog(this.latestPath);
244       }
245     }
246     src.startup();
247     return src;
248   }
249 
250   /**
251    * Delete a complete queue of hlogs associated with a peer cluster
252    * @param peerId Id of the peer cluster queue of hlogs to delete
253    */
254   public void deleteSource(String peerId, boolean closeConnection) {
255     this.replicationQueues.removeQueue(peerId);
256     if (closeConnection) {
257       this.replicationPeers.disconnectFromPeer(peerId);
258     }
259   }
260 
261   /**
262    * Terminate the replication on this region server
263    */
264   public void join() {
265     this.executor.shutdown();
266     if (this.sources.size() == 0) {
267       this.replicationQueues.removeAllQueues();
268     }
269     for (ReplicationSourceInterface source : this.sources) {
270       source.terminate("Region server is closing");
271     }
272   }
273 
274   /**
275    * Get a copy of the hlogs of the first source on this rs
276    * @return a sorted set of hlog names
277    */
278   protected Map<String, SortedSet<String>> getHLogs() {
279     return Collections.unmodifiableMap(hlogsById);
280   }
281 
282   /**
283    * Get a list of all the normal sources of this rs
284    * @return lis of all sources
285    */
286   public List<ReplicationSourceInterface> getSources() {
287     return this.sources;
288   }
289 
290   /**
291    * Get a list of all the old sources of this rs
292    * @return list of all old sources
293    */
294   public List<ReplicationSourceInterface> getOldSources() {
295     return this.oldsources;
296   }
297 
298   void preLogRoll(Path newLog) throws IOException {
299 
300     synchronized (this.hlogsById) {
301       String name = newLog.getName();
302       for (ReplicationSourceInterface source : this.sources) {
303         try {
304           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
305         } catch (ReplicationException e) {
306           throw new IOException("Cannot add log to replication queue with id="
307               + source.getPeerClusterZnode() + ", filename=" + name, e);
308         }
309       }
310       for (SortedSet<String> hlogs : this.hlogsById.values()) {
311         if (this.sources.isEmpty()) {
312           // If there's no slaves, don't need to keep the old hlogs since
313           // we only consider the last one when a new slave comes in
314           hlogs.clear();
315         }
316         hlogs.add(name);
317       }
318     }
319 
320     this.latestPath = newLog;
321   }
322 
323   void postLogRoll(Path newLog) throws IOException {
324     // This only updates the sources we own, not the recovered ones
325     for (ReplicationSourceInterface source : this.sources) {
326       source.enqueueLog(newLog);
327     }
328   }
329 
330   /**
331    * Factory method to create a replication source
332    * @param conf the configuration to use
333    * @param fs the file system to use
334    * @param manager the manager to use
335    * @param stopper the stopper object for this region server
336    * @param peerId the id of the peer cluster
337    * @return the created source
338    * @throws IOException
339    */
340   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
341       final FileSystem fs, final ReplicationSourceManager manager,
342       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
343       final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
344     ReplicationSourceInterface src;
345     try {
346       @SuppressWarnings("rawtypes")
347       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
348           ReplicationSource.class.getCanonicalName()));
349       src = (ReplicationSourceInterface) c.newInstance();
350     } catch (Exception e) {
351       LOG.warn("Passed replication source implementation throws errors, " +
352           "defaulting to ReplicationSource", e);
353       src = new ReplicationSource();
354 
355     }
356     src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
357     return src;
358   }
359 
360   /**
361    * Transfer all the queues of the specified to this region server.
362    * First it tries to grab a lock and if it works it will move the
363    * znodes and finally will delete the old znodes.
364    *
365    * It creates one old source for any type of source of the old rs.
366    * @param rsZnode
367    */
368   private void transferQueues(String rsZnode) {
369     NodeFailoverWorker transfer =
370         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
371             this.clusterId);
372     try {
373       this.executor.execute(transfer);
374     } catch (RejectedExecutionException ex) {
375       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
376     }
377   }
378 
379   /**
380    * Clear the references to the specified old source
381    * @param src source to clear
382    */
383   public void closeRecoveredQueue(ReplicationSourceInterface src) {
384     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
385     this.oldsources.remove(src);
386     deleteSource(src.getPeerClusterZnode(), false);
387   }
388 
389   /**
390    * Thie method first deletes all the recovered sources for the specified
391    * id, then deletes the normal source (deleting all related data in ZK).
392    * @param id The id of the peer cluster
393    */
394   public void removePeer(String id) {
395     LOG.info("Closing the following queue " + id + ", currently have "
396         + sources.size() + " and another "
397         + oldsources.size() + " that were recovered");
398     String terminateMessage = "Replication stream was removed by a user";
399     ReplicationSourceInterface srcToRemove = null;
400     List<ReplicationSourceInterface> oldSourcesToDelete =
401         new ArrayList<ReplicationSourceInterface>();
402     // First close all the recovered sources for this peer
403     for (ReplicationSourceInterface src : oldsources) {
404       if (id.equals(src.getPeerClusterId())) {
405         oldSourcesToDelete.add(src);
406       }
407     }
408     for (ReplicationSourceInterface src : oldSourcesToDelete) {
409       src.terminate(terminateMessage);
410       closeRecoveredQueue((src));
411     }
412     LOG.info("Number of deleted recovered sources for " + id + ": "
413         + oldSourcesToDelete.size());
414     // Now look for the one on this cluster
415     for (ReplicationSourceInterface src : this.sources) {
416       if (id.equals(src.getPeerClusterId())) {
417         srcToRemove = src;
418         break;
419       }
420     }
421     if (srcToRemove == null) {
422       LOG.error("The queue we wanted to close is missing " + id);
423       return;
424     }
425     srcToRemove.terminate(terminateMessage);
426     this.sources.remove(srcToRemove);
427     deleteSource(id, true);
428   }
429 
430   @Override
431   public void regionServerRemoved(String regionserver) {
432     transferQueues(regionserver);
433   }
434 
435   @Override
436   public void peerRemoved(String peerId) {
437     removePeer(peerId);
438   }
439 
440   @Override
441   public void peerListChanged(List<String> peerIds) {
442     for (String id : peerIds) {
443       try {
444         boolean added = this.replicationPeers.connectToPeer(id);
445         if (added) {
446           addSource(id);
447         }
448       } catch (Exception e) {
449         LOG.error("Error while adding a new peer", e);
450       }
451     }
452   }
453 
454   /**
455    * Class responsible to setup new ReplicationSources to take care of the
456    * queues from dead region servers.
457    */
458   class NodeFailoverWorker extends Thread {
459 
460     private String rsZnode;
461     private final ReplicationQueues rq;
462     private final ReplicationPeers rp;
463     private final UUID clusterId;
464 
465     /**
466      *
467      * @param rsZnode
468      */
469     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
470         final ReplicationPeers replicationPeers, final UUID clusterId) {
471       super("Failover-for-"+rsZnode);
472       this.rsZnode = rsZnode;
473       this.rq = replicationQueues;
474       this.rp = replicationPeers;
475       this.clusterId = clusterId;
476     }
477 
478     @Override
479     public void run() {
480       if (this.rq.isThisOurZnode(rsZnode)) {
481         return;
482       }
483       // Wait a bit before transferring the queues, we may be shutting down.
484       // This sleep may not be enough in some cases.
485       try {
486         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
487       } catch (InterruptedException e) {
488         LOG.warn("Interrupted while waiting before transferring a queue.");
489         Thread.currentThread().interrupt();
490       }
491       // We try to lock that rs' queue directory
492       if (stopper.isStopped()) {
493         LOG.info("Not transferring queue since we are shutting down");
494         return;
495       }
496       SortedMap<String, SortedSet<String>> newQueues = null;
497 
498       newQueues = this.rq.claimQueues(rsZnode);
499 
500       // Copying over the failed queue is completed.
501       if (newQueues.isEmpty()) {
502         // We either didn't get the lock or the failed region server didn't have any outstanding
503         // HLogs to replicate, so we are done.
504         return;
505       }
506 
507       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
508         String peerId = entry.getKey();
509         try {
510           ReplicationSourceInterface src =
511               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
512                 stopper, peerId, this.clusterId);
513           if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
514             src.terminate("Recovered queue doesn't belong to any current peer");
515             break;
516           }
517           oldsources.add(src);
518           for (String hlog : entry.getValue()) {
519             src.enqueueLog(new Path(oldLogDir, hlog));
520           }
521           src.startup();
522         } catch (IOException e) {
523           // TODO manage it
524           LOG.error("Failed creating a source", e);
525         }
526       }
527     }
528   }
529 
530   /**
531    * Get the directory where hlogs are archived
532    * @return the directory where hlogs are archived
533    */
534   public Path getOldLogDir() {
535     return this.oldLogDir;
536   }
537 
538   /**
539    * Get the directory where hlogs are stored by their RSs
540    * @return the directory where hlogs are stored by their RSs
541    */
542   public Path getLogDir() {
543     return this.logDir;
544   }
545 
546   /**
547    * Get the handle on the local file system
548    * @return Handle on the local file system
549    */
550   public FileSystem getFs() {
551     return this.fs;
552   }
553 
554   /**
555    * Get a string representation of all the sources' metrics
556    */
557   public String getStats() {
558     StringBuffer stats = new StringBuffer();
559     for (ReplicationSourceInterface source : sources) {
560       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
561       stats.append(source.getStats() + "\n");
562     }
563     for (ReplicationSourceInterface oldSource : oldsources) {
564       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
565       stats.append(oldSource.getStats()+ "\n");
566     }
567     return stats.toString();
568   }
569 }