1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.net.ConnectException;
25  import java.net.SocketTimeoutException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Comparator;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.NavigableMap;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.UUID;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.PriorityBlockingQueue;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.Stoppable;
51  import org.apache.hadoop.hbase.client.HConnection;
52  import org.apache.hadoop.hbase.client.HConnectionManager;
53  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
57  import org.apache.hadoop.hbase.regionserver.wal.HLog;
58  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
59  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
61  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.apache.hadoop.ipc.RemoteException;
65  import org.apache.zookeeper.KeeperException;
66  
67  /**
68   * Class that handles the source of a replication stream.
69   * Currently does not handle more than 1 slave
70   * For each slave cluster it selects a random number of peers
71   * using a replication ratio. For example, if replication ration = 0.1
72   * and slave cluster has 100 region servers, 10 will be selected.
73   * <p/>
74   * A stream is considered down when we cannot contact a region server on the
75   * peer cluster for more than 55 seconds by default.
76   * <p/>
77   *
78   */
79  @InterfaceAudience.Private
80  public class ReplicationSource extends Thread
81      implements ReplicationSourceInterface {
82  
83    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
84    // Queue of logs to process
85    private PriorityBlockingQueue<Path> queue;
86    // container of entries to replicate
87    private HLog.Entry[] entriesArray;
88    private HConnection conn;
89    // Helper class for zookeeper
90    private ReplicationZookeeper zkHelper;
91    private Configuration conf;
92    // ratio of region servers to chose from a slave cluster
93    private float ratio;
94    private Random random;
95    // should we replicate or not?
96    private AtomicBoolean replicating;
97    private ReplicationQueueInfo replicationQueueInfo;
98    // id of the peer cluster this source replicates to
99    private String peerId;
100   // The manager of all sources to which we ping back our progress
101   private ReplicationSourceManager manager;
102   // Should we stop everything?
103   private Stoppable stopper;
104   // List of chosen sinks (region servers)
105   private List<ServerName> currentPeers;
106   // How long should we sleep for each retry
107   private long sleepForRetries;
108   // Max size in bytes of entriesArray
109   private long replicationQueueSizeCapacity;
110   // Max number of entries in entriesArray
111   private int replicationQueueNbCapacity;
112   // Our reader for the current log
113   private HLog.Reader reader;
114   // Last position in the log that we sent to ZooKeeper
115   private long lastLoggedPosition = -1;
116   // Path of the current log
117   private volatile Path currentPath;
118   private FileSystem fs;
119   // id of this cluster
120   private UUID clusterId;
121   // id of the other cluster
122   private UUID peerClusterId;
123   // total number of edits we replicated
124   private long totalReplicatedEdits = 0;
125   // The znode we currently play with
126   private String peerClusterZnode;
127   // Maximum number of retries before taking bold actions
128   private int maxRetriesMultiplier;
129   // Socket timeouts require even bolder actions since we don't want to DDOS
130   private int socketTimeoutMultiplier;
131   // Current number of entries that we need to replicate
132   private int currentNbEntries = 0;
133   // Current number of operations (Put/Delete) that we need to replicate
134   private int currentNbOperations = 0;
135   // Current size of data we need to replicate
136   private int currentSize = 0;
137   // Indicates if this particular source is running
138   private volatile boolean running = true;
139   // Metrics for this source
140   private MetricsSource metrics;
141   // Handle on the log reader helper
142   private ReplicationHLogReaderManager repLogReader;
143 
144   /**
145    * Instantiation method used by region servers
146    *
147    * @param conf configuration to use
148    * @param fs file system to use
149    * @param manager replication manager to ping to
150    * @param stopper     the atomic boolean to use to stop the regionserver
151    * @param replicating the atomic boolean that starts/stops replication
152    * @param peerClusterZnode the name of our znode
153    * @throws IOException
154    */
155   public void init(final Configuration conf,
156                    final FileSystem fs,
157                    final ReplicationSourceManager manager,
158                    final Stoppable stopper,
159                    final AtomicBoolean replicating,
160                    final String peerClusterZnode)
161       throws IOException {
162     this.stopper = stopper;
163     this.conf = conf;
164     this.replicationQueueSizeCapacity =
165         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
166     this.replicationQueueNbCapacity =
167         this.conf.getInt("replication.source.nb.capacity", 25000);
168     this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
169     for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
170       this.entriesArray[i] = new HLog.Entry();
171     }
172     this.maxRetriesMultiplier =
173         this.conf.getInt("replication.source.maxretriesmultiplier", 10);
174     this.socketTimeoutMultiplier = maxRetriesMultiplier * maxRetriesMultiplier;
175     this.queue =
176         new PriorityBlockingQueue<Path>(
177             conf.getInt("hbase.regionserver.maxlogs", 32),
178             new LogsComparator());
179     this.conn = HConnectionManager.getConnection(conf);
180     this.zkHelper = manager.getRepZkWrapper();
181     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
182     this.currentPeers = new ArrayList<ServerName>();
183     this.random = new Random();
184     this.replicating = replicating;
185     this.manager = manager;
186     this.sleepForRetries =
187         this.conf.getLong("replication.source.sleepforretries", 1000);
188     this.fs = fs;
189     this.metrics = new MetricsSource(peerClusterZnode);
190     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
191     try {
192       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
193     } catch (KeeperException ke) {
194       throw new IOException("Could not read cluster id", ke);
195     }
196     this.peerClusterZnode = peerClusterZnode;
197     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
198     // ReplicationQueueInfo parses the peerId out of the znode for us
199     this.peerId = this.replicationQueueInfo.getPeerId();
200   }
201 
202   /**
203    * Select a number of peers at random using the ratio. Mininum 1.
204    */
205   private void chooseSinks() {
206     this.currentPeers.clear();
207     List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
208     Set<ServerName> setOfAddr = new HashSet<ServerName>();
209     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
210     LOG.info("Getting " + nbPeers +
211         " rs from peer cluster # " + this.peerId);
212     for (int i = 0; i < nbPeers; i++) {
213       ServerName sn;
214       // Make sure we get one address that we don't already have
215       do {
216         sn = addresses.get(this.random.nextInt(addresses.size()));
217       } while (setOfAddr.contains(sn));
218       LOG.info("Choosing peer " + sn);
219       setOfAddr.add(sn);
220     }
221     this.currentPeers.addAll(setOfAddr);
222   }
223 
224   @Override
225   public void enqueueLog(Path log) {
226     this.queue.put(log);
227     this.metrics.setSizeOfLogQueue(queue.size());
228   }
229 
230   @Override
231   public void run() {
232     connectToPeers();
233     // We were stopped while looping to connect to sinks, just abort
234     if (!this.isActive()) {
235       metrics.clear();
236       return;
237     }
238     int sleepMultiplier = 1;
239     // delay this until we are in an asynchronous thread
240     while (this.peerClusterId == null) {
241       this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
242       if (this.peerClusterId == null) {
243         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
244           sleepMultiplier++;
245         }
246       }
247     }
248     // resetting to 1 to reuse later
249     sleepMultiplier = 1;
250 
251     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
252 
253     // If this is recovered, the queue is already full and the first log
254     // normally has a position (unless the RS failed between 2 logs)
255     if (this.replicationQueueInfo.isQueueRecovered()) {
256       try {
257         this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
258             this.peerClusterZnode, this.queue.peek().getName()));
259       } catch (KeeperException e) {
260         this.terminate("Couldn't get the position of this recovered queue " +
261             this.peerClusterZnode, e);
262       }
263     }
264     // Loop until we close down
265     while (isActive()) {
266       // Sleep until replication is enabled again
267       if (!isPeerEnabled()) {
268         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
269           sleepMultiplier++;
270         }
271         continue;
272       }
273       Path oldPath = getCurrentPath(); //note that in the current scenario,
274                                        //oldPath will be null when a log roll
275                                        //happens.
276       // Get a new path
277       boolean hasCurrentPath = getNextPath();
278       if (getCurrentPath() != null && oldPath == null) {
279         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
280       }
281       if (!hasCurrentPath) {
282         if (sleepForRetries("No log to process", sleepMultiplier)) {
283           sleepMultiplier++;
284         }
285         continue;
286       }
287       boolean currentWALisBeingWrittenTo = false;
288       //For WAL files we own (rather than recovered), take a snapshot of whether the
289       //current WAL file (this.currentPath) is in use (for writing) NOW!
290       //Since the new WAL paths are enqueued only after the prev WAL file
291       //is 'closed', presence of an element in the queue means that
292       //the previous WAL file was closed, else the file is in use (currentPath)
293       //We take the snapshot now so that we are protected against races
294       //where a new file gets enqueued while the current file is being processed
295       //(and where we just finished reading the current file).
296       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
297         currentWALisBeingWrittenTo = true;
298       }
299       // Open a reader on it
300       if (!openReader(sleepMultiplier)) {
301         // Reset the sleep multiplier, else it'd be reused for the next file
302         sleepMultiplier = 1;
303         continue;
304       }
305 
306       // If we got a null reader but didn't continue, then sleep and continue
307       if (this.reader == null) {
308         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
309           sleepMultiplier++;
310         }
311         continue;
312       }
313 
314       boolean gotIOE = false;
315       currentNbOperations = 0;
316       currentNbEntries = 0;
317       currentSize = 0;
318       try {
319         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
320           continue;
321         }
322       } catch (IOException ioe) {
323         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
324         gotIOE = true;
325         if (ioe.getCause() instanceof EOFException) {
326 
327           boolean considerDumping = false;
328           if (this.replicationQueueInfo.isQueueRecovered()) {
329             try {
330               FileStatus stat = this.fs.getFileStatus(this.currentPath);
331               if (stat.getLen() == 0) {
332                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
333               }
334               considerDumping = true;
335             } catch (IOException e) {
336               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
337             }
338           } else if (currentNbEntries != 0) {
339             LOG.warn(this.peerClusterZnode +
340                 " Got EOF while reading, " + "looks like this file is broken? " + currentPath);
341             considerDumping = true;
342             currentNbEntries = 0;
343           }
344 
345           if (considerDumping &&
346               sleepMultiplier == this.maxRetriesMultiplier &&
347               processEndOfFile()) {
348             continue;
349           }
350         }
351       } finally {
352         try {
353           this.reader = null;
354           this.repLogReader.closeReader();
355         } catch (IOException e) {
356           gotIOE = true;
357           LOG.warn("Unable to finalize the tailing of a file", e);
358         }
359       }
360 
361       // If we didn't get anything to replicate, or if we hit a IOE,
362       // wait a bit and retry.
363       // But if we need to stop, don't bother sleeping
364       if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
365         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
366           this.manager.logPositionAndCleanOldLogs(this.currentPath,
367               this.peerClusterZnode, this.repLogReader.getPosition(),
368               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
369           this.lastLoggedPosition = this.repLogReader.getPosition();
370         }
371         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
372           sleepMultiplier++;
373         }
374         continue;
375       }
376       sleepMultiplier = 1;
377       shipEdits(currentWALisBeingWrittenTo);
378 
379     }
380     if (this.conn != null) {
381       try {
382         this.conn.close();
383       } catch (IOException e) {
384         LOG.debug("Attempt to close connection failed", e);
385       }
386     }
387     LOG.debug("Source exiting " + this.peerId);
388     metrics.clear();
389   }
390 
391   /**
392    * Read all the entries from the current log files and retain those
393    * that need to be replicated. Else, process the end of the current file.
394    * @param currentWALisBeingWrittenTo is the current WAL being written to
395    * @return true if we got nothing and went to the next file, false if we got
396    * entries
397    * @throws IOException
398    */
399   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
400       throws IOException{
401     long seenEntries = 0;
402     this.repLogReader.seek();
403     HLog.Entry entry =
404         this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
405     while (entry != null) {
406       WALEdit edit = entry.getEdit();
407       this.metrics.incrLogEditsRead();
408       seenEntries++;
409       // Remove all KVs that should not be replicated
410       HLogKey logKey = entry.getKey();
411       // don't replicate if the log entries originated in the peer
412       if (!logKey.getClusterId().equals(peerClusterId)) {
413         removeNonReplicableEdits(entry);
414         // Don't replicate catalog entries, if the WALEdit wasn't
415         // containing anything to replicate and if we're currently not set to replicate
416         if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
417             Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
418             edit.size() != 0 && replicating.get()) {
419           // Only set the clusterId if is a local key.
420           // This ensures that the originator sets the cluster id
421           // and all replicas retain the initial cluster id.
422           // This is *only* place where a cluster id other than the default is set.
423           if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
424             logKey.setClusterId(this.clusterId);
425           }
426           currentNbOperations += countDistinctRowKeys(edit);
427           currentNbEntries++;
428           currentSize += entry.getEdit().size();
429         } else {
430           this.metrics.incrLogEditsFiltered();
431         }
432       }
433       // Stop if too many entries or too big
434       if (currentSize >= this.replicationQueueSizeCapacity ||
435           currentNbEntries >= this.replicationQueueNbCapacity) {
436         break;
437       }
438       try {
439         entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
440       } catch (IOException ie) {
441         LOG.debug("Break on IOE: " + ie.getMessage());
442         break;
443       }
444     }
445     if (currentWALisBeingWrittenTo) {
446       return false;
447     }
448     // If we didn't get anything and the queue has an object, it means we
449     // hit the end of the file for sure
450     return seenEntries == 0 && processEndOfFile();
451   }
452 
453   private void connectToPeers() {
454     // Connect to peer cluster first, unless we have to stop
455     while (this.isActive() && this.currentPeers.size() == 0) {
456 
457       try {
458         chooseSinks();
459         Thread.sleep(this.sleepForRetries);
460       } catch (InterruptedException e) {
461         LOG.error("Interrupted while trying to connect to sinks", e);
462       }
463     }
464   }
465 
466   /**
467    * Poll for the next path
468    * @return true if a path was obtained, false if not
469    */
470   protected boolean getNextPath() {
471     try {
472       if (this.currentPath == null) {
473         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
474         this.metrics.setSizeOfLogQueue(queue.size());
475       }
476     } catch (InterruptedException e) {
477       LOG.warn("Interrupted while reading edits", e);
478     }
479     return this.currentPath != null;
480   }
481 
482   /**
483    * Open a reader on the current path
484    *
485    * @param sleepMultiplier by how many times the default sleeping time is augmented
486    * @return true if we should continue with that file, false if we are over with it
487    */
488   protected boolean openReader(int sleepMultiplier) {
489     try {
490       try {
491         this.reader = repLogReader.openReader(this.currentPath);
492       } catch (FileNotFoundException fnfe) {
493         if (this.replicationQueueInfo.isQueueRecovered()) {
494           // We didn't find the log in the archive directory, look if it still
495           // exists in the dead RS folder (there could be a chain of failures
496           // to look at)
497           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
498           LOG.info("NB dead servers : " + deadRegionServers.size());
499           for (String curDeadServerName : deadRegionServers) {
500             Path deadRsDirectory =
501                 new Path(manager.getLogDir().getParent(), curDeadServerName);
502             Path[] locs = new Path[] {
503                 new Path(deadRsDirectory, currentPath.getName()),
504                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
505                                           currentPath.getName()),
506             };
507             for (Path possibleLogLocation : locs) {
508               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
509               if (this.manager.getFs().exists(possibleLogLocation)) {
510                 // We found the right new location
511                 LOG.info("Log " + this.currentPath + " still exists at " +
512                     possibleLogLocation);
513                 // Breaking here will make us sleep since reader is null
514                 return true;
515               }
516             }
517           }
518           // TODO What happens if the log was missing from every single location?
519           // Although we need to check a couple of times as the log could have
520           // been moved by the master between the checks
521           // It can also happen if a recovered queue wasn't properly cleaned,
522           // such that the znode pointing to a log exists but the log was
523           // deleted a long time ago.
524           // For the moment, we'll throw the IO and processEndOfFile
525           throw new IOException("File from recovered queue is " +
526               "nowhere to be found", fnfe);
527         } else {
528           // If the log was archived, continue reading from there
529           Path archivedLogLocation =
530               new Path(manager.getOldLogDir(), currentPath.getName());
531           if (this.manager.getFs().exists(archivedLogLocation)) {
532             currentPath = archivedLogLocation;
533             LOG.info("Log " + this.currentPath + " was moved to " +
534                 archivedLogLocation);
535             // Open the log at the new location
536             this.openReader(sleepMultiplier);
537 
538           }
539           // TODO What happens the log is missing in both places?
540         }
541       }
542     } catch (IOException ioe) {
543       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
544       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
545       this.reader = null;
546       if (ioe.getCause() instanceof NullPointerException) {
547         // Workaround for race condition in HDFS-4380
548         // which throws a NPE if we open a file before any data node has the most recent block
549         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
550         LOG.warn("Got NPE opening reader, will retry.");
551       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
552         // TODO Need a better way to determine if a file is really gone but
553         // TODO without scanning all logs dir
554         LOG.warn("Waited too long for this file, considering dumping");
555         return !processEndOfFile();
556       }
557     }
558     return true;
559   }
560 
561   /*
562    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
563    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
564    * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
565    * may be empty, and we don't want to retry that.
566    */
567   private boolean isCurrentLogEmpty() {
568     return (this.repLogReader.getPosition() == 0 &&
569         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
570   }
571   
572   /**
573    * Do the sleeping logic
574    * @param msg Why we sleep
575    * @param sleepMultiplier by how many times the default sleeping time is augmented
576    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
577    */
578   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
579     try {
580       LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
581       Thread.sleep(this.sleepForRetries * sleepMultiplier);
582     } catch (InterruptedException e) {
583       LOG.debug("Interrupted while sleeping between retries");
584     }
585     return sleepMultiplier < maxRetriesMultiplier;
586   }
587 
588   /**
589    * We only want KVs that are scoped other than local
590    * @param entry The entry to check for replication
591    */
592   protected void removeNonReplicableEdits(HLog.Entry entry) {
593     NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
594     List<KeyValue> kvs = entry.getEdit().getKeyValues();
595     for (int i = kvs.size()-1; i >= 0; i--) {
596       KeyValue kv = kvs.get(i);
597       // The scope will be null or empty if
598       // there's nothing to replicate in that WALEdit
599       if (scopes == null || !scopes.containsKey(kv.getFamily())) {
600         kvs.remove(i);
601       }
602     }
603   }
604 
605   /**
606    * Count the number of different row keys in the given edit because of
607    * mini-batching. We assume that there's at least one KV in the WALEdit.
608    * @param edit edit to count row keys from
609    * @return number of different row keys
610    */
611   private int countDistinctRowKeys(WALEdit edit) {
612     List<KeyValue> kvs = edit.getKeyValues();
613     int distinctRowKeys = 1;
614     KeyValue lastKV = kvs.get(0);
615     for (int i = 0; i < edit.size(); i++) {
616       if (!kvs.get(i).matchingRow(lastKV)) {
617         distinctRowKeys++;
618       }
619     }
620     return distinctRowKeys;
621   }
622 
623   /**
624    * Do the shipping logic
625    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) 
626    * written to when this method was called
627    */
628   protected void shipEdits(boolean currentWALisBeingWrittenTo) {
629     int sleepMultiplier = 1;
630     if (this.currentNbEntries == 0) {
631       LOG.warn("Was given 0 edits to ship");
632       return;
633     }
634     while (this.isActive()) {
635       if (!isPeerEnabled()) {
636         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
637           sleepMultiplier++;
638         }
639         continue;
640       }
641       try {
642         AdminService.BlockingInterface rrs = getRS();
643         ReplicationProtbufUtil.replicateWALEntry(rrs,
644             Arrays.copyOf(this.entriesArray, currentNbEntries));
645         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
646           this.manager.logPositionAndCleanOldLogs(this.currentPath,
647               this.peerClusterZnode, this.repLogReader.getPosition(),
648               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
649           this.lastLoggedPosition = this.repLogReader.getPosition();
650         }
651         this.totalReplicatedEdits += currentNbEntries;
652         this.metrics.shipBatch(this.currentNbOperations);
653         this.metrics.setAgeOfLastShippedOp(
654             this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
655         break;
656 
657       } catch (IOException ioe) {
658         // Didn't ship anything, but must still age the last time we did
659         this.metrics.refreshAgeOfLastShippedOp();
660         if (ioe instanceof RemoteException) {
661           ioe = ((RemoteException) ioe).unwrapRemoteException();
662           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
663           if (ioe instanceof TableNotFoundException) {
664             if (sleepForRetries("A table is missing in the peer cluster. "
665                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
666               sleepMultiplier++;
667             }
668           }
669         } else {
670           if (ioe instanceof SocketTimeoutException) {
671             // This exception means we waited for more than 60s and nothing
672             // happened, the cluster is alive and calling it right away
673             // even for a test just makes things worse.
674             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
675               "call to the remote cluster timed out, which is usually " +
676               "caused by a machine failure or a massive slowdown",
677               this.socketTimeoutMultiplier);
678           } else if (ioe instanceof ConnectException) {
679             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
680             chooseSinks();
681           } else {
682             LOG.warn("Can't replicate because of a local or network error: ", ioe);
683           }
684         }
685 
686         try {
687           boolean down;
688           // Spin while the slave is down and we're not asked to shutdown/close
689           do {
690             down = isSlaveDown();
691             if (down) {
692               if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
693                 sleepMultiplier++;
694               } else {
695                 chooseSinks();
696               }
697             }
698           } while (this.isActive() && down );
699         } catch (InterruptedException e) {
700           LOG.debug("Interrupted while trying to contact the peer cluster");
701         }
702       }
703     }
704   }
705 
706   /**
707    * check whether the peer is enabled or not
708    *
709    * @return true if the peer is enabled, otherwise false
710    */
711   protected boolean isPeerEnabled() {
712     return this.replicating.get() &&
713         this.zkHelper.getPeerEnabled(this.peerId);
714   }
715 
716   /**
717    * If the queue isn't empty, switch to the next one
718    * Else if this is a recovered queue, it means we're done!
719    * Else we'll just continue to try reading the log file
720    * @return true if we're done with the current file, false if we should
721    * continue trying to read from it
722    */
723   protected boolean processEndOfFile() {
724     if (this.queue.size() != 0) {
725       this.currentPath = null;
726       this.repLogReader.finishCurrentFile();
727       this.reader = null;
728       return true;
729     } else if (this.replicationQueueInfo.isQueueRecovered()) {
730       this.manager.closeRecoveredQueue(this);
731       LOG.info("Finished recovering the queue");
732       this.running = false;
733       return true;
734     }
735     return false;
736   }
737 
738   public void startup() {
739     String n = Thread.currentThread().getName();
740     Thread.UncaughtExceptionHandler handler =
741         new Thread.UncaughtExceptionHandler() {
742           public void uncaughtException(final Thread t, final Throwable e) {
743             LOG.error("Unexpected exception in ReplicationSource," +
744               " currentPath=" + currentPath, e);
745           }
746         };
747     Threads.setDaemonThreadRunning(
748         this, n + ".replicationSource," +
749         this.peerClusterZnode, handler);
750   }
751 
752   public void terminate(String reason) {
753     terminate(reason, null);
754   }
755 
756   public void terminate(String reason, Exception cause) {
757     if (cause == null) {
758       LOG.info("Closing source "
759           + this.peerClusterZnode + " because: " + reason);
760 
761     } else {
762       LOG.error("Closing source " + this.peerClusterZnode
763           + " because an error occurred: " + reason, cause);
764     }
765     this.running = false;
766     Threads.shutdown(this, this.sleepForRetries);
767   }
768 
769   /**
770    * Get a new region server at random from this peer
771    * @return
772    * @throws IOException
773    */
774   private AdminService.BlockingInterface getRS() throws IOException {
775     if (this.currentPeers.size() == 0) {
776       throw new IOException(this.peerClusterZnode + " has 0 region servers");
777     }
778     ServerName address =
779         currentPeers.get(random.nextInt(this.currentPeers.size()));
780     return this.conn.getAdmin(address);
781   }
782 
783   /**
784    * Check if the slave is down by trying to establish a connection
785    * @return true if down, false if up
786    * @throws InterruptedException
787    */
788   public boolean isSlaveDown() throws InterruptedException {
789     final CountDownLatch latch = new CountDownLatch(1);
790     Thread pingThread = new Thread() {
791       public void run() {
792         try {
793           AdminService.BlockingInterface rrs = getRS();
794           // Dummy call which should fail
795           ProtobufUtil.getServerInfo(rrs);
796           latch.countDown();
797         } catch (IOException ex) {
798           if (ex instanceof RemoteException) {
799             ex = ((RemoteException) ex).unwrapRemoteException();
800           }
801           LOG.info("Slave cluster looks down: " + ex.getMessage());
802         }
803       }
804     };
805     pingThread.start();
806     // awaits returns true if countDown happened
807     boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
808     pingThread.interrupt();
809     return down;
810   }
811 
812   public String getPeerClusterZnode() {
813     return this.peerClusterZnode;
814   }
815 
816   public String getPeerClusterId() {
817     return this.peerId;
818   }
819 
820   public Path getCurrentPath() {
821     return this.currentPath;
822   }
823 
824   private boolean isActive() {
825     return !this.stopper.isStopped() && this.running;
826   }
827 
828   /**
829    * Comparator used to compare logs together based on their start time
830    */
831   public static class LogsComparator implements Comparator<Path> {
832 
833     @Override
834     public int compare(Path o1, Path o2) {
835       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
836     }
837 
838     /**
839      * Split a path to get the start time
840      * For example: 10.20.20.171%3A60020.1277499063250
841      * @param p path to split
842      * @return start time
843      */
844     private long getTS(Path p) {
845       String[] parts = p.getName().split("\\.");
846       return Long.parseLong(parts[parts.length-1]);
847     }
848   }
849 
850   @Override
851   public String getStats() {
852     String position = "N/A";
853     try {
854       if (this.reader != null) {
855         position = this.reader.getPosition()+"";
856       }
857     } catch (IOException ioe) {
858     }
859     return "Total replicated edits: " + totalReplicatedEdits +
860       ", currently replicating from: " + this.currentPath +
861       " at position: " + position;
862   }
863 }