View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.replication.ReplicationException;
46  import org.apache.hadoop.hbase.replication.ReplicationListener;
47  import org.apache.hadoop.hbase.replication.ReplicationPeers;
48  import org.apache.hadoop.hbase.replication.ReplicationQueues;
49  import org.apache.hadoop.hbase.replication.ReplicationTracker;
50  import org.apache.zookeeper.KeeperException;
51  
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * This class is responsible to manage all the replication
56   * sources. There are two classes of sources:
57   * <li> Normal sources are persistent and one per peer cluster</li>
58   * <li> Old sources are recovered from a failed region server and our
59   * only goal is to finish replicating the HLog queue it had up in ZK</li>
60   *
61   * When a region server dies, this class uses a watcher to get notified and it
62   * tries to grab a lock in order to transfer all the queues in a local
63   * old source.
64   *
65   * This class implements the ReplicationListener interface so that it can track changes in
66   * replication state.
67   */
68  @InterfaceAudience.Private
69  public class ReplicationSourceManager implements ReplicationListener {
70    private static final Log LOG =
71        LogFactory.getLog(ReplicationSourceManager.class);
72    // List of all the sources that read this RS's logs
73    private final List<ReplicationSourceInterface> sources;
74    // List of all the sources we got from died RSs
75    private final List<ReplicationSourceInterface> oldsources;
76    private final ReplicationQueues replicationQueues;
77    private final ReplicationTracker replicationTracker;
78    private final ReplicationPeers replicationPeers;
79    // UUID for this cluster
80    private final UUID clusterId;
81    // All about stopping
82    private final Stoppable stopper;
83    // All logs we are currently tracking
84    private final Map<String, SortedSet<String>> hlogsById;
85    private final Configuration conf;
86    private final FileSystem fs;
87    // The path to the latest log we saw, for new coming sources
88    private Path latestPath;
89    // Path to the hlogs directories
90    private final Path logDir;
91    // Path to the hlog archive
92    private final Path oldLogDir;
93    // The number of ms that we wait before moving znodes, HBASE-3596
94    private final long sleepBeforeFailover;
95    // Homemade executer service for replication
96    private final ThreadPoolExecutor executor;
97  
98    private final Random rand;
99  
100 
101   /**
102    * Creates a replication manager and sets the watch on all the other registered region servers
103    * @param replicationQueues the interface for manipulating replication queues
104    * @param replicationPeers
105    * @param replicationTracker
106    * @param conf the configuration to use
107    * @param stopper the stopper object for this region server
108    * @param fs the file system to use
109    * @param logDir the directory that contains all hlog directories of live RSs
110    * @param oldLogDir the directory where old logs are archived
111    * @param clusterId
112    */
113   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
114       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
115       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
116       final Path oldLogDir, final UUID clusterId) {
117     this.sources = new ArrayList<ReplicationSourceInterface>();
118     this.replicationQueues = replicationQueues;
119     this.replicationPeers = replicationPeers;
120     this.replicationTracker = replicationTracker;
121     this.stopper = stopper;
122     this.hlogsById = new HashMap<String, SortedSet<String>>();
123     this.oldsources = new ArrayList<ReplicationSourceInterface>();
124     this.conf = conf;
125     this.fs = fs;
126     this.logDir = logDir;
127     this.oldLogDir = oldLogDir;
128     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
129     this.clusterId = clusterId;
130     this.replicationTracker.registerListener(this);
131     this.replicationPeers.getAllPeerIds();
132     // It's preferable to failover 1 RS at a time, but with good zk servers
133     // more could be processed at the same time.
134     int nbWorkers = conf.getInt("replication.executor.workers", 1);
135     // use a short 100ms sleep since this could be done inline with a RS startup
136     // even if we fail, other region servers can take care of it
137     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
138         100, TimeUnit.MILLISECONDS,
139         new LinkedBlockingQueue<Runnable>());
140     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
141     tfb.setNameFormat("ReplicationExecutor-%d");
142     this.executor.setThreadFactory(tfb.build());
143     this.rand = new Random();
144   }
145 
146   /**
147    * Provide the id of the peer and a log key and this method will figure which
148    * hlog it belongs to and will log, for this region server, the current
149    * position. It will also clean old logs from the queue.
150    * @param log Path to the log currently being replicated from
151    * replication status in zookeeper. It will also delete older entries.
152    * @param id id of the peer cluster
153    * @param position current location in the log
154    * @param queueRecovered indicates if this queue comes from another region server
155    * @param holdLogInZK if true then the log is retained in ZK
156    */
157   public void logPositionAndCleanOldLogs(Path log, String id, long position,
158       boolean queueRecovered, boolean holdLogInZK) {
159     String fileName = log.getName();
160     this.replicationQueues.setLogPosition(id, fileName, position);
161     if (holdLogInZK) {
162      return;
163     }
164     cleanOldLogs(fileName, id, queueRecovered);
165   }
166 
167   /**
168    * Cleans a log file and all older files from ZK. Called when we are sure that a
169    * log file is closed and has no more entries.
170    * @param key Path to the log
171    * @param id id of the peer cluster
172    * @param queueRecovered Whether this is a recovered queue
173    */
174   public void cleanOldLogs(String key,
175                            String id,
176                            boolean queueRecovered) {
177     synchronized (this.hlogsById) {
178       SortedSet<String> hlogs = this.hlogsById.get(id);
179       if (queueRecovered || hlogs.first().equals(key)) {
180         return;
181       }
182       SortedSet<String> hlogSet = hlogs.headSet(key);
183       for (String hlog : hlogSet) {
184         this.replicationQueues.removeLog(id, hlog);
185       }
186       hlogSet.clear();
187     }
188   }
189 
190   /**
191    * Adds a normal source per registered peer cluster and tries to process all
192    * old region server hlog queues
193    */
194   protected void init() throws IOException, ReplicationException {
195     for (String id : this.replicationPeers.getConnectedPeers()) {
196       addSource(id);
197     }
198     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
199     if (currentReplicators == null || currentReplicators.size() == 0) {
200       return;
201     }
202     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
203     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
204         + otherRegionServers);
205 
206     // Look if there's anything to process after a restart
207     for (String rs : currentReplicators) {
208       if (!otherRegionServers.contains(rs)) {
209         transferQueues(rs);
210       }
211     }
212   }
213 
214   /**
215    * Add a new normal source to this region server
216    * @param id the id of the peer cluster
217    * @return the source that was created
218    * @throws IOException
219    */
220   protected ReplicationSourceInterface addSource(String id) throws IOException,
221       ReplicationException {
222     ReplicationSourceInterface src =
223         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
224           this.replicationPeers, stopper, id, this.clusterId);
225     synchronized (this.hlogsById) {
226       this.sources.add(src);
227       this.hlogsById.put(id, new TreeSet<String>());
228       // Add the latest hlog to that source's queue
229       if (this.latestPath != null) {
230         String name = this.latestPath.getName();
231         this.hlogsById.get(id).add(name);
232         try {
233           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
234         } catch (ReplicationException e) {
235           String message =
236               "Cannot add log to queue when creating a new source, queueId="
237                   + src.getPeerClusterZnode() + ", filename=" + name;
238           stopper.stop(message);
239           throw e;
240         }
241         src.enqueueLog(this.latestPath);
242       }
243     }
244     src.startup();
245     return src;
246   }
247 
248   /**
249    * Delete a complete queue of hlogs associated with a peer cluster
250    * @param peerId Id of the peer cluster queue of hlogs to delete
251    */
252   public void deleteSource(String peerId, boolean closeConnection) {
253     this.replicationQueues.removeQueue(peerId);
254     if (closeConnection) {
255       this.replicationPeers.disconnectFromPeer(peerId);
256     }
257   }
258 
259   /**
260    * Terminate the replication on this region server
261    */
262   public void join() {
263     this.executor.shutdown();
264     if (this.sources.size() == 0) {
265       this.replicationQueues.removeAllQueues();
266     }
267     for (ReplicationSourceInterface source : this.sources) {
268       source.terminate("Region server is closing");
269     }
270   }
271 
272   /**
273    * Get a copy of the hlogs of the first source on this rs
274    * @return a sorted set of hlog names
275    */
276   protected Map<String, SortedSet<String>> getHLogs() {
277     return Collections.unmodifiableMap(hlogsById);
278   }
279 
280   /**
281    * Get a list of all the normal sources of this rs
282    * @return lis of all sources
283    */
284   public List<ReplicationSourceInterface> getSources() {
285     return this.sources;
286   }
287 
288   /**
289    * Get a list of all the old sources of this rs
290    * @return list of all old sources
291    */
292   public List<ReplicationSourceInterface> getOldSources() {
293     return this.oldsources;
294   }
295 
296   void preLogRoll(Path newLog) throws IOException {
297 
298     synchronized (this.hlogsById) {
299       String name = newLog.getName();
300       for (ReplicationSourceInterface source : this.sources) {
301         try {
302           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
303         } catch (ReplicationException e) {
304           throw new IOException("Cannot add log to replication queue with id="
305               + source.getPeerClusterZnode() + ", filename=" + name, e);
306         }
307       }
308       for (SortedSet<String> hlogs : this.hlogsById.values()) {
309         if (this.sources.isEmpty()) {
310           // If there's no slaves, don't need to keep the old hlogs since
311           // we only consider the last one when a new slave comes in
312           hlogs.clear();
313         }
314         hlogs.add(name);
315       }
316     }
317 
318     this.latestPath = newLog;
319   }
320 
321   void postLogRoll(Path newLog) throws IOException {
322     // This only updates the sources we own, not the recovered ones
323     for (ReplicationSourceInterface source : this.sources) {
324       source.enqueueLog(newLog);
325     }
326   }
327 
328   /**
329    * Factory method to create a replication source
330    * @param conf the configuration to use
331    * @param fs the file system to use
332    * @param manager the manager to use
333    * @param stopper the stopper object for this region server
334    * @param peerId the id of the peer cluster
335    * @return the created source
336    * @throws IOException
337    */
338   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
339       final FileSystem fs, final ReplicationSourceManager manager,
340       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
341       final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
342     ReplicationSourceInterface src;
343     try {
344       @SuppressWarnings("rawtypes")
345       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
346           ReplicationSource.class.getCanonicalName()));
347       src = (ReplicationSourceInterface) c.newInstance();
348     } catch (Exception e) {
349       LOG.warn("Passed replication source implementation throws errors, " +
350           "defaulting to ReplicationSource", e);
351       src = new ReplicationSource();
352 
353     }
354     src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
355     return src;
356   }
357 
358   /**
359    * Transfer all the queues of the specified to this region server.
360    * First it tries to grab a lock and if it works it will move the
361    * znodes and finally will delete the old znodes.
362    *
363    * It creates one old source for any type of source of the old rs.
364    * @param rsZnode
365    */
366   private void transferQueues(String rsZnode) {
367     NodeFailoverWorker transfer =
368         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
369             this.clusterId);
370     try {
371       this.executor.execute(transfer);
372     } catch (RejectedExecutionException ex) {
373       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
374     }
375   }
376 
377   /**
378    * Clear the references to the specified old source
379    * @param src source to clear
380    */
381   public void closeRecoveredQueue(ReplicationSourceInterface src) {
382     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
383     this.oldsources.remove(src);
384     deleteSource(src.getPeerClusterZnode(), false);
385   }
386 
387   /**
388    * Thie method first deletes all the recovered sources for the specified
389    * id, then deletes the normal source (deleting all related data in ZK).
390    * @param id The id of the peer cluster
391    */
392   public void removePeer(String id) {
393     LOG.info("Closing the following queue " + id + ", currently have "
394         + sources.size() + " and another "
395         + oldsources.size() + " that were recovered");
396     String terminateMessage = "Replication stream was removed by a user";
397     ReplicationSourceInterface srcToRemove = null;
398     List<ReplicationSourceInterface> oldSourcesToDelete =
399         new ArrayList<ReplicationSourceInterface>();
400     // First close all the recovered sources for this peer
401     for (ReplicationSourceInterface src : oldsources) {
402       if (id.equals(src.getPeerClusterId())) {
403         oldSourcesToDelete.add(src);
404       }
405     }
406     for (ReplicationSourceInterface src : oldSourcesToDelete) {
407       src.terminate(terminateMessage);
408       closeRecoveredQueue((src));
409     }
410     LOG.info("Number of deleted recovered sources for " + id + ": "
411         + oldSourcesToDelete.size());
412     // Now look for the one on this cluster
413     for (ReplicationSourceInterface src : this.sources) {
414       if (id.equals(src.getPeerClusterId())) {
415         srcToRemove = src;
416         break;
417       }
418     }
419     if (srcToRemove == null) {
420       LOG.error("The queue we wanted to close is missing " + id);
421       return;
422     }
423     srcToRemove.terminate(terminateMessage);
424     this.sources.remove(srcToRemove);
425     deleteSource(id, true);
426   }
427 
428   @Override
429   public void regionServerRemoved(String regionserver) {
430     transferQueues(regionserver);
431   }
432 
433   @Override
434   public void peerRemoved(String peerId) {
435     removePeer(peerId);
436   }
437 
438   @Override
439   public void peerListChanged(List<String> peerIds) {
440     for (String id : peerIds) {
441       try {
442         boolean added = this.replicationPeers.connectToPeer(id);
443         if (added) {
444           addSource(id);
445         }
446       } catch (Exception e) {
447         LOG.error("Error while adding a new peer", e);
448       }
449     }
450   }
451 
452   /**
453    * Class responsible to setup new ReplicationSources to take care of the
454    * queues from dead region servers.
455    */
456   class NodeFailoverWorker extends Thread {
457 
458     private String rsZnode;
459     private final ReplicationQueues rq;
460     private final ReplicationPeers rp;
461     private final UUID clusterId;
462 
463     /**
464      *
465      * @param rsZnode
466      */
467     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
468         final ReplicationPeers replicationPeers, final UUID clusterId) {
469       super("Failover-for-"+rsZnode);
470       this.rsZnode = rsZnode;
471       this.rq = replicationQueues;
472       this.rp = replicationPeers;
473       this.clusterId = clusterId;
474     }
475 
476     @Override
477     public void run() {
478       if (this.rq.isThisOurZnode(rsZnode)) {
479         return;
480       }
481       // Wait a bit before transferring the queues, we may be shutting down.
482       // This sleep may not be enough in some cases.
483       try {
484         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
485       } catch (InterruptedException e) {
486         LOG.warn("Interrupted while waiting before transferring a queue.");
487         Thread.currentThread().interrupt();
488       }
489       // We try to lock that rs' queue directory
490       if (stopper.isStopped()) {
491         LOG.info("Not transferring queue since we are shutting down");
492         return;
493       }
494       SortedMap<String, SortedSet<String>> newQueues = null;
495 
496       newQueues = this.rq.claimQueues(rsZnode);
497 
498       // Copying over the failed queue is completed.
499       if (newQueues.isEmpty()) {
500         // We either didn't get the lock or the failed region server didn't have any outstanding
501         // HLogs to replicate, so we are done.
502         return;
503       }
504 
505       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
506         String peerId = entry.getKey();
507         try {
508           ReplicationSourceInterface src =
509               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
510                 stopper, peerId, this.clusterId);
511           if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
512             src.terminate("Recovered queue doesn't belong to any current peer");
513             break;
514           }
515           oldsources.add(src);
516           for (String hlog : entry.getValue()) {
517             src.enqueueLog(new Path(oldLogDir, hlog));
518           }
519           src.startup();
520         } catch (IOException e) {
521           // TODO manage it
522           LOG.error("Failed creating a source", e);
523         }
524       }
525     }
526   }
527 
528   /**
529    * Get the directory where hlogs are archived
530    * @return the directory where hlogs are archived
531    */
532   public Path getOldLogDir() {
533     return this.oldLogDir;
534   }
535 
536   /**
537    * Get the directory where hlogs are stored by their RSs
538    * @return the directory where hlogs are stored by their RSs
539    */
540   public Path getLogDir() {
541     return this.logDir;
542   }
543 
544   /**
545    * Get the handle on the local file system
546    * @return Handle on the local file system
547    */
548   public FileSystem getFs() {
549     return this.fs;
550   }
551 
552   /**
553    * Get a string representation of all the sources' metrics
554    */
555   public String getStats() {
556     StringBuffer stats = new StringBuffer();
557     for (ReplicationSourceInterface source : sources) {
558       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
559       stats.append(source.getStats() + "\n");
560     }
561     for (ReplicationSourceInterface oldSource : oldsources) {
562       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
563       stats.append(oldSource.getStats()+ "\n");
564     }
565     return stats.toString();
566   }
567 }