View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.Stoppable;
50  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
51  import org.apache.hadoop.hbase.wal.WAL;
52  import org.apache.hadoop.hbase.wal.WALKey;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
55  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56  import org.apache.hadoop.hbase.replication.ReplicationException;
57  import org.apache.hadoop.hbase.replication.ReplicationPeers;
58  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
59  import org.apache.hadoop.hbase.replication.ReplicationQueues;
60  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
61  import org.apache.hadoop.hbase.replication.WALEntryFilter;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.CancelableProgressable;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.hbase.util.FSUtils;
66  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
67  import org.apache.hadoop.hbase.util.Threads;
68  
69  import com.google.common.collect.Lists;
70  import com.google.common.util.concurrent.ListenableFuture;
71  import com.google.common.util.concurrent.Service;
72  
73  /**
74   * Class that handles the source of a replication stream.
75   * Currently does not handle more than 1 slave
76   * For each slave cluster it selects a random number of peers
77   * using a replication ratio. For example, if replication ration = 0.1
78   * and slave cluster has 100 region servers, 10 will be selected.
79   * <p>
80   * A stream is considered down when we cannot contact a region server on the
81   * peer cluster for more than 55 seconds by default.
82   * </p>
83   *
84   */
85  @InterfaceAudience.Private
86  public class ReplicationSource extends Thread
87      implements ReplicationSourceInterface {
88  
89    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
90    // Queues of logs to process, entry in format of walGroupId->queue,
91    // each presents a queue for one wal group
92    private Map<String, PriorityBlockingQueue<Path>> queues =
93        new HashMap<String, PriorityBlockingQueue<Path>>();
94    // per group queue size, keep no more than this number of logs in each wal group
95    private int queueSizePerGroup;
96    private ReplicationQueues replicationQueues;
97    private ReplicationPeers replicationPeers;
98  
99    private Configuration conf;
100   private ReplicationQueueInfo replicationQueueInfo;
101   // id of the peer cluster this source replicates to
102   private String peerId;
103   // The manager of all sources to which we ping back our progress
104   private ReplicationSourceManager manager;
105   // Should we stop everything?
106   private Stoppable stopper;
107   // How long should we sleep for each retry
108   private long sleepForRetries;
109   // Max size in bytes of entriesArray
110   private long replicationQueueSizeCapacity;
111   // Max number of entries in entriesArray
112   private int replicationQueueNbCapacity;
113   private FileSystem fs;
114   // id of this cluster
115   private UUID clusterId;
116   // id of the other cluster
117   private UUID peerClusterId;
118   // total number of edits we replicated
119   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
120   // total number of edits we replicated
121   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
122   // The znode we currently play with
123   private String peerClusterZnode;
124   // Maximum number of retries before taking bold actions
125   private int maxRetriesMultiplier;
126   // Indicates if this particular source is running
127   private volatile boolean sourceRunning = false;
128   // Metrics for this source
129   private MetricsSource metrics;
130   //WARN threshold for the number of queued logs, defaults to 2
131   private int logQueueWarnThreshold;
132   // ReplicationEndpoint which will handle the actual replication
133   private ReplicationEndpoint replicationEndpoint;
134   // A filter (or a chain of filters) for the WAL entries.
135   private WALEntryFilter walEntryFilter;
136   // throttler
137   private ReplicationThrottler throttler;
138   private AtomicInteger logQueueSize = new AtomicInteger(0);
139   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
140       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
141 
142   /**
143    * Instantiation method used by region servers
144    *
145    * @param conf configuration to use
146    * @param fs file system to use
147    * @param manager replication manager to ping to
148    * @param stopper     the atomic boolean to use to stop the regionserver
149    * @param peerClusterZnode the name of our znode
150    * @param clusterId unique UUID for the cluster
151    * @param replicationEndpoint the replication endpoint implementation
152    * @param metrics metrics for replication source
153    * @throws IOException
154    */
155   @Override
156   public void init(final Configuration conf, final FileSystem fs,
157       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
158       final ReplicationPeers replicationPeers, final Stoppable stopper,
159       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
160       final MetricsSource metrics)
161           throws IOException {
162     this.stopper = stopper;
163     this.conf = HBaseConfiguration.create(conf);
164     decorateConf();
165     this.replicationQueueSizeCapacity =
166         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
167     this.replicationQueueNbCapacity =
168         this.conf.getInt("replication.source.nb.capacity", 25000);
169     this.sleepForRetries =
170         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
171     this.maxRetriesMultiplier =
172         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
173     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
174     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
175     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
176     this.replicationQueues = replicationQueues;
177     this.replicationPeers = replicationPeers;
178     this.manager = manager;
179     this.fs = fs;
180     this.metrics = metrics;
181     this.clusterId = clusterId;
182 
183     this.peerClusterZnode = peerClusterZnode;
184     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
185     // ReplicationQueueInfo parses the peerId out of the znode for us
186     this.peerId = this.replicationQueueInfo.getPeerId();
187     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
188     this.replicationEndpoint = replicationEndpoint;
189   }
190 
191   private void decorateConf() {
192     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
193     if (StringUtils.isNotEmpty(replicationCodec)) {
194       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
195     }
196   }
197 
198   @Override
199   public void enqueueLog(Path log) {
200     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
201     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
202     if (queue == null) {
203       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
204       queues.put(logPrefix, queue);
205       if (this.sourceRunning) {
206         // new wal group observed after source startup, start a new worker thread to track it
207         // notice: it's possible that log enqueued when this.running is set but worker thread
208         // still not launched, so it's necessary to check workerThreads before start the worker
209         final ReplicationSourceWorkerThread worker =
210             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
211         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
212         if (extant != null) {
213           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
214         } else {
215           LOG.debug("Starting up worker for wal group " + logPrefix);
216           worker.startup();
217         }
218       }
219     }
220     queue.put(log);
221     int queueSize = logQueueSize.incrementAndGet();
222     this.metrics.setSizeOfLogQueue(queueSize);
223     // This will log a warning for each new log that gets created above the warn threshold
224     if (queue.size() > this.logQueueWarnThreshold) {
225       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
226           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
227     }
228   }
229 
230   private void uninitialize() {
231     LOG.debug("Source exiting " + this.peerId);
232     metrics.clear();
233     if (replicationEndpoint.state() == Service.State.STARTING
234         || replicationEndpoint.state() == Service.State.RUNNING) {
235       replicationEndpoint.stopAndWait();
236     }
237   }
238 
239   @Override
240   public void run() {
241     // mark we are running now
242     this.sourceRunning = true;
243     try {
244       // start the endpoint, connect to the cluster
245       Service.State state = replicationEndpoint.start().get();
246       if (state != Service.State.RUNNING) {
247         LOG.warn("ReplicationEndpoint was not started. Exiting");
248         uninitialize();
249         return;
250       }
251     } catch (Exception ex) {
252       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
253       throw new RuntimeException(ex);
254     }
255 
256     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
257     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
258       (WALEntryFilter)new SystemTableWALEntryFilter());
259     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
260     if (filterFromEndpoint != null) {
261       filters.add(filterFromEndpoint);
262     }
263     this.walEntryFilter = new ChainWALEntryFilter(filters);
264 
265     int sleepMultiplier = 1;
266     // delay this until we are in an asynchronous thread
267     while (this.isSourceActive() && this.peerClusterId == null) {
268       this.peerClusterId = replicationEndpoint.getPeerUUID();
269       if (this.isSourceActive() && this.peerClusterId == null) {
270         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
271           sleepMultiplier++;
272         }
273       }
274     }
275 
276     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
277     // peerClusterId value, which is the same as the source clusterId
278     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
279       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
280           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
281           + replicationEndpoint.getClass().getName(), null, false);
282     }
283     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
284     // start workers
285     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
286       String walGroupId = entry.getKey();
287       PriorityBlockingQueue<Path> queue = entry.getValue();
288       final ReplicationSourceWorkerThread worker =
289           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
290       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
291       if (extant != null) {
292         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
293       } else {
294         LOG.debug("Starting up worker for wal group " + walGroupId);
295         worker.startup();
296       }
297     }
298   }
299 
300   /**
301    * Do the sleeping logic
302    * @param msg Why we sleep
303    * @param sleepMultiplier by how many times the default sleeping time is augmented
304    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
305    */
306   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
307     try {
308       if (LOG.isTraceEnabled()) {
309         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
310       }
311       Thread.sleep(this.sleepForRetries * sleepMultiplier);
312     } catch (InterruptedException e) {
313       LOG.debug("Interrupted while sleeping between retries");
314       Thread.currentThread().interrupt();
315     }
316     return sleepMultiplier < maxRetriesMultiplier;
317   }
318 
319   /**
320    * check whether the peer is enabled or not
321    *
322    * @return true if the peer is enabled, otherwise false
323    */
324   protected boolean isPeerEnabled() {
325     return this.replicationPeers.getStatusOfPeer(this.peerId);
326   }
327 
328   @Override
329   public void startup() {
330     String n = Thread.currentThread().getName();
331     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
332       @Override
333       public void uncaughtException(final Thread t, final Throwable e) {
334         LOG.error("Unexpected exception in ReplicationSource", e);
335       }
336     };
337     Threads
338         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
339   }
340 
341   @Override
342   public void terminate(String reason) {
343     terminate(reason, null);
344   }
345 
346   @Override
347   public void terminate(String reason, Exception cause) {
348     terminate(reason, cause, true);
349   }
350 
351   public void terminate(String reason, Exception cause, boolean join) {
352     if (cause == null) {
353       LOG.info("Closing source "
354           + this.peerClusterZnode + " because: " + reason);
355 
356     } else {
357       LOG.error("Closing source " + this.peerClusterZnode
358           + " because an error occurred: " + reason, cause);
359     }
360     this.sourceRunning = false;
361     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
362     for (ReplicationSourceWorkerThread worker : workers) {
363       worker.setWorkerRunning(false);
364       worker.interrupt();
365     }
366     ListenableFuture<Service.State> future = null;
367     if (this.replicationEndpoint != null) {
368       future = this.replicationEndpoint.stop();
369     }
370     if (join) {
371       for (ReplicationSourceWorkerThread worker : workers) {
372         Threads.shutdown(worker, this.sleepForRetries);
373         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
374       }
375       if (future != null) {
376         try {
377           future.get();
378         } catch (Exception e) {
379           LOG.warn("Got exception:" + e);
380         }
381       }
382     }
383   }
384 
385   @Override
386   public String getPeerClusterZnode() {
387     return this.peerClusterZnode;
388   }
389 
390   @Override
391   public String getPeerClusterId() {
392     return this.peerId;
393   }
394 
395   @Override
396   public Path getCurrentPath() {
397     // only for testing
398     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
399       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
400     }
401     return null;
402   }
403 
404   private boolean isSourceActive() {
405     return !this.stopper.isStopped() && this.sourceRunning;
406   }
407 
408   /**
409    * Comparator used to compare logs together based on their start time
410    */
411   public static class LogsComparator implements Comparator<Path> {
412 
413     @Override
414     public int compare(Path o1, Path o2) {
415       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
416     }
417 
418     /**
419      * Split a path to get the start time
420      * For example: 10.20.20.171%3A60020.1277499063250
421      * @param p path to split
422      * @return start time
423      */
424     private static long getTS(Path p) {
425       int tsIndex = p.getName().lastIndexOf('.') + 1;
426       return Long.parseLong(p.getName().substring(tsIndex));
427     }
428   }
429 
430   @Override
431   public String getStats() {
432     StringBuilder sb = new StringBuilder();
433     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
434         .append(", current progress: \n");
435     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
436       String walGroupId = entry.getKey();
437       ReplicationSourceWorkerThread worker = entry.getValue();
438       long position = worker.getCurrentPosition();
439       Path currentPath = worker.getCurrentPath();
440       sb.append("walGroup [").append(walGroupId).append("]: ");
441       if (currentPath != null) {
442         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
443             .append(position).append("\n");
444       } else {
445         sb.append("no replication ongoing, waiting for new log");
446       }
447     }
448     return sb.toString();
449   }
450 
451   /**
452    * Get Replication Source Metrics
453    * @return sourceMetrics
454    */
455   public MetricsSource getSourceMetrics() {
456     return this.metrics;
457   }
458 
459   public class ReplicationSourceWorkerThread extends Thread {
460     private ReplicationSource source;
461     private String walGroupId;
462     private PriorityBlockingQueue<Path> queue;
463     private ReplicationQueueInfo replicationQueueInfo;
464     // Our reader for the current log. open/close handled by repLogReader
465     private WAL.Reader reader;
466     // Last position in the log that we sent to ZooKeeper
467     private long lastLoggedPosition = -1;
468     // Path of the current log
469     private volatile Path currentPath;
470     // Handle on the log reader helper
471     private ReplicationWALReaderManager repLogReader;
472     // Current number of operations (Put/Delete) that we need to replicate
473     private int currentNbOperations = 0;
474     // Current size of data we need to replicate
475     private int currentSize = 0;
476     // Indicates whether this particular worker is running
477     private boolean workerRunning = true;
478 
479     public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
480         ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
481       this.walGroupId = walGroupId;
482       this.queue = queue;
483       this.replicationQueueInfo = replicationQueueInfo;
484       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
485       this.source = source;
486     }
487 
488     @Override
489     public void run() {
490       // If this is recovered, the queue is already full and the first log
491       // normally has a position (unless the RS failed between 2 logs)
492       if (this.replicationQueueInfo.isQueueRecovered()) {
493         try {
494           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
495             this.queue.peek().getName()));
496           if (LOG.isTraceEnabled()) {
497             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
498                 + this.repLogReader.getPosition());
499           }
500         } catch (ReplicationException e) {
501           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
502         }
503       }
504       // Loop until we close down
505       while (isWorkerActive()) {
506         int sleepMultiplier = 1;
507         // Sleep until replication is enabled again
508         if (!isPeerEnabled()) {
509           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
510             sleepMultiplier++;
511           }
512           continue;
513         }
514         Path oldPath = getCurrentPath(); //note that in the current scenario,
515                                          //oldPath will be null when a log roll
516                                          //happens.
517         // Get a new path
518         boolean hasCurrentPath = getNextPath();
519         if (getCurrentPath() != null && oldPath == null) {
520           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
521         }
522         if (!hasCurrentPath) {
523           if (sleepForRetries("No log to process", sleepMultiplier)) {
524             sleepMultiplier++;
525           }
526           continue;
527         }
528         boolean currentWALisBeingWrittenTo = false;
529         //For WAL files we own (rather than recovered), take a snapshot of whether the
530         //current WAL file (this.currentPath) is in use (for writing) NOW!
531         //Since the new WAL paths are enqueued only after the prev WAL file
532         //is 'closed', presence of an element in the queue means that
533         //the previous WAL file was closed, else the file is in use (currentPath)
534         //We take the snapshot now so that we are protected against races
535         //where a new file gets enqueued while the current file is being processed
536         //(and where we just finished reading the current file).
537         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
538           currentWALisBeingWrittenTo = true;
539         }
540         // Open a reader on it
541         if (!openReader(sleepMultiplier)) {
542           // Reset the sleep multiplier, else it'd be reused for the next file
543           sleepMultiplier = 1;
544           continue;
545         }
546 
547         // If we got a null reader but didn't continue, then sleep and continue
548         if (this.reader == null) {
549           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
550             sleepMultiplier++;
551           }
552           continue;
553         }
554 
555         boolean gotIOE = false;
556         currentNbOperations = 0;
557         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
558         currentSize = 0;
559         try {
560           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
561             continue;
562           }
563         } catch (IOException ioe) {
564           LOG.warn(peerClusterZnode + " Got: ", ioe);
565           gotIOE = true;
566           if (ioe.getCause() instanceof EOFException) {
567 
568             boolean considerDumping = false;
569             if (this.replicationQueueInfo.isQueueRecovered()) {
570               try {
571                 FileStatus stat = fs.getFileStatus(this.currentPath);
572                 if (stat.getLen() == 0) {
573                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
574                 }
575                 considerDumping = true;
576               } catch (IOException e) {
577                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
578               }
579             }
580 
581             if (considerDumping &&
582                 sleepMultiplier == maxRetriesMultiplier &&
583                 processEndOfFile()) {
584               continue;
585             }
586           }
587         } finally {
588           try {
589             this.reader = null;
590             this.repLogReader.closeReader();
591           } catch (IOException e) {
592             gotIOE = true;
593             LOG.warn("Unable to finalize the tailing of a file", e);
594           }
595         }
596 
597         // If we didn't get anything to replicate, or if we hit a IOE,
598         // wait a bit and retry.
599         // But if we need to stop, don't bother sleeping
600         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
601           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
602             manager.logPositionAndCleanOldLogs(this.currentPath,
603                 peerClusterZnode, this.repLogReader.getPosition(),
604                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
605             this.lastLoggedPosition = this.repLogReader.getPosition();
606           }
607           // Reset the sleep multiplier if nothing has actually gone wrong
608           if (!gotIOE) {
609             sleepMultiplier = 1;
610             // if there was nothing to ship and it's not an error
611             // set "ageOfLastShippedOp" to <now> to indicate that we're current
612             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
613           }
614           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
615             sleepMultiplier++;
616           }
617           continue;
618         }
619         sleepMultiplier = 1;
620         shipEdits(currentWALisBeingWrittenTo, entries);
621       }
622       if (replicationQueueInfo.isQueueRecovered()) {
623         // use synchronize to make sure one last thread will clean the queue
624         synchronized (workerThreads) {
625           Threads.sleep(100);// wait a short while for other worker thread to fully exit
626           boolean allOtherTaskDone = true;
627           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
628             if (!worker.equals(this) && worker.isAlive()) {
629               allOtherTaskDone = false;
630               break;
631             }
632           }
633           if (allOtherTaskDone) {
634             manager.closeRecoveredQueue(this.source);
635             LOG.info("Finished recovering queue " + peerClusterZnode
636                 + " with the following stats: " + getStats());
637           }
638         }
639       }
640     }
641 
642     /**
643      * Read all the entries from the current log files and retain those that need to be replicated.
644      * Else, process the end of the current file.
645      * @param currentWALisBeingWrittenTo is the current WAL being written to
646      * @param entries resulting entries to be replicated
647      * @return true if we got nothing and went to the next file, false if we got entries
648      * @throws IOException
649      */
650     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
651         List<WAL.Entry> entries) throws IOException {
652       long seenEntries = 0;
653       if (LOG.isTraceEnabled()) {
654         LOG.trace("Seeking in " + this.currentPath + " at position "
655             + this.repLogReader.getPosition());
656       }
657       this.repLogReader.seek();
658       long positionBeforeRead = this.repLogReader.getPosition();
659       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
660       while (entry != null) {
661         metrics.incrLogEditsRead();
662         seenEntries++;
663 
664         // don't replicate if the log entries have already been consumed by the cluster
665         if (replicationEndpoint.canReplicateToSameCluster()
666             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
667           // Remove all KVs that should not be replicated
668           entry = walEntryFilter.filter(entry);
669           WALEdit edit = null;
670           WALKey logKey = null;
671           if (entry != null) {
672             edit = entry.getEdit();
673             logKey = entry.getKey();
674           }
675 
676           if (edit != null && edit.size() != 0) {
677             // Mark that the current cluster has the change
678             logKey.addClusterId(clusterId);
679             currentNbOperations += countDistinctRowKeys(edit);
680             entries.add(entry);
681             currentSize += entry.getEdit().heapSize();
682           } else {
683             metrics.incrLogEditsFiltered();
684           }
685         }
686         // Stop if too many entries or too big
687         // FIXME check the relationship between single wal group and overall
688         if (currentSize >= replicationQueueSizeCapacity
689             || entries.size() >= replicationQueueNbCapacity) {
690           break;
691         }
692         try {
693           entry = this.repLogReader.readNextAndSetPosition();
694         } catch (IOException ie) {
695           LOG.debug("Break on IOE: " + ie.getMessage());
696           break;
697         }
698       }
699       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
700       if (currentWALisBeingWrittenTo) {
701         return false;
702       }
703       // If we didn't get anything and the queue has an object, it means we
704       // hit the end of the file for sure
705       return seenEntries == 0 && processEndOfFile();
706     }
707 
708     /**
709      * Poll for the next path
710      * @return true if a path was obtained, false if not
711      */
712     protected boolean getNextPath() {
713       try {
714         if (this.currentPath == null) {
715           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
716           int queueSize = logQueueSize.decrementAndGet();
717           metrics.setSizeOfLogQueue(queueSize);
718           if (this.currentPath != null) {
719             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
720             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
721               this.replicationQueueInfo.isQueueRecovered());
722             if (LOG.isTraceEnabled()) {
723               LOG.trace("New log: " + this.currentPath);
724             }
725           }
726         }
727       } catch (InterruptedException e) {
728         LOG.warn("Interrupted while reading edits", e);
729       }
730       return this.currentPath != null;
731     }
732 
733     /**
734      * Open a reader on the current path
735      *
736      * @param sleepMultiplier by how many times the default sleeping time is augmented
737      * @return true if we should continue with that file, false if we are over with it
738      */
739     protected boolean openReader(int sleepMultiplier) {
740       try {
741         try {
742           if (LOG.isTraceEnabled()) {
743             LOG.trace("Opening log " + this.currentPath);
744           }
745           this.reader = repLogReader.openReader(this.currentPath);
746         } catch (FileNotFoundException fnfe) {
747           if (this.replicationQueueInfo.isQueueRecovered()) {
748             // We didn't find the log in the archive directory, look if it still
749             // exists in the dead RS folder (there could be a chain of failures
750             // to look at)
751             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
752             LOG.info("NB dead servers : " + deadRegionServers.size());
753             final Path rootDir = FSUtils.getRootDir(conf);
754             for (String curDeadServerName : deadRegionServers) {
755               final Path deadRsDirectory = new Path(rootDir,
756                   DefaultWALProvider.getWALDirectoryName(curDeadServerName));
757               Path[] locs = new Path[] {
758                   new Path(deadRsDirectory, currentPath.getName()),
759                   new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
760                                             currentPath.getName()),
761               };
762               for (Path possibleLogLocation : locs) {
763                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
764                 if (manager.getFs().exists(possibleLogLocation)) {
765                   // We found the right new location
766                   LOG.info("Log " + this.currentPath + " still exists at " +
767                       possibleLogLocation);
768                   // Breaking here will make us sleep since reader is null
769                   // TODO why don't we need to set currentPath and call openReader here?
770                   return true;
771                 }
772               }
773             }
774             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
775             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
776             if (stopper instanceof ReplicationSyncUp.DummyServer) {
777               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
778               //      area rather than to the wal area for a particular region server.
779               FileStatus[] rss = fs.listStatus(manager.getLogDir());
780               for (FileStatus rs : rss) {
781                 Path p = rs.getPath();
782                 FileStatus[] logs = fs.listStatus(p);
783                 for (FileStatus log : logs) {
784                   p = new Path(p, log.getPath().getName());
785                   if (p.getName().equals(currentPath.getName())) {
786                     currentPath = p;
787                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
788                     // Open the log at the new location
789                     this.openReader(sleepMultiplier);
790                     return true;
791                   }
792                 }
793               }
794             }
795 
796             // TODO What happens if the log was missing from every single location?
797             // Although we need to check a couple of times as the log could have
798             // been moved by the master between the checks
799             // It can also happen if a recovered queue wasn't properly cleaned,
800             // such that the znode pointing to a log exists but the log was
801             // deleted a long time ago.
802             // For the moment, we'll throw the IO and processEndOfFile
803             throw new IOException("File from recovered queue is " +
804                 "nowhere to be found", fnfe);
805           } else {
806             // If the log was archived, continue reading from there
807             Path archivedLogLocation =
808                 new Path(manager.getOldLogDir(), currentPath.getName());
809             if (manager.getFs().exists(archivedLogLocation)) {
810               currentPath = archivedLogLocation;
811               LOG.info("Log " + this.currentPath + " was moved to " +
812                   archivedLogLocation);
813               // Open the log at the new location
814               this.openReader(sleepMultiplier);
815 
816             }
817             // TODO What happens the log is missing in both places?
818           }
819         }
820       } catch (LeaseNotRecoveredException lnre) {
821         // HBASE-15019 the WAL was not closed due to some hiccup.
822         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
823         recoverLease(conf, currentPath);
824         this.reader = null;
825       } catch (IOException ioe) {
826         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
827         LOG.warn(peerClusterZnode + " Got: ", ioe);
828         this.reader = null;
829         if (ioe.getCause() instanceof NullPointerException) {
830           // Workaround for race condition in HDFS-4380
831           // which throws a NPE if we open a file before any data node has the most recent block
832           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
833           LOG.warn("Got NPE opening reader, will retry.");
834         } else if (sleepMultiplier >= maxRetriesMultiplier) {
835           // TODO Need a better way to determine if a file is really gone but
836           // TODO without scanning all logs dir
837           LOG.warn("Waited too long for this file, considering dumping");
838           return !processEndOfFile();
839         }
840       }
841       return true;
842     }
843 
844     private void recoverLease(final Configuration conf, final Path path) {
845       try {
846         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
847         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
848         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
849           @Override
850           public boolean progress() {
851             LOG.debug("recover WAL lease: " + path);
852             return isWorkerActive();
853           }
854         });
855       } catch (IOException e) {
856         LOG.warn("unable to recover lease for WAL: " + path, e);
857       }
858     }
859 
860     /*
861      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
862      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
863      * trying to read the log file and get EOFException. In case of a recovered queue the last log
864      * file may be empty, and we don't want to retry that.
865      */
866     private boolean isCurrentLogEmpty() {
867       return (this.repLogReader.getPosition() == 0 &&
868           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
869     }
870 
871     /**
872      * Count the number of different row keys in the given edit because of mini-batching. We assume
873      * that there's at least one Cell in the WALEdit.
874      * @param edit edit to count row keys from
875      * @return number of different row keys
876      */
877     private int countDistinctRowKeys(WALEdit edit) {
878       List<Cell> cells = edit.getCells();
879       int distinctRowKeys = 1;
880       Cell lastCell = cells.get(0);
881       for (int i = 0; i < edit.size(); i++) {
882         if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
883           distinctRowKeys++;
884         }
885       }
886       return distinctRowKeys;
887     }
888 
889     /**
890      * Do the shipping logic
891      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
892      * written to when this method was called
893      */
894     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
895       int sleepMultiplier = 0;
896       if (entries.isEmpty()) {
897         LOG.warn("Was given 0 edits to ship");
898         return;
899       }
900       while (isWorkerActive()) {
901         try {
902           if (throttler.isEnabled()) {
903             long sleepTicks = throttler.getNextSleepInterval(currentSize);
904             if (sleepTicks > 0) {
905               try {
906                 if (LOG.isTraceEnabled()) {
907                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
908                 }
909                 Thread.sleep(sleepTicks);
910               } catch (InterruptedException e) {
911                 LOG.debug("Interrupted while sleeping for throttling control");
912                 Thread.currentThread().interrupt();
913                 // current thread might be interrupted to terminate
914                 // directly go back to while() for confirm this
915                 continue;
916               }
917               // reset throttler's cycle start tick when sleep for throttling occurs
918               throttler.resetStartTick();
919             }
920           }
921           // create replicateContext here, so the entries can be GC'd upon return from this call
922           // stack
923           ReplicationEndpoint.ReplicateContext replicateContext =
924               new ReplicationEndpoint.ReplicateContext();
925           replicateContext.setEntries(entries).setSize(currentSize);
926           replicateContext.setWalGroupId(walGroupId);
927 
928           long startTimeNs = System.nanoTime();
929           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
930           boolean replicated = replicationEndpoint.replicate(replicateContext);
931           long endTimeNs = System.nanoTime();
932 
933           if (!replicated) {
934             continue;
935           } else {
936             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
937           }
938 
939           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
940             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
941               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
942               currentWALisBeingWrittenTo);
943             this.lastLoggedPosition = this.repLogReader.getPosition();
944           }
945           if (throttler.isEnabled()) {
946             throttler.addPushSize(currentSize);
947           }
948           totalReplicatedEdits.addAndGet(entries.size());
949           totalReplicatedOperations.addAndGet(currentNbOperations);
950           // FIXME check relationship between wal group and overall
951           metrics.shipBatch(currentNbOperations, currentSize / 1024);
952           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
953             walGroupId);
954           if (LOG.isTraceEnabled()) {
955             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
956                 + totalReplicatedOperations + " operations in "
957                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
958           }
959           break;
960         } catch (Exception ex) {
961           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
962               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
963           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
964             sleepMultiplier++;
965           }
966         }
967       }
968     }
969 
970     /**
971      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
972      * we're done! Else we'll just continue to try reading the log file
973      * @return true if we're done with the current file, false if we should continue trying to read
974      *         from it
975      */
976     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
977         justification = "Yeah, this is how it works")
978     protected boolean processEndOfFile() {
979       // We presume this means the file we're reading is closed.
980       if (this.queue.size() != 0) {
981         // -1 means the wal wasn't closed cleanly.
982         final long trailerSize = this.repLogReader.currentTrailerSize();
983         final long currentPosition = this.repLogReader.getPosition();
984         FileStatus stat = null;
985         try {
986           stat = fs.getFileStatus(this.currentPath);
987         } catch (IOException exception) {
988           LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
989               + ", stats: " + getStats());
990           metrics.incrUnknownFileLengthForClosedWAL();
991         }
992         if (stat != null) {
993           if (trailerSize < 0) {
994             if (currentPosition < stat.getLen()) {
995               final long skippedBytes = stat.getLen() - currentPosition;
996               LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
997               metrics.incrUncleanlyClosedWALs();
998               metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
999             }
1000           } else if (currentPosition + trailerSize < stat.getLen()){
1001             LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
1002                 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
1003             repLogReader.setPosition(0);
1004             metrics.incrRestartedWALReading();
1005             metrics.incrRepeatedFileBytes(currentPosition);
1006             return false;
1007           }
1008         }
1009         if (LOG.isTraceEnabled()) {
1010           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1011               + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
1012         }
1013         this.currentPath = null;
1014         this.repLogReader.finishCurrentFile();
1015         this.reader = null;
1016         metrics.incrCompletedWAL();
1017         return true;
1018       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1019         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1020             + peerClusterZnode);
1021         metrics.incrCompletedRecoveryQueue();
1022         workerRunning = false;
1023         return true;
1024       }
1025       return false;
1026     }
1027 
1028     public void startup() {
1029       String n = Thread.currentThread().getName();
1030       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1031         @Override
1032         public void uncaughtException(final Thread t, final Throwable e) {
1033           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1034               + getCurrentPath(), e);
1035         }
1036       };
1037       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1038           + peerClusterZnode, handler);
1039       workerThreads.put(walGroupId, this);
1040     }
1041 
1042     public Path getCurrentPath() {
1043       return this.currentPath;
1044     }
1045 
1046     public long getCurrentPosition() {
1047       return this.repLogReader.getPosition();
1048     }
1049 
1050     private boolean isWorkerActive() {
1051       return !stopper.isStopped() && workerRunning && !isInterrupted();
1052     }
1053 
1054     private void terminate(String reason, Exception cause) {
1055       if (cause == null) {
1056         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1057 
1058       } else {
1059         LOG.error("Closing worker for wal group " + this.walGroupId
1060             + " because an error occurred: " + reason, cause);
1061       }
1062       this.interrupt();
1063       Threads.shutdown(this, sleepForRetries);
1064       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1065     }
1066 
1067     public void setWorkerRunning(boolean workerRunning) {
1068       this.workerRunning = workerRunning;
1069     }
1070   }
1071 }