View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.regionserver.wal.HLog;
45  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
46  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
48  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
49  import org.apache.hadoop.hbase.replication.ReplicationException;
50  import org.apache.hadoop.hbase.replication.ReplicationPeers;
51  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
52  import org.apache.hadoop.hbase.replication.ReplicationQueues;
53  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
54  import org.apache.hadoop.hbase.replication.WALEntryFilter;
55  import org.apache.hadoop.hbase.util.Threads;
56  
57  import com.google.common.collect.Lists;
58  import com.google.common.util.concurrent.ListenableFuture;
59  import com.google.common.util.concurrent.Service;
60  
61  /**
62   * Class that handles the source of a replication stream.
63   * Currently does not handle more than 1 slave
64   * For each slave cluster it selects a random number of peers
65   * using a replication ratio. For example, if replication ration = 0.1
66   * and slave cluster has 100 region servers, 10 will be selected.
67   * <p/>
68   * A stream is considered down when we cannot contact a region server on the
69   * peer cluster for more than 55 seconds by default.
70   * <p/>
71   *
72   */
73  @InterfaceAudience.Private
74  public class ReplicationSource extends Thread
75      implements ReplicationSourceInterface {
76  
77    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
78    // Queue of logs to process
79    private PriorityBlockingQueue<Path> queue;
80    private ReplicationQueues replicationQueues;
81    private ReplicationPeers replicationPeers;
82  
83    private Configuration conf;
84    private ReplicationQueueInfo replicationQueueInfo;
85    // id of the peer cluster this source replicates to
86    private String peerId;
87    // The manager of all sources to which we ping back our progress
88    private ReplicationSourceManager manager;
89    // Should we stop everything?
90    private Stoppable stopper;
91    // How long should we sleep for each retry
92    private long sleepForRetries;
93    // Max size in bytes of entriesArray
94    private long replicationQueueSizeCapacity;
95    // Max number of entries in entriesArray
96    private int replicationQueueNbCapacity;
97    // Our reader for the current log
98    private HLog.Reader reader;
99    // Last position in the log that we sent to ZooKeeper
100   private long lastLoggedPosition = -1;
101   // Path of the current log
102   private volatile Path currentPath;
103   private FileSystem fs;
104   // id of this cluster
105   private UUID clusterId;
106   // id of the other cluster
107   private UUID peerClusterId;
108   // total number of edits we replicated
109   private long totalReplicatedEdits = 0;
110   // total number of edits we replicated
111   private long totalReplicatedOperations = 0;
112   // The znode we currently play with
113   private String peerClusterZnode;
114   // Maximum number of retries before taking bold actions
115   private int maxRetriesMultiplier;
116   // Current number of operations (Put/Delete) that we need to replicate
117   private int currentNbOperations = 0;
118   // Current size of data we need to replicate
119   private int currentSize = 0;
120   // Indicates if this particular source is running
121   private volatile boolean running = true;
122   // Metrics for this source
123   private MetricsSource metrics;
124   // Handle on the log reader helper
125   private ReplicationHLogReaderManager repLogReader;
126   //WARN threshold for the number of queued logs, defaults to 2
127   private int logQueueWarnThreshold;
128   // ReplicationEndpoint which will handle the actual replication
129   private ReplicationEndpoint replicationEndpoint;
130   // A filter (or a chain of filters) for the WAL entries.
131   private WALEntryFilter walEntryFilter;
132   // Context for ReplicationEndpoint#replicate()
133   private ReplicationEndpoint.ReplicateContext replicateContext;
134   // throttler
135   private ReplicationThrottler throttler;
136 
137   /**
138    * Instantiation method used by region servers
139    *
140    * @param conf configuration to use
141    * @param fs file system to use
142    * @param manager replication manager to ping to
143    * @param stopper     the atomic boolean to use to stop the regionserver
144    * @param peerClusterZnode the name of our znode
145    * @param clusterId unique UUID for the cluster
146    * @param replicationEndpoint the replication endpoint implementation
147    * @param metrics metrics for replication source
148    * @throws IOException
149    */
150   @Override
151   public void init(final Configuration conf, final FileSystem fs,
152       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
153       final ReplicationPeers replicationPeers, final Stoppable stopper,
154       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
155       final MetricsSource metrics)
156           throws IOException {
157     this.stopper = stopper;
158     this.conf = HBaseConfiguration.create(conf);
159     decorateConf();
160     this.replicationQueueSizeCapacity =
161         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
162     this.replicationQueueNbCapacity =
163         this.conf.getInt("replication.source.nb.capacity", 25000);
164     this.sleepForRetries =
165         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
166     this.maxRetriesMultiplier =
167         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
168     this.queue =
169         new PriorityBlockingQueue<Path>(
170             this.conf.getInt("hbase.regionserver.maxlogs", 32),
171             new LogsComparator());
172     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
173     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
174     this.replicationQueues = replicationQueues;
175     this.replicationPeers = replicationPeers;
176     this.manager = manager;
177     this.fs = fs;
178     this.metrics = metrics;
179     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
180     this.clusterId = clusterId;
181 
182     this.peerClusterZnode = peerClusterZnode;
183     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
184     // ReplicationQueueInfo parses the peerId out of the znode for us
185     this.peerId = this.replicationQueueInfo.getPeerId();
186     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
187     this.replicationEndpoint = replicationEndpoint;
188 
189     this.replicateContext = new ReplicationEndpoint.ReplicateContext();
190   }
191 
192   private void decorateConf() {
193     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
194     if (StringUtils.isNotEmpty(replicationCodec)) {
195       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
196     }
197   }
198 
199   @Override
200   public void enqueueLog(Path log) {
201     this.queue.put(log);
202     int queueSize = queue.size();
203     this.metrics.setSizeOfLogQueue(queueSize);
204     // This will log a warning for each new log that gets created above the warn threshold
205     if (queueSize > this.logQueueWarnThreshold) {
206       LOG.warn("Queue size: " + queueSize +
207         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
208     }
209   }
210 
211   private void uninitialize() {
212     LOG.debug("Source exiting " + this.peerId);
213     metrics.clear();
214     if (replicationEndpoint.state() == Service.State.STARTING
215         || replicationEndpoint.state() == Service.State.RUNNING) {
216       replicationEndpoint.stopAndWait();
217     }
218   }
219 
220   @Override
221   public void run() {
222     // We were stopped while looping to connect to sinks, just abort
223     if (!this.isActive()) {
224       uninitialize();
225       return;
226     }
227 
228     try {
229       // start the endpoint, connect to the cluster
230       Service.State state = replicationEndpoint.start().get();
231       if (state != Service.State.RUNNING) {
232         LOG.warn("ReplicationEndpoint was not started. Exiting");
233         uninitialize();
234         return;
235       }
236     } catch (Exception ex) {
237       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
238       throw new RuntimeException(ex);
239     }
240 
241     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
242     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
243       (WALEntryFilter)new SystemTableWALEntryFilter());
244     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
245     if (filterFromEndpoint != null) {
246       filters.add(filterFromEndpoint);
247     }
248     this.walEntryFilter = new ChainWALEntryFilter(filters);
249 
250     int sleepMultiplier = 1;
251     // delay this until we are in an asynchronous thread
252     while (this.isActive() && this.peerClusterId == null) {
253       this.peerClusterId = replicationEndpoint.getPeerUUID();
254       if (this.isActive() && this.peerClusterId == null) {
255         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
256           sleepMultiplier++;
257         }
258       }
259     }
260     // We were stopped while looping to contact peer's zk ensemble, just abort
261     if (!this.isActive()) {
262       uninitialize();
263       return;
264     }
265 
266     // resetting to 1 to reuse later
267     sleepMultiplier = 1;
268 
269     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
270     // peerClusterId value, which is the same as the source clusterId
271     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
272       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
273           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
274           + replicationEndpoint.getClass().getName(), null, false);
275     }
276     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
277 
278     // If this is recovered, the queue is already full and the first log
279     // normally has a position (unless the RS failed between 2 logs)
280     if (this.replicationQueueInfo.isQueueRecovered()) {
281       try {
282         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
283           this.queue.peek().getName()));
284         if (LOG.isTraceEnabled()) {
285           LOG.trace("Recovered queue started with log " + this.queue.peek() +
286               " at position " + this.repLogReader.getPosition());
287         }
288       } catch (ReplicationException e) {
289         this.terminate("Couldn't get the position of this recovered queue " +
290             this.peerClusterZnode, e);
291       }
292     }
293     // Loop until we close down
294     while (isActive()) {
295       // Sleep until replication is enabled again
296       if (!isPeerEnabled()) {
297         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
298           sleepMultiplier++;
299         }
300         continue;
301       }
302       Path oldPath = getCurrentPath(); //note that in the current scenario,
303                                        //oldPath will be null when a log roll
304                                        //happens.
305       // Get a new path
306       boolean hasCurrentPath = getNextPath();
307       if (getCurrentPath() != null && oldPath == null) {
308         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
309       }
310       if (!hasCurrentPath) {
311         if (sleepForRetries("No log to process", sleepMultiplier)) {
312           sleepMultiplier++;
313         }
314         continue;
315       }
316       boolean currentWALisBeingWrittenTo = false;
317       //For WAL files we own (rather than recovered), take a snapshot of whether the
318       //current WAL file (this.currentPath) is in use (for writing) NOW!
319       //Since the new WAL paths are enqueued only after the prev WAL file
320       //is 'closed', presence of an element in the queue means that
321       //the previous WAL file was closed, else the file is in use (currentPath)
322       //We take the snapshot now so that we are protected against races
323       //where a new file gets enqueued while the current file is being processed
324       //(and where we just finished reading the current file).
325       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
326         currentWALisBeingWrittenTo = true;
327       }
328       // Open a reader on it
329       if (!openReader(sleepMultiplier)) {
330         // Reset the sleep multiplier, else it'd be reused for the next file
331         sleepMultiplier = 1;
332         continue;
333       }
334 
335       // If we got a null reader but didn't continue, then sleep and continue
336       if (this.reader == null) {
337         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
338           sleepMultiplier++;
339         }
340         continue;
341       }
342 
343       boolean gotIOE = false;
344       currentNbOperations = 0;
345       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
346       currentSize = 0;
347       try {
348         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
349           continue;
350         }
351       } catch (IOException ioe) {
352         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
353         gotIOE = true;
354         if (ioe.getCause() instanceof EOFException) {
355 
356           boolean considerDumping = false;
357           if (this.replicationQueueInfo.isQueueRecovered()) {
358             try {
359               FileStatus stat = this.fs.getFileStatus(this.currentPath);
360               if (stat.getLen() == 0) {
361                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
362               }
363               considerDumping = true;
364             } catch (IOException e) {
365               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
366             }
367           }
368 
369           if (considerDumping &&
370               sleepMultiplier == this.maxRetriesMultiplier &&
371               processEndOfFile()) {
372             continue;
373           }
374         }
375       } finally {
376         try {
377           this.reader = null;
378           this.repLogReader.closeReader();
379         } catch (IOException e) {
380           gotIOE = true;
381           LOG.warn("Unable to finalize the tailing of a file", e);
382         }
383       }
384 
385       // If we didn't get anything to replicate, or if we hit a IOE,
386       // wait a bit and retry.
387       // But if we need to stop, don't bother sleeping
388       if (this.isActive() && (gotIOE || entries.isEmpty())) {
389         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
390           this.manager.logPositionAndCleanOldLogs(this.currentPath,
391               this.peerClusterZnode, this.repLogReader.getPosition(),
392               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
393           this.lastLoggedPosition = this.repLogReader.getPosition();
394         }
395         // Reset the sleep multiplier if nothing has actually gone wrong
396         if (!gotIOE) {
397           sleepMultiplier = 1;
398           // if there was nothing to ship and it's not an error
399           // set "ageOfLastShippedOp" to <now> to indicate that we're current
400           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
401         }
402         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
403           sleepMultiplier++;
404         }
405         continue;
406       }
407       sleepMultiplier = 1;
408       shipEdits(currentWALisBeingWrittenTo, entries);
409     }
410     uninitialize();
411   }
412 
413   /**
414    * Read all the entries from the current log files and retain those
415    * that need to be replicated. Else, process the end of the current file.
416    * @param currentWALisBeingWrittenTo is the current WAL being written to
417    * @param entries resulting entries to be replicated
418    * @return true if we got nothing and went to the next file, false if we got
419    * entries
420    * @throws IOException
421    */
422   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
423       List<HLog.Entry> entries) throws IOException{
424     long seenEntries = 0;
425     if (LOG.isTraceEnabled()) {
426       LOG.trace("Seeking in " + this.currentPath + " at position "
427           + this.repLogReader.getPosition());
428     }
429     this.repLogReader.seek();
430     long positionBeforeRead = this.repLogReader.getPosition();
431     HLog.Entry entry =
432         this.repLogReader.readNextAndSetPosition();
433     while (entry != null) {
434       this.metrics.incrLogEditsRead();
435       seenEntries++;
436 
437       // don't replicate if the log entries have already been consumed by the cluster
438       if (replicationEndpoint.canReplicateToSameCluster()
439           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
440         // Remove all KVs that should not be replicated
441         entry = walEntryFilter.filter(entry);
442         WALEdit edit = null;
443         HLogKey logKey = null;
444         if (entry != null) {
445           edit = entry.getEdit();
446           logKey = entry.getKey();
447         }
448 
449         if (edit != null && edit.size() != 0) {
450           //Mark that the current cluster has the change
451           logKey.addClusterId(clusterId);
452           currentNbOperations += countDistinctRowKeys(edit);
453           entries.add(entry);
454           currentSize += entry.getEdit().heapSize();
455         } else {
456           this.metrics.incrLogEditsFiltered();
457         }
458       }
459       // Stop if too many entries or too big
460       if (currentSize >= this.replicationQueueSizeCapacity ||
461           entries.size() >= this.replicationQueueNbCapacity) {
462         break;
463       }
464       try {
465         entry = this.repLogReader.readNextAndSetPosition();
466       } catch (IOException ie) {
467         LOG.debug("Break on IOE: " + ie.getMessage());
468         break;
469       }
470     }
471     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
472     if (currentWALisBeingWrittenTo) {
473       return false;
474     }
475     // If we didn't get anything and the queue has an object, it means we
476     // hit the end of the file for sure
477     return seenEntries == 0 && processEndOfFile();
478   }
479 
480   /**
481    * Poll for the next path
482    * @return true if a path was obtained, false if not
483    */
484   protected boolean getNextPath() {
485     try {
486       if (this.currentPath == null) {
487         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
488         this.metrics.setSizeOfLogQueue(queue.size());
489         if (this.currentPath != null) {
490           this.manager.cleanOldLogs(this.currentPath.getName(),
491               this.peerId,
492               this.replicationQueueInfo.isQueueRecovered());
493           if (LOG.isTraceEnabled()) {
494             LOG.trace("New log: " + this.currentPath);
495           }
496         }
497       }
498     } catch (InterruptedException e) {
499       LOG.warn("Interrupted while reading edits", e);
500     }
501     return this.currentPath != null;
502   }
503 
504   /**
505    * Open a reader on the current path
506    *
507    * @param sleepMultiplier by how many times the default sleeping time is augmented
508    * @return true if we should continue with that file, false if we are over with it
509    */
510   protected boolean openReader(int sleepMultiplier) {
511     try {
512       try {
513         if (LOG.isTraceEnabled()) {
514           LOG.trace("Opening log " + this.currentPath);
515         }
516         this.reader = repLogReader.openReader(this.currentPath);
517       } catch (FileNotFoundException fnfe) {
518         if (this.replicationQueueInfo.isQueueRecovered()) {
519           // We didn't find the log in the archive directory, look if it still
520           // exists in the dead RS folder (there could be a chain of failures
521           // to look at)
522           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
523           LOG.info("NB dead servers : " + deadRegionServers.size());
524           for (String curDeadServerName : deadRegionServers) {
525             Path deadRsDirectory =
526                 new Path(manager.getLogDir().getParent(), curDeadServerName);
527             Path[] locs = new Path[] {
528                 new Path(deadRsDirectory, currentPath.getName()),
529                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
530                                           currentPath.getName()),
531             };
532             for (Path possibleLogLocation : locs) {
533               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
534               if (this.manager.getFs().exists(possibleLogLocation)) {
535                 // We found the right new location
536                 LOG.info("Log " + this.currentPath + " still exists at " +
537                     possibleLogLocation);
538                 // Breaking here will make us sleep since reader is null
539                 return true;
540               }
541             }
542           }
543           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
544           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
545           if (stopper instanceof ReplicationSyncUp.DummyServer) {
546             FileStatus[] rss = fs.listStatus(manager.getLogDir());
547             for (FileStatus rs : rss) {
548               Path p = rs.getPath();
549               FileStatus[] logs = fs.listStatus(p);
550               for (FileStatus log : logs) {
551                 p = new Path(p, log.getPath().getName());
552                 if (p.getName().equals(currentPath.getName())) {
553                   currentPath = p;
554                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
555                   // Open the log at the new location
556                   this.openReader(sleepMultiplier);
557                   return true;
558                 }
559               }
560             }
561           }
562 
563           // TODO What happens if the log was missing from every single location?
564           // Although we need to check a couple of times as the log could have
565           // been moved by the master between the checks
566           // It can also happen if a recovered queue wasn't properly cleaned,
567           // such that the znode pointing to a log exists but the log was
568           // deleted a long time ago.
569           // For the moment, we'll throw the IO and processEndOfFile
570           throw new IOException("File from recovered queue is " +
571               "nowhere to be found", fnfe);
572         } else {
573           // If the log was archived, continue reading from there
574           Path archivedLogLocation =
575               new Path(manager.getOldLogDir(), currentPath.getName());
576           if (this.manager.getFs().exists(archivedLogLocation)) {
577             currentPath = archivedLogLocation;
578             LOG.info("Log " + this.currentPath + " was moved to " +
579                 archivedLogLocation);
580             // Open the log at the new location
581             this.openReader(sleepMultiplier);
582 
583           }
584           // TODO What happens the log is missing in both places?
585         }
586       }
587     } catch (IOException ioe) {
588       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
589       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
590       this.reader = null;
591       if (ioe.getCause() instanceof NullPointerException) {
592         // Workaround for race condition in HDFS-4380
593         // which throws a NPE if we open a file before any data node has the most recent block
594         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
595         LOG.warn("Got NPE opening reader, will retry.");
596       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
597         // TODO Need a better way to determine if a file is really gone but
598         // TODO without scanning all logs dir
599         LOG.warn("Waited too long for this file, considering dumping");
600         return !processEndOfFile();
601       }
602     }
603     return true;
604   }
605 
606   /*
607    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
608    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
609    * trying to read the log file and get EOFException. In case of a recovered queue the last log
610    * file may be empty, and we don't want to retry that.
611    */
612   private boolean isCurrentLogEmpty() {
613     return (this.repLogReader.getPosition() == 0 &&
614         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
615   }
616 
617   /**
618    * Do the sleeping logic
619    * @param msg Why we sleep
620    * @param sleepMultiplier by how many times the default sleeping time is augmented
621    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
622    */
623   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
624     try {
625       if (LOG.isTraceEnabled()) {
626         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
627       }
628       Thread.sleep(this.sleepForRetries * sleepMultiplier);
629     } catch (InterruptedException e) {
630       LOG.debug("Interrupted while sleeping between retries");
631       Thread.currentThread().interrupt();
632     }
633     return sleepMultiplier < maxRetriesMultiplier;
634   }
635 
636   /**
637    * Count the number of different row keys in the given edit because of
638    * mini-batching. We assume that there's at least one Cell in the WALEdit.
639    * @param edit edit to count row keys from
640    * @return number of different row keys
641    */
642   private int countDistinctRowKeys(WALEdit edit) {
643     List<Cell> cells = edit.getCells();
644     int distinctRowKeys = 1;
645     Cell lastCell = cells.get(0);
646     for (int i = 0; i < edit.size(); i++) {
647       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
648         distinctRowKeys++;
649       }
650     }
651     return distinctRowKeys;
652   }
653 
654   /**
655    * Do the shipping logic
656    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
657    * written to when this method was called
658    */
659   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
660     int sleepMultiplier = 1;
661     if (entries.isEmpty()) {
662       LOG.warn("Was given 0 edits to ship");
663       return;
664     }
665     while (this.isActive()) {
666       try {
667         if (this.throttler.isEnabled()) {
668           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
669           if (sleepTicks > 0) {
670             try {
671               if (LOG.isTraceEnabled()) {
672                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
673               }
674               Thread.sleep(sleepTicks);
675             } catch (InterruptedException e) {
676               LOG.debug("Interrupted while sleeping for throttling control");
677               Thread.currentThread().interrupt();
678               // current thread might be interrupted to terminate
679               // directly go back to while() for confirm this
680               continue;
681             }
682             // reset throttler's cycle start tick when sleep for throttling occurs
683             this.throttler.resetStartTick();
684           }
685         }
686         replicateContext.setEntries(entries).setSize(currentSize);
687 
688         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
689         boolean replicated = replicationEndpoint.replicate(replicateContext);
690 
691         if (!replicated) {
692           continue;
693         }
694 
695         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
696           this.manager.logPositionAndCleanOldLogs(this.currentPath,
697               this.peerClusterZnode, this.repLogReader.getPosition(),
698               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
699           this.lastLoggedPosition = this.repLogReader.getPosition();
700         }
701         if (this.throttler.isEnabled()) {
702           this.throttler.addPushSize(currentSize);
703         }
704         this.totalReplicatedEdits += entries.size();
705         this.totalReplicatedOperations += currentNbOperations;
706         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
707         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
708         if (LOG.isTraceEnabled()) {
709           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
710               + this.totalReplicatedOperations + " operations");
711         }
712         break;
713       } catch (Exception ex) {
714         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
715             org.apache.hadoop.util.StringUtils.stringifyException(ex));
716         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
717           sleepMultiplier++;
718         }
719       }
720     }
721   }
722 
723   /**
724    * check whether the peer is enabled or not
725    *
726    * @return true if the peer is enabled, otherwise false
727    */
728   protected boolean isPeerEnabled() {
729     return this.replicationPeers.getStatusOfPeer(this.peerId);
730   }
731 
732   /**
733    * If the queue isn't empty, switch to the next one
734    * Else if this is a recovered queue, it means we're done!
735    * Else we'll just continue to try reading the log file
736    * @return true if we're done with the current file, false if we should
737    * continue trying to read from it
738    */
739   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
740       justification="Yeah, this is how it works")
741   protected boolean processEndOfFile() {
742     if (this.queue.size() != 0) {
743       if (LOG.isTraceEnabled()) {
744         String filesize = "N/A";
745         try {
746           FileStatus stat = this.fs.getFileStatus(this.currentPath);
747           filesize = stat.getLen()+"";
748         } catch (IOException ex) {}
749         LOG.trace("Reached the end of a log, stats: " + getStats() +
750             ", and the length of the file is " + filesize);
751       }
752       this.currentPath = null;
753       this.repLogReader.finishCurrentFile();
754       this.reader = null;
755       return true;
756     } else if (this.replicationQueueInfo.isQueueRecovered()) {
757       this.manager.closeRecoveredQueue(this);
758       LOG.info("Finished recovering the queue with the following stats " + getStats());
759       this.running = false;
760       return true;
761     }
762     return false;
763   }
764 
765   @Override
766   public void startup() {
767     String n = Thread.currentThread().getName();
768     Thread.UncaughtExceptionHandler handler =
769         new Thread.UncaughtExceptionHandler() {
770           @Override
771           public void uncaughtException(final Thread t, final Throwable e) {
772             LOG.error("Unexpected exception in ReplicationSource," +
773               " currentPath=" + currentPath, e);
774           }
775         };
776     Threads.setDaemonThreadRunning(
777         this, n + ".replicationSource," +
778         this.peerClusterZnode, handler);
779   }
780 
781   @Override
782   public void terminate(String reason) {
783     terminate(reason, null);
784   }
785 
786   @Override
787   public void terminate(String reason, Exception cause) {
788     terminate(reason, cause, true);
789   }
790 
791   public void terminate(String reason, Exception cause, boolean join) {
792     if (cause == null) {
793       LOG.info("Closing source "
794           + this.peerClusterZnode + " because: " + reason);
795 
796     } else {
797       LOG.error("Closing source " + this.peerClusterZnode
798           + " because an error occurred: " + reason, cause);
799     }
800     this.running = false;
801     this.interrupt();
802     ListenableFuture<Service.State> future = null;
803     if (this.replicationEndpoint != null) {
804       future = this.replicationEndpoint.stop();
805     }
806     if (join) {
807       Threads.shutdown(this, this.sleepForRetries);
808       if (future != null) {
809         try {
810           future.get();
811         } catch (Exception e) {
812           LOG.warn("Got exception:" + e);
813         }
814       }
815     }
816   }
817 
818   @Override
819   public String getPeerClusterZnode() {
820     return this.peerClusterZnode;
821   }
822 
823   @Override
824   public String getPeerClusterId() {
825     return this.peerId;
826   }
827 
828   @Override
829   public Path getCurrentPath() {
830     return this.currentPath;
831   }
832 
833   private boolean isActive() {
834     return !this.stopper.isStopped() && this.running && !isInterrupted();
835   }
836 
837   /**
838    * Comparator used to compare logs together based on their start time
839    */
840   public static class LogsComparator implements Comparator<Path> {
841 
842     @Override
843     public int compare(Path o1, Path o2) {
844       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
845     }
846 
847     /**
848      * Split a path to get the start time
849      * For example: 10.20.20.171%3A60020.1277499063250
850      * @param p path to split
851      * @return start time
852      */
853     private long getTS(Path p) {
854       String[] parts = p.getName().split("\\.");
855       return Long.parseLong(parts[parts.length-1]);
856     }
857   }
858 
859   @Override
860   public String getStats() {
861     long position = this.repLogReader.getPosition();
862     return "Total replicated edits: " + totalReplicatedEdits +
863       ", currently replicating from: " + this.currentPath +
864       " at position: " + position;
865   }
866 }