View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.Stoppable;
43  import org.apache.hadoop.hbase.regionserver.wal.HLog;
44  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
45  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
47  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
48  import org.apache.hadoop.hbase.replication.ReplicationException;
49  import org.apache.hadoop.hbase.replication.ReplicationPeers;
50  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
51  import org.apache.hadoop.hbase.replication.ReplicationQueues;
52  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
53  import org.apache.hadoop.hbase.replication.WALEntryFilter;
54  import org.apache.hadoop.hbase.util.Threads;
55  
56  import com.google.common.collect.Lists;
57  import com.google.common.util.concurrent.ListenableFuture;
58  import com.google.common.util.concurrent.Service;
59  
60  /**
61   * Class that handles the source of a replication stream.
62   * Currently does not handle more than 1 slave
63   * For each slave cluster it selects a random number of peers
64   * using a replication ratio. For example, if replication ration = 0.1
65   * and slave cluster has 100 region servers, 10 will be selected.
66   * <p/>
67   * A stream is considered down when we cannot contact a region server on the
68   * peer cluster for more than 55 seconds by default.
69   * <p/>
70   *
71   */
72  @InterfaceAudience.Private
73  public class ReplicationSource extends Thread
74      implements ReplicationSourceInterface {
75  
76    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
77    // Queue of logs to process
78    private PriorityBlockingQueue<Path> queue;
79    private ReplicationQueues replicationQueues;
80    private ReplicationPeers replicationPeers;
81  
82    private Configuration conf;
83    private ReplicationQueueInfo replicationQueueInfo;
84    // id of the peer cluster this source replicates to
85    private String peerId;
86    // The manager of all sources to which we ping back our progress
87    private ReplicationSourceManager manager;
88    // Should we stop everything?
89    private Stoppable stopper;
90    // How long should we sleep for each retry
91    private long sleepForRetries;
92    // Max size in bytes of entriesArray
93    private long replicationQueueSizeCapacity;
94    // Max number of entries in entriesArray
95    private int replicationQueueNbCapacity;
96    // Our reader for the current log
97    private HLog.Reader reader;
98    // Last position in the log that we sent to ZooKeeper
99    private long lastLoggedPosition = -1;
100   // Path of the current log
101   private volatile Path currentPath;
102   private FileSystem fs;
103   // id of this cluster
104   private UUID clusterId;
105   // id of the other cluster
106   private UUID peerClusterId;
107   // total number of edits we replicated
108   private long totalReplicatedEdits = 0;
109   // total number of edits we replicated
110   private long totalReplicatedOperations = 0;
111   // The znode we currently play with
112   private String peerClusterZnode;
113   // Maximum number of retries before taking bold actions
114   private int maxRetriesMultiplier;
115   // Current number of operations (Put/Delete) that we need to replicate
116   private int currentNbOperations = 0;
117   // Current size of data we need to replicate
118   private int currentSize = 0;
119   // Indicates if this particular source is running
120   private volatile boolean running = true;
121   // Metrics for this source
122   private MetricsSource metrics;
123   // Handle on the log reader helper
124   private ReplicationHLogReaderManager repLogReader;
125   //WARN threshold for the number of queued logs, defaults to 2
126   private int logQueueWarnThreshold;
127   // ReplicationEndpoint which will handle the actual replication
128   private ReplicationEndpoint replicationEndpoint;
129   // A filter (or a chain of filters) for the WAL entries.
130   private WALEntryFilter walEntryFilter;
131   // Context for ReplicationEndpoint#replicate()
132   private ReplicationEndpoint.ReplicateContext replicateContext;
133   // throttler
134   private ReplicationThrottler throttler;
135 
136   /**
137    * Instantiation method used by region servers
138    *
139    * @param conf configuration to use
140    * @param fs file system to use
141    * @param manager replication manager to ping to
142    * @param stopper     the atomic boolean to use to stop the regionserver
143    * @param peerClusterZnode the name of our znode
144    * @param clusterId unique UUID for the cluster
145    * @param replicationEndpoint the replication endpoint implementation
146    * @param metrics metrics for replication source
147    * @throws IOException
148    */
149   @Override
150   public void init(final Configuration conf, final FileSystem fs,
151       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
152       final ReplicationPeers replicationPeers, final Stoppable stopper,
153       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
154       final MetricsSource metrics)
155           throws IOException {
156     this.stopper = stopper;
157     this.conf = conf;
158     decorateConf();
159     this.replicationQueueSizeCapacity =
160         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
161     this.replicationQueueNbCapacity =
162         this.conf.getInt("replication.source.nb.capacity", 25000);
163     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
164     this.queue =
165         new PriorityBlockingQueue<Path>(
166             this.conf.getInt("hbase.regionserver.maxlogs", 32),
167             new LogsComparator());
168     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
169     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
170     this.replicationQueues = replicationQueues;
171     this.replicationPeers = replicationPeers;
172     this.manager = manager;
173     this.sleepForRetries =
174         this.conf.getLong("replication.source.sleepforretries", 1000);
175     this.fs = fs;
176     this.metrics = metrics;
177     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
178     this.clusterId = clusterId;
179 
180     this.peerClusterZnode = peerClusterZnode;
181     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
182     // ReplicationQueueInfo parses the peerId out of the znode for us
183     this.peerId = this.replicationQueueInfo.getPeerId();
184     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
185     this.replicationEndpoint = replicationEndpoint;
186 
187     this.replicateContext = new ReplicationEndpoint.ReplicateContext();
188   }
189 
190   private void decorateConf() {
191     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
192     if (StringUtils.isNotEmpty(replicationCodec)) {
193       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
194     }
195   }
196 
197   @Override
198   public void enqueueLog(Path log) {
199     this.queue.put(log);
200     int queueSize = queue.size();
201     this.metrics.setSizeOfLogQueue(queueSize);
202     // This will log a warning for each new log that gets created above the warn threshold
203     if (queueSize > this.logQueueWarnThreshold) {
204       LOG.warn("Queue size: " + queueSize +
205         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
206     }
207   }
208 
209   private void uninitialize() {
210     LOG.debug("Source exiting " + this.peerId);
211     metrics.clear();
212     if (replicationEndpoint.state() == Service.State.STARTING
213         || replicationEndpoint.state() == Service.State.RUNNING) {
214       replicationEndpoint.stopAndWait();
215     }
216   }
217 
218   @Override
219   public void run() {
220     // We were stopped while looping to connect to sinks, just abort
221     if (!this.isActive()) {
222       uninitialize();
223       return;
224     }
225 
226     try {
227       // start the endpoint, connect to the cluster
228       Service.State state = replicationEndpoint.start().get();
229       if (state != Service.State.RUNNING) {
230         LOG.warn("ReplicationEndpoint was not started. Exiting");
231         uninitialize();
232         return;
233       }
234     } catch (Exception ex) {
235       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
236       throw new RuntimeException(ex);
237     }
238 
239     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
240     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
241       (WALEntryFilter)new SystemTableWALEntryFilter());
242     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
243     if (filterFromEndpoint != null) {
244       filters.add(filterFromEndpoint);
245     }
246     this.walEntryFilter = new ChainWALEntryFilter(filters);
247 
248     int sleepMultiplier = 1;
249     // delay this until we are in an asynchronous thread
250     while (this.isActive() && this.peerClusterId == null) {
251       this.peerClusterId = replicationEndpoint.getPeerUUID();
252       if (this.isActive() && this.peerClusterId == null) {
253         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
254           sleepMultiplier++;
255         }
256       }
257     }
258     // We were stopped while looping to contact peer's zk ensemble, just abort
259     if (!this.isActive()) {
260       uninitialize();
261       return;
262     }
263 
264     // resetting to 1 to reuse later
265     sleepMultiplier = 1;
266 
267     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
268     // peerClusterId value, which is the same as the source clusterId
269     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
270       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
271           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
272           + replicationEndpoint.getClass().getName(), null, false);
273     }
274     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
275 
276     // If this is recovered, the queue is already full and the first log
277     // normally has a position (unless the RS failed between 2 logs)
278     if (this.replicationQueueInfo.isQueueRecovered()) {
279       try {
280         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
281           this.queue.peek().getName()));
282         if (LOG.isTraceEnabled()) {
283           LOG.trace("Recovered queue started with log " + this.queue.peek() +
284               " at position " + this.repLogReader.getPosition());
285         }
286       } catch (ReplicationException e) {
287         this.terminate("Couldn't get the position of this recovered queue " +
288             this.peerClusterZnode, e);
289       }
290     }
291     // Loop until we close down
292     while (isActive()) {
293       // Sleep until replication is enabled again
294       if (!isPeerEnabled()) {
295         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
296           sleepMultiplier++;
297         }
298         continue;
299       }
300       Path oldPath = getCurrentPath(); //note that in the current scenario,
301                                        //oldPath will be null when a log roll
302                                        //happens.
303       // Get a new path
304       boolean hasCurrentPath = getNextPath();
305       if (getCurrentPath() != null && oldPath == null) {
306         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
307       }
308       if (!hasCurrentPath) {
309         if (sleepForRetries("No log to process", sleepMultiplier)) {
310           sleepMultiplier++;
311         }
312         continue;
313       }
314       boolean currentWALisBeingWrittenTo = false;
315       //For WAL files we own (rather than recovered), take a snapshot of whether the
316       //current WAL file (this.currentPath) is in use (for writing) NOW!
317       //Since the new WAL paths are enqueued only after the prev WAL file
318       //is 'closed', presence of an element in the queue means that
319       //the previous WAL file was closed, else the file is in use (currentPath)
320       //We take the snapshot now so that we are protected against races
321       //where a new file gets enqueued while the current file is being processed
322       //(and where we just finished reading the current file).
323       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
324         currentWALisBeingWrittenTo = true;
325       }
326       // Open a reader on it
327       if (!openReader(sleepMultiplier)) {
328         // Reset the sleep multiplier, else it'd be reused for the next file
329         sleepMultiplier = 1;
330         continue;
331       }
332 
333       // If we got a null reader but didn't continue, then sleep and continue
334       if (this.reader == null) {
335         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
336           sleepMultiplier++;
337         }
338         continue;
339       }
340 
341       boolean gotIOE = false;
342       currentNbOperations = 0;
343       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
344       currentSize = 0;
345       try {
346         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
347           continue;
348         }
349       } catch (IOException ioe) {
350         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
351         gotIOE = true;
352         if (ioe.getCause() instanceof EOFException) {
353 
354           boolean considerDumping = false;
355           if (this.replicationQueueInfo.isQueueRecovered()) {
356             try {
357               FileStatus stat = this.fs.getFileStatus(this.currentPath);
358               if (stat.getLen() == 0) {
359                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
360               }
361               considerDumping = true;
362             } catch (IOException e) {
363               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
364             }
365           }
366 
367           if (considerDumping &&
368               sleepMultiplier == this.maxRetriesMultiplier &&
369               processEndOfFile()) {
370             continue;
371           }
372         }
373       } finally {
374         try {
375           this.reader = null;
376           this.repLogReader.closeReader();
377         } catch (IOException e) {
378           gotIOE = true;
379           LOG.warn("Unable to finalize the tailing of a file", e);
380         }
381       }
382 
383       // If we didn't get anything to replicate, or if we hit a IOE,
384       // wait a bit and retry.
385       // But if we need to stop, don't bother sleeping
386       if (this.isActive() && (gotIOE || entries.isEmpty())) {
387         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
388           this.manager.logPositionAndCleanOldLogs(this.currentPath,
389               this.peerClusterZnode, this.repLogReader.getPosition(),
390               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
391           this.lastLoggedPosition = this.repLogReader.getPosition();
392         }
393         // Reset the sleep multiplier if nothing has actually gone wrong
394         if (!gotIOE) {
395           sleepMultiplier = 1;
396         }
397         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
398           sleepMultiplier++;
399         }
400         continue;
401       }
402       sleepMultiplier = 1;
403       shipEdits(currentWALisBeingWrittenTo, entries);
404     }
405     uninitialize();
406   }
407 
408   /**
409    * Read all the entries from the current log files and retain those
410    * that need to be replicated. Else, process the end of the current file.
411    * @param currentWALisBeingWrittenTo is the current WAL being written to
412    * @param entries resulting entries to be replicated
413    * @return true if we got nothing and went to the next file, false if we got
414    * entries
415    * @throws IOException
416    */
417   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
418       List<HLog.Entry> entries) throws IOException{
419     long seenEntries = 0;
420     if (LOG.isTraceEnabled()) {
421       LOG.trace("Seeking in " + this.currentPath + " at position "
422           + this.repLogReader.getPosition());
423     }
424     this.repLogReader.seek();
425     long positionBeforeRead = this.repLogReader.getPosition();
426     HLog.Entry entry =
427         this.repLogReader.readNextAndSetPosition();
428     while (entry != null) {
429       this.metrics.incrLogEditsRead();
430       seenEntries++;
431 
432       // don't replicate if the log entries have already been consumed by the cluster
433       if (replicationEndpoint.canReplicateToSameCluster()
434           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
435         // Remove all KVs that should not be replicated
436         entry = walEntryFilter.filter(entry);
437         WALEdit edit = null;
438         HLogKey logKey = null;
439         if (entry != null) {
440           edit = entry.getEdit();
441           logKey = entry.getKey();
442         }
443 
444         if (edit != null && edit.size() != 0) {
445           //Mark that the current cluster has the change
446           logKey.addClusterId(clusterId);
447           currentNbOperations += countDistinctRowKeys(edit);
448           entries.add(entry);
449           currentSize += entry.getEdit().heapSize();
450         } else {
451           this.metrics.incrLogEditsFiltered();
452         }
453       }
454       // Stop if too many entries or too big
455       if (currentSize >= this.replicationQueueSizeCapacity ||
456           entries.size() >= this.replicationQueueNbCapacity) {
457         break;
458       }
459       try {
460         entry = this.repLogReader.readNextAndSetPosition();
461       } catch (IOException ie) {
462         LOG.debug("Break on IOE: " + ie.getMessage());
463         break;
464       }
465     }
466     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
467     if (currentWALisBeingWrittenTo) {
468       return false;
469     }
470     // If we didn't get anything and the queue has an object, it means we
471     // hit the end of the file for sure
472     return seenEntries == 0 && processEndOfFile();
473   }
474 
475   /**
476    * Poll for the next path
477    * @return true if a path was obtained, false if not
478    */
479   protected boolean getNextPath() {
480     try {
481       if (this.currentPath == null) {
482         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
483         this.metrics.setSizeOfLogQueue(queue.size());
484         if (this.currentPath != null) {
485           this.manager.cleanOldLogs(this.currentPath.getName(),
486               this.peerId,
487               this.replicationQueueInfo.isQueueRecovered());
488           if (LOG.isTraceEnabled()) {
489             LOG.trace("New log: " + this.currentPath);
490           }
491         }
492       }
493     } catch (InterruptedException e) {
494       LOG.warn("Interrupted while reading edits", e);
495     }
496     return this.currentPath != null;
497   }
498 
499   /**
500    * Open a reader on the current path
501    *
502    * @param sleepMultiplier by how many times the default sleeping time is augmented
503    * @return true if we should continue with that file, false if we are over with it
504    */
505   protected boolean openReader(int sleepMultiplier) {
506     try {
507       try {
508         if (LOG.isTraceEnabled()) {
509           LOG.trace("Opening log " + this.currentPath);
510         }
511         this.reader = repLogReader.openReader(this.currentPath);
512       } catch (FileNotFoundException fnfe) {
513         if (this.replicationQueueInfo.isQueueRecovered()) {
514           // We didn't find the log in the archive directory, look if it still
515           // exists in the dead RS folder (there could be a chain of failures
516           // to look at)
517           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
518           LOG.info("NB dead servers : " + deadRegionServers.size());
519           for (String curDeadServerName : deadRegionServers) {
520             Path deadRsDirectory =
521                 new Path(manager.getLogDir().getParent(), curDeadServerName);
522             Path[] locs = new Path[] {
523                 new Path(deadRsDirectory, currentPath.getName()),
524                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
525                                           currentPath.getName()),
526             };
527             for (Path possibleLogLocation : locs) {
528               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
529               if (this.manager.getFs().exists(possibleLogLocation)) {
530                 // We found the right new location
531                 LOG.info("Log " + this.currentPath + " still exists at " +
532                     possibleLogLocation);
533                 // Breaking here will make us sleep since reader is null
534                 return true;
535               }
536             }
537           }
538           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
539           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
540           if (stopper instanceof ReplicationSyncUp.DummyServer) {
541             FileStatus[] rss = fs.listStatus(manager.getLogDir());
542             for (FileStatus rs : rss) {
543               Path p = rs.getPath();
544               FileStatus[] logs = fs.listStatus(p);
545               for (FileStatus log : logs) {
546                 p = new Path(p, log.getPath().getName());
547                 if (p.getName().equals(currentPath.getName())) {
548                   currentPath = p;
549                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
550                   // Open the log at the new location
551                   this.openReader(sleepMultiplier);
552                   return true;
553                 }
554               }
555             }
556           }
557 
558           // TODO What happens if the log was missing from every single location?
559           // Although we need to check a couple of times as the log could have
560           // been moved by the master between the checks
561           // It can also happen if a recovered queue wasn't properly cleaned,
562           // such that the znode pointing to a log exists but the log was
563           // deleted a long time ago.
564           // For the moment, we'll throw the IO and processEndOfFile
565           throw new IOException("File from recovered queue is " +
566               "nowhere to be found", fnfe);
567         } else {
568           // If the log was archived, continue reading from there
569           Path archivedLogLocation =
570               new Path(manager.getOldLogDir(), currentPath.getName());
571           if (this.manager.getFs().exists(archivedLogLocation)) {
572             currentPath = archivedLogLocation;
573             LOG.info("Log " + this.currentPath + " was moved to " +
574                 archivedLogLocation);
575             // Open the log at the new location
576             this.openReader(sleepMultiplier);
577 
578           }
579           // TODO What happens the log is missing in both places?
580         }
581       }
582     } catch (IOException ioe) {
583       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
584       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
585       this.reader = null;
586       if (ioe.getCause() instanceof NullPointerException) {
587         // Workaround for race condition in HDFS-4380
588         // which throws a NPE if we open a file before any data node has the most recent block
589         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
590         LOG.warn("Got NPE opening reader, will retry.");
591       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
592         // TODO Need a better way to determine if a file is really gone but
593         // TODO without scanning all logs dir
594         LOG.warn("Waited too long for this file, considering dumping");
595         return !processEndOfFile();
596       }
597     }
598     return true;
599   }
600 
601   /*
602    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
603    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
604    * trying to read the log file and get EOFException. In case of a recovered queue the last log
605    * file may be empty, and we don't want to retry that.
606    */
607   private boolean isCurrentLogEmpty() {
608     return (this.repLogReader.getPosition() == 0 &&
609         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
610   }
611 
612   /**
613    * Do the sleeping logic
614    * @param msg Why we sleep
615    * @param sleepMultiplier by how many times the default sleeping time is augmented
616    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
617    */
618   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
619     try {
620       if (LOG.isTraceEnabled()) {
621         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
622       }
623       Thread.sleep(this.sleepForRetries * sleepMultiplier);
624     } catch (InterruptedException e) {
625       LOG.debug("Interrupted while sleeping between retries");
626       Thread.currentThread().interrupt();
627     }
628     return sleepMultiplier < maxRetriesMultiplier;
629   }
630 
631   /**
632    * Count the number of different row keys in the given edit because of
633    * mini-batching. We assume that there's at least one KV in the WALEdit.
634    * @param edit edit to count row keys from
635    * @return number of different row keys
636    */
637   private int countDistinctRowKeys(WALEdit edit) {
638     List<KeyValue> kvs = edit.getKeyValues();
639     int distinctRowKeys = 1;
640     KeyValue lastKV = kvs.get(0);
641     for (int i = 0; i < edit.size(); i++) {
642       if (!CellUtil.matchingRow(kvs.get(i), lastKV)) {
643         distinctRowKeys++;
644       }
645     }
646     return distinctRowKeys;
647   }
648 
649   /**
650    * Do the shipping logic
651    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
652    * written to when this method was called
653    */
654   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
655     int sleepMultiplier = 1;
656     if (entries.isEmpty()) {
657       LOG.warn("Was given 0 edits to ship");
658       return;
659     }
660     while (this.isActive()) {
661       try {
662         if (this.throttler.isEnabled()) {
663           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
664           if (sleepTicks > 0) {
665             try {
666               if (LOG.isTraceEnabled()) {
667                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
668               }
669               Thread.sleep(sleepTicks);
670             } catch (InterruptedException e) {
671               LOG.debug("Interrupted while sleeping for throttling control");
672               Thread.currentThread().interrupt();
673               // current thread might be interrupted to terminate
674               // directly go back to while() for confirm this
675               continue;
676             }
677             // reset throttler's cycle start tick when sleep for throttling occurs
678             this.throttler.resetStartTick();
679           }
680         }
681         replicateContext.setEntries(entries).setSize(currentSize);
682 
683         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
684         boolean replicated = replicationEndpoint.replicate(replicateContext);
685 
686         if (!replicated) {
687           continue;
688         }
689 
690         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
691           this.manager.logPositionAndCleanOldLogs(this.currentPath,
692               this.peerClusterZnode, this.repLogReader.getPosition(),
693               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
694           this.lastLoggedPosition = this.repLogReader.getPosition();
695         }
696         if (this.throttler.isEnabled()) {
697           this.throttler.addPushSize(currentSize);
698         }
699         this.totalReplicatedEdits += entries.size();
700         this.totalReplicatedOperations += currentNbOperations;
701         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
702         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
703         if (LOG.isTraceEnabled()) {
704           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
705               + this.totalReplicatedOperations + " operations");
706         }
707         break;
708       } catch (Exception ex) {
709         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
710         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
711           sleepMultiplier++;
712         }
713       }
714     }
715   }
716 
717   /**
718    * check whether the peer is enabled or not
719    *
720    * @return true if the peer is enabled, otherwise false
721    */
722   protected boolean isPeerEnabled() {
723     return this.replicationPeers.getStatusOfPeer(this.peerId);
724   }
725 
726   /**
727    * If the queue isn't empty, switch to the next one
728    * Else if this is a recovered queue, it means we're done!
729    * Else we'll just continue to try reading the log file
730    * @return true if we're done with the current file, false if we should
731    * continue trying to read from it
732    */
733   protected boolean processEndOfFile() {
734     if (this.queue.size() != 0) {
735       if (LOG.isTraceEnabled()) {
736         String filesize = "N/A";
737         try {
738           FileStatus stat = this.fs.getFileStatus(this.currentPath);
739           filesize = stat.getLen()+"";
740         } catch (IOException ex) {}
741         LOG.trace("Reached the end of a log, stats: " + getStats() +
742             ", and the length of the file is " + filesize);
743       }
744       this.currentPath = null;
745       this.repLogReader.finishCurrentFile();
746       this.reader = null;
747       return true;
748     } else if (this.replicationQueueInfo.isQueueRecovered()) {
749       this.manager.closeRecoveredQueue(this);
750       LOG.info("Finished recovering the queue with the following stats " + getStats());
751       this.running = false;
752       return true;
753     }
754     return false;
755   }
756 
757   @Override
758   public void startup() {
759     String n = Thread.currentThread().getName();
760     Thread.UncaughtExceptionHandler handler =
761         new Thread.UncaughtExceptionHandler() {
762           @Override
763           public void uncaughtException(final Thread t, final Throwable e) {
764             LOG.error("Unexpected exception in ReplicationSource," +
765               " currentPath=" + currentPath, e);
766           }
767         };
768     Threads.setDaemonThreadRunning(
769         this, n + ".replicationSource," +
770         this.peerClusterZnode, handler);
771   }
772 
773   @Override
774   public void terminate(String reason) {
775     terminate(reason, null);
776   }
777 
778   @Override
779   public void terminate(String reason, Exception cause) {
780     terminate(reason, cause, true);
781   }
782 
783   public void terminate(String reason, Exception cause, boolean join) {
784     if (cause == null) {
785       LOG.info("Closing source "
786           + this.peerClusterZnode + " because: " + reason);
787 
788     } else {
789       LOG.error("Closing source " + this.peerClusterZnode
790           + " because an error occurred: " + reason, cause);
791     }
792     this.running = false;
793     this.interrupt();
794     ListenableFuture<Service.State> future = null;
795     if (this.replicationEndpoint != null) {
796       future = this.replicationEndpoint.stop();
797     }
798     if (join) {
799       Threads.shutdown(this, this.sleepForRetries);
800       if (future != null) {
801         try {
802           future.get();
803         } catch (Exception e) {
804           LOG.warn("Got exception:" + e);
805         }
806       }
807     }
808   }
809 
810   @Override
811   public String getPeerClusterZnode() {
812     return this.peerClusterZnode;
813   }
814 
815   @Override
816   public String getPeerClusterId() {
817     return this.peerId;
818   }
819 
820   @Override
821   public Path getCurrentPath() {
822     return this.currentPath;
823   }
824 
825   private boolean isActive() {
826     return !this.stopper.isStopped() && this.running && !isInterrupted();
827   }
828 
829   /**
830    * Comparator used to compare logs together based on their start time
831    */
832   public static class LogsComparator implements Comparator<Path> {
833 
834     @Override
835     public int compare(Path o1, Path o2) {
836       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
837     }
838 
839     /**
840      * Split a path to get the start time
841      * For example: 10.20.20.171%3A60020.1277499063250
842      * @param p path to split
843      * @return start time
844      */
845     private long getTS(Path p) {
846       String[] parts = p.getName().split("\\.");
847       return Long.parseLong(parts[parts.length-1]);
848     }
849   }
850 
851   @Override
852   public String getStats() {
853     long position = this.repLogReader.getPosition();
854     return "Total replicated edits: " + totalReplicatedEdits +
855       ", currently replicating from: " + this.currentPath +
856       " at position: " + position;
857   }
858 }