View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
45  import org.apache.hadoop.hbase.wal.WAL;
46  import org.apache.hadoop.hbase.wal.WALKey;
47  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
53  import org.apache.hadoop.hbase.replication.ReplicationQueues;
54  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
55  import org.apache.hadoop.hbase.replication.WALEntryFilter;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.Threads;
58  
59  import com.google.common.collect.Lists;
60  import com.google.common.util.concurrent.ListenableFuture;
61  import com.google.common.util.concurrent.Service;
62  
63  /**
64   * Class that handles the source of a replication stream.
65   * Currently does not handle more than 1 slave
66   * For each slave cluster it selects a random number of peers
67   * using a replication ratio. For example, if replication ration = 0.1
68   * and slave cluster has 100 region servers, 10 will be selected.
69   * <p/>
70   * A stream is considered down when we cannot contact a region server on the
71   * peer cluster for more than 55 seconds by default.
72   * <p/>
73   *
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSource extends Thread
77      implements ReplicationSourceInterface {
78  
79    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
80    // Queue of logs to process
81    private PriorityBlockingQueue<Path> queue;
82    private ReplicationQueues replicationQueues;
83    private ReplicationPeers replicationPeers;
84  
85    private Configuration conf;
86    private ReplicationQueueInfo replicationQueueInfo;
87    // id of the peer cluster this source replicates to
88    private String peerId;
89    // The manager of all sources to which we ping back our progress
90    private ReplicationSourceManager manager;
91    // Should we stop everything?
92    private Stoppable stopper;
93    // How long should we sleep for each retry
94    private long sleepForRetries;
95    // Max size in bytes of entriesArray
96    private long replicationQueueSizeCapacity;
97    // Max number of entries in entriesArray
98    private int replicationQueueNbCapacity;
99    // Our reader for the current log. open/close handled by repLogReader
100   private WAL.Reader reader;
101   // Last position in the log that we sent to ZooKeeper
102   private long lastLoggedPosition = -1;
103   // Path of the current log
104   private volatile Path currentPath;
105   private FileSystem fs;
106   // id of this cluster
107   private UUID clusterId;
108   // id of the other cluster
109   private UUID peerClusterId;
110   // total number of edits we replicated
111   private long totalReplicatedEdits = 0;
112   // total number of edits we replicated
113   private long totalReplicatedOperations = 0;
114   // The znode we currently play with
115   private String peerClusterZnode;
116   // Maximum number of retries before taking bold actions
117   private int maxRetriesMultiplier;
118   // Current number of operations (Put/Delete) that we need to replicate
119   private int currentNbOperations = 0;
120   // Current size of data we need to replicate
121   private int currentSize = 0;
122   // Indicates if this particular source is running
123   private volatile boolean running = true;
124   // Metrics for this source
125   private MetricsSource metrics;
126   // Handle on the log reader helper
127   private ReplicationWALReaderManager repLogReader;
128   //WARN threshold for the number of queued logs, defaults to 2
129   private int logQueueWarnThreshold;
130   // ReplicationEndpoint which will handle the actual replication
131   private ReplicationEndpoint replicationEndpoint;
132   // A filter (or a chain of filters) for the WAL entries.
133   private WALEntryFilter walEntryFilter;
134   // Context for ReplicationEndpoint#replicate()
135   private ReplicationEndpoint.ReplicateContext replicateContext;
136   // throttler
137   private ReplicationThrottler throttler;
138 
139   /**
140    * Instantiation method used by region servers
141    *
142    * @param conf configuration to use
143    * @param fs file system to use
144    * @param manager replication manager to ping to
145    * @param stopper     the atomic boolean to use to stop the regionserver
146    * @param peerClusterZnode the name of our znode
147    * @param clusterId unique UUID for the cluster
148    * @param replicationEndpoint the replication endpoint implementation
149    * @param metrics metrics for replication source
150    * @throws IOException
151    */
152   @Override
153   public void init(final Configuration conf, final FileSystem fs,
154       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
155       final ReplicationPeers replicationPeers, final Stoppable stopper,
156       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
157       final MetricsSource metrics)
158           throws IOException {
159     this.stopper = stopper;
160     this.conf = HBaseConfiguration.create(conf);
161     decorateConf();
162     this.replicationQueueSizeCapacity =
163         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
164     this.replicationQueueNbCapacity =
165         this.conf.getInt("replication.source.nb.capacity", 25000);
166     this.sleepForRetries =
167         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
168     this.maxRetriesMultiplier =
169         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
170     this.queue =
171         new PriorityBlockingQueue<Path>(
172             this.conf.getInt("hbase.regionserver.maxlogs", 32),
173             new LogsComparator());
174     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
175     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
176     this.replicationQueues = replicationQueues;
177     this.replicationPeers = replicationPeers;
178     this.manager = manager;
179     this.fs = fs;
180     this.metrics = metrics;
181     this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
182     this.clusterId = clusterId;
183 
184     this.peerClusterZnode = peerClusterZnode;
185     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
186     // ReplicationQueueInfo parses the peerId out of the znode for us
187     this.peerId = this.replicationQueueInfo.getPeerId();
188     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
189     this.replicationEndpoint = replicationEndpoint;
190 
191     this.replicateContext = new ReplicationEndpoint.ReplicateContext();
192   }
193 
194   private void decorateConf() {
195     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
196     if (StringUtils.isNotEmpty(replicationCodec)) {
197       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
198     }
199   }
200 
201   @Override
202   public void enqueueLog(Path log) {
203     this.queue.put(log);
204     int queueSize = queue.size();
205     this.metrics.setSizeOfLogQueue(queueSize);
206     // This will log a warning for each new log that gets created above the warn threshold
207     if (queueSize > this.logQueueWarnThreshold) {
208       LOG.warn("Queue size: " + queueSize +
209         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
210     }
211   }
212 
213   private void uninitialize() {
214     LOG.debug("Source exiting " + this.peerId);
215     metrics.clear();
216     if (replicationEndpoint.state() == Service.State.STARTING
217         || replicationEndpoint.state() == Service.State.RUNNING) {
218       replicationEndpoint.stopAndWait();
219     }
220   }
221 
222   @Override
223   public void run() {
224     // We were stopped while looping to connect to sinks, just abort
225     if (!this.isActive()) {
226       uninitialize();
227       return;
228     }
229 
230     try {
231       // start the endpoint, connect to the cluster
232       Service.State state = replicationEndpoint.start().get();
233       if (state != Service.State.RUNNING) {
234         LOG.warn("ReplicationEndpoint was not started. Exiting");
235         uninitialize();
236         return;
237       }
238     } catch (Exception ex) {
239       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
240       throw new RuntimeException(ex);
241     }
242 
243     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
244     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
245       (WALEntryFilter)new SystemTableWALEntryFilter());
246     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
247     if (filterFromEndpoint != null) {
248       filters.add(filterFromEndpoint);
249     }
250     this.walEntryFilter = new ChainWALEntryFilter(filters);
251 
252     int sleepMultiplier = 1;
253     // delay this until we are in an asynchronous thread
254     while (this.isActive() && this.peerClusterId == null) {
255       this.peerClusterId = replicationEndpoint.getPeerUUID();
256       if (this.isActive() && this.peerClusterId == null) {
257         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
258           sleepMultiplier++;
259         }
260       }
261     }
262     // We were stopped while looping to contact peer's zk ensemble, just abort
263     if (!this.isActive()) {
264       uninitialize();
265       return;
266     }
267 
268     // resetting to 1 to reuse later
269     sleepMultiplier = 1;
270 
271     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
272     // peerClusterId value, which is the same as the source clusterId
273     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
274       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
275           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
276           + replicationEndpoint.getClass().getName(), null, false);
277     }
278     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
279 
280     // If this is recovered, the queue is already full and the first log
281     // normally has a position (unless the RS failed between 2 logs)
282     if (this.replicationQueueInfo.isQueueRecovered()) {
283       try {
284         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
285           this.queue.peek().getName()));
286         if (LOG.isTraceEnabled()) {
287           LOG.trace("Recovered queue started with log " + this.queue.peek() +
288               " at position " + this.repLogReader.getPosition());
289         }
290       } catch (ReplicationException e) {
291         this.terminate("Couldn't get the position of this recovered queue " +
292             this.peerClusterZnode, e);
293       }
294     }
295     // Loop until we close down
296     while (isActive()) {
297       // Sleep until replication is enabled again
298       if (!isPeerEnabled()) {
299         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
300           sleepMultiplier++;
301         }
302         continue;
303       }
304       Path oldPath = getCurrentPath(); //note that in the current scenario,
305                                        //oldPath will be null when a log roll
306                                        //happens.
307       // Get a new path
308       boolean hasCurrentPath = getNextPath();
309       if (getCurrentPath() != null && oldPath == null) {
310         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
311       }
312       if (!hasCurrentPath) {
313         if (sleepForRetries("No log to process", sleepMultiplier)) {
314           sleepMultiplier++;
315         }
316         continue;
317       }
318       boolean currentWALisBeingWrittenTo = false;
319       //For WAL files we own (rather than recovered), take a snapshot of whether the
320       //current WAL file (this.currentPath) is in use (for writing) NOW!
321       //Since the new WAL paths are enqueued only after the prev WAL file
322       //is 'closed', presence of an element in the queue means that
323       //the previous WAL file was closed, else the file is in use (currentPath)
324       //We take the snapshot now so that we are protected against races
325       //where a new file gets enqueued while the current file is being processed
326       //(and where we just finished reading the current file).
327       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
328         currentWALisBeingWrittenTo = true;
329       }
330       // Open a reader on it
331       if (!openReader(sleepMultiplier)) {
332         // Reset the sleep multiplier, else it'd be reused for the next file
333         sleepMultiplier = 1;
334         continue;
335       }
336 
337       // If we got a null reader but didn't continue, then sleep and continue
338       if (this.reader == null) {
339         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
340           sleepMultiplier++;
341         }
342         continue;
343       }
344 
345       boolean gotIOE = false;
346       currentNbOperations = 0;
347       List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
348       currentSize = 0;
349       try {
350         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
351           continue;
352         }
353       } catch (IOException ioe) {
354         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
355         gotIOE = true;
356         if (ioe.getCause() instanceof EOFException) {
357 
358           boolean considerDumping = false;
359           if (this.replicationQueueInfo.isQueueRecovered()) {
360             try {
361               FileStatus stat = this.fs.getFileStatus(this.currentPath);
362               if (stat.getLen() == 0) {
363                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
364               }
365               considerDumping = true;
366             } catch (IOException e) {
367               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
368             }
369           }
370 
371           if (considerDumping &&
372               sleepMultiplier == this.maxRetriesMultiplier &&
373               processEndOfFile()) {
374             continue;
375           }
376         }
377       } finally {
378         try {
379           this.reader = null;
380           this.repLogReader.closeReader();
381         } catch (IOException e) {
382           gotIOE = true;
383           LOG.warn("Unable to finalize the tailing of a file", e);
384         }
385       }
386 
387       // If we didn't get anything to replicate, or if we hit a IOE,
388       // wait a bit and retry.
389       // But if we need to stop, don't bother sleeping
390       if (this.isActive() && (gotIOE || entries.isEmpty())) {
391         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
392           this.manager.logPositionAndCleanOldLogs(this.currentPath,
393               this.peerClusterZnode, this.repLogReader.getPosition(),
394               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
395           this.lastLoggedPosition = this.repLogReader.getPosition();
396         }
397         // Reset the sleep multiplier if nothing has actually gone wrong
398         if (!gotIOE) {
399           sleepMultiplier = 1;
400           // if there was nothing to ship and it's not an error
401           // set "ageOfLastShippedOp" to <now> to indicate that we're current
402           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
403         }
404         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
405           sleepMultiplier++;
406         }
407         continue;
408       }
409       sleepMultiplier = 1;
410       shipEdits(currentWALisBeingWrittenTo, entries);
411     }
412     uninitialize();
413   }
414 
415   /**
416    * Read all the entries from the current log files and retain those
417    * that need to be replicated. Else, process the end of the current file.
418    * @param currentWALisBeingWrittenTo is the current WAL being written to
419    * @param entries resulting entries to be replicated
420    * @return true if we got nothing and went to the next file, false if we got
421    * entries
422    * @throws IOException
423    */
424   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
425       List<WAL.Entry> entries) throws IOException {
426     long seenEntries = 0;
427     if (LOG.isTraceEnabled()) {
428       LOG.trace("Seeking in " + this.currentPath + " at position "
429           + this.repLogReader.getPosition());
430     }
431     this.repLogReader.seek();
432     long positionBeforeRead = this.repLogReader.getPosition();
433     WAL.Entry entry =
434         this.repLogReader.readNextAndSetPosition();
435     while (entry != null) {
436       this.metrics.incrLogEditsRead();
437       seenEntries++;
438 
439       // don't replicate if the log entries have already been consumed by the cluster
440       if (replicationEndpoint.canReplicateToSameCluster()
441           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
442         // Remove all KVs that should not be replicated
443         entry = walEntryFilter.filter(entry);
444         WALEdit edit = null;
445         WALKey logKey = null;
446         if (entry != null) {
447           edit = entry.getEdit();
448           logKey = entry.getKey();
449         }
450 
451         if (edit != null && edit.size() != 0) {
452           //Mark that the current cluster has the change
453           logKey.addClusterId(clusterId);
454           currentNbOperations += countDistinctRowKeys(edit);
455           entries.add(entry);
456           currentSize += entry.getEdit().heapSize();
457         } else {
458           this.metrics.incrLogEditsFiltered();
459         }
460       }
461       // Stop if too many entries or too big
462       if (currentSize >= this.replicationQueueSizeCapacity ||
463           entries.size() >= this.replicationQueueNbCapacity) {
464         break;
465       }
466       try {
467         entry = this.repLogReader.readNextAndSetPosition();
468       } catch (IOException ie) {
469         LOG.debug("Break on IOE: " + ie.getMessage());
470         break;
471       }
472     }
473     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
474     if (currentWALisBeingWrittenTo) {
475       return false;
476     }
477     // If we didn't get anything and the queue has an object, it means we
478     // hit the end of the file for sure
479     return seenEntries == 0 && processEndOfFile();
480   }
481 
482   /**
483    * Poll for the next path
484    * @return true if a path was obtained, false if not
485    */
486   protected boolean getNextPath() {
487     try {
488       if (this.currentPath == null) {
489         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
490         this.metrics.setSizeOfLogQueue(queue.size());
491         if (this.currentPath != null) {
492           this.manager.cleanOldLogs(this.currentPath.getName(),
493               this.peerId,
494               this.replicationQueueInfo.isQueueRecovered());
495           if (LOG.isTraceEnabled()) {
496             LOG.trace("New log: " + this.currentPath);
497           }
498         }
499       }
500     } catch (InterruptedException e) {
501       LOG.warn("Interrupted while reading edits", e);
502     }
503     return this.currentPath != null;
504   }
505 
506   /**
507    * Open a reader on the current path
508    *
509    * @param sleepMultiplier by how many times the default sleeping time is augmented
510    * @return true if we should continue with that file, false if we are over with it
511    */
512   protected boolean openReader(int sleepMultiplier) {
513     try {
514       try {
515         if (LOG.isTraceEnabled()) {
516           LOG.trace("Opening log " + this.currentPath);
517         }
518         this.reader = repLogReader.openReader(this.currentPath);
519       } catch (FileNotFoundException fnfe) {
520         if (this.replicationQueueInfo.isQueueRecovered()) {
521           // We didn't find the log in the archive directory, look if it still
522           // exists in the dead RS folder (there could be a chain of failures
523           // to look at)
524           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
525           LOG.info("NB dead servers : " + deadRegionServers.size());
526           final Path rootDir = FSUtils.getRootDir(this.conf);
527           for (String curDeadServerName : deadRegionServers) {
528             final Path deadRsDirectory = new Path(rootDir,
529                 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
530             Path[] locs = new Path[] {
531                 new Path(deadRsDirectory, currentPath.getName()),
532                 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
533                                           currentPath.getName()),
534             };
535             for (Path possibleLogLocation : locs) {
536               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
537               if (this.manager.getFs().exists(possibleLogLocation)) {
538                 // We found the right new location
539                 LOG.info("Log " + this.currentPath + " still exists at " +
540                     possibleLogLocation);
541                 // Breaking here will make us sleep since reader is null
542                 // TODO why don't we need to set currentPath and call openReader here?
543                 return true;
544               }
545             }
546           }
547           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
548           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
549           if (stopper instanceof ReplicationSyncUp.DummyServer) {
550             // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
551             //      area rather than to the wal area for a particular region server.
552             FileStatus[] rss = fs.listStatus(manager.getLogDir());
553             for (FileStatus rs : rss) {
554               Path p = rs.getPath();
555               FileStatus[] logs = fs.listStatus(p);
556               for (FileStatus log : logs) {
557                 p = new Path(p, log.getPath().getName());
558                 if (p.getName().equals(currentPath.getName())) {
559                   currentPath = p;
560                   LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
561                   // Open the log at the new location
562                   this.openReader(sleepMultiplier);
563                   return true;
564                 }
565               }
566             }
567           }
568 
569           // TODO What happens if the log was missing from every single location?
570           // Although we need to check a couple of times as the log could have
571           // been moved by the master between the checks
572           // It can also happen if a recovered queue wasn't properly cleaned,
573           // such that the znode pointing to a log exists but the log was
574           // deleted a long time ago.
575           // For the moment, we'll throw the IO and processEndOfFile
576           throw new IOException("File from recovered queue is " +
577               "nowhere to be found", fnfe);
578         } else {
579           // If the log was archived, continue reading from there
580           Path archivedLogLocation =
581               new Path(manager.getOldLogDir(), currentPath.getName());
582           if (this.manager.getFs().exists(archivedLogLocation)) {
583             currentPath = archivedLogLocation;
584             LOG.info("Log " + this.currentPath + " was moved to " +
585                 archivedLogLocation);
586             // Open the log at the new location
587             this.openReader(sleepMultiplier);
588 
589           }
590           // TODO What happens the log is missing in both places?
591         }
592       }
593     } catch (IOException ioe) {
594       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
595       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
596       this.reader = null;
597       if (ioe.getCause() instanceof NullPointerException) {
598         // Workaround for race condition in HDFS-4380
599         // which throws a NPE if we open a file before any data node has the most recent block
600         // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
601         LOG.warn("Got NPE opening reader, will retry.");
602       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
603         // TODO Need a better way to determine if a file is really gone but
604         // TODO without scanning all logs dir
605         LOG.warn("Waited too long for this file, considering dumping");
606         return !processEndOfFile();
607       }
608     }
609     return true;
610   }
611 
612   /*
613    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
614    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
615    * trying to read the log file and get EOFException. In case of a recovered queue the last log
616    * file may be empty, and we don't want to retry that.
617    */
618   private boolean isCurrentLogEmpty() {
619     return (this.repLogReader.getPosition() == 0 &&
620         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
621   }
622 
623   /**
624    * Do the sleeping logic
625    * @param msg Why we sleep
626    * @param sleepMultiplier by how many times the default sleeping time is augmented
627    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
628    */
629   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
630     try {
631       if (LOG.isTraceEnabled()) {
632         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
633       }
634       Thread.sleep(this.sleepForRetries * sleepMultiplier);
635     } catch (InterruptedException e) {
636       LOG.debug("Interrupted while sleeping between retries");
637       Thread.currentThread().interrupt();
638     }
639     return sleepMultiplier < maxRetriesMultiplier;
640   }
641 
642   /**
643    * Count the number of different row keys in the given edit because of
644    * mini-batching. We assume that there's at least one Cell in the WALEdit.
645    * @param edit edit to count row keys from
646    * @return number of different row keys
647    */
648   private int countDistinctRowKeys(WALEdit edit) {
649     List<Cell> cells = edit.getCells();
650     int distinctRowKeys = 1;
651     Cell lastCell = cells.get(0);
652     for (int i = 0; i < edit.size(); i++) {
653       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
654         distinctRowKeys++;
655       }
656     }
657     return distinctRowKeys;
658   }
659 
660   /**
661    * Do the shipping logic
662    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
663    * written to when this method was called
664    */
665   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
666     int sleepMultiplier = 1;
667     if (entries.isEmpty()) {
668       LOG.warn("Was given 0 edits to ship");
669       return;
670     }
671     while (this.isActive()) {
672       try {
673         if (this.throttler.isEnabled()) {
674           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
675           if (sleepTicks > 0) {
676             try {
677               if (LOG.isTraceEnabled()) {
678                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
679               }
680               Thread.sleep(sleepTicks);
681             } catch (InterruptedException e) {
682               LOG.debug("Interrupted while sleeping for throttling control");
683               Thread.currentThread().interrupt();
684               // current thread might be interrupted to terminate
685               // directly go back to while() for confirm this
686               continue;
687             }
688             // reset throttler's cycle start tick when sleep for throttling occurs
689             this.throttler.resetStartTick();
690           }
691         }
692         replicateContext.setEntries(entries).setSize(currentSize);
693 
694         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
695         boolean replicated = replicationEndpoint.replicate(replicateContext);
696 
697         if (!replicated) {
698           continue;
699         }
700 
701         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
702           this.manager.logPositionAndCleanOldLogs(this.currentPath,
703               this.peerClusterZnode, this.repLogReader.getPosition(),
704               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
705           this.lastLoggedPosition = this.repLogReader.getPosition();
706         }
707         if (this.throttler.isEnabled()) {
708           this.throttler.addPushSize(currentSize);
709         }
710         this.totalReplicatedEdits += entries.size();
711         this.totalReplicatedOperations += currentNbOperations;
712         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
713         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
714         if (LOG.isTraceEnabled()) {
715           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
716               + this.totalReplicatedOperations + " operations");
717         }
718         break;
719       } catch (Exception ex) {
720         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
721             org.apache.hadoop.util.StringUtils.stringifyException(ex));
722         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
723           sleepMultiplier++;
724         }
725       }
726     }
727   }
728 
729   /**
730    * check whether the peer is enabled or not
731    *
732    * @return true if the peer is enabled, otherwise false
733    */
734   protected boolean isPeerEnabled() {
735     return this.replicationPeers.getStatusOfPeer(this.peerId);
736   }
737 
738   /**
739    * If the queue isn't empty, switch to the next one
740    * Else if this is a recovered queue, it means we're done!
741    * Else we'll just continue to try reading the log file
742    * @return true if we're done with the current file, false if we should
743    * continue trying to read from it
744    */
745   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
746       justification="Yeah, this is how it works")
747   protected boolean processEndOfFile() {
748     if (this.queue.size() != 0) {
749       if (LOG.isTraceEnabled()) {
750         String filesize = "N/A";
751         try {
752           FileStatus stat = this.fs.getFileStatus(this.currentPath);
753           filesize = stat.getLen()+"";
754         } catch (IOException ex) {}
755         LOG.trace("Reached the end of a log, stats: " + getStats() +
756             ", and the length of the file is " + filesize);
757       }
758       this.currentPath = null;
759       this.repLogReader.finishCurrentFile();
760       this.reader = null;
761       return true;
762     } else if (this.replicationQueueInfo.isQueueRecovered()) {
763       this.manager.closeRecoveredQueue(this);
764       LOG.info("Finished recovering the queue with the following stats " + getStats());
765       this.running = false;
766       return true;
767     }
768     return false;
769   }
770 
771   @Override
772   public void startup() {
773     String n = Thread.currentThread().getName();
774     Thread.UncaughtExceptionHandler handler =
775         new Thread.UncaughtExceptionHandler() {
776           @Override
777           public void uncaughtException(final Thread t, final Throwable e) {
778             LOG.error("Unexpected exception in ReplicationSource," +
779               " currentPath=" + currentPath, e);
780           }
781         };
782     Threads.setDaemonThreadRunning(
783         this, n + ".replicationSource," +
784         this.peerClusterZnode, handler);
785   }
786 
787   @Override
788   public void terminate(String reason) {
789     terminate(reason, null);
790   }
791 
792   @Override
793   public void terminate(String reason, Exception cause) {
794     terminate(reason, cause, true);
795   }
796 
797   public void terminate(String reason, Exception cause, boolean join) {
798     if (cause == null) {
799       LOG.info("Closing source "
800           + this.peerClusterZnode + " because: " + reason);
801 
802     } else {
803       LOG.error("Closing source " + this.peerClusterZnode
804           + " because an error occurred: " + reason, cause);
805     }
806     this.running = false;
807     this.interrupt();
808     ListenableFuture<Service.State> future = null;
809     if (this.replicationEndpoint != null) {
810       future = this.replicationEndpoint.stop();
811     }
812     if (join) {
813       Threads.shutdown(this, this.sleepForRetries);
814       if (future != null) {
815         try {
816           future.get();
817         } catch (Exception e) {
818           LOG.warn("Got exception:" + e);
819         }
820       }
821     }
822   }
823 
824   @Override
825   public String getPeerClusterZnode() {
826     return this.peerClusterZnode;
827   }
828 
829   @Override
830   public String getPeerClusterId() {
831     return this.peerId;
832   }
833 
834   @Override
835   public Path getCurrentPath() {
836     return this.currentPath;
837   }
838 
839   private boolean isActive() {
840     return !this.stopper.isStopped() && this.running && !isInterrupted();
841   }
842 
843   /**
844    * Comparator used to compare logs together based on their start time
845    */
846   public static class LogsComparator implements Comparator<Path> {
847 
848     @Override
849     public int compare(Path o1, Path o2) {
850       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
851     }
852 
853     /**
854      * Split a path to get the start time
855      * For example: 10.20.20.171%3A60020.1277499063250
856      * @param p path to split
857      * @return start time
858      */
859     private long getTS(Path p) {
860       String[] parts = p.getName().split("\\.");
861       return Long.parseLong(parts[parts.length-1]);
862     }
863   }
864 
865   @Override
866   public String getStats() {
867     long position = this.repLogReader.getPosition();
868     return "Total replicated edits: " + totalReplicatedEdits +
869       ", currently replicating from: " + this.currentPath +
870       " at position: " + position;
871   }
872 }