View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
45  import org.apache.hadoop.hbase.wal.WAL;
46  import org.apache.hadoop.hbase.wal.WALKey;
47  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
53  import org.apache.hadoop.hbase.replication.ReplicationQueues;
54  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
55  import org.apache.hadoop.hbase.replication.WALEntryFilter;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.Threads;
58  
59  import com.google.common.collect.Lists;
60  import com.google.common.util.concurrent.ListenableFuture;
61  import com.google.common.util.concurrent.Service;
62  
63  /**
64   * Class that handles the source of a replication stream.
65   * Currently does not handle more than 1 slave
66   * For each slave cluster it selects a random number of peers
67   * using a replication ratio. For example, if replication ration = 0.1
68   * and slave cluster has 100 region servers, 10 will be selected.
69   * <p>
70   * A stream is considered down when we cannot contact a region server on the
71   * peer cluster for more than 55 seconds by default.
72   * </p>
73   *
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSource extends Thread
77      implements ReplicationSourceInterface {
78  
79    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
80    // Queue of logs to process
81    private PriorityBlockingQueue<Path> queue;
82    private ReplicationQueues replicationQueues;
83    private ReplicationPeers replicationPeers;
84  
85    private Configuration conf;
86    private ReplicationQueueInfo replicationQueueInfo;
87    // id of the peer cluster this source replicates to
88    private String peerId;
89    // The manager of all sources to which we ping back our progress
90    private ReplicationSourceManager manager;
91    // Should we stop everything?
92    private Stoppable stopper;
93    // How long should we sleep for each retry
94    private long sleepForRetries;
95    // Max size in bytes of entriesArray
96    private long replicationQueueSizeCapacity;
97    // Max number of entries in entriesArray
98    private int replicationQueueNbCapacity;
99    // Our reader for the current log. open/close handled by repLogReader
100   private WAL.Reader reader;
101   // Last position in the log that we sent to ZooKeeper
102   private long lastLoggedPosition = -1;
103   // Path of the current log
104   private volatile Path currentPath;
105   private FileSystem fs;
106   // id of this cluster
107   private UUID clusterId;
108   // id of the other cluster
109   private UUID peerClusterId;
110   // total number of edits we replicated
111   private long totalReplicatedEdits = 0;
112   // total number of edits we replicated
113   private long totalReplicatedOperations = 0;
114   // The znode we currently play with
115   private String peerClusterZnode;
116   // Maximum number of retries before taking bold actions
117   private int maxRetriesMultiplier;
118   // Current number of operations (Put/Delete) that we need to replicate
119   private int currentNbOperations = 0;
120   // Current size of data we need to replicate
121   private int currentSize = 0;
122   // Indicates if this particular source is running
123   private volatile boolean running = true;
124   // Metrics for this source
125   private MetricsSource metrics;
126   // Handle on the log reader helper
127   private ReplicationWALReaderManager repLogReader;
128   //WARN threshold for the number of queued logs, defaults to 2
129   private int logQueueWarnThreshold;
130   // ReplicationEndpoint which will handle the actual replication
131   private ReplicationEndpoint replicationEndpoint;
132   // A filter (or a chain of filters) for the WAL entries.
133   private WALEntryFilter walEntryFilter;
134   // throttler
135   private ReplicationThrottler throttler;
136 
137   /**
138    * Instantiation method used by region servers
139    *
140    * @param conf configuration to use
141    * @param fs file system to use
142    * @param manager replication manager to ping to
143    * @param stopper     the atomic boolean to use to stop the regionserver
144    * @param peerClusterZnode the name of our znode
145    * @param clusterId unique UUID for the cluster
146    * @param replicationEndpoint the replication endpoint implementation
147    * @param metrics metrics for replication source
148    * @throws IOException
149    */
150   @Override
151   public void init(final Configuration conf, final FileSystem fs,
152       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
153       final ReplicationPeers replicationPeers, final Stoppable stopper,
154       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
155       final MetricsSource metrics)
156           throws IOException {
157     this.stopper = stopper;
158     this.conf = HBaseConfiguration.create(conf);
159     decorateConf();
160     this.replicationQueueSizeCapacity =
161         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
162     this.replicationQueueNbCapacity =
163         this.conf.getInt("replication.source.nb.capacity", 25000);
164     this.sleepForRetries =
165         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
166     this.maxRetriesMultiplier =
167         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
168     this.queue =
169         new PriorityBlockingQueue<Path>(
170             this.conf.getInt("hbase.regionserver.maxlogs", 32),
171             new LogsComparator());
172     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
173     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
174     this.replicationQueues = replicationQueues;
175     this.replicationPeers = replicationPeers;
176     this.manager = manager;
177     this.fs = fs;
178     this.metrics = metrics;
179     this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
180     this.clusterId = clusterId;
181 
182     this.peerClusterZnode = peerClusterZnode;
183     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
184     // ReplicationQueueInfo parses the peerId out of the znode for us
185     this.peerId = this.replicationQueueInfo.getPeerId();
186     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
187     this.replicationEndpoint = replicationEndpoint;
188   }
189 
190   private void decorateConf() {
191     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
192     if (StringUtils.isNotEmpty(replicationCodec)) {
193       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
194     }
195   }
196 
197   @Override
198   public void enqueueLog(Path log) {
199     this.queue.put(log);
200     int queueSize = queue.size();
201     this.metrics.setSizeOfLogQueue(queueSize);
202     // This will log a warning for each new log that gets created above the warn threshold
203     if (queueSize > this.logQueueWarnThreshold) {
204       LOG.warn("Queue size: " + queueSize +
205         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
206     }
207   }
208 
209   private void uninitialize() {
210     LOG.debug("Source exiting " + this.peerId);
211     metrics.clear();
212     if (replicationEndpoint.state() == Service.State.STARTING
213         || replicationEndpoint.state() == Service.State.RUNNING) {
214       replicationEndpoint.stopAndWait();
215     }
216   }
217 
218   @Override
219   public void run() {
220     // We were stopped while looping to connect to sinks, just abort
221     if (!this.isActive()) {
222       uninitialize();
223       return;
224     }
225 
226     try {
227       // start the endpoint, connect to the cluster
228       Service.State state = replicationEndpoint.start().get();
229       if (state != Service.State.RUNNING) {
230         LOG.warn("ReplicationEndpoint was not started. Exiting");
231         uninitialize();
232         return;
233       }
234     } catch (Exception ex) {
235       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
236       throw new RuntimeException(ex);
237     }
238 
239     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
240     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
241       (WALEntryFilter)new SystemTableWALEntryFilter());
242     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
243     if (filterFromEndpoint != null) {
244       filters.add(filterFromEndpoint);
245     }
246     this.walEntryFilter = new ChainWALEntryFilter(filters);
247 
248     int sleepMultiplier = 1;
249     // delay this until we are in an asynchronous thread
250     while (this.isActive() && this.peerClusterId == null) {
251       this.peerClusterId = replicationEndpoint.getPeerUUID();
252       if (this.isActive() && this.peerClusterId == null) {
253         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
254           sleepMultiplier++;
255         }
256       }
257     }
258     // We were stopped while looping to contact peer's zk ensemble, just abort
259     if (!this.isActive()) {
260       uninitialize();
261       return;
262     }
263 
264     // resetting to 1 to reuse later
265     sleepMultiplier = 1;
266 
267     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
268     // peerClusterId value, which is the same as the source clusterId
269     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
270       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
271           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
272           + replicationEndpoint.getClass().getName(), null, false);
273     }
274     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
275 
276     // If this is recovered, the queue is already full and the first log
277     // normally has a position (unless the RS failed between 2 logs)
278     if (this.replicationQueueInfo.isQueueRecovered()) {
279       try {
280         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
281           this.queue.peek().getName()));
282         if (LOG.isTraceEnabled()) {
283           LOG.trace("Recovered queue started with log " + this.queue.peek() +
284               " at position " + this.repLogReader.getPosition());
285         }
286       } catch (ReplicationException e) {
287         this.terminate("Couldn't get the position of this recovered queue " +
288             this.peerClusterZnode, e);
289       }
290     }
291     // Loop until we close down
292     while (isActive()) {
293       // Sleep until replication is enabled again
294       if (!isPeerEnabled()) {
295         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
296           sleepMultiplier++;
297         }
298         continue;
299       }
300       Path oldPath = getCurrentPath(); //note that in the current scenario,
301                                        //oldPath will be null when a log roll
302                                        //happens.
303       // Get a new path
304       boolean hasCurrentPath = getNextPath();
305       if (getCurrentPath() != null && oldPath == null) {
306         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
307       }
308       if (!hasCurrentPath) {
309         if (sleepForRetries("No log to process", sleepMultiplier)) {
310           sleepMultiplier++;
311         }
312         continue;
313       }
314       boolean currentWALisBeingWrittenTo = false;
315       //For WAL files we own (rather than recovered), take a snapshot of whether the
316       //current WAL file (this.currentPath) is in use (for writing) NOW!
317       //Since the new WAL paths are enqueued only after the prev WAL file
318       //is 'closed', presence of an element in the queue means that
319       //the previous WAL file was closed, else the file is in use (currentPath)
320       //We take the snapshot now so that we are protected against races
321       //where a new file gets enqueued while the current file is being processed
322       //(and where we just finished reading the current file).
323       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
324         currentWALisBeingWrittenTo = true;
325       }
326       // Open a reader on it
327       if (!openReader(sleepMultiplier)) {
328         // Reset the sleep multiplier, else it'd be reused for the next file
329         sleepMultiplier = 1;
330         continue;
331       }
332 
333       // If we got a null reader but didn't continue, then sleep and continue
334       if (this.reader == null) {
335         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
336           sleepMultiplier++;
337         }
338         continue;
339       }
340 
341       boolean gotIOE = false;
342       currentNbOperations = 0;
343       List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
344       currentSize = 0;
345       try {
346         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
347           continue;
348         }
349       } catch (IOException ioe) {
350         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
351         gotIOE = true;
352         if (ioe.getCause() instanceof EOFException) {
353 
354           boolean considerDumping = false;
355           if (this.replicationQueueInfo.isQueueRecovered()) {
356             try {
357               FileStatus stat = this.fs.getFileStatus(this.currentPath);
358               if (stat.getLen() == 0) {
359                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
360               }
361               considerDumping = true;
362             } catch (IOException e) {
363               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
364             }
365           }
366 
367           if (considerDumping &&
368               sleepMultiplier == this.maxRetriesMultiplier &&
369               processEndOfFile()) {
370             continue;
371           }
372         }
373       } finally {
374         try {
375           this.reader = null;
376           this.repLogReader.closeReader();
377         } catch (IOException e) {
378           gotIOE = true;
379           LOG.warn("Unable to finalize the tailing of a file", e);
380         }
381       }
382 
383       // If we didn't get anything to replicate, or if we hit a IOE,
384       // wait a bit and retry.
385       // But if we need to stop, don't bother sleeping
386       if (this.isActive() && (gotIOE || entries.isEmpty())) {
387         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
388           this.manager.logPositionAndCleanOldLogs(this.currentPath,
389               this.peerClusterZnode, this.repLogReader.getPosition(),
390               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
391           this.lastLoggedPosition = this.repLogReader.getPosition();
392         }
393         // Reset the sleep multiplier if nothing has actually gone wrong
394         if (!gotIOE) {
395           sleepMultiplier = 1;
396           // if there was nothing to ship and it's not an error
397           // set "ageOfLastShippedOp" to <now> to indicate that we're current
398           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
399         }
400         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
401           sleepMultiplier++;
402         }
403         continue;
404       }
405       sleepMultiplier = 1;
406       shipEdits(currentWALisBeingWrittenTo, entries);
407     }
408     uninitialize();
409   }
410 
411   /**
412    * Read all the entries from the current log files and retain those
413    * that need to be replicated. Else, process the end of the current file.
414    * @param currentWALisBeingWrittenTo is the current WAL being written to
415    * @param entries resulting entries to be replicated
416    * @return true if we got nothing and went to the next file, false if we got
417    * entries
418    * @throws IOException
419    */
420   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
421       List<WAL.Entry> entries) throws IOException {
422     long seenEntries = 0;
423     if (LOG.isTraceEnabled()) {
424       LOG.trace("Seeking in " + this.currentPath + " at position "
425           + this.repLogReader.getPosition());
426     }
427     this.repLogReader.seek();
428     long positionBeforeRead = this.repLogReader.getPosition();
429     WAL.Entry entry =
430         this.repLogReader.readNextAndSetPosition();
431     while (entry != null) {
432       this.metrics.incrLogEditsRead();
433       seenEntries++;
434 
435       // don't replicate if the log entries have already been consumed by the cluster
436       if (replicationEndpoint.canReplicateToSameCluster()
437           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
438         // Remove all KVs that should not be replicated
439         entry = walEntryFilter.filter(entry);
440         WALEdit edit = null;
441         WALKey logKey = null;
442         if (entry != null) {
443           edit = entry.getEdit();
444           logKey = entry.getKey();
445         }
446 
447         if (edit != null && edit.size() != 0) {
448           //Mark that the current cluster has the change
449           logKey.addClusterId(clusterId);
450           currentNbOperations += countDistinctRowKeys(edit);
451           entries.add(entry);
452           currentSize += entry.getEdit().heapSize();
453         } else {
454           this.metrics.incrLogEditsFiltered();
455         }
456       }
457       // Stop if too many entries or too big
458       if (currentSize >= this.replicationQueueSizeCapacity ||
459           entries.size() >= this.replicationQueueNbCapacity) {
460         break;
461       }
462       try {
463         entry = this.repLogReader.readNextAndSetPosition();
464       } catch (IOException ie) {
465         LOG.debug("Break on IOE: " + ie.getMessage());
466         break;
467       }
468     }
469     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
470     if (currentWALisBeingWrittenTo) {
471       return false;
472     }
473     // If we didn't get anything and the queue has an object, it means we
474     // hit the end of the file for sure
475     return seenEntries == 0 && processEndOfFile();
476   }
477 
478   /**
479    * Poll for the next path
480    * @return true if a path was obtained, false if not
481    */
482   protected boolean getNextPath() {
483     try {
484       if (this.currentPath == null) {
485         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
486         this.metrics.setSizeOfLogQueue(queue.size());
487         if (this.currentPath != null) {
488           this.manager.cleanOldLogs(this.currentPath.getName(),
489               this.peerId,
490               this.replicationQueueInfo.isQueueRecovered());
491           if (LOG.isTraceEnabled()) {
492             LOG.trace("New log: " + this.currentPath);
493           }
494         }
495       }
496     } catch (InterruptedException e) {
497       LOG.warn("Interrupted while reading edits", e);
498     }
499     return this.currentPath != null;
500   }
501 
502   /**
503    * Open a reader on the current path
504    *
505    * @param sleepMultiplier by how many times the default sleeping time is augmented
506    * @return true if we should continue with that file, false if we are over with it
507    */
508   protected boolean openReader(int sleepMultiplier) {
509     try {
510       try {
511         if (LOG.isTraceEnabled()) {
512           LOG.trace("Opening log " + this.currentPath);
513         }
514         this.reader = repLogReader.openReader(this.currentPath);
515       } catch (FileNotFoundException fnfe) {
516         if (this.replicationQueueInfo.isQueueRecovered()) {
517           // We didn't find the log in the archive directory, look if it still
518           // exists in the dead RS folder (there could be a chain of failures
519           // to look at)
520           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
521           LOG.info("NB dead servers : " + deadRegionServers.size());
522           final Path rootDir = FSUtils.getRootDir(this.conf);
523           for (String curDeadServerName : deadRegionServers) {
524             final Path deadRsDirectory = new Path(rootDir,
525                 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
526             Path[] locs = new Path[] {
527                 new Path(deadRsDirectory, currentPath.getName()),
528                 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
529                                           currentPath.getName()),
530             };
531             for (Path possibleLogLocation : locs) {
532               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
533               if (this.manager.getFs().exists(possibleLogLocation)) {
534                 // We found the right new location
535                 LOG.info("Log " + this.currentPath + " still exists at " +
536                     possibleLogLocation);
537                 // Breaking here will make us sleep since reader is null
538                 // TODO why don't we need to set currentPath and call openReader here?
539                 return true;
540               }
541             }
542           }
543           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
544           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
545           if (stopper instanceof ReplicationSyncUp.DummyServer) {
546             // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
547             //      area rather than to the wal area for a particular region server.
548             FileStatus[] rss = fs.listStatus(manager.getLogDir());
549             for (FileStatus rs : rss) {
550               Path p = rs.getPath();
551               FileStatus[] logs = fs.listStatus(p);
552               for (FileStatus log : logs) {
553                 p = new Path(p, log.getPath().getName());
554                 if (p.getName().equals(currentPath.getName())) {
555                   currentPath = p;
556                   LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
557                   // Open the log at the new location
558                   this.openReader(sleepMultiplier);
559                   return true;
560                 }
561               }
562             }
563           }
564 
565           // TODO What happens if the log was missing from every single location?
566           // Although we need to check a couple of times as the log could have
567           // been moved by the master between the checks
568           // It can also happen if a recovered queue wasn't properly cleaned,
569           // such that the znode pointing to a log exists but the log was
570           // deleted a long time ago.
571           // For the moment, we'll throw the IO and processEndOfFile
572           throw new IOException("File from recovered queue is " +
573               "nowhere to be found", fnfe);
574         } else {
575           // If the log was archived, continue reading from there
576           Path archivedLogLocation =
577               new Path(manager.getOldLogDir(), currentPath.getName());
578           if (this.manager.getFs().exists(archivedLogLocation)) {
579             currentPath = archivedLogLocation;
580             LOG.info("Log " + this.currentPath + " was moved to " +
581                 archivedLogLocation);
582             // Open the log at the new location
583             this.openReader(sleepMultiplier);
584 
585           }
586           // TODO What happens the log is missing in both places?
587         }
588       }
589     } catch (IOException ioe) {
590       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
591       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
592       this.reader = null;
593       if (ioe.getCause() instanceof NullPointerException) {
594         // Workaround for race condition in HDFS-4380
595         // which throws a NPE if we open a file before any data node has the most recent block
596         // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
597         LOG.warn("Got NPE opening reader, will retry.");
598       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
599         // TODO Need a better way to determine if a file is really gone but
600         // TODO without scanning all logs dir
601         LOG.warn("Waited too long for this file, considering dumping");
602         return !processEndOfFile();
603       }
604     }
605     return true;
606   }
607 
608   /*
609    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
610    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
611    * trying to read the log file and get EOFException. In case of a recovered queue the last log
612    * file may be empty, and we don't want to retry that.
613    */
614   private boolean isCurrentLogEmpty() {
615     return (this.repLogReader.getPosition() == 0 &&
616         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
617   }
618 
619   /**
620    * Do the sleeping logic
621    * @param msg Why we sleep
622    * @param sleepMultiplier by how many times the default sleeping time is augmented
623    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
624    */
625   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
626     try {
627       if (LOG.isTraceEnabled()) {
628         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
629       }
630       Thread.sleep(this.sleepForRetries * sleepMultiplier);
631     } catch (InterruptedException e) {
632       LOG.debug("Interrupted while sleeping between retries");
633       Thread.currentThread().interrupt();
634     }
635     return sleepMultiplier < maxRetriesMultiplier;
636   }
637 
638   /**
639    * Count the number of different row keys in the given edit because of
640    * mini-batching. We assume that there's at least one Cell in the WALEdit.
641    * @param edit edit to count row keys from
642    * @return number of different row keys
643    */
644   private int countDistinctRowKeys(WALEdit edit) {
645     List<Cell> cells = edit.getCells();
646     int distinctRowKeys = 1;
647     Cell lastCell = cells.get(0);
648     for (int i = 0; i < edit.size(); i++) {
649       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
650         distinctRowKeys++;
651       }
652     }
653     return distinctRowKeys;
654   }
655 
656   /**
657    * Do the shipping logic
658    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
659    * written to when this method was called
660    */
661   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
662     int sleepMultiplier = 0;
663     if (entries.isEmpty()) {
664       LOG.warn("Was given 0 edits to ship");
665       return;
666     }
667     while (this.isActive()) {
668       try {
669         if (this.throttler.isEnabled()) {
670           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
671           if (sleepTicks > 0) {
672             try {
673               if (LOG.isTraceEnabled()) {
674                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
675               }
676               Thread.sleep(sleepTicks);
677             } catch (InterruptedException e) {
678               LOG.debug("Interrupted while sleeping for throttling control");
679               Thread.currentThread().interrupt();
680               // current thread might be interrupted to terminate
681               // directly go back to while() for confirm this
682               continue;
683             }
684             // reset throttler's cycle start tick when sleep for throttling occurs
685             this.throttler.resetStartTick();
686           }
687         }
688         // create replicateContext here, so the entries can be GC'd upon return from this call stack
689         ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
690         replicateContext.setEntries(entries).setSize(currentSize);
691 
692         long startTimeNs = System.nanoTime();
693         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
694         boolean replicated = replicationEndpoint.replicate(replicateContext);
695         long endTimeNs = System.nanoTime();
696 
697         if (!replicated) {
698           continue;
699         } else {
700           sleepMultiplier = Math.max(sleepMultiplier-1, 0);
701         }
702 
703         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
704           this.manager.logPositionAndCleanOldLogs(this.currentPath,
705               this.peerClusterZnode, this.repLogReader.getPosition(),
706               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
707           this.lastLoggedPosition = this.repLogReader.getPosition();
708         }
709         if (this.throttler.isEnabled()) {
710           this.throttler.addPushSize(currentSize);
711         }
712         this.totalReplicatedEdits += entries.size();
713         this.totalReplicatedOperations += currentNbOperations;
714         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
715         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
716         if (LOG.isTraceEnabled()) {
717           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
718               + this.totalReplicatedOperations + " operations in " +
719               ((endTimeNs - startTimeNs)/1000000) + " ms");
720         }
721         break;
722       } catch (Exception ex) {
723         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
724             org.apache.hadoop.util.StringUtils.stringifyException(ex));
725         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
726           sleepMultiplier++;
727         }
728       }
729     }
730   }
731 
732   /**
733    * check whether the peer is enabled or not
734    *
735    * @return true if the peer is enabled, otherwise false
736    */
737   protected boolean isPeerEnabled() {
738     return this.replicationPeers.getStatusOfPeer(this.peerId);
739   }
740 
741   /**
742    * If the queue isn't empty, switch to the next one
743    * Else if this is a recovered queue, it means we're done!
744    * Else we'll just continue to try reading the log file
745    * @return true if we're done with the current file, false if we should
746    * continue trying to read from it
747    */
748   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
749       justification="Yeah, this is how it works")
750   protected boolean processEndOfFile() {
751     if (this.queue.size() != 0) {
752       if (LOG.isTraceEnabled()) {
753         String filesize = "N/A";
754         try {
755           FileStatus stat = this.fs.getFileStatus(this.currentPath);
756           filesize = stat.getLen()+"";
757         } catch (IOException ex) {}
758         LOG.trace("Reached the end of a log, stats: " + getStats() +
759             ", and the length of the file is " + filesize);
760       }
761       this.currentPath = null;
762       this.repLogReader.finishCurrentFile();
763       this.reader = null;
764       return true;
765     } else if (this.replicationQueueInfo.isQueueRecovered()) {
766       this.manager.closeRecoveredQueue(this);
767       LOG.info("Finished recovering the queue with the following stats " + getStats());
768       this.running = false;
769       return true;
770     }
771     return false;
772   }
773 
774   @Override
775   public void startup() {
776     String n = Thread.currentThread().getName();
777     Thread.UncaughtExceptionHandler handler =
778         new Thread.UncaughtExceptionHandler() {
779           @Override
780           public void uncaughtException(final Thread t, final Throwable e) {
781             LOG.error("Unexpected exception in ReplicationSource," +
782               " currentPath=" + currentPath, e);
783           }
784         };
785     Threads.setDaemonThreadRunning(
786         this, n + ".replicationSource," +
787         this.peerClusterZnode, handler);
788   }
789 
790   @Override
791   public void terminate(String reason) {
792     terminate(reason, null);
793   }
794 
795   @Override
796   public void terminate(String reason, Exception cause) {
797     terminate(reason, cause, true);
798   }
799 
800   public void terminate(String reason, Exception cause, boolean join) {
801     if (cause == null) {
802       LOG.info("Closing source "
803           + this.peerClusterZnode + " because: " + reason);
804 
805     } else {
806       LOG.error("Closing source " + this.peerClusterZnode
807           + " because an error occurred: " + reason, cause);
808     }
809     this.running = false;
810     this.interrupt();
811     ListenableFuture<Service.State> future = null;
812     if (this.replicationEndpoint != null) {
813       future = this.replicationEndpoint.stop();
814     }
815     if (join) {
816       Threads.shutdown(this, this.sleepForRetries);
817       if (future != null) {
818         try {
819           future.get();
820         } catch (Exception e) {
821           LOG.warn("Got exception:" + e);
822         }
823       }
824     }
825   }
826 
827   @Override
828   public String getPeerClusterZnode() {
829     return this.peerClusterZnode;
830   }
831 
832   @Override
833   public String getPeerClusterId() {
834     return this.peerId;
835   }
836 
837   @Override
838   public Path getCurrentPath() {
839     return this.currentPath;
840   }
841 
842   private boolean isActive() {
843     return !this.stopper.isStopped() && this.running && !isInterrupted();
844   }
845 
846   /**
847    * Comparator used to compare logs together based on their start time
848    */
849   public static class LogsComparator implements Comparator<Path> {
850 
851     @Override
852     public int compare(Path o1, Path o2) {
853       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
854     }
855 
856     /**
857      * Split a path to get the start time
858      * For example: 10.20.20.171%3A60020.1277499063250
859      * @param p path to split
860      * @return start time
861      */
862     private long getTS(Path p) {
863       String[] parts = p.getName().split("\\.");
864       return Long.parseLong(parts[parts.length-1]);
865     }
866   }
867 
868   @Override
869   public String getStats() {
870     long position = this.repLogReader.getPosition();
871     return "Total replicated edits: " + totalReplicatedEdits +
872       ", currently replicating from: " + this.currentPath +
873       " at position: " + position;
874   }
875 
876   /**
877    * Get Replication Source Metrics
878    * @return sourceMetrics
879    */
880   public MetricsSource getSourceMetrics() {
881     return this.metrics;
882   }
883 }