View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.Stoppable;
50  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
51  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
53  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
59  import org.apache.hadoop.hbase.replication.WALEntryFilter;
60  import org.apache.hadoop.hbase.util.CancelableProgressable;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66  import org.apache.hadoop.hbase.wal.WAL;
67  import org.apache.hadoop.hbase.wal.WALKey;
68  
69  import com.google.common.collect.Lists;
70  import com.google.common.util.concurrent.ListenableFuture;
71  import com.google.common.util.concurrent.Service;
72  
73  /**
74   * Class that handles the source of a replication stream.
75   * Currently does not handle more than 1 slave
76   * For each slave cluster it selects a random number of peers
77   * using a replication ratio. For example, if replication ration = 0.1
78   * and slave cluster has 100 region servers, 10 will be selected.
79   * <p>
80   * A stream is considered down when we cannot contact a region server on the
81   * peer cluster for more than 55 seconds by default.
82   * </p>
83   *
84   */
85  @InterfaceAudience.Private
86  public class ReplicationSource extends Thread
87      implements ReplicationSourceInterface {
88  
89    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
90    // Queues of logs to process, entry in format of walGroupId->queue,
91    // each presents a queue for one wal group
92    private Map<String, PriorityBlockingQueue<Path>> queues =
93        new HashMap<String, PriorityBlockingQueue<Path>>();
94    // per group queue size, keep no more than this number of logs in each wal group
95    private int queueSizePerGroup;
96    private ReplicationQueues replicationQueues;
97    private ReplicationPeers replicationPeers;
98  
99    private Configuration conf;
100   private ReplicationQueueInfo replicationQueueInfo;
101   // id of the peer cluster this source replicates to
102   private String peerId;
103   // The manager of all sources to which we ping back our progress
104   private ReplicationSourceManager manager;
105   // Should we stop everything?
106   private Stoppable stopper;
107   // How long should we sleep for each retry
108   private long sleepForRetries;
109   // Max size in bytes of entriesArray
110   private long replicationQueueSizeCapacity;
111   // Max number of entries in entriesArray
112   private int replicationQueueNbCapacity;
113   private FileSystem fs;
114   // id of this cluster
115   private UUID clusterId;
116   // id of the other cluster
117   private UUID peerClusterId;
118   // total number of edits we replicated
119   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
120   // total number of edits we replicated
121   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
122   // The znode we currently play with
123   private String peerClusterZnode;
124   // Maximum number of retries before taking bold actions
125   private int maxRetriesMultiplier;
126   // Indicates if this particular source is running
127   private volatile boolean sourceRunning = false;
128   // Metrics for this source
129   private MetricsSource metrics;
130   //WARN threshold for the number of queued logs, defaults to 2
131   private int logQueueWarnThreshold;
132   // ReplicationEndpoint which will handle the actual replication
133   private ReplicationEndpoint replicationEndpoint;
134   // A filter (or a chain of filters) for the WAL entries.
135   private WALEntryFilter walEntryFilter;
136   // throttler
137   private ReplicationThrottler throttler;
138   private AtomicInteger logQueueSize = new AtomicInteger(0);
139   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
140       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
141 
142   /**
143    * Instantiation method used by region servers
144    *
145    * @param conf configuration to use
146    * @param fs file system to use
147    * @param manager replication manager to ping to
148    * @param stopper     the atomic boolean to use to stop the regionserver
149    * @param peerClusterZnode the name of our znode
150    * @param clusterId unique UUID for the cluster
151    * @param replicationEndpoint the replication endpoint implementation
152    * @param metrics metrics for replication source
153    * @throws IOException
154    */
155   @Override
156   public void init(final Configuration conf, final FileSystem fs,
157       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
158       final ReplicationPeers replicationPeers, final Stoppable stopper,
159       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
160       final MetricsSource metrics)
161           throws IOException {
162     this.stopper = stopper;
163     this.conf = HBaseConfiguration.create(conf);
164     decorateConf();
165     this.replicationQueueSizeCapacity =
166         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
167     this.replicationQueueNbCapacity =
168         this.conf.getInt("replication.source.nb.capacity", 25000);
169     this.sleepForRetries =
170         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
171     this.maxRetriesMultiplier =
172         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
173     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
174     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
175     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
176     this.replicationQueues = replicationQueues;
177     this.replicationPeers = replicationPeers;
178     this.manager = manager;
179     this.fs = fs;
180     this.metrics = metrics;
181     this.clusterId = clusterId;
182 
183     this.peerClusterZnode = peerClusterZnode;
184     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
185     // ReplicationQueueInfo parses the peerId out of the znode for us
186     this.peerId = this.replicationQueueInfo.getPeerId();
187     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
188     this.replicationEndpoint = replicationEndpoint;
189   }
190 
191   private void decorateConf() {
192     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
193     if (StringUtils.isNotEmpty(replicationCodec)) {
194       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
195     }
196   }
197 
198   @Override
199   public void enqueueLog(Path log) {
200     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
201     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
202     if (queue == null) {
203       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
204       queues.put(logPrefix, queue);
205       if (this.sourceRunning) {
206         // new wal group observed after source startup, start a new worker thread to track it
207         // notice: it's possible that log enqueued when this.running is set but worker thread
208         // still not launched, so it's necessary to check workerThreads before start the worker
209         final ReplicationSourceWorkerThread worker =
210             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
211         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
212         if (extant != null) {
213           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
214         } else {
215           LOG.debug("Starting up worker for wal group " + logPrefix);
216           worker.startup();
217         }
218       }
219     }
220     queue.put(log);
221     int queueSize = logQueueSize.incrementAndGet();
222     this.metrics.setSizeOfLogQueue(queueSize);
223     // This will log a warning for each new log that gets created above the warn threshold
224     if (queue.size() > this.logQueueWarnThreshold) {
225       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
226           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
227     }
228   }
229 
230   private void uninitialize() {
231     LOG.debug("Source exiting " + this.peerId);
232     metrics.clear();
233     if (replicationEndpoint.state() == Service.State.STARTING
234         || replicationEndpoint.state() == Service.State.RUNNING) {
235       replicationEndpoint.stopAndWait();
236     }
237   }
238 
239   @Override
240   public void run() {
241     // mark we are running now
242     this.sourceRunning = true;
243     try {
244       // start the endpoint, connect to the cluster
245       Service.State state = replicationEndpoint.start().get();
246       if (state != Service.State.RUNNING) {
247         LOG.warn("ReplicationEndpoint was not started. Exiting");
248         uninitialize();
249         return;
250       }
251     } catch (Exception ex) {
252       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
253       throw new RuntimeException(ex);
254     }
255 
256     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
257     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
258       (WALEntryFilter)new SystemTableWALEntryFilter());
259     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
260     if (filterFromEndpoint != null) {
261       filters.add(filterFromEndpoint);
262     }
263     this.walEntryFilter = new ChainWALEntryFilter(filters);
264 
265     int sleepMultiplier = 1;
266     // delay this until we are in an asynchronous thread
267     while (this.isSourceActive() && this.peerClusterId == null) {
268       this.peerClusterId = replicationEndpoint.getPeerUUID();
269       if (this.isSourceActive() && this.peerClusterId == null) {
270         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
271           sleepMultiplier++;
272         }
273       }
274     }
275 
276     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
277     // peerClusterId value, which is the same as the source clusterId
278     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
279       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
280           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
281           + replicationEndpoint.getClass().getName(), null, false);
282       this.manager.closeQueue(this);
283       return;
284     }
285     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
286     // start workers
287     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
288       String walGroupId = entry.getKey();
289       PriorityBlockingQueue<Path> queue = entry.getValue();
290       final ReplicationSourceWorkerThread worker =
291           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
292       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
293       if (extant != null) {
294         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
295       } else {
296         LOG.debug("Starting up worker for wal group " + walGroupId);
297         worker.startup();
298       }
299     }
300   }
301 
302   /**
303    * Do the sleeping logic
304    * @param msg Why we sleep
305    * @param sleepMultiplier by how many times the default sleeping time is augmented
306    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
307    */
308   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
309     try {
310       if (LOG.isTraceEnabled()) {
311         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
312       }
313       Thread.sleep(this.sleepForRetries * sleepMultiplier);
314     } catch (InterruptedException e) {
315       LOG.debug("Interrupted while sleeping between retries");
316       Thread.currentThread().interrupt();
317     }
318     return sleepMultiplier < maxRetriesMultiplier;
319   }
320 
321   /**
322    * check whether the peer is enabled or not
323    *
324    * @return true if the peer is enabled, otherwise false
325    */
326   protected boolean isPeerEnabled() {
327     return this.replicationPeers.getStatusOfPeer(this.peerId);
328   }
329 
330   @Override
331   public void startup() {
332     String n = Thread.currentThread().getName();
333     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
334       @Override
335       public void uncaughtException(final Thread t, final Throwable e) {
336         LOG.error("Unexpected exception in ReplicationSource", e);
337       }
338     };
339     Threads
340         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
341   }
342 
343   @Override
344   public void terminate(String reason) {
345     terminate(reason, null);
346   }
347 
348   @Override
349   public void terminate(String reason, Exception cause) {
350     terminate(reason, cause, true);
351   }
352 
353   public void terminate(String reason, Exception cause, boolean join) {
354     if (cause == null) {
355       LOG.info("Closing source "
356           + this.peerClusterZnode + " because: " + reason);
357 
358     } else {
359       LOG.error("Closing source " + this.peerClusterZnode
360           + " because an error occurred: " + reason, cause);
361     }
362     this.sourceRunning = false;
363     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
364     for (ReplicationSourceWorkerThread worker : workers) {
365       worker.setWorkerRunning(false);
366       worker.interrupt();
367     }
368     ListenableFuture<Service.State> future = null;
369     if (this.replicationEndpoint != null) {
370       future = this.replicationEndpoint.stop();
371     }
372     if (join) {
373       for (ReplicationSourceWorkerThread worker : workers) {
374         Threads.shutdown(worker, this.sleepForRetries);
375         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
376       }
377       if (future != null) {
378         try {
379           future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
380         } catch (Exception e) {
381           LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
382               + this.peerClusterZnode,
383             e);
384         }
385       }
386     }
387   }
388 
389   @Override
390   public String getPeerClusterZnode() {
391     return this.peerClusterZnode;
392   }
393 
394   @Override
395   public String getPeerClusterId() {
396     return this.peerId;
397   }
398 
399   @Override
400   public Path getCurrentPath() {
401     // only for testing
402     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
403       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
404     }
405     return null;
406   }
407 
408   private boolean isSourceActive() {
409     return !this.stopper.isStopped() && this.sourceRunning;
410   }
411 
412   /**
413    * Comparator used to compare logs together based on their start time
414    */
415   public static class LogsComparator implements Comparator<Path> {
416 
417     @Override
418     public int compare(Path o1, Path o2) {
419       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
420     }
421 
422     /**
423      * Split a path to get the start time
424      * For example: 10.20.20.171%3A60020.1277499063250
425      * @param p path to split
426      * @return start time
427      */
428     private static long getTS(Path p) {
429       int tsIndex = p.getName().lastIndexOf('.') + 1;
430       return Long.parseLong(p.getName().substring(tsIndex));
431     }
432   }
433 
434   @Override
435   public String getStats() {
436     StringBuilder sb = new StringBuilder();
437     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
438         .append(", current progress: \n");
439     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
440       String walGroupId = entry.getKey();
441       ReplicationSourceWorkerThread worker = entry.getValue();
442       long position = worker.getCurrentPosition();
443       Path currentPath = worker.getCurrentPath();
444       sb.append("walGroup [").append(walGroupId).append("]: ");
445       if (currentPath != null) {
446         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
447             .append(position).append("\n");
448       } else {
449         sb.append("no replication ongoing, waiting for new log");
450       }
451     }
452     return sb.toString();
453   }
454 
455   /**
456    * Get Replication Source Metrics
457    * @return sourceMetrics
458    */
459   public MetricsSource getSourceMetrics() {
460     return this.metrics;
461   }
462 
463   public class ReplicationSourceWorkerThread extends Thread {
464     private ReplicationSource source;
465     private String walGroupId;
466     private PriorityBlockingQueue<Path> queue;
467     private ReplicationQueueInfo replicationQueueInfo;
468     // Our reader for the current log. open/close handled by repLogReader
469     private WAL.Reader reader;
470     // Last position in the log that we sent to ZooKeeper
471     private long lastLoggedPosition = -1;
472     // Path of the current log
473     private volatile Path currentPath;
474     // Handle on the log reader helper
475     private ReplicationWALReaderManager repLogReader;
476     // Current number of operations (Put/Delete) that we need to replicate
477     private int currentNbOperations = 0;
478     // Current size of data we need to replicate
479     private int currentSize = 0;
480     // Indicates whether this particular worker is running
481     private boolean workerRunning = true;
482 
483     public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
484         ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
485       this.walGroupId = walGroupId;
486       this.queue = queue;
487       this.replicationQueueInfo = replicationQueueInfo;
488       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
489       this.source = source;
490     }
491 
492     @Override
493     public void run() {
494       // If this is recovered, the queue is already full and the first log
495       // normally has a position (unless the RS failed between 2 logs)
496       if (this.replicationQueueInfo.isQueueRecovered()) {
497         try {
498           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
499             this.queue.peek().getName()));
500           if (LOG.isTraceEnabled()) {
501             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
502                 + this.repLogReader.getPosition());
503           }
504         } catch (ReplicationException e) {
505           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
506         }
507       }
508       // Loop until we close down
509       while (isWorkerActive()) {
510         int sleepMultiplier = 1;
511         // Sleep until replication is enabled again
512         if (!isPeerEnabled()) {
513           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
514             sleepMultiplier++;
515           }
516           continue;
517         }
518         Path oldPath = getCurrentPath(); //note that in the current scenario,
519                                          //oldPath will be null when a log roll
520                                          //happens.
521         // Get a new path
522         boolean hasCurrentPath = getNextPath();
523         if (getCurrentPath() != null && oldPath == null) {
524           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
525         }
526         if (!hasCurrentPath) {
527           if (sleepForRetries("No log to process", sleepMultiplier)) {
528             sleepMultiplier++;
529           }
530           continue;
531         }
532         boolean currentWALisBeingWrittenTo = false;
533         //For WAL files we own (rather than recovered), take a snapshot of whether the
534         //current WAL file (this.currentPath) is in use (for writing) NOW!
535         //Since the new WAL paths are enqueued only after the prev WAL file
536         //is 'closed', presence of an element in the queue means that
537         //the previous WAL file was closed, else the file is in use (currentPath)
538         //We take the snapshot now so that we are protected against races
539         //where a new file gets enqueued while the current file is being processed
540         //(and where we just finished reading the current file).
541         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
542           currentWALisBeingWrittenTo = true;
543         }
544         // Open a reader on it
545         if (!openReader(sleepMultiplier)) {
546           // Reset the sleep multiplier, else it'd be reused for the next file
547           sleepMultiplier = 1;
548           continue;
549         }
550 
551         // If we got a null reader but didn't continue, then sleep and continue
552         if (this.reader == null) {
553           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
554             sleepMultiplier++;
555           }
556           continue;
557         }
558 
559         boolean gotIOE = false;
560         currentNbOperations = 0;
561         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
562         currentSize = 0;
563         try {
564           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
565             continue;
566           }
567         } catch (IOException ioe) {
568           LOG.warn(peerClusterZnode + " Got: ", ioe);
569           gotIOE = true;
570           if (ioe.getCause() instanceof EOFException) {
571 
572             boolean considerDumping = false;
573             if (this.replicationQueueInfo.isQueueRecovered()) {
574               try {
575                 FileStatus stat = fs.getFileStatus(this.currentPath);
576                 if (stat.getLen() == 0) {
577                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
578                 }
579                 considerDumping = true;
580               } catch (IOException e) {
581                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
582               }
583             }
584 
585             if (considerDumping &&
586                 sleepMultiplier == maxRetriesMultiplier &&
587                 processEndOfFile()) {
588               continue;
589             }
590           }
591         } finally {
592           try {
593             this.reader = null;
594             this.repLogReader.closeReader();
595           } catch (IOException e) {
596             gotIOE = true;
597             LOG.warn("Unable to finalize the tailing of a file", e);
598           }
599         }
600 
601         // If we didn't get anything to replicate, or if we hit a IOE,
602         // wait a bit and retry.
603         // But if we need to stop, don't bother sleeping
604         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
605           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
606             manager.logPositionAndCleanOldLogs(this.currentPath,
607                 peerClusterZnode, this.repLogReader.getPosition(),
608                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
609             this.lastLoggedPosition = this.repLogReader.getPosition();
610           }
611           // Reset the sleep multiplier if nothing has actually gone wrong
612           if (!gotIOE) {
613             sleepMultiplier = 1;
614             // if there was nothing to ship and it's not an error
615             // set "ageOfLastShippedOp" to <now> to indicate that we're current
616             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
617           }
618           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
619             sleepMultiplier++;
620           }
621           continue;
622         }
623         sleepMultiplier = 1;
624         shipEdits(currentWALisBeingWrittenTo, entries);
625       }
626       if (replicationQueueInfo.isQueueRecovered()) {
627         // use synchronize to make sure one last thread will clean the queue
628         synchronized (workerThreads) {
629           Threads.sleep(100);// wait a short while for other worker thread to fully exit
630           boolean allOtherTaskDone = true;
631           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
632             if (!worker.equals(this) && worker.isAlive()) {
633               allOtherTaskDone = false;
634               break;
635             }
636           }
637           if (allOtherTaskDone) {
638             manager.closeRecoveredQueue(this.source);
639             LOG.info("Finished recovering queue " + peerClusterZnode
640                 + " with the following stats: " + getStats());
641           }
642         }
643       }
644     }
645 
646     /**
647      * Read all the entries from the current log files and retain those that need to be replicated.
648      * Else, process the end of the current file.
649      * @param currentWALisBeingWrittenTo is the current WAL being written to
650      * @param entries resulting entries to be replicated
651      * @return true if we got nothing and went to the next file, false if we got entries
652      * @throws IOException
653      */
654     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
655         List<WAL.Entry> entries) throws IOException {
656       long seenEntries = 0;
657       if (LOG.isTraceEnabled()) {
658         LOG.trace("Seeking in " + this.currentPath + " at position "
659             + this.repLogReader.getPosition());
660       }
661       this.repLogReader.seek();
662       long positionBeforeRead = this.repLogReader.getPosition();
663       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
664       while (entry != null) {
665         metrics.incrLogEditsRead();
666         seenEntries++;
667 
668         // don't replicate if the log entries have already been consumed by the cluster
669         if (replicationEndpoint.canReplicateToSameCluster()
670             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
671           // Remove all KVs that should not be replicated
672           entry = walEntryFilter.filter(entry);
673           WALEdit edit = null;
674           WALKey logKey = null;
675           if (entry != null) {
676             edit = entry.getEdit();
677             logKey = entry.getKey();
678           }
679 
680           if (edit != null && edit.size() != 0) {
681             // Mark that the current cluster has the change
682             logKey.addClusterId(clusterId);
683             currentNbOperations += countDistinctRowKeys(edit);
684             entries.add(entry);
685             currentSize += entry.getEdit().heapSize();
686           } else {
687             metrics.incrLogEditsFiltered();
688           }
689         }
690         // Stop if too many entries or too big
691         // FIXME check the relationship between single wal group and overall
692         if (currentSize >= replicationQueueSizeCapacity
693             || entries.size() >= replicationQueueNbCapacity) {
694           break;
695         }
696         try {
697           entry = this.repLogReader.readNextAndSetPosition();
698         } catch (IOException ie) {
699           LOG.debug("Break on IOE: " + ie.getMessage());
700           break;
701         }
702       }
703       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
704       if (currentWALisBeingWrittenTo) {
705         return false;
706       }
707       // If we didn't get anything and the queue has an object, it means we
708       // hit the end of the file for sure
709       return seenEntries == 0 && processEndOfFile();
710     }
711 
712     /**
713      * Poll for the next path
714      * @return true if a path was obtained, false if not
715      */
716     protected boolean getNextPath() {
717       try {
718         if (this.currentPath == null) {
719           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
720           int queueSize = logQueueSize.decrementAndGet();
721           metrics.setSizeOfLogQueue(queueSize);
722           if (this.currentPath != null) {
723             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
724             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
725               this.replicationQueueInfo.isQueueRecovered());
726             if (LOG.isTraceEnabled()) {
727               LOG.trace("New log: " + this.currentPath);
728             }
729           }
730         }
731       } catch (InterruptedException e) {
732         LOG.warn("Interrupted while reading edits", e);
733       }
734       return this.currentPath != null;
735     }
736 
737     /**
738      * Open a reader on the current path
739      *
740      * @param sleepMultiplier by how many times the default sleeping time is augmented
741      * @return true if we should continue with that file, false if we are over with it
742      */
743     protected boolean openReader(int sleepMultiplier) {
744       try {
745         try {
746           if (LOG.isTraceEnabled()) {
747             LOG.trace("Opening log " + this.currentPath);
748           }
749           this.reader = repLogReader.openReader(this.currentPath);
750         } catch (FileNotFoundException fnfe) {
751           if (this.replicationQueueInfo.isQueueRecovered()) {
752             // We didn't find the log in the archive directory, look if it still
753             // exists in the dead RS folder (there could be a chain of failures
754             // to look at)
755             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
756             LOG.info("NB dead servers : " + deadRegionServers.size());
757             final Path rootDir = FSUtils.getRootDir(conf);
758             for (String curDeadServerName : deadRegionServers) {
759               final Path deadRsDirectory = new Path(rootDir,
760                   DefaultWALProvider.getWALDirectoryName(curDeadServerName));
761               Path[] locs = new Path[] {
762                   new Path(deadRsDirectory, currentPath.getName()),
763                   new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
764                                             currentPath.getName()),
765               };
766               for (Path possibleLogLocation : locs) {
767                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
768                 if (manager.getFs().exists(possibleLogLocation)) {
769                   // We found the right new location
770                   LOG.info("Log " + this.currentPath + " still exists at " +
771                       possibleLogLocation);
772                   // Breaking here will make us sleep since reader is null
773                   // TODO why don't we need to set currentPath and call openReader here?
774                   return true;
775                 }
776               }
777             }
778             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
779             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
780             if (stopper instanceof ReplicationSyncUp.DummyServer) {
781               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
782               //      area rather than to the wal area for a particular region server.
783               FileStatus[] rss = fs.listStatus(manager.getLogDir());
784               for (FileStatus rs : rss) {
785                 Path p = rs.getPath();
786                 FileStatus[] logs = fs.listStatus(p);
787                 for (FileStatus log : logs) {
788                   p = new Path(p, log.getPath().getName());
789                   if (p.getName().equals(currentPath.getName())) {
790                     currentPath = p;
791                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
792                     // Open the log at the new location
793                     this.openReader(sleepMultiplier);
794                     return true;
795                   }
796                 }
797               }
798             }
799 
800             // TODO What happens if the log was missing from every single location?
801             // Although we need to check a couple of times as the log could have
802             // been moved by the master between the checks
803             // It can also happen if a recovered queue wasn't properly cleaned,
804             // such that the znode pointing to a log exists but the log was
805             // deleted a long time ago.
806             // For the moment, we'll throw the IO and processEndOfFile
807             throw new IOException("File from recovered queue is " +
808                 "nowhere to be found", fnfe);
809           } else {
810             // If the log was archived, continue reading from there
811             Path archivedLogLocation =
812                 new Path(manager.getOldLogDir(), currentPath.getName());
813             if (manager.getFs().exists(archivedLogLocation)) {
814               currentPath = archivedLogLocation;
815               LOG.info("Log " + this.currentPath + " was moved to " +
816                   archivedLogLocation);
817               // Open the log at the new location
818               this.openReader(sleepMultiplier);
819 
820             }
821             // TODO What happens the log is missing in both places?
822           }
823         }
824       } catch (LeaseNotRecoveredException lnre) {
825         // HBASE-15019 the WAL was not closed due to some hiccup.
826         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
827         recoverLease(conf, currentPath);
828         this.reader = null;
829       } catch (IOException ioe) {
830         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
831         LOG.warn(peerClusterZnode + " Got: ", ioe);
832         this.reader = null;
833         if (ioe.getCause() instanceof NullPointerException) {
834           // Workaround for race condition in HDFS-4380
835           // which throws a NPE if we open a file before any data node has the most recent block
836           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
837           LOG.warn("Got NPE opening reader, will retry.");
838         } else if (sleepMultiplier >= maxRetriesMultiplier) {
839           // TODO Need a better way to determine if a file is really gone but
840           // TODO without scanning all logs dir
841           LOG.warn("Waited too long for this file, considering dumping");
842           return !processEndOfFile();
843         }
844       }
845       return true;
846     }
847 
848     private void recoverLease(final Configuration conf, final Path path) {
849       try {
850         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
851         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
852         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
853           @Override
854           public boolean progress() {
855             LOG.debug("recover WAL lease: " + path);
856             return isWorkerActive();
857           }
858         });
859       } catch (IOException e) {
860         LOG.warn("unable to recover lease for WAL: " + path, e);
861       }
862     }
863 
864     /*
865      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
866      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
867      * trying to read the log file and get EOFException. In case of a recovered queue the last log
868      * file may be empty, and we don't want to retry that.
869      */
870     private boolean isCurrentLogEmpty() {
871       return (this.repLogReader.getPosition() == 0 &&
872           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
873     }
874 
875     /**
876      * Count the number of different row keys in the given edit because of mini-batching. We assume
877      * that there's at least one Cell in the WALEdit.
878      * @param edit edit to count row keys from
879      * @return number of different row keys
880      */
881     private int countDistinctRowKeys(WALEdit edit) {
882       List<Cell> cells = edit.getCells();
883       int distinctRowKeys = 1;
884       Cell lastCell = cells.get(0);
885       for (int i = 0; i < edit.size(); i++) {
886         if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
887           distinctRowKeys++;
888         }
889       }
890       return distinctRowKeys;
891     }
892 
893     /**
894      * Do the shipping logic
895      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
896      * written to when this method was called
897      */
898     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
899       int sleepMultiplier = 0;
900       if (entries.isEmpty()) {
901         LOG.warn("Was given 0 edits to ship");
902         return;
903       }
904       while (isWorkerActive()) {
905         try {
906           if (throttler.isEnabled()) {
907             long sleepTicks = throttler.getNextSleepInterval(currentSize);
908             if (sleepTicks > 0) {
909               try {
910                 if (LOG.isTraceEnabled()) {
911                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
912                 }
913                 Thread.sleep(sleepTicks);
914               } catch (InterruptedException e) {
915                 LOG.debug("Interrupted while sleeping for throttling control");
916                 Thread.currentThread().interrupt();
917                 // current thread might be interrupted to terminate
918                 // directly go back to while() for confirm this
919                 continue;
920               }
921               // reset throttler's cycle start tick when sleep for throttling occurs
922               throttler.resetStartTick();
923             }
924           }
925           // create replicateContext here, so the entries can be GC'd upon return from this call
926           // stack
927           ReplicationEndpoint.ReplicateContext replicateContext =
928               new ReplicationEndpoint.ReplicateContext();
929           replicateContext.setEntries(entries).setSize(currentSize);
930           replicateContext.setWalGroupId(walGroupId);
931 
932           long startTimeNs = System.nanoTime();
933           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
934           boolean replicated = replicationEndpoint.replicate(replicateContext);
935           long endTimeNs = System.nanoTime();
936 
937           if (!replicated) {
938             continue;
939           } else {
940             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
941           }
942 
943           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
944             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
945               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
946               currentWALisBeingWrittenTo);
947             this.lastLoggedPosition = this.repLogReader.getPosition();
948           }
949           if (throttler.isEnabled()) {
950             throttler.addPushSize(currentSize);
951           }
952           totalReplicatedEdits.addAndGet(entries.size());
953           totalReplicatedOperations.addAndGet(currentNbOperations);
954           // FIXME check relationship between wal group and overall
955           metrics.shipBatch(currentNbOperations, currentSize / 1024);
956           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
957             walGroupId);
958           if (LOG.isTraceEnabled()) {
959             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
960                 + totalReplicatedOperations + " operations in "
961                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
962           }
963           break;
964         } catch (Exception ex) {
965           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
966               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
967           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
968             sleepMultiplier++;
969           }
970         }
971       }
972     }
973 
974     /**
975      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
976      * we're done! Else we'll just continue to try reading the log file
977      * @return true if we're done with the current file, false if we should continue trying to read
978      *         from it
979      */
980     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
981         justification = "Yeah, this is how it works")
982     protected boolean processEndOfFile() {
983       // We presume this means the file we're reading is closed.
984       if (this.queue.size() != 0) {
985         // -1 means the wal wasn't closed cleanly.
986         final long trailerSize = this.repLogReader.currentTrailerSize();
987         final long currentPosition = this.repLogReader.getPosition();
988         FileStatus stat = null;
989         try {
990           stat = fs.getFileStatus(this.currentPath);
991         } catch (IOException exception) {
992           LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
993               + ", stats: " + getStats());
994           metrics.incrUnknownFileLengthForClosedWAL();
995         }
996         if (stat != null) {
997           if (trailerSize < 0) {
998             if (currentPosition < stat.getLen()) {
999               final long skippedBytes = stat.getLen() - currentPosition;
1000               LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
1001               metrics.incrUncleanlyClosedWALs();
1002               metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
1003             }
1004           } else if (currentPosition + trailerSize < stat.getLen()){
1005             LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
1006                 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
1007             repLogReader.setPosition(0);
1008             metrics.incrRestartedWALReading();
1009             metrics.incrRepeatedFileBytes(currentPosition);
1010             return false;
1011           }
1012         }
1013         if (LOG.isTraceEnabled()) {
1014           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1015               + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
1016         }
1017         this.currentPath = null;
1018         this.repLogReader.finishCurrentFile();
1019         this.reader = null;
1020         metrics.incrCompletedWAL();
1021         return true;
1022       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1023         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1024             + peerClusterZnode);
1025         metrics.incrCompletedRecoveryQueue();
1026         workerRunning = false;
1027         return true;
1028       }
1029       return false;
1030     }
1031 
1032     public void startup() {
1033       String n = Thread.currentThread().getName();
1034       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1035         @Override
1036         public void uncaughtException(final Thread t, final Throwable e) {
1037           RSRpcServices.exitIfOOME(e);
1038           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1039               + getCurrentPath(), e);
1040           stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
1041         }
1042       };
1043       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1044           + peerClusterZnode, handler);
1045       workerThreads.put(walGroupId, this);
1046     }
1047 
1048     public Path getCurrentPath() {
1049       return this.currentPath;
1050     }
1051 
1052     public long getCurrentPosition() {
1053       return this.repLogReader.getPosition();
1054     }
1055 
1056     private boolean isWorkerActive() {
1057       return !stopper.isStopped() && workerRunning && !isInterrupted();
1058     }
1059 
1060     private void terminate(String reason, Exception cause) {
1061       if (cause == null) {
1062         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1063 
1064       } else {
1065         LOG.error("Closing worker for wal group " + this.walGroupId
1066             + " because an error occurred: " + reason, cause);
1067       }
1068       this.interrupt();
1069       Threads.shutdown(this, sleepForRetries);
1070       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1071     }
1072 
1073     public void setWorkerRunning(boolean workerRunning) {
1074       this.workerRunning = workerRunning;
1075     }
1076   }
1077 }