View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.commons.lang.StringUtils;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.CellUtil;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.Stoppable;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.classification.InterfaceAudience;
50  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
51  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
54  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
55  import org.apache.hadoop.hbase.replication.ReplicationException;
56  import org.apache.hadoop.hbase.replication.ReplicationPeers;
57  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
58  import org.apache.hadoop.hbase.replication.ReplicationQueues;
59  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
60  import org.apache.hadoop.hbase.replication.WALEntryFilter;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.CancelableProgressable;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.FSUtils;
65  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
66  import org.apache.hadoop.hbase.util.Threads;
67  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
68  import org.apache.hadoop.hbase.wal.WAL;
69  import org.apache.hadoop.hbase.wal.WALKey;
70  
71  import com.google.common.collect.Lists;
72  import com.google.common.util.concurrent.ListenableFuture;
73  import com.google.common.util.concurrent.Service;
74  
75  /**
76   * Class that handles the source of a replication stream.
77   * Currently does not handle more than 1 slave
78   * For each slave cluster it selects a random number of peers
79   * using a replication ratio. For example, if replication ration = 0.1
80   * and slave cluster has 100 region servers, 10 will be selected.
81   * <p>
82   * A stream is considered down when we cannot contact a region server on the
83   * peer cluster for more than 55 seconds by default.
84   * </p>
85   *
86   */
87  @InterfaceAudience.Private
88  public class ReplicationSource extends Thread
89      implements ReplicationSourceInterface {
90  
91    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
92    // Queues of logs to process, entry in format of walGroupId->queue,
93    // each presents a queue for one wal group
94    private Map<String, PriorityBlockingQueue<Path>> queues =
95        new HashMap<String, PriorityBlockingQueue<Path>>();
96    // per group queue size, keep no more than this number of logs in each wal group
97    private int queueSizePerGroup;
98    private ReplicationQueues replicationQueues;
99    private ReplicationPeers replicationPeers;
100 
101   private Configuration conf;
102   private ReplicationQueueInfo replicationQueueInfo;
103   // id of the peer cluster this source replicates to
104   private String peerId;
105   // The manager of all sources to which we ping back our progress
106   private ReplicationSourceManager manager;
107   // Should we stop everything?
108   private Stoppable stopper;
109   // How long should we sleep for each retry
110   private long sleepForRetries;
111   // Max size in bytes of entriesArray
112   private long replicationQueueSizeCapacity;
113   // Max number of entries in entriesArray
114   private int replicationQueueNbCapacity;
115   private FileSystem fs;
116   // id of this cluster
117   private UUID clusterId;
118   // id of the other cluster
119   private UUID peerClusterId;
120   // total number of edits we replicated
121   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
122   // total number of edits we replicated
123   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
124   // The znode we currently play with
125   private String peerClusterZnode;
126   // Maximum number of retries before taking bold actions
127   private int maxRetriesMultiplier;
128   // Indicates if this particular source is running
129   private volatile boolean sourceRunning = false;
130   // Metrics for this source
131   private MetricsSource metrics;
132   //WARN threshold for the number of queued logs, defaults to 2
133   private int logQueueWarnThreshold;
134   // ReplicationEndpoint which will handle the actual replication
135   private ReplicationEndpoint replicationEndpoint;
136   // A filter (or a chain of filters) for the WAL entries.
137   private WALEntryFilter walEntryFilter;
138   // throttler
139   private ReplicationThrottler throttler;
140   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
141       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
142 
143   /**
144    * Instantiation method used by region servers
145    *
146    * @param conf configuration to use
147    * @param fs file system to use
148    * @param manager replication manager to ping to
149    * @param stopper     the atomic boolean to use to stop the regionserver
150    * @param peerClusterZnode the name of our znode
151    * @param clusterId unique UUID for the cluster
152    * @param replicationEndpoint the replication endpoint implementation
153    * @param metrics metrics for replication source
154    * @throws IOException
155    */
156   @Override
157   public void init(final Configuration conf, final FileSystem fs,
158       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
159       final ReplicationPeers replicationPeers, final Stoppable stopper,
160       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
161       final MetricsSource metrics)
162           throws IOException {
163     this.stopper = stopper;
164     this.conf = HBaseConfiguration.create(conf);
165     decorateConf();
166     this.replicationQueueSizeCapacity =
167         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
168     this.replicationQueueNbCapacity =
169         this.conf.getInt("replication.source.nb.capacity", 25000);
170     this.sleepForRetries =
171         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
172     this.maxRetriesMultiplier =
173         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
174     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
175     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
176     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
177     this.replicationQueues = replicationQueues;
178     this.replicationPeers = replicationPeers;
179     this.manager = manager;
180     this.fs = fs;
181     this.metrics = metrics;
182     this.clusterId = clusterId;
183 
184     this.peerClusterZnode = peerClusterZnode;
185     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
186     // ReplicationQueueInfo parses the peerId out of the znode for us
187     this.peerId = this.replicationQueueInfo.getPeerId();
188     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
189     this.replicationEndpoint = replicationEndpoint;
190   }
191 
192   private void decorateConf() {
193     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
194     if (StringUtils.isNotEmpty(replicationCodec)) {
195       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
196     }
197   }
198 
199   @Override
200   public void enqueueLog(Path log) {
201     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
202     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
203     if (queue == null) {
204       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
205       queues.put(logPrefix, queue);
206       if (this.sourceRunning) {
207         // new wal group observed after source startup, start a new worker thread to track it
208         // notice: it's possible that log enqueued when this.running is set but worker thread
209         // still not launched, so it's necessary to check workerThreads before start the worker
210         final ReplicationSourceWorkerThread worker =
211             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
212         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
213         if (extant != null) {
214           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
215         } else {
216           LOG.debug("Starting up worker for wal group " + logPrefix);
217           worker.startup();
218         }
219       }
220     }
221     queue.put(log);
222     this.metrics.incrSizeOfLogQueue();
223     // This will log a warning for each new log that gets created above the warn threshold
224     int queueSize = queue.size();
225     if (queueSize > this.logQueueWarnThreshold) {
226       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
227           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
228     }
229   }
230 
231   @Override
232   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
233       throws ReplicationException {
234     String peerId = peerClusterZnode;
235     if (peerId.contains("-")) {
236       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
237       // A peerId will not have "-" in its name, see HBASE-11394
238       peerId = peerClusterZnode.split("-")[0];
239     }
240     Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
241     if (tableCFMap != null) {
242       List<String> tableCfs = tableCFMap.get(tableName);
243       if (tableCFMap.containsKey(tableName)
244           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
245         this.replicationQueues.addHFileRefs(peerId, files);
246         metrics.incrSizeOfHFileRefsQueue(files.size());
247       } else {
248         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
249             + Bytes.toString(family) + " to peer id " + peerId);
250       }
251     } else {
252       // user has explicitly not defined any table cfs for replication, means replicate all the
253       // data
254       this.replicationQueues.addHFileRefs(peerId, files);
255       metrics.incrSizeOfHFileRefsQueue(files.size());
256     }
257   }
258 
259   private void uninitialize() {
260     LOG.debug("Source exiting " + this.peerId);
261     metrics.clear();
262     if (replicationEndpoint.state() == Service.State.STARTING
263         || replicationEndpoint.state() == Service.State.RUNNING) {
264       replicationEndpoint.stopAndWait();
265     }
266   }
267 
268   @Override
269   public void run() {
270     // mark we are running now
271     this.sourceRunning = true;
272     try {
273       // start the endpoint, connect to the cluster
274       Service.State state = replicationEndpoint.start().get();
275       if (state != Service.State.RUNNING) {
276         LOG.warn("ReplicationEndpoint was not started. Exiting");
277         uninitialize();
278         return;
279       }
280     } catch (Exception ex) {
281       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
282       throw new RuntimeException(ex);
283     }
284 
285     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
286     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
287       (WALEntryFilter)new SystemTableWALEntryFilter());
288     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
289     if (filterFromEndpoint != null) {
290       filters.add(filterFromEndpoint);
291     }
292     this.walEntryFilter = new ChainWALEntryFilter(filters);
293 
294     int sleepMultiplier = 1;
295     // delay this until we are in an asynchronous thread
296     while (this.isSourceActive() && this.peerClusterId == null) {
297       this.peerClusterId = replicationEndpoint.getPeerUUID();
298       if (this.isSourceActive() && this.peerClusterId == null) {
299         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
300           sleepMultiplier++;
301         }
302       }
303     }
304 
305     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
306     // peerClusterId value, which is the same as the source clusterId
307     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
308       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
309           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
310           + replicationEndpoint.getClass().getName(), null, false);
311     }
312     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
313     // start workers
314     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
315       String walGroupId = entry.getKey();
316       PriorityBlockingQueue<Path> queue = entry.getValue();
317       final ReplicationSourceWorkerThread worker =
318           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
319       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
320       if (extant != null) {
321         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
322       } else {
323         LOG.debug("Starting up worker for wal group " + walGroupId);
324         worker.startup();
325       }
326     }
327   }
328 
329   /**
330    * Do the sleeping logic
331    * @param msg Why we sleep
332    * @param sleepMultiplier by how many times the default sleeping time is augmented
333    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
334    */
335   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
336     try {
337       if (LOG.isTraceEnabled()) {
338         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
339       }
340       Thread.sleep(this.sleepForRetries * sleepMultiplier);
341     } catch (InterruptedException e) {
342       LOG.debug("Interrupted while sleeping between retries");
343       Thread.currentThread().interrupt();
344     }
345     return sleepMultiplier < maxRetriesMultiplier;
346   }
347 
348   /**
349    * check whether the peer is enabled or not
350    *
351    * @return true if the peer is enabled, otherwise false
352    */
353   protected boolean isPeerEnabled() {
354     return this.replicationPeers.getStatusOfPeer(this.peerId);
355   }
356 
357   @Override
358   public void startup() {
359     String n = Thread.currentThread().getName();
360     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
361       @Override
362       public void uncaughtException(final Thread t, final Throwable e) {
363         LOG.error("Unexpected exception in ReplicationSource", e);
364       }
365     };
366     Threads
367         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
368   }
369 
370   @Override
371   public void terminate(String reason) {
372     terminate(reason, null);
373   }
374 
375   @Override
376   public void terminate(String reason, Exception cause) {
377     terminate(reason, cause, true);
378   }
379 
380   public void terminate(String reason, Exception cause, boolean join) {
381     if (cause == null) {
382       LOG.info("Closing source "
383           + this.peerClusterZnode + " because: " + reason);
384 
385     } else {
386       LOG.error("Closing source " + this.peerClusterZnode
387           + " because an error occurred: " + reason, cause);
388     }
389     this.sourceRunning = false;
390     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
391     for (ReplicationSourceWorkerThread worker : workers) {
392       worker.setWorkerRunning(false);
393       worker.interrupt();
394     }
395     ListenableFuture<Service.State> future = null;
396     if (this.replicationEndpoint != null) {
397       future = this.replicationEndpoint.stop();
398     }
399     if (join) {
400       for (ReplicationSourceWorkerThread worker : workers) {
401         Threads.shutdown(worker, this.sleepForRetries);
402         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
403       }
404       if (future != null) {
405         try {
406           future.get();
407         } catch (Exception e) {
408           LOG.warn("Got exception:" + e);
409         }
410       }
411     }
412   }
413 
414   @Override
415   public String getPeerClusterZnode() {
416     return this.peerClusterZnode;
417   }
418 
419   @Override
420   public String getPeerClusterId() {
421     return this.peerId;
422   }
423 
424   @Override
425   public Path getCurrentPath() {
426     // only for testing
427     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
428       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
429     }
430     return null;
431   }
432 
433   private boolean isSourceActive() {
434     return !this.stopper.isStopped() && this.sourceRunning;
435   }
436 
437   /**
438    * Comparator used to compare logs together based on their start time
439    */
440   public static class LogsComparator implements Comparator<Path> {
441 
442     @Override
443     public int compare(Path o1, Path o2) {
444       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
445     }
446 
447     /**
448      * Split a path to get the start time
449      * For example: 10.20.20.171%3A60020.1277499063250
450      * @param p path to split
451      * @return start time
452      */
453     private static long getTS(Path p) {
454       int tsIndex = p.getName().lastIndexOf('.') + 1;
455       return Long.parseLong(p.getName().substring(tsIndex));
456     }
457   }
458 
459   @Override
460   public String getStats() {
461     StringBuilder sb = new StringBuilder();
462     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
463         .append(", current progress: \n");
464     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
465       String walGroupId = entry.getKey();
466       ReplicationSourceWorkerThread worker = entry.getValue();
467       long position = worker.getCurrentPosition();
468       Path currentPath = worker.getCurrentPath();
469       sb.append("walGroup [").append(walGroupId).append("]: ");
470       if (currentPath != null) {
471         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
472             .append(position).append("\n");
473       } else {
474         sb.append("no replication ongoing, waiting for new log");
475       }
476     }
477     return sb.toString();
478   }
479 
480   /**
481    * Get Replication Source Metrics
482    * @return sourceMetrics
483    */
484   public MetricsSource getSourceMetrics() {
485     return this.metrics;
486   }
487 
488   public class ReplicationSourceWorkerThread extends Thread {
489     ReplicationSource source;
490     String walGroupId;
491     PriorityBlockingQueue<Path> queue;
492     ReplicationQueueInfo replicationQueueInfo;
493     // Our reader for the current log. open/close handled by repLogReader
494     private WAL.Reader reader;
495     // Last position in the log that we sent to ZooKeeper
496     private long lastLoggedPosition = -1;
497     // Path of the current log
498     private volatile Path currentPath;
499     // Handle on the log reader helper
500     private ReplicationWALReaderManager repLogReader;
501     // Current number of operations (Put/Delete) that we need to replicate
502     private int currentNbOperations = 0;
503     // Current size of data we need to replicate
504     private int currentSize = 0;
505     // Indicates whether this particular worker is running
506     private boolean workerRunning = true;
507     // Current number of hfiles that we need to replicate
508     private long currentNbHFiles = 0;
509 
510     public ReplicationSourceWorkerThread(String walGroupId,
511         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
512         ReplicationSource source) {
513       this.walGroupId = walGroupId;
514       this.queue = queue;
515       this.replicationQueueInfo = replicationQueueInfo;
516       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
517       this.source = source;
518     }
519 
520     @Override
521     public void run() {
522       // If this is recovered, the queue is already full and the first log
523       // normally has a position (unless the RS failed between 2 logs)
524       if (this.replicationQueueInfo.isQueueRecovered()) {
525         try {
526           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
527             this.queue.peek().getName()));
528           if (LOG.isTraceEnabled()) {
529             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
530                 + this.repLogReader.getPosition());
531           }
532         } catch (ReplicationException e) {
533           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
534         }
535       }
536       // Loop until we close down
537       while (isWorkerActive()) {
538         int sleepMultiplier = 1;
539         // Sleep until replication is enabled again
540         if (!isPeerEnabled()) {
541           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
542             sleepMultiplier++;
543           }
544           continue;
545         }
546         Path oldPath = getCurrentPath(); //note that in the current scenario,
547                                          //oldPath will be null when a log roll
548                                          //happens.
549         // Get a new path
550         boolean hasCurrentPath = getNextPath();
551         if (getCurrentPath() != null && oldPath == null) {
552           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
553         }
554         if (!hasCurrentPath) {
555           if (sleepForRetries("No log to process", sleepMultiplier)) {
556             sleepMultiplier++;
557           }
558           continue;
559         }
560         boolean currentWALisBeingWrittenTo = false;
561         //For WAL files we own (rather than recovered), take a snapshot of whether the
562         //current WAL file (this.currentPath) is in use (for writing) NOW!
563         //Since the new WAL paths are enqueued only after the prev WAL file
564         //is 'closed', presence of an element in the queue means that
565         //the previous WAL file was closed, else the file is in use (currentPath)
566         //We take the snapshot now so that we are protected against races
567         //where a new file gets enqueued while the current file is being processed
568         //(and where we just finished reading the current file).
569         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
570           currentWALisBeingWrittenTo = true;
571         }
572         // Open a reader on it
573         if (!openReader(sleepMultiplier)) {
574           // Reset the sleep multiplier, else it'd be reused for the next file
575           sleepMultiplier = 1;
576           continue;
577         }
578 
579         // If we got a null reader but didn't continue, then sleep and continue
580         if (this.reader == null) {
581           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
582             sleepMultiplier++;
583           }
584           continue;
585         }
586 
587         boolean gotIOE = false;
588         currentNbOperations = 0;
589         currentNbHFiles = 0;
590         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
591         currentSize = 0;
592         try {
593           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
594             continue;
595           }
596         } catch (IOException ioe) {
597           LOG.warn(peerClusterZnode + " Got: ", ioe);
598           gotIOE = true;
599           if (ioe.getCause() instanceof EOFException) {
600 
601             boolean considerDumping = false;
602             if (this.replicationQueueInfo.isQueueRecovered()) {
603               try {
604                 FileStatus stat = fs.getFileStatus(this.currentPath);
605                 if (stat.getLen() == 0) {
606                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
607                 }
608                 considerDumping = true;
609               } catch (IOException e) {
610                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
611               }
612             }
613 
614             if (considerDumping &&
615                 sleepMultiplier == maxRetriesMultiplier &&
616                 processEndOfFile()) {
617               continue;
618             }
619           }
620         } finally {
621           try {
622             this.reader = null;
623             this.repLogReader.closeReader();
624           } catch (IOException e) {
625             gotIOE = true;
626             LOG.warn("Unable to finalize the tailing of a file", e);
627           }
628         }
629 
630         // If we didn't get anything to replicate, or if we hit a IOE,
631         // wait a bit and retry.
632         // But if we need to stop, don't bother sleeping
633         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
634           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
635             manager.logPositionAndCleanOldLogs(this.currentPath,
636                 peerClusterZnode, this.repLogReader.getPosition(),
637                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
638             this.lastLoggedPosition = this.repLogReader.getPosition();
639           }
640           // Reset the sleep multiplier if nothing has actually gone wrong
641           if (!gotIOE) {
642             sleepMultiplier = 1;
643             // if there was nothing to ship and it's not an error
644             // set "ageOfLastShippedOp" to <now> to indicate that we're current
645             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
646           }
647           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
648             sleepMultiplier++;
649           }
650           continue;
651         }
652         sleepMultiplier = 1;
653         shipEdits(currentWALisBeingWrittenTo, entries);
654       }
655       if (replicationQueueInfo.isQueueRecovered()) {
656         // use synchronize to make sure one last thread will clean the queue
657         synchronized (workerThreads) {
658           Threads.sleep(100);// wait a short while for other worker thread to fully exit
659           boolean allOtherTaskDone = true;
660           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
661             if (!worker.equals(this) && worker.isAlive()) {
662               allOtherTaskDone = false;
663               break;
664             }
665           }
666           if (allOtherTaskDone) {
667             manager.closeRecoveredQueue(this.source);
668             LOG.info("Finished recovering queue " + peerClusterZnode
669                 + " with the following stats: " + getStats());
670           }
671         }
672       }
673     }
674 
675     /**
676      * Read all the entries from the current log files and retain those that need to be replicated.
677      * Else, process the end of the current file.
678      * @param currentWALisBeingWrittenTo is the current WAL being written to
679      * @param entries resulting entries to be replicated
680      * @return true if we got nothing and went to the next file, false if we got entries
681      * @throws IOException
682      */
683     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
684         List<WAL.Entry> entries) throws IOException {
685       long seenEntries = 0;
686       if (LOG.isTraceEnabled()) {
687         LOG.trace("Seeking in " + this.currentPath + " at position "
688             + this.repLogReader.getPosition());
689       }
690       this.repLogReader.seek();
691       long positionBeforeRead = this.repLogReader.getPosition();
692       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
693       while (entry != null) {
694         metrics.incrLogEditsRead();
695         seenEntries++;
696 
697         // don't replicate if the log entries have already been consumed by the cluster
698         if (replicationEndpoint.canReplicateToSameCluster()
699             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
700           // Remove all KVs that should not be replicated
701           entry = walEntryFilter.filter(entry);
702           WALEdit edit = null;
703           WALKey logKey = null;
704           if (entry != null) {
705             edit = entry.getEdit();
706             logKey = entry.getKey();
707           }
708 
709           if (edit != null && edit.size() != 0) {
710             // Mark that the current cluster has the change
711             logKey.addClusterId(clusterId);
712             currentNbOperations += countDistinctRowKeys(edit);
713             entries.add(entry);
714             currentSize += entry.getEdit().heapSize();
715           } else {
716             metrics.incrLogEditsFiltered();
717           }
718         }
719         // Stop if too many entries or too big
720         // FIXME check the relationship between single wal group and overall
721         if (currentSize >= replicationQueueSizeCapacity
722             || entries.size() >= replicationQueueNbCapacity) {
723           break;
724         }
725         try {
726           entry = this.repLogReader.readNextAndSetPosition();
727         } catch (IOException ie) {
728           LOG.debug("Break on IOE: " + ie.getMessage());
729           break;
730         }
731       }
732       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
733       if (currentWALisBeingWrittenTo) {
734         return false;
735       }
736       // If we didn't get anything and the queue has an object, it means we
737       // hit the end of the file for sure
738       return seenEntries == 0 && processEndOfFile();
739     }
740 
741     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
742       String peerId = peerClusterZnode;
743       if (peerId.contains("-")) {
744         // peerClusterZnode will be in the form peerId + "-" + rsZNode.
745         // A peerId will not have "-" in its name, see HBASE-11394
746         peerId = peerClusterZnode.split("-")[0];
747       }
748       List<Cell> cells = edit.getCells();
749       for (int i = 0; i < cells.size(); i++) {
750         Cell cell = cells.get(i);
751         if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
752           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
753           List<StoreDescriptor> stores = bld.getStoresList();
754           for (int j = 0; j < stores.size(); j++) {
755             List<String> storeFileList = stores.get(j).getStoreFileList();
756             manager.cleanUpHFileRefs(peerId, storeFileList);
757             metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
758           }
759         }
760       }
761     }
762 
763     /**
764      * Poll for the next path
765      * @return true if a path was obtained, false if not
766      */
767     protected boolean getNextPath() {
768       try {
769         if (this.currentPath == null) {
770           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
771           metrics.decrSizeOfLogQueue();
772           if (this.currentPath != null) {
773             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
774             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
775               this.replicationQueueInfo.isQueueRecovered());
776             if (LOG.isTraceEnabled()) {
777               LOG.trace("New log: " + this.currentPath);
778             }
779           }
780         }
781       } catch (InterruptedException e) {
782         LOG.warn("Interrupted while reading edits", e);
783       }
784       return this.currentPath != null;
785     }
786 
787     /**
788      * Open a reader on the current path
789      *
790      * @param sleepMultiplier by how many times the default sleeping time is augmented
791      * @return true if we should continue with that file, false if we are over with it
792      */
793     protected boolean openReader(int sleepMultiplier) {
794       try {
795         try {
796           if (LOG.isTraceEnabled()) {
797             LOG.trace("Opening log " + this.currentPath);
798           }
799           this.reader = repLogReader.openReader(this.currentPath);
800         } catch (FileNotFoundException fnfe) {
801           if (this.replicationQueueInfo.isQueueRecovered()) {
802             // We didn't find the log in the archive directory, look if it still
803             // exists in the dead RS folder (there could be a chain of failures
804             // to look at)
805             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
806             LOG.info("NB dead servers : " + deadRegionServers.size());
807             final Path rootDir = FSUtils.getRootDir(conf);
808             for (String curDeadServerName : deadRegionServers) {
809               final Path deadRsDirectory = new Path(rootDir,
810                   DefaultWALProvider.getWALDirectoryName(curDeadServerName));
811               Path[] locs = new Path[] {
812                   new Path(deadRsDirectory, currentPath.getName()),
813                   new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
814                                             currentPath.getName()),
815               };
816               for (Path possibleLogLocation : locs) {
817                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
818                 if (manager.getFs().exists(possibleLogLocation)) {
819                   // We found the right new location
820                   LOG.info("Log " + this.currentPath + " still exists at " +
821                       possibleLogLocation);
822                   // Breaking here will make us sleep since reader is null
823                   // TODO why don't we need to set currentPath and call openReader here?
824                   return true;
825                 }
826               }
827             }
828             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
829             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
830             if (stopper instanceof ReplicationSyncUp.DummyServer) {
831               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
832               //      area rather than to the wal area for a particular region server.
833               FileStatus[] rss = fs.listStatus(manager.getLogDir());
834               for (FileStatus rs : rss) {
835                 Path p = rs.getPath();
836                 FileStatus[] logs = fs.listStatus(p);
837                 for (FileStatus log : logs) {
838                   p = new Path(p, log.getPath().getName());
839                   if (p.getName().equals(currentPath.getName())) {
840                     currentPath = p;
841                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
842                     // Open the log at the new location
843                     this.openReader(sleepMultiplier);
844                     return true;
845                   }
846                 }
847               }
848             }
849 
850             // TODO What happens if the log was missing from every single location?
851             // Although we need to check a couple of times as the log could have
852             // been moved by the master between the checks
853             // It can also happen if a recovered queue wasn't properly cleaned,
854             // such that the znode pointing to a log exists but the log was
855             // deleted a long time ago.
856             // For the moment, we'll throw the IO and processEndOfFile
857             throw new IOException("File from recovered queue is " +
858                 "nowhere to be found", fnfe);
859           } else {
860             // If the log was archived, continue reading from there
861             Path archivedLogLocation =
862                 new Path(manager.getOldLogDir(), currentPath.getName());
863             if (manager.getFs().exists(archivedLogLocation)) {
864               currentPath = archivedLogLocation;
865               LOG.info("Log " + this.currentPath + " was moved to " +
866                   archivedLogLocation);
867               // Open the log at the new location
868               this.openReader(sleepMultiplier);
869 
870             }
871             // TODO What happens the log is missing in both places?
872           }
873         }
874       } catch (LeaseNotRecoveredException lnre) {
875         // HBASE-15019 the WAL was not closed due to some hiccup.
876         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
877         recoverLease(conf, currentPath);
878         this.reader = null;
879       } catch (IOException ioe) {
880         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
881         LOG.warn(peerClusterZnode + " Got: ", ioe);
882         this.reader = null;
883         if (ioe.getCause() instanceof NullPointerException) {
884           // Workaround for race condition in HDFS-4380
885           // which throws a NPE if we open a file before any data node has the most recent block
886           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
887           LOG.warn("Got NPE opening reader, will retry.");
888         } else if (sleepMultiplier >= maxRetriesMultiplier) {
889           // TODO Need a better way to determine if a file is really gone but
890           // TODO without scanning all logs dir
891           LOG.warn("Waited too long for this file, considering dumping");
892           return !processEndOfFile();
893         }
894       }
895       return true;
896     }
897 
898     private void recoverLease(final Configuration conf, final Path path) {
899       try {
900         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
901         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
902         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
903           @Override
904           public boolean progress() {
905             LOG.debug("recover WAL lease: " + path);
906             return isWorkerActive();
907           }
908         });
909       } catch (IOException e) {
910         LOG.warn("unable to recover lease for WAL: " + path, e);
911       }
912     }
913 
914     /*
915      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
916      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
917      * trying to read the log file and get EOFException. In case of a recovered queue the last log
918      * file may be empty, and we don't want to retry that.
919      */
920     private boolean isCurrentLogEmpty() {
921       return (this.repLogReader.getPosition() == 0 &&
922           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
923     }
924 
925     /**
926      * Count the number of different row keys in the given edit because of mini-batching. We assume
927      * that there's at least one Cell in the WALEdit.
928      * @param edit edit to count row keys from
929      * @return number of different row keys
930      */
931     private int countDistinctRowKeys(WALEdit edit) {
932       List<Cell> cells = edit.getCells();
933       int distinctRowKeys = 1;
934       int totalHFileEntries = 0;
935       Cell lastCell = cells.get(0);
936 
937       for (int i = 0; i < edit.size(); i++) {
938         // Count HFiles to be replicated
939         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
940           try {
941             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
942             List<StoreDescriptor> stores = bld.getStoresList();
943             for (int j = 0; j < stores.size(); j++) {
944               totalHFileEntries += stores.get(j).getStoreFileList().size();
945             }
946           } catch (IOException e) {
947             LOG.error("Failed to deserialize bulk load entry from wal edit. "
948                 + "This its hfiles count will not be added into metric.");
949           }
950         }
951 
952         if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
953           distinctRowKeys++;
954         }
955         lastCell = cells.get(i);
956       }
957       currentNbHFiles += totalHFileEntries;
958       return distinctRowKeys + totalHFileEntries;
959     }
960 
961     /**
962      * Do the shipping logic
963      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
964      * written to when this method was called
965      */
966     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
967       int sleepMultiplier = 0;
968       if (entries.isEmpty()) {
969         LOG.warn("Was given 0 edits to ship");
970         return;
971       }
972       while (isWorkerActive()) {
973         try {
974           if (throttler.isEnabled()) {
975             long sleepTicks = throttler.getNextSleepInterval(currentSize);
976             if (sleepTicks > 0) {
977               try {
978                 if (LOG.isTraceEnabled()) {
979                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
980                 }
981                 Thread.sleep(sleepTicks);
982               } catch (InterruptedException e) {
983                 LOG.debug("Interrupted while sleeping for throttling control");
984                 Thread.currentThread().interrupt();
985                 // current thread might be interrupted to terminate
986                 // directly go back to while() for confirm this
987                 continue;
988               }
989               // reset throttler's cycle start tick when sleep for throttling occurs
990               throttler.resetStartTick();
991             }
992           }
993           // create replicateContext here, so the entries can be GC'd upon return from this call
994           // stack
995           ReplicationEndpoint.ReplicateContext replicateContext =
996               new ReplicationEndpoint.ReplicateContext();
997           replicateContext.setEntries(entries).setSize(currentSize);
998           replicateContext.setWalGroupId(walGroupId);
999 
1000           long startTimeNs = System.nanoTime();
1001           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
1002           boolean replicated = replicationEndpoint.replicate(replicateContext);
1003           long endTimeNs = System.nanoTime();
1004 
1005           if (!replicated) {
1006             continue;
1007           } else {
1008             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
1009           }
1010 
1011           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
1012             //Clean up hfile references
1013             int size = entries.size();
1014             for (int i = 0; i < size; i++) {
1015               cleanUpHFileRefs(entries.get(i).getEdit());
1016             }
1017             //Log and clean up WAL logs
1018             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
1019               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
1020               currentWALisBeingWrittenTo);
1021             this.lastLoggedPosition = this.repLogReader.getPosition();
1022           }
1023           if (throttler.isEnabled()) {
1024             throttler.addPushSize(currentSize);
1025           }
1026           totalReplicatedEdits.addAndGet(entries.size());
1027           totalReplicatedOperations.addAndGet(currentNbOperations);
1028           // FIXME check relationship between wal group and overall
1029           metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
1030           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
1031             walGroupId);
1032           if (LOG.isTraceEnabled()) {
1033             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
1034                 + totalReplicatedOperations + " operations in "
1035                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
1036           }
1037           break;
1038         } catch (Exception ex) {
1039           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
1040               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
1041           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
1042             sleepMultiplier++;
1043           }
1044         }
1045       }
1046     }
1047 
1048     /**
1049      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
1050      * we're done! Else we'll just continue to try reading the log file
1051      * @return true if we're done with the current file, false if we should continue trying to read
1052      *         from it
1053      */
1054     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
1055         justification = "Yeah, this is how it works")
1056     protected boolean processEndOfFile() {
1057       if (this.queue.size() != 0) {
1058         if (LOG.isTraceEnabled()) {
1059           String filesize = "N/A";
1060           try {
1061             FileStatus stat = fs.getFileStatus(this.currentPath);
1062             filesize = stat.getLen() + "";
1063           } catch (IOException ex) {
1064           }
1065           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1066               + ", and the length of the file is " + filesize);
1067         }
1068         this.currentPath = null;
1069         this.repLogReader.finishCurrentFile();
1070         this.reader = null;
1071         return true;
1072       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1073         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1074             + peerClusterZnode);
1075         workerRunning = false;
1076         return true;
1077       }
1078       return false;
1079     }
1080 
1081     public void startup() {
1082       String n = Thread.currentThread().getName();
1083       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1084         @Override
1085         public void uncaughtException(final Thread t, final Throwable e) {
1086           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1087               + getCurrentPath(), e);
1088         }
1089       };
1090       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1091           + peerClusterZnode, handler);
1092       workerThreads.put(walGroupId, this);
1093     }
1094 
1095     public Path getCurrentPath() {
1096       return this.currentPath;
1097     }
1098 
1099     public long getCurrentPosition() {
1100       return this.repLogReader.getPosition();
1101     }
1102 
1103     private boolean isWorkerActive() {
1104       return !stopper.isStopped() && workerRunning && !isInterrupted();
1105     }
1106 
1107     private void terminate(String reason, Exception cause) {
1108       if (cause == null) {
1109         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1110 
1111       } else {
1112         LOG.error("Closing worker for wal group " + this.walGroupId
1113             + " because an error occurred: " + reason, cause);
1114       }
1115       this.interrupt();
1116       Threads.shutdown(this, sleepForRetries);
1117       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1118     }
1119 
1120     public void setWorkerRunning(boolean workerRunning) {
1121       this.workerRunning = workerRunning;
1122     }
1123   }
1124 }