View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import com.google.common.cache.Cache;
22  import com.google.common.cache.CacheBuilder;
23  import com.google.common.cache.CacheLoader;
24  import com.google.common.cache.LoadingCache;
25  import com.google.common.collect.Lists;
26  import com.google.common.util.concurrent.ListenableFuture;
27  import com.google.common.util.concurrent.Service;
28
29  import java.io.EOFException;
30  import java.io.FileNotFoundException;
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Comparator;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Set;
40  import java.util.UUID;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.PriorityBlockingQueue;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicLong;
45
46  import org.apache.commons.lang.StringUtils;
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.FileStatus;
51  import org.apache.hadoop.fs.FileSystem;
52  import org.apache.hadoop.fs.Path;
53  import org.apache.hadoop.hbase.Cell;
54  import org.apache.hadoop.hbase.CellUtil;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.Stoppable;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.classification.InterfaceAudience;
61  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
62  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
63  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
64  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
65  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
66  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
67  import org.apache.hadoop.hbase.replication.ReplicationException;
68  import org.apache.hadoop.hbase.replication.ReplicationPeers;
69  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
70  import org.apache.hadoop.hbase.replication.ReplicationQueues;
71  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
72  import org.apache.hadoop.hbase.replication.WALEntryFilter;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.CancelableProgressable;
75  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
76  import org.apache.hadoop.hbase.util.FSUtils;
77  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
78  import org.apache.hadoop.hbase.util.Threads;
79  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
80  import org.apache.hadoop.hbase.wal.WAL;
81  import org.apache.hadoop.hbase.wal.WALKey;
82
83  /**
84   * Class that handles the source of a replication stream.
85   * Currently does not handle more than 1 slave
86   * For each slave cluster it selects a random number of peers
87   * using a replication ratio. For example, if replication ration = 0.1
88   * and slave cluster has 100 region servers, 10 will be selected.
89   * <p>
90   * A stream is considered down when we cannot contact a region server on the
91   * peer cluster for more than 55 seconds by default.
92   * </p>
93   *
94   */
95  @InterfaceAudience.Private
96  public class ReplicationSource extends Thread
97      implements ReplicationSourceInterface {
98
99    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
100   // Queues of logs to process, entry in format of walGroupId->queue,
101   // each presents a queue for one wal group
102   private Map<String, PriorityBlockingQueue<Path>> queues =
103       new HashMap<String, PriorityBlockingQueue<Path>>();
104   // per group queue size, keep no more than this number of logs in each wal group
105   private int queueSizePerGroup;
106   private ReplicationQueues replicationQueues;
107   private ReplicationPeers replicationPeers;
108
109   private Configuration conf;
110   private ReplicationQueueInfo replicationQueueInfo;
111   // id of the peer cluster this source replicates to
112   private String peerId;
113
114   String actualPeerId;
115   // The manager of all sources to which we ping back our progress
116   private ReplicationSourceManager manager;
117   // Should we stop everything?
118   private Stoppable stopper;
119   // How long should we sleep for each retry
120   private long sleepForRetries;
121   // Max size in bytes of entriesArray
122   private long replicationQueueSizeCapacity;
123   // Max number of entries in entriesArray
124   private int replicationQueueNbCapacity;
125   private FileSystem fs;
126   // id of this cluster
127   private UUID clusterId;
128   // id of the other cluster
129   private UUID peerClusterId;
130   // total number of edits we replicated
131   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
132   // total number of edits we replicated
133   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
134   // The znode we currently play with
135   private String peerClusterZnode;
136   // Maximum number of retries before taking bold actions
137   private int maxRetriesMultiplier;
138   // Indicates if this particular source is running
139   private volatile boolean sourceRunning = false;
140   // Metrics for this source
141   private MetricsSource metrics;
142   //WARN threshold for the number of queued logs, defaults to 2
143   private int logQueueWarnThreshold;
144   // ReplicationEndpoint which will handle the actual replication
145   private ReplicationEndpoint replicationEndpoint;
146   // A filter (or a chain of filters) for the WAL entries.
147   private WALEntryFilter walEntryFilter;
148   // throttler
149   private ReplicationThrottler throttler;
150   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
151       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
152
153   /**
154    * Instantiation method used by region servers
155    *
156    * @param conf configuration to use
157    * @param fs file system to use
158    * @param manager replication manager to ping to
159    * @param stopper     the atomic boolean to use to stop the regionserver
160    * @param peerClusterZnode the name of our znode
161    * @param clusterId unique UUID for the cluster
162    * @param replicationEndpoint the replication endpoint implementation
163    * @param metrics metrics for replication source
164    * @throws IOException
165    */
166   @Override
167   public void init(final Configuration conf, final FileSystem fs,
168       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
169       final ReplicationPeers replicationPeers, final Stoppable stopper,
170       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
171       final MetricsSource metrics)
172           throws IOException {
173     this.stopper = stopper;
174     this.conf = HBaseConfiguration.create(conf);
175     decorateConf();
176     this.replicationQueueSizeCapacity =
177         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
178     this.replicationQueueNbCapacity =
179         this.conf.getInt("replication.source.nb.capacity", 25000);
180     this.sleepForRetries =
181         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
182     this.maxRetriesMultiplier =
183         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
184     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
185     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
186     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
187     this.replicationQueues = replicationQueues;
188     this.replicationPeers = replicationPeers;
189     this.manager = manager;
190     this.fs = fs;
191     this.metrics = metrics;
192     this.clusterId = clusterId;
193
194     this.peerClusterZnode = peerClusterZnode;
195     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
196     // ReplicationQueueInfo parses the peerId out of the znode for us
197     this.peerId = this.replicationQueueInfo.getPeerId();
198     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
199     this.actualPeerId = replicationQueueInfo.getPeerId();
200     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
201     this.replicationEndpoint = replicationEndpoint;
202   }
203
204   private void decorateConf() {
205     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
206     if (StringUtils.isNotEmpty(replicationCodec)) {
207       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
208     }
209   }
210
211   @Override
212   public void enqueueLog(Path log) {
213     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
214     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
215     if (queue == null) {
216       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
217       queues.put(logPrefix, queue);
218       if (this.sourceRunning) {
219         // new wal group observed after source startup, start a new worker thread to track it
220         // notice: it's possible that log enqueued when this.running is set but worker thread
221         // still not launched, so it's necessary to check workerThreads before start the worker
222         final ReplicationSourceWorkerThread worker =
223             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
224         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
225         if (extant != null) {
226           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
227         } else {
228           LOG.debug("Starting up worker for wal group " + logPrefix);
229           worker.startup();
230         }
231       }
232     }
233     queue.put(log);
234     this.metrics.incrSizeOfLogQueue();
235     // This will log a warning for each new log that gets created above the warn threshold
236     int queueSize = queue.size();
237     if (queueSize > this.logQueueWarnThreshold) {
238       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
239           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
240     }
241   }
242
243   @Override
244   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
245       throws ReplicationException {
246     String peerId = peerClusterZnode;
247     if (peerId.contains("-")) {
248       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
249       // A peerId will not have "-" in its name, see HBASE-11394
250       peerId = peerClusterZnode.split("-")[0];
251     }
252     Map<TableName, List<String>> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs();
253     if (tableCFMap != null) {
254       List<String> tableCfs = tableCFMap.get(tableName);
255       if (tableCFMap.containsKey(tableName)
256           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
257         this.replicationQueues.addHFileRefs(peerId, files);
258         metrics.incrSizeOfHFileRefsQueue(files.size());
259       } else {
260         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
261             + Bytes.toString(family) + " to peer id " + peerId);
262       }
263     } else {
264       // user has explicitly not defined any table cfs for replication, means replicate all the
265       // data
266       this.replicationQueues.addHFileRefs(peerId, files);
267       metrics.incrSizeOfHFileRefsQueue(files.size());
268     }
269   }
270
271   private void uninitialize() {
272     LOG.debug("Source exiting " + this.peerId);
273     metrics.clear();
274     if (replicationEndpoint.state() == Service.State.STARTING
275         || replicationEndpoint.state() == Service.State.RUNNING) {
276       replicationEndpoint.stopAndWait();
277     }
278   }
279
280   @Override
281   public void run() {
282     // mark we are running now
283     this.sourceRunning = true;
284     try {
285       // start the endpoint, connect to the cluster
286       Service.State state = replicationEndpoint.start().get();
287       if (state != Service.State.RUNNING) {
288         LOG.warn("ReplicationEndpoint was not started. Exiting");
289         uninitialize();
290         return;
291       }
292     } catch (Exception ex) {
293       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
294       throw new RuntimeException(ex);
295     }
296
297     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
298     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
299       (WALEntryFilter)new SystemTableWALEntryFilter());
300     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
301     if (filterFromEndpoint != null) {
302       filters.add(filterFromEndpoint);
303     }
304     this.walEntryFilter = new ChainWALEntryFilter(filters);
305
306     int sleepMultiplier = 1;
307     // delay this until we are in an asynchronous thread
308     while (this.isSourceActive() && this.peerClusterId == null) {
309       this.peerClusterId = replicationEndpoint.getPeerUUID();
310       if (this.isSourceActive() && this.peerClusterId == null) {
311         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
312           sleepMultiplier++;
313         }
314       }
315     }
316
317     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
318     // peerClusterId value, which is the same as the source clusterId
319     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
320       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
321           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
322           + replicationEndpoint.getClass().getName(), null, false);
323     }
324     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
325     // start workers
326     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
327       String walGroupId = entry.getKey();
328       PriorityBlockingQueue<Path> queue = entry.getValue();
329       final ReplicationSourceWorkerThread worker =
330           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
331       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
332       if (extant != null) {
333         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
334       } else {
335         LOG.debug("Starting up worker for wal group " + walGroupId);
336         worker.startup();
337       }
338     }
339   }
340
341   /**
342    * Do the sleeping logic
343    * @param msg Why we sleep
344    * @param sleepMultiplier by how many times the default sleeping time is augmented
345    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
346    */
347   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
348     try {
349       if (LOG.isTraceEnabled()) {
350         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
351       }
352       Thread.sleep(this.sleepForRetries * sleepMultiplier);
353     } catch (InterruptedException e) {
354       LOG.debug("Interrupted while sleeping between retries");
355       Thread.currentThread().interrupt();
356     }
357     return sleepMultiplier < maxRetriesMultiplier;
358   }
359
360   /**
361    * check whether the peer is enabled or not
362    *
363    * @return true if the peer is enabled, otherwise false
364    */
365   protected boolean isPeerEnabled() {
366     return this.replicationPeers.getStatusOfPeer(this.peerId);
367   }
368
369   @Override
370   public void startup() {
371     String n = Thread.currentThread().getName();
372     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
373       @Override
374       public void uncaughtException(final Thread t, final Throwable e) {
375         LOG.error("Unexpected exception in ReplicationSource", e);
376       }
377     };
378     Threads
379         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
380   }
381
382   @Override
383   public void terminate(String reason) {
384     terminate(reason, null);
385   }
386
387   @Override
388   public void terminate(String reason, Exception cause) {
389     terminate(reason, cause, true);
390   }
391
392   public void terminate(String reason, Exception cause, boolean join) {
393     if (cause == null) {
394       LOG.info("Closing source "
395           + this.peerClusterZnode + " because: " + reason);
396
397     } else {
398       LOG.error("Closing source " + this.peerClusterZnode
399           + " because an error occurred: " + reason, cause);
400     }
401     this.sourceRunning = false;
402     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
403     for (ReplicationSourceWorkerThread worker : workers) {
404       worker.setWorkerRunning(false);
405       worker.interrupt();
406     }
407     ListenableFuture<Service.State> future = null;
408     if (this.replicationEndpoint != null) {
409       future = this.replicationEndpoint.stop();
410     }
411     if (join) {
412       for (ReplicationSourceWorkerThread worker : workers) {
413         Threads.shutdown(worker, this.sleepForRetries);
414         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
415       }
416       if (future != null) {
417         try {
418           future.get();
419         } catch (Exception e) {
420           LOG.warn("Got exception:" + e);
421         }
422       }
423     }
424   }
425
426   @Override
427   public String getPeerClusterZnode() {
428     return this.peerClusterZnode;
429   }
430
431   @Override
432   public String getPeerClusterId() {
433     return this.peerId;
434   }
435
436   @Override
437   public Path getCurrentPath() {
438     // only for testing
439     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
440       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
441     }
442     return null;
443   }
444
445   private boolean isSourceActive() {
446     return !this.stopper.isStopped() && this.sourceRunning;
447   }
448
449   /**
450    * Comparator used to compare logs together based on their start time
451    */
452   public static class LogsComparator implements Comparator<Path> {
453
454     @Override
455     public int compare(Path o1, Path o2) {
456       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
457     }
458 
459     /**
460      * Split a path to get the start time
461      * For example: 10.20.20.171%3A60020.1277499063250
462      * @param p path to split
463      * @return start time
464      */
465     private static long getTS(Path p) {
466       int tsIndex = p.getName().lastIndexOf('.') + 1;
467       return Long.parseLong(p.getName().substring(tsIndex));
468     }
469   }
470
471   @Override
472   public String getStats() {
473     StringBuilder sb = new StringBuilder();
474     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
475         .append(", current progress: \n");
476     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
477       String walGroupId = entry.getKey();
478       ReplicationSourceWorkerThread worker = entry.getValue();
479       long position = worker.getCurrentPosition();
480       Path currentPath = worker.getCurrentPath();
481       sb.append("walGroup [").append(walGroupId).append("]: ");
482       if (currentPath != null) {
483         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
484             .append(position).append("\n");
485       } else {
486         sb.append("no replication ongoing, waiting for new log");
487       }
488     }
489     return sb.toString();
490   }
491
492   /**
493    * Get Replication Source Metrics
494    * @return sourceMetrics
495    */
496   public MetricsSource getSourceMetrics() {
497     return this.metrics;
498   }
499
500   public class ReplicationSourceWorkerThread extends Thread {
501     ReplicationSource source;
502     String walGroupId;
503     PriorityBlockingQueue<Path> queue;
504     ReplicationQueueInfo replicationQueueInfo;
505     // Our reader for the current log. open/close handled by repLogReader
506     private WAL.Reader reader;
507     // Last position in the log that we sent to ZooKeeper
508     private long lastLoggedPosition = -1;
509     // Path of the current log
510     private volatile Path currentPath;
511     // Handle on the log reader helper
512     private ReplicationWALReaderManager repLogReader;
513     // Current number of operations (Put/Delete) that we need to replicate
514     private int currentNbOperations = 0;
515     // Current size of data we need to replicate
516     private int currentSize = 0;
517     // Indicates whether this particular worker is running
518     private boolean workerRunning = true;
519     // Current number of hfiles that we need to replicate
520     private long currentNbHFiles = 0;
521
522     // Use guava cache to set ttl for each key
523     private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
524         .expireAfterAccess(1, TimeUnit.DAYS).build(
525         new CacheLoader<String, Boolean>() {
526           @Override
527           public Boolean load(String key) throws Exception {
528             return false;
529           }
530         }
531     );
532
533     public ReplicationSourceWorkerThread(String walGroupId,
534         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
535         ReplicationSource source) {
536       this.walGroupId = walGroupId;
537       this.queue = queue;
538       this.replicationQueueInfo = replicationQueueInfo;
539       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
540       this.source = source;
541     }
542
543     @Override
544     public void run() {
545       // If this is recovered, the queue is already full and the first log
546       // normally has a position (unless the RS failed between 2 logs)
547       if (this.replicationQueueInfo.isQueueRecovered()) {
548         try {
549           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
550             this.queue.peek().getName()));
551           if (LOG.isTraceEnabled()) {
552             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
553                 + this.repLogReader.getPosition());
554           }
555         } catch (ReplicationException e) {
556           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
557         }
558       }
559       // Loop until we close down
560       while (isWorkerActive()) {
561         int sleepMultiplier = 1;
562         // Sleep until replication is enabled again
563         if (!isPeerEnabled()) {
564           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
565             sleepMultiplier++;
566           }
567           continue;
568         }
569         Path oldPath = getCurrentPath(); //note that in the current scenario,
570                                          //oldPath will be null when a log roll
571                                          //happens.
572         // Get a new path
573         boolean hasCurrentPath = getNextPath();
574         if (getCurrentPath() != null && oldPath == null) {
575           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
576         }
577         if (!hasCurrentPath) {
578           if (sleepForRetries("No log to process", sleepMultiplier)) {
579             sleepMultiplier++;
580           }
581           continue;
582         }
583         boolean currentWALisBeingWrittenTo = false;
584         //For WAL files we own (rather than recovered), take a snapshot of whether the
585         //current WAL file (this.currentPath) is in use (for writing) NOW!
586         //Since the new WAL paths are enqueued only after the prev WAL file
587         //is 'closed', presence of an element in the queue means that
588         //the previous WAL file was closed, else the file is in use (currentPath)
589         //We take the snapshot now so that we are protected against races
590         //where a new file gets enqueued while the current file is being processed
591         //(and where we just finished reading the current file).
592         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
593           currentWALisBeingWrittenTo = true;
594         }
595         // Open a reader on it
596         if (!openReader(sleepMultiplier)) {
597           // Reset the sleep multiplier, else it'd be reused for the next file
598           sleepMultiplier = 1;
599           continue;
600         }
601
602         // If we got a null reader but didn't continue, then sleep and continue
603         if (this.reader == null) {
604           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
605             sleepMultiplier++;
606           }
607           continue;
608         }
609
610         boolean gotIOE = false;
611         currentNbOperations = 0;
612         currentNbHFiles = 0;
613         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
614
615         Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
616         currentSize = 0;
617         try {
618           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
619               lastPositionsForSerialScope)) {
620             for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
621               waitingUntilCanPush(entry);
622             }
623             try {
624               MetaTableAccessor
625                   .updateReplicationPositions(manager.getConnection(), actualPeerId,
626                       lastPositionsForSerialScope);
627             } catch (IOException e) {
628               LOG.error("updateReplicationPositions fail", e);
629               stopper.stop("updateReplicationPositions fail");
630             }
631
632             continue;
633           }
634         } catch (IOException ioe) {
635           LOG.warn(peerClusterZnode + " Got: ", ioe);
636           gotIOE = true;
637           if (ioe.getCause() instanceof EOFException) {
638
639             boolean considerDumping = false;
640             if (this.replicationQueueInfo.isQueueRecovered()) {
641               try {
642                 FileStatus stat = fs.getFileStatus(this.currentPath);
643                 if (stat.getLen() == 0) {
644                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
645                 }
646                 considerDumping = true;
647               } catch (IOException e) {
648                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
649               }
650             }
651
652             if (considerDumping &&
653                 sleepMultiplier == maxRetriesMultiplier &&
654                 processEndOfFile()) {
655               continue;
656             }
657           }
658         } finally {
659           try {
660             this.reader = null;
661             this.repLogReader.closeReader();
662           } catch (IOException e) {
663             gotIOE = true;
664             LOG.warn("Unable to finalize the tailing of a file", e);
665           }
666         }
667         for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
668           waitingUntilCanPush(entry);
669         }
670         // If we didn't get anything to replicate, or if we hit a IOE,
671         // wait a bit and retry.
672         // But if we need to stop, don't bother sleeping
673         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
674           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
675
676             // Save positions to meta table before zk.
677             if (!gotIOE) {
678               try {
679                 MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
680                     lastPositionsForSerialScope);
681               } catch (IOException e) {
682                 LOG.error("updateReplicationPositions fail", e);
683                 stopper.stop("updateReplicationPositions fail");
684               }
685             }
686
687             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
688                 this.repLogReader.getPosition(),
689                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
690
691             this.lastLoggedPosition = this.repLogReader.getPosition();
692           }
693           // Reset the sleep multiplier if nothing has actually gone wrong
694           if (!gotIOE) {
695             sleepMultiplier = 1;
696             // if there was nothing to ship and it's not an error
697             // set "ageOfLastShippedOp" to <now> to indicate that we're current
698             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
699           }
700           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
701             sleepMultiplier++;
702           }
703           continue;
704         }
705         shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
706       }
707       if (replicationQueueInfo.isQueueRecovered()) {
708         // use synchronize to make sure one last thread will clean the queue
709         synchronized (workerThreads) {
710           Threads.sleep(100);// wait a short while for other worker thread to fully exit
711           boolean allOtherTaskDone = true;
712           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
713             if (!worker.equals(this) && worker.isAlive()) {
714               allOtherTaskDone = false;
715               break;
716             }
717           }
718           if (allOtherTaskDone) {
719             manager.closeRecoveredQueue(this.source);
720             LOG.info("Finished recovering queue " + peerClusterZnode
721                 + " with the following stats: " + getStats());
722           }
723         }
724       }
725     }
726
727     private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
728       String key = entry.getKey();
729       long seq = entry.getValue();
730       boolean deleteKey = false;
731       if (seq <= 0) {
732         // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
733         deleteKey = true;
734         seq = -seq;
735       }
736
737       if (!canSkipWaitingSet.getUnchecked(key)) {
738         try {
739           manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
740         } catch (Exception e) {
741           LOG.error("waitUntilCanBePushed fail", e);
742           stopper.stop("waitUntilCanBePushed fail");
743         }
744         canSkipWaitingSet.put(key, true);
745       }
746       if (deleteKey) {
747         canSkipWaitingSet.invalidate(key);
748       }
749     }
750
751     /**
752      * Read all the entries from the current log files and retain those that need to be replicated.
753      * Else, process the end of the current file.
754      * @param currentWALisBeingWrittenTo is the current WAL being written to
755      * @param entries resulting entries to be replicated
756      * @param lastPosition save the last sequenceid for each region if the table has
757      *                     serial-replication scope
758      * @return true if we got nothing and went to the next file, false if we got entries
759      * @throws IOException
760      */
761     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
762         List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
763       long seenEntries = 0;
764       if (LOG.isTraceEnabled()) {
765         LOG.trace("Seeking in " + this.currentPath + " at position "
766             + this.repLogReader.getPosition());
767       }
768       this.repLogReader.seek();
769       long positionBeforeRead = this.repLogReader.getPosition();
770       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
771       while (entry != null) {
772         metrics.incrLogEditsRead();
773         seenEntries++;
774
775         if (entry.hasSerialReplicationScope()) {
776           String key = Bytes.toString(entry.getKey().getEncodedRegionName());
777           lastPosition.put(key, entry.getKey().getSequenceId());
778           if (entry.getEdit().getCells().size() > 0) {
779             WALProtos.RegionEventDescriptor maybeEvent =
780                 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
781             if (maybeEvent != null && maybeEvent.getEventType()
782                 == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
783               // In serially replication, if we move a region to another RS and move it back, we may
784               // read logs crossing two sections. We should break at REGION_CLOSE and push the first
785               // section first in case of missing the middle section belonging to the other RS.
786               // In a worker thread, if we can push the first log of a region, we can push all logs
787               // in the same region without waiting until we read a close marker because next time
788               // we read logs in this region, it must be a new section and not adjacent with this
789               // region. Mark it negative.
790               lastPosition.put(key, -entry.getKey().getSequenceId());
791               break;
792             }
793           }
794         }
795
796         // don't replicate if the log entries have already been consumed by the cluster
797         if (replicationEndpoint.canReplicateToSameCluster()
798             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
799           // Remove all KVs that should not be replicated
800           entry = walEntryFilter.filter(entry);
801           WALEdit edit = null;
802           WALKey logKey = null;
803           if (entry != null) {
804             edit = entry.getEdit();
805             logKey = entry.getKey();
806           }
807
808           if (edit != null && edit.size() != 0) {
809             // Mark that the current cluster has the change
810             logKey.addClusterId(clusterId);
811             currentNbOperations += countDistinctRowKeys(edit);
812             entries.add(entry);
813             currentSize += entry.getEdit().heapSize();
814             currentSize += calculateTotalSizeOfStoreFiles(edit);
815           } else {
816             metrics.incrLogEditsFiltered();
817           }
818         }
819         // Stop if too many entries or too big
820         // FIXME check the relationship between single wal group and overall
821         if (currentSize >= replicationQueueSizeCapacity
822             || entries.size() >= replicationQueueNbCapacity) {
823           break;
824         }
825
826         try {
827           entry = this.repLogReader.readNextAndSetPosition();
828         } catch (IOException ie) {
829           LOG.debug("Break on IOE: " + ie.getMessage());
830           break;
831         }
832       }
833       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
834       if (currentWALisBeingWrittenTo) {
835         return false;
836       }
837       // If we didn't get anything and the queue has an object, it means we
838       // hit the end of the file for sure
839       return seenEntries == 0 && processEndOfFile();
840     }
841
842     /**
843      * Calculate the total size of all the store files
844      * @param edit edit to count row keys from
845      * @return the total size of the store files
846      */
847     private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
848       List<Cell> cells = edit.getCells();
849       int totalStoreFilesSize = 0;
850
851       int totalCells = edit.size();
852       for (int i = 0; i < totalCells; i++) {
853         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
854           try {
855             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
856             List<StoreDescriptor> stores = bld.getStoresList();
857             int totalStores = stores.size();
858             for (int j = 0; j < totalStores; j++) {
859               totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
860             }
861           } catch (IOException e) {
862             LOG.error("Failed to deserialize bulk load entry from wal edit. "
863                 + "Size of HFiles part of cell will not be considered in replication "
864                 + "request size calculation.", e);
865           }
866         }
867       }
868       return totalStoreFilesSize;
869     }
870
871     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
872       String peerId = peerClusterZnode;
873       if (peerId.contains("-")) {
874         // peerClusterZnode will be in the form peerId + "-" + rsZNode.
875         // A peerId will not have "-" in its name, see HBASE-11394
876         peerId = peerClusterZnode.split("-")[0];
877       }
878       List<Cell> cells = edit.getCells();
879       int totalCells = cells.size();
880       for (int i = 0; i < totalCells; i++) {
881         Cell cell = cells.get(i);
882         if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
883           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
884           List<StoreDescriptor> stores = bld.getStoresList();
885           int totalStores = stores.size();
886           for (int j = 0; j < totalStores; j++) {
887             List<String> storeFileList = stores.get(j).getStoreFileList();
888             manager.cleanUpHFileRefs(peerId, storeFileList);
889             metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
890           }
891         }
892       }
893     }
894
895     /**
896      * Poll for the next path
897      * @return true if a path was obtained, false if not
898      */
899     protected boolean getNextPath() {
900       try {
901         if (this.currentPath == null) {
902           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
903           metrics.decrSizeOfLogQueue();
904           if (this.currentPath != null) {
905             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
906             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
907               this.replicationQueueInfo.isQueueRecovered());
908             if (LOG.isTraceEnabled()) {
909               LOG.trace("New log: " + this.currentPath);
910             }
911           }
912         }
913       } catch (InterruptedException e) {
914         LOG.warn("Interrupted while reading edits", e);
915       }
916       return this.currentPath != null;
917     }
918
919     /**
920      * Open a reader on the current path
921      *
922      * @param sleepMultiplier by how many times the default sleeping time is augmented
923      * @return true if we should continue with that file, false if we are over with it
924      */
925     protected boolean openReader(int sleepMultiplier) {
926       try {
927         try {
928           if (LOG.isTraceEnabled()) {
929             LOG.trace("Opening log " + this.currentPath);
930           }
931           this.reader = repLogReader.openReader(this.currentPath);
932         } catch (FileNotFoundException fnfe) {
933           if (this.replicationQueueInfo.isQueueRecovered()) {
934             // We didn't find the log in the archive directory, look if it still
935             // exists in the dead RS folder (there could be a chain of failures
936             // to look at)
937             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
938             LOG.info("NB dead servers : " + deadRegionServers.size());
939             final Path rootDir = FSUtils.getRootDir(conf);
940             for (String curDeadServerName : deadRegionServers) {
941               final Path deadRsDirectory = new Path(rootDir,
942                 AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
943               Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()),
944                 new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT),
945                   currentPath.getName()) };
946               for (Path possibleLogLocation : locs) {
947                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
948                 if (manager.getFs().exists(possibleLogLocation)) {
949                   // We found the right new location
950                   LOG.info("Log " + this.currentPath + " still exists at " +
951                       possibleLogLocation);
952                   // Breaking here will make us sleep since reader is null
953                   // TODO why don't we need to set currentPath and call openReader here?
954                   return true;
955                 }
956               }
957             }
958             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
959             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
960             if (stopper instanceof ReplicationSyncUp.DummyServer) {
961               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
962               //      area rather than to the wal area for a particular region server.
963               FileStatus[] rss = fs.listStatus(manager.getLogDir());
964               for (FileStatus rs : rss) {
965                 Path p = rs.getPath();
966                 FileStatus[] logs = fs.listStatus(p);
967                 for (FileStatus log : logs) {
968                   p = new Path(p, log.getPath().getName());
969                   if (p.getName().equals(currentPath.getName())) {
970                     currentPath = p;
971                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
972                     // Open the log at the new location
973                     this.openReader(sleepMultiplier);
974                     return true;
975                   }
976                 }
977               }
978             }
979
980             // TODO What happens if the log was missing from every single location?
981             // Although we need to check a couple of times as the log could have
982             // been moved by the master between the checks
983             // It can also happen if a recovered queue wasn't properly cleaned,
984             // such that the znode pointing to a log exists but the log was
985             // deleted a long time ago.
986             // For the moment, we'll throw the IO and processEndOfFile
987             throw new IOException("File from recovered queue is " +
988                 "nowhere to be found", fnfe);
989           } else {
990             // If the log was archived, continue reading from there
991             Path archivedLogLocation =
992                 new Path(manager.getOldLogDir(), currentPath.getName());
993             if (manager.getFs().exists(archivedLogLocation)) {
994               currentPath = archivedLogLocation;
995               LOG.info("Log " + this.currentPath + " was moved to " +
996                   archivedLogLocation);
997               // Open the log at the new location
998               this.openReader(sleepMultiplier);
999
1000             }
1001             // TODO What happens the log is missing in both places?
1002           }
1003         }
1004       } catch (LeaseNotRecoveredException lnre) {
1005         // HBASE-15019 the WAL was not closed due to some hiccup.
1006         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
1007         recoverLease(conf, currentPath);
1008         this.reader = null;
1009       } catch (IOException ioe) {
1010         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
1011         LOG.warn(peerClusterZnode + " Got: ", ioe);
1012         this.reader = null;
1013         if (ioe.getCause() instanceof NullPointerException) {
1014           // Workaround for race condition in HDFS-4380
1015           // which throws a NPE if we open a file before any data node has the most recent block
1016           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
1017           LOG.warn("Got NPE opening reader, will retry.");
1018         } else if (sleepMultiplier >= maxRetriesMultiplier) {
1019           // TODO Need a better way to determine if a file is really gone but
1020           // TODO without scanning all logs dir
1021           LOG.warn("Waited too long for this file, considering dumping");
1022           return !processEndOfFile();
1023         }
1024       }
1025       return true;
1026     }
1027
1028     private void recoverLease(final Configuration conf, final Path path) {
1029       try {
1030         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
1031         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
1032         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
1033           @Override
1034           public boolean progress() {
1035             LOG.debug("recover WAL lease: " + path);
1036             return isWorkerActive();
1037           }
1038         });
1039       } catch (IOException e) {
1040         LOG.warn("unable to recover lease for WAL: " + path, e);
1041       }
1042     }
1043
1044     /*
1045      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
1046      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
1047      * trying to read the log file and get EOFException. In case of a recovered queue the last log
1048      * file may be empty, and we don't want to retry that.
1049      */
1050     private boolean isCurrentLogEmpty() {
1051       return (this.repLogReader.getPosition() == 0 &&
1052           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
1053     }
1054
1055     /**
1056      * Count the number of different row keys in the given edit because of mini-batching. We assume
1057      * that there's at least one Cell in the WALEdit.
1058      * @param edit edit to count row keys from
1059      * @return number of different row keys
1060      */
1061     private int countDistinctRowKeys(WALEdit edit) {
1062       List<Cell> cells = edit.getCells();
1063       int distinctRowKeys = 1;
1064       int totalHFileEntries = 0;
1065       Cell lastCell = cells.get(0);
1066
1067       int totalCells = edit.size();
1068       for (int i = 0; i < totalCells; i++) {
1069         // Count HFiles to be replicated
1070         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
1071           try {
1072             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
1073             List<StoreDescriptor> stores = bld.getStoresList();
1074             int totalStores = stores.size();
1075             for (int j = 0; j < totalStores; j++) {
1076               totalHFileEntries += stores.get(j).getStoreFileList().size();
1077             }
1078           } catch (IOException e) {
1079             LOG.error("Failed to deserialize bulk load entry from wal edit. "
1080                 + "Then its hfiles count will not be added into metric.");
1081           }
1082         }
1083
1084         if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
1085           distinctRowKeys++;
1086         }
1087         lastCell = cells.get(i);
1088       }
1089       currentNbHFiles += totalHFileEntries;
1090       return distinctRowKeys + totalHFileEntries;
1091     }
1092
1093     /**
1094      * Do the shipping logic
1095      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
1096      * written to when this method was called
1097      */
1098     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
1099         Map<String, Long> lastPositionsForSerialScope) {
1100       int sleepMultiplier = 0;
1101       if (entries.isEmpty()) {
1102         LOG.warn("Was given 0 edits to ship");
1103         return;
1104       }
1105       while (isWorkerActive()) {
1106         try {
1107           if (throttler.isEnabled()) {
1108             long sleepTicks = throttler.getNextSleepInterval(currentSize);
1109             if (sleepTicks > 0) {
1110               try {
1111                 if (LOG.isTraceEnabled()) {
1112                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
1113                 }
1114                 Thread.sleep(sleepTicks);
1115               } catch (InterruptedException e) {
1116                 LOG.debug("Interrupted while sleeping for throttling control");
1117                 Thread.currentThread().interrupt();
1118                 // current thread might be interrupted to terminate
1119                 // directly go back to while() for confirm this
1120                 continue;
1121               }
1122               // reset throttler's cycle start tick when sleep for throttling occurs
1123               throttler.resetStartTick();
1124             }
1125           }
1126           // create replicateContext here, so the entries can be GC'd upon return from this call
1127           // stack
1128           ReplicationEndpoint.ReplicateContext replicateContext =
1129               new ReplicationEndpoint.ReplicateContext();
1130           replicateContext.setEntries(entries).setSize(currentSize);
1131           replicateContext.setWalGroupId(walGroupId);
1132
1133           long startTimeNs = System.nanoTime();
1134           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
1135           boolean replicated = replicationEndpoint.replicate(replicateContext);
1136           long endTimeNs = System.nanoTime();
1137
1138           if (!replicated) {
1139             continue;
1140           } else {
1141             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
1142           }
1143
1144           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
1145             //Clean up hfile references
1146             int size = entries.size();
1147             for (int i = 0; i < size; i++) {
1148               cleanUpHFileRefs(entries.get(i).getEdit());
1149             }
1150
1151             // Save positions to meta table before zk.
1152             try {
1153               MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
1154                   lastPositionsForSerialScope);
1155             } catch (IOException e) {
1156               LOG.error("updateReplicationPositions fail", e);
1157               stopper.stop("updateReplicationPositions fail");
1158             }
1159
1160             //Log and clean up WAL logs
1161             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
1162               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
1163               currentWALisBeingWrittenTo);
1164             this.lastLoggedPosition = this.repLogReader.getPosition();
1165           }
1166           if (throttler.isEnabled()) {
1167             throttler.addPushSize(currentSize);
1168           }
1169           totalReplicatedEdits.addAndGet(entries.size());
1170           totalReplicatedOperations.addAndGet(currentNbOperations);
1171           // FIXME check relationship between wal group and overall
1172           metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
1173           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
1174             walGroupId);
1175           if (LOG.isTraceEnabled()) {
1176             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
1177                 + totalReplicatedOperations + " operations in "
1178                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
1179           }
1180           break;
1181         } catch (Exception ex) {
1182           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
1183               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
1184           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
1185             sleepMultiplier++;
1186           }
1187         }
1188       }
1189     }
1190
1191     /**
1192      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
1193      * we're done! Else we'll just continue to try reading the log file
1194      * @return true if we're done with the current file, false if we should continue trying to read
1195      *         from it
1196      */
1197     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
1198         justification = "Yeah, this is how it works")
1199     protected boolean processEndOfFile() {
1200       if (this.queue.size() != 0) {
1201         if (LOG.isTraceEnabled()) {
1202           String filesize = "N/A";
1203           try {
1204             FileStatus stat = fs.getFileStatus(this.currentPath);
1205             filesize = stat.getLen() + "";
1206           } catch (IOException ex) {
1207           }
1208           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1209               + ", and the length of the file is " + filesize);
1210         }
1211         this.currentPath = null;
1212         this.repLogReader.finishCurrentFile();
1213         this.reader = null;
1214         return true;
1215       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1216         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1217             + peerClusterZnode);
1218         workerRunning = false;
1219         return true;
1220       }
1221       return false;
1222     }
1223
1224     public void startup() {
1225       String n = Thread.currentThread().getName();
1226       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1227         @Override
1228         public void uncaughtException(final Thread t, final Throwable e) {
1229           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1230               + getCurrentPath(), e);
1231         }
1232       };
1233       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1234           + peerClusterZnode, handler);
1235       workerThreads.put(walGroupId, this);
1236     }
1237
1238     public Path getCurrentPath() {
1239       return this.currentPath;
1240     }
1241
1242     public long getCurrentPosition() {
1243       return this.repLogReader.getPosition();
1244     }
1245
1246     private boolean isWorkerActive() {
1247       return !stopper.isStopped() && workerRunning && !isInterrupted();
1248     }
1249
1250     private void terminate(String reason, Exception cause) {
1251       if (cause == null) {
1252         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1253
1254       } else {
1255         LOG.error("Closing worker for wal group " + this.walGroupId
1256             + " because an error occurred: " + reason, cause);
1257       }
1258       this.interrupt();
1259       Threads.shutdown(this, sleepForRetries);
1260       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1261     }
1262
1263     public void setWorkerRunning(boolean workerRunning) {
1264       this.workerRunning = workerRunning;
1265     }
1266   }
1267 }