View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
45  import org.apache.hadoop.hbase.wal.WAL;
46  import org.apache.hadoop.hbase.wal.WALKey;
47  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
53  import org.apache.hadoop.hbase.replication.ReplicationQueues;
54  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
55  import org.apache.hadoop.hbase.replication.WALEntryFilter;
56  import org.apache.hadoop.hbase.util.CancelableProgressable;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.FSUtils;
59  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
60  import org.apache.hadoop.hbase.util.Threads;
61  
62  import com.google.common.collect.Lists;
63  import com.google.common.util.concurrent.ListenableFuture;
64  import com.google.common.util.concurrent.Service;
65  
66  /**
67   * Class that handles the source of a replication stream.
68   * Currently does not handle more than 1 slave
69   * For each slave cluster it selects a random number of peers
70   * using a replication ratio. For example, if replication ration = 0.1
71   * and slave cluster has 100 region servers, 10 will be selected.
72   * <p/>
73   * A stream is considered down when we cannot contact a region server on the
74   * peer cluster for more than 55 seconds by default.
75   * <p/>
76   *
77   */
78  @InterfaceAudience.Private
79  public class ReplicationSource extends Thread
80      implements ReplicationSourceInterface {
81  
82    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
83    // Queue of logs to process
84    private PriorityBlockingQueue<Path> queue;
85    private ReplicationQueues replicationQueues;
86    private ReplicationPeers replicationPeers;
87  
88    private Configuration conf;
89    private ReplicationQueueInfo replicationQueueInfo;
90    // id of the peer cluster this source replicates to
91    private String peerId;
92    // The manager of all sources to which we ping back our progress
93    private ReplicationSourceManager manager;
94    // Should we stop everything?
95    private Stoppable stopper;
96    // How long should we sleep for each retry
97    private long sleepForRetries;
98    // Max size in bytes of entriesArray
99    private long replicationQueueSizeCapacity;
100   // Max number of entries in entriesArray
101   private int replicationQueueNbCapacity;
102   // Our reader for the current log. open/close handled by repLogReader
103   private WAL.Reader reader;
104   // Last position in the log that we sent to ZooKeeper
105   private long lastLoggedPosition = -1;
106   // Path of the current log
107   private volatile Path currentPath;
108   private FileSystem fs;
109   // id of this cluster
110   private UUID clusterId;
111   // id of the other cluster
112   private UUID peerClusterId;
113   // total number of edits we replicated
114   private long totalReplicatedEdits = 0;
115   // total number of edits we replicated
116   private long totalReplicatedOperations = 0;
117   // The znode we currently play with
118   private String peerClusterZnode;
119   // Maximum number of retries before taking bold actions
120   private int maxRetriesMultiplier;
121   // Current number of operations (Put/Delete) that we need to replicate
122   private int currentNbOperations = 0;
123   // Current size of data we need to replicate
124   private int currentSize = 0;
125   // Indicates if this particular source is running
126   private volatile boolean running = true;
127   // Metrics for this source
128   private MetricsSource metrics;
129   // Handle on the log reader helper
130   private ReplicationWALReaderManager repLogReader;
131   //WARN threshold for the number of queued logs, defaults to 2
132   private int logQueueWarnThreshold;
133   // ReplicationEndpoint which will handle the actual replication
134   private ReplicationEndpoint replicationEndpoint;
135   // A filter (or a chain of filters) for the WAL entries.
136   private WALEntryFilter walEntryFilter;
137   // throttler
138   private ReplicationThrottler throttler;
139 
140   /**
141    * Instantiation method used by region servers
142    *
143    * @param conf configuration to use
144    * @param fs file system to use
145    * @param manager replication manager to ping to
146    * @param stopper     the atomic boolean to use to stop the regionserver
147    * @param peerClusterZnode the name of our znode
148    * @param clusterId unique UUID for the cluster
149    * @param replicationEndpoint the replication endpoint implementation
150    * @param metrics metrics for replication source
151    * @throws IOException
152    */
153   @Override
154   public void init(final Configuration conf, final FileSystem fs,
155       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
156       final ReplicationPeers replicationPeers, final Stoppable stopper,
157       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
158       final MetricsSource metrics)
159           throws IOException {
160     this.stopper = stopper;
161     this.conf = HBaseConfiguration.create(conf);
162     decorateConf();
163     this.replicationQueueSizeCapacity =
164         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
165     this.replicationQueueNbCapacity =
166         this.conf.getInt("replication.source.nb.capacity", 25000);
167     this.sleepForRetries =
168         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
169     this.maxRetriesMultiplier =
170         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
171     this.queue =
172         new PriorityBlockingQueue<Path>(
173             this.conf.getInt("hbase.regionserver.maxlogs", 32),
174             new LogsComparator());
175     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
176     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
177     this.replicationQueues = replicationQueues;
178     this.replicationPeers = replicationPeers;
179     this.manager = manager;
180     this.fs = fs;
181     this.metrics = metrics;
182     this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
183     this.clusterId = clusterId;
184 
185     this.peerClusterZnode = peerClusterZnode;
186     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
187     // ReplicationQueueInfo parses the peerId out of the znode for us
188     this.peerId = this.replicationQueueInfo.getPeerId();
189     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
190     this.replicationEndpoint = replicationEndpoint;
191   }
192 
193   private void decorateConf() {
194     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
195     if (StringUtils.isNotEmpty(replicationCodec)) {
196       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
197     }
198   }
199 
200   @Override
201   public void enqueueLog(Path log) {
202     this.queue.put(log);
203     int queueSize = queue.size();
204     this.metrics.setSizeOfLogQueue(queueSize);
205     // This will log a warning for each new log that gets created above the warn threshold
206     if (queueSize > this.logQueueWarnThreshold) {
207       LOG.warn("Queue size: " + queueSize +
208         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
209     }
210   }
211 
212   private void uninitialize() {
213     LOG.debug("Source exiting " + this.peerId);
214     metrics.clear();
215     if (replicationEndpoint.state() == Service.State.STARTING
216         || replicationEndpoint.state() == Service.State.RUNNING) {
217       replicationEndpoint.stopAndWait();
218     }
219   }
220 
221   @Override
222   public void run() {
223     // We were stopped while looping to connect to sinks, just abort
224     if (!this.isActive()) {
225       uninitialize();
226       return;
227     }
228 
229     try {
230       // start the endpoint, connect to the cluster
231       Service.State state = replicationEndpoint.start().get();
232       if (state != Service.State.RUNNING) {
233         LOG.warn("ReplicationEndpoint was not started. Exiting");
234         uninitialize();
235         return;
236       }
237     } catch (Exception ex) {
238       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
239       throw new RuntimeException(ex);
240     }
241 
242     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
243     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
244       (WALEntryFilter)new SystemTableWALEntryFilter());
245     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
246     if (filterFromEndpoint != null) {
247       filters.add(filterFromEndpoint);
248     }
249     this.walEntryFilter = new ChainWALEntryFilter(filters);
250 
251     int sleepMultiplier = 1;
252     // delay this until we are in an asynchronous thread
253     while (this.isActive() && this.peerClusterId == null) {
254       this.peerClusterId = replicationEndpoint.getPeerUUID();
255       if (this.isActive() && this.peerClusterId == null) {
256         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
257           sleepMultiplier++;
258         }
259       }
260     }
261     // We were stopped while looping to contact peer's zk ensemble, just abort
262     if (!this.isActive()) {
263       uninitialize();
264       return;
265     }
266 
267     // resetting to 1 to reuse later
268     sleepMultiplier = 1;
269 
270     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
271     // peerClusterId value, which is the same as the source clusterId
272     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
273       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
274           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
275           + replicationEndpoint.getClass().getName(), null, false);
276     }
277     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
278 
279     // If this is recovered, the queue is already full and the first log
280     // normally has a position (unless the RS failed between 2 logs)
281     if (this.replicationQueueInfo.isQueueRecovered()) {
282       try {
283         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
284           this.queue.peek().getName()));
285         if (LOG.isTraceEnabled()) {
286           LOG.trace("Recovered queue started with log " + this.queue.peek() +
287               " at position " + this.repLogReader.getPosition());
288         }
289       } catch (ReplicationException e) {
290         this.terminate("Couldn't get the position of this recovered queue " +
291             this.peerClusterZnode, e);
292       }
293     }
294     // Loop until we close down
295     while (isActive()) {
296       // Sleep until replication is enabled again
297       if (!isPeerEnabled()) {
298         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
299           sleepMultiplier++;
300         }
301         continue;
302       }
303       Path oldPath = getCurrentPath(); //note that in the current scenario,
304                                        //oldPath will be null when a log roll
305                                        //happens.
306       // Get a new path
307       boolean hasCurrentPath = getNextPath();
308       if (getCurrentPath() != null && oldPath == null) {
309         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
310       }
311       if (!hasCurrentPath) {
312         if (sleepForRetries("No log to process", sleepMultiplier)) {
313           sleepMultiplier++;
314         }
315         continue;
316       }
317       boolean currentWALisBeingWrittenTo = false;
318       //For WAL files we own (rather than recovered), take a snapshot of whether the
319       //current WAL file (this.currentPath) is in use (for writing) NOW!
320       //Since the new WAL paths are enqueued only after the prev WAL file
321       //is 'closed', presence of an element in the queue means that
322       //the previous WAL file was closed, else the file is in use (currentPath)
323       //We take the snapshot now so that we are protected against races
324       //where a new file gets enqueued while the current file is being processed
325       //(and where we just finished reading the current file).
326       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
327         currentWALisBeingWrittenTo = true;
328       }
329       // Open a reader on it
330       if (!openReader(sleepMultiplier)) {
331         // Reset the sleep multiplier, else it'd be reused for the next file
332         sleepMultiplier = 1;
333         continue;
334       }
335 
336       // If we got a null reader but didn't continue, then sleep and continue
337       if (this.reader == null) {
338         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
339           sleepMultiplier++;
340         }
341         continue;
342       }
343 
344       boolean gotIOE = false;
345       currentNbOperations = 0;
346       List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
347       currentSize = 0;
348       try {
349         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
350           continue;
351         }
352       } catch (IOException ioe) {
353         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
354         gotIOE = true;
355         if (ioe.getCause() instanceof EOFException) {
356 
357           boolean considerDumping = false;
358           if (this.replicationQueueInfo.isQueueRecovered()) {
359             try {
360               FileStatus stat = this.fs.getFileStatus(this.currentPath);
361               if (stat.getLen() == 0) {
362                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
363               }
364               considerDumping = true;
365             } catch (IOException e) {
366               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
367             }
368           }
369 
370           if (considerDumping &&
371               sleepMultiplier == this.maxRetriesMultiplier &&
372               processEndOfFile()) {
373             continue;
374           }
375         }
376       } finally {
377         try {
378           this.reader = null;
379           this.repLogReader.closeReader();
380         } catch (IOException e) {
381           gotIOE = true;
382           LOG.warn("Unable to finalize the tailing of a file", e);
383         }
384       }
385 
386       // If we didn't get anything to replicate, or if we hit a IOE,
387       // wait a bit and retry.
388       // But if we need to stop, don't bother sleeping
389       if (this.isActive() && (gotIOE || entries.isEmpty())) {
390         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
391           this.manager.logPositionAndCleanOldLogs(this.currentPath,
392               this.peerClusterZnode, this.repLogReader.getPosition(),
393               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
394           this.lastLoggedPosition = this.repLogReader.getPosition();
395         }
396         // Reset the sleep multiplier if nothing has actually gone wrong
397         if (!gotIOE) {
398           sleepMultiplier = 1;
399           // if there was nothing to ship and it's not an error
400           // set "ageOfLastShippedOp" to <now> to indicate that we're current
401           this.metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime());
402         }
403         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
404           sleepMultiplier++;
405         }
406         continue;
407       }
408       sleepMultiplier = 1;
409       shipEdits(currentWALisBeingWrittenTo, entries);
410     }
411     uninitialize();
412   }
413 
414   /**
415    * Read all the entries from the current log files and retain those
416    * that need to be replicated. Else, process the end of the current file.
417    * @param currentWALisBeingWrittenTo is the current WAL being written to
418    * @param entries resulting entries to be replicated
419    * @return true if we got nothing and went to the next file, false if we got
420    * entries
421    * @throws IOException
422    */
423   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
424       List<WAL.Entry> entries) throws IOException {
425     long seenEntries = 0;
426     if (LOG.isTraceEnabled()) {
427       LOG.trace("Seeking in " + this.currentPath + " at position "
428           + this.repLogReader.getPosition());
429     }
430     this.repLogReader.seek();
431     long positionBeforeRead = this.repLogReader.getPosition();
432     WAL.Entry entry =
433         this.repLogReader.readNextAndSetPosition();
434     while (entry != null) {
435       this.metrics.incrLogEditsRead();
436       seenEntries++;
437 
438       // don't replicate if the log entries have already been consumed by the cluster
439       if (replicationEndpoint.canReplicateToSameCluster()
440           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
441         // Remove all KVs that should not be replicated
442         entry = walEntryFilter.filter(entry);
443         WALEdit edit = null;
444         WALKey logKey = null;
445         if (entry != null) {
446           edit = entry.getEdit();
447           logKey = entry.getKey();
448         }
449 
450         if (edit != null && edit.size() != 0) {
451           //Mark that the current cluster has the change
452           logKey.addClusterId(clusterId);
453           currentNbOperations += countDistinctRowKeys(edit);
454           entries.add(entry);
455           currentSize += entry.getEdit().heapSize();
456         } else {
457           this.metrics.incrLogEditsFiltered();
458         }
459       }
460       // Stop if too many entries or too big
461       if (currentSize >= this.replicationQueueSizeCapacity ||
462           entries.size() >= this.replicationQueueNbCapacity) {
463         break;
464       }
465       try {
466         entry = this.repLogReader.readNextAndSetPosition();
467       } catch (IOException ie) {
468         LOG.debug("Break on IOE: " + ie.getMessage());
469         break;
470       }
471     }
472     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
473     if (currentWALisBeingWrittenTo) {
474       return false;
475     }
476     // If we didn't get anything and the queue has an object, it means we
477     // hit the end of the file for sure
478     return seenEntries == 0 && processEndOfFile();
479   }
480 
481   /**
482    * Poll for the next path
483    * @return true if a path was obtained, false if not
484    */
485   protected boolean getNextPath() {
486     try {
487       if (this.currentPath == null) {
488         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
489         this.metrics.setSizeOfLogQueue(queue.size());
490         if (this.currentPath != null) {
491           this.manager.cleanOldLogs(this.currentPath.getName(),
492               this.peerId,
493               this.replicationQueueInfo.isQueueRecovered());
494           if (LOG.isTraceEnabled()) {
495             LOG.trace("New log: " + this.currentPath);
496           }
497         }
498       }
499     } catch (InterruptedException e) {
500       LOG.warn("Interrupted while reading edits", e);
501     }
502     return this.currentPath != null;
503   }
504 
505   /**
506    * Open a reader on the current path
507    *
508    * @param sleepMultiplier by how many times the default sleeping time is augmented
509    * @return true if we should continue with that file, false if we are over with it
510    */
511   protected boolean openReader(int sleepMultiplier) {
512     try {
513       try {
514         if (LOG.isTraceEnabled()) {
515           LOG.trace("Opening log " + this.currentPath);
516         }
517         this.reader = repLogReader.openReader(this.currentPath);
518       } catch (FileNotFoundException fnfe) {
519         if (this.replicationQueueInfo.isQueueRecovered()) {
520           // We didn't find the log in the archive directory, look if it still
521           // exists in the dead RS folder (there could be a chain of failures
522           // to look at)
523           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
524           LOG.info("NB dead servers : " + deadRegionServers.size());
525           final Path rootDir = FSUtils.getRootDir(this.conf);
526           for (String curDeadServerName : deadRegionServers) {
527             final Path deadRsDirectory = new Path(rootDir,
528                 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
529             Path[] locs = new Path[] {
530                 new Path(deadRsDirectory, currentPath.getName()),
531                 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
532                                           currentPath.getName()),
533             };
534             for (Path possibleLogLocation : locs) {
535               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
536               if (this.manager.getFs().exists(possibleLogLocation)) {
537                 // We found the right new location
538                 LOG.info("Log " + this.currentPath + " still exists at " +
539                     possibleLogLocation);
540                 // Breaking here will make us sleep since reader is null
541                 // TODO why don't we need to set currentPath and call openReader here?
542                 return true;
543               }
544             }
545           }
546           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
547           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
548           if (stopper instanceof ReplicationSyncUp.DummyServer) {
549             // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
550             //      area rather than to the wal area for a particular region server.
551             FileStatus[] rss = fs.listStatus(manager.getLogDir());
552             for (FileStatus rs : rss) {
553               Path p = rs.getPath();
554               FileStatus[] logs = fs.listStatus(p);
555               for (FileStatus log : logs) {
556                 p = new Path(p, log.getPath().getName());
557                 if (p.getName().equals(currentPath.getName())) {
558                   currentPath = p;
559                   LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
560                   // Open the log at the new location
561                   this.openReader(sleepMultiplier);
562                   return true;
563                 }
564               }
565             }
566           }
567 
568           // TODO What happens if the log was missing from every single location?
569           // Although we need to check a couple of times as the log could have
570           // been moved by the master between the checks
571           // It can also happen if a recovered queue wasn't properly cleaned,
572           // such that the znode pointing to a log exists but the log was
573           // deleted a long time ago.
574           // For the moment, we'll throw the IO and processEndOfFile
575           throw new IOException("File from recovered queue is " +
576               "nowhere to be found", fnfe);
577         } else {
578           // If the log was archived, continue reading from there
579           Path archivedLogLocation =
580               new Path(manager.getOldLogDir(), currentPath.getName());
581           if (this.manager.getFs().exists(archivedLogLocation)) {
582             currentPath = archivedLogLocation;
583             LOG.info("Log " + this.currentPath + " was moved to " +
584                 archivedLogLocation);
585             // Open the log at the new location
586             this.openReader(sleepMultiplier);
587 
588           }
589           // TODO What happens the log is missing in both places?
590         }
591       }
592     } catch (LeaseNotRecoveredException lnre) {
593       // HBASE-15019 the WAL was not closed due to some hiccup.
594       LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
595       recoverLease(conf, currentPath);
596       this.reader = null;
597     } catch (IOException ioe) {
598       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
599       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
600       this.reader = null;
601       if (ioe.getCause() instanceof NullPointerException) {
602         // Workaround for race condition in HDFS-4380
603         // which throws a NPE if we open a file before any data node has the most recent block
604         // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
605         LOG.warn("Got NPE opening reader, will retry.");
606       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
607         // TODO Need a better way to determine if a file is really gone but
608         // TODO without scanning all logs dir
609         LOG.warn("Waited too long for this file, considering dumping");
610         return !processEndOfFile();
611       }
612     }
613     return true;
614   }
615 
616   private void recoverLease(final Configuration conf, final Path path) {
617     try {
618       final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
619       FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
620       fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
621         @Override
622         public boolean progress() {
623           LOG.debug("recover WAL lease: " + path);
624           return isActive();
625         }
626       });
627     } catch (IOException e) {
628       LOG.warn("unable to recover lease for WAL: " + path, e);
629     }
630   }
631 
632   /*
633    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
634    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
635    * trying to read the log file and get EOFException. In case of a recovered queue the last log
636    * file may be empty, and we don't want to retry that.
637    */
638   private boolean isCurrentLogEmpty() {
639     return (this.repLogReader.getPosition() == 0 &&
640         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
641   }
642 
643   /**
644    * Do the sleeping logic
645    * @param msg Why we sleep
646    * @param sleepMultiplier by how many times the default sleeping time is augmented
647    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
648    */
649   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
650     try {
651       if (LOG.isTraceEnabled()) {
652         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
653       }
654       Thread.sleep(this.sleepForRetries * sleepMultiplier);
655     } catch (InterruptedException e) {
656       LOG.debug("Interrupted while sleeping between retries");
657       Thread.currentThread().interrupt();
658     }
659     return sleepMultiplier < maxRetriesMultiplier;
660   }
661 
662   /**
663    * Count the number of different row keys in the given edit because of
664    * mini-batching. We assume that there's at least one Cell in the WALEdit.
665    * @param edit edit to count row keys from
666    * @return number of different row keys
667    */
668   private int countDistinctRowKeys(WALEdit edit) {
669     List<Cell> cells = edit.getCells();
670     int distinctRowKeys = 1;
671     Cell lastCell = cells.get(0);
672     for (int i = 0; i < edit.size(); i++) {
673       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
674         distinctRowKeys++;
675       }
676       lastCell = cells.get(i);
677     }
678     return distinctRowKeys;
679   }
680 
681   /**
682    * Do the shipping logic
683    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
684    * written to when this method was called
685    */
686   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
687     int sleepMultiplier = 0;
688     if (entries.isEmpty()) {
689       LOG.warn("Was given 0 edits to ship");
690       return;
691     }
692     while (this.isActive()) {
693       try {
694         if (this.throttler.isEnabled()) {
695           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
696           if (sleepTicks > 0) {
697             try {
698               if (LOG.isTraceEnabled()) {
699                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
700               }
701               Thread.sleep(sleepTicks);
702             } catch (InterruptedException e) {
703               LOG.debug("Interrupted while sleeping for throttling control");
704               Thread.currentThread().interrupt();
705               // current thread might be interrupted to terminate
706               // directly go back to while() for confirm this
707               continue;
708             }
709             // reset throttler's cycle start tick when sleep for throttling occurs
710             this.throttler.resetStartTick();
711           }
712         }
713         // create replicateContext here, so the entries can be GC'd upon return from this call stack
714         ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
715         replicateContext.setEntries(entries).setSize(currentSize);
716 
717         long startTimeNs = System.nanoTime();
718         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
719         boolean replicated = replicationEndpoint.replicate(replicateContext);
720         long endTimeNs = System.nanoTime();
721 
722         if (!replicated) {
723           continue;
724         } else {
725           sleepMultiplier = Math.max(sleepMultiplier-1, 0);
726         }
727 
728         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
729           this.manager.logPositionAndCleanOldLogs(this.currentPath,
730               this.peerClusterZnode, this.repLogReader.getPosition(),
731               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
732           this.lastLoggedPosition = this.repLogReader.getPosition();
733         }
734         if (this.throttler.isEnabled()) {
735           this.throttler.addPushSize(currentSize);
736         }
737         this.totalReplicatedEdits += entries.size();
738         this.totalReplicatedOperations += currentNbOperations;
739         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
740         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
741         if (LOG.isTraceEnabled()) {
742           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
743               + this.totalReplicatedOperations + " operations in " +
744               ((endTimeNs - startTimeNs)/1000000) + " ms");
745         }
746         break;
747       } catch (Exception ex) {
748         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
749             org.apache.hadoop.util.StringUtils.stringifyException(ex));
750         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
751           sleepMultiplier++;
752         }
753       }
754     }
755   }
756 
757   /**
758    * check whether the peer is enabled or not
759    *
760    * @return true if the peer is enabled, otherwise false
761    */
762   protected boolean isPeerEnabled() {
763     return this.replicationPeers.getStatusOfPeer(this.peerId);
764   }
765 
766   /**
767    * If the queue isn't empty, switch to the next one
768    * Else if this is a recovered queue, it means we're done!
769    * Else we'll just continue to try reading the log file
770    * @return true if we're done with the current file, false if we should
771    * continue trying to read from it
772    */
773   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
774       justification="Yeah, this is how it works")
775   protected boolean processEndOfFile() {
776     // We presume this means the file we're reading is closed.
777     if (this.queue.size() != 0) {
778       // -1 means the wal wasn't closed cleanly.
779       final long trailerSize = this.repLogReader.currentTrailerSize();
780       final long currentPosition = this.repLogReader.getPosition();
781       FileStatus stat = null;
782       try {
783         stat = fs.getFileStatus(this.currentPath);
784       } catch (IOException exception) {
785         LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
786             + ", stats: " + getStats());
787         metrics.incrUnknownFileLengthForClosedWAL();
788       }
789       if (stat != null) {
790         if (trailerSize < 0) {
791           if (currentPosition < stat.getLen()) {
792             final long skippedBytes = stat.getLen() - currentPosition;
793             LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
794             metrics.incrUncleanlyClosedWALs();
795             metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
796           }
797         } else if (currentPosition + trailerSize < stat.getLen()){
798           LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
799             ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
800           repLogReader.setPosition(0);
801           metrics.incrRestartedWALReading();
802           metrics.incrRepeatedFileBytes(currentPosition);
803           return false;
804         }
805       }
806       if (LOG.isTraceEnabled()) {
807         LOG.trace("Reached the end of a log, stats: " + getStats()
808           + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
809       }
810       this.currentPath = null;
811       this.repLogReader.finishCurrentFile();
812       this.reader = null;
813       metrics.incrCompletedWAL();
814       return true;
815     } else if (this.replicationQueueInfo.isQueueRecovered()) {
816       this.manager.closeRecoveredQueue(this);
817       LOG.info("Finished recovering the queue with the following stats " + getStats());
818       metrics.incrCompletedRecoveryQueue();
819       this.running = false;
820       return true;
821     }
822     return false;
823   }
824 
825   @Override
826   public void startup() {
827     String n = Thread.currentThread().getName();
828     Thread.UncaughtExceptionHandler handler =
829         new Thread.UncaughtExceptionHandler() {
830           @Override
831           public void uncaughtException(final Thread t, final Throwable e) {
832             LOG.error("Unexpected exception in ReplicationSource," +
833               " currentPath=" + currentPath, e);
834           }
835         };
836     Threads.setDaemonThreadRunning(
837         this, n + ".replicationSource," +
838         this.peerClusterZnode, handler);
839   }
840 
841   @Override
842   public void terminate(String reason) {
843     terminate(reason, null);
844   }
845 
846   @Override
847   public void terminate(String reason, Exception cause) {
848     terminate(reason, cause, true);
849   }
850 
851   public void terminate(String reason, Exception cause, boolean join) {
852     if (cause == null) {
853       LOG.info("Closing source "
854           + this.peerClusterZnode + " because: " + reason);
855 
856     } else {
857       LOG.error("Closing source " + this.peerClusterZnode
858           + " because an error occurred: " + reason, cause);
859     }
860     this.running = false;
861     this.interrupt();
862     ListenableFuture<Service.State> future = null;
863     if (this.replicationEndpoint != null) {
864       future = this.replicationEndpoint.stop();
865     }
866     if (join) {
867       Threads.shutdown(this, this.sleepForRetries);
868       if (future != null) {
869         try {
870           future.get();
871         } catch (Exception e) {
872           LOG.warn("Got exception:" + e);
873         }
874       }
875     }
876   }
877 
878   @Override
879   public String getPeerClusterZnode() {
880     return this.peerClusterZnode;
881   }
882 
883   @Override
884   public String getPeerClusterId() {
885     return this.peerId;
886   }
887 
888   @Override
889   public Path getCurrentPath() {
890     return this.currentPath;
891   }
892 
893   private boolean isActive() {
894     return !this.stopper.isStopped() && this.running && !isInterrupted();
895   }
896 
897   /**
898    * Comparator used to compare logs together based on their start time
899    */
900   public static class LogsComparator implements Comparator<Path> {
901 
902     @Override
903     public int compare(Path o1, Path o2) {
904       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
905     }
906 
907     /**
908      * Split a path to get the start time
909      * For example: 10.20.20.171%3A60020.1277499063250
910      * @param p path to split
911      * @return start time
912      */
913     private static long getTS(Path p) {
914       int tsIndex = p.getName().lastIndexOf('.') + 1;
915       return Long.parseLong(p.getName().substring(tsIndex));
916     }
917   }
918 
919   @Override
920   public String getStats() {
921     long position = this.repLogReader.getPosition();
922     return "Total replicated edits: " + totalReplicatedEdits +
923       ", currently replicating from: " + this.currentPath +
924       " at position: " + position;
925   }
926 
927   /**
928    * Get Replication Source Metrics
929    * @return sourceMetrics
930    */
931   public MetricsSource getSourceMetrics() {
932     return this.metrics;
933   }
934 }