View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileStatus;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.Cell;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.Stoppable;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
52  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
55  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56  import org.apache.hadoop.hbase.replication.ReplicationException;
57  import org.apache.hadoop.hbase.replication.ReplicationPeers;
58  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
59  import org.apache.hadoop.hbase.replication.ReplicationQueues;
60  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
61  import org.apache.hadoop.hbase.replication.WALEntryFilter;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.CancelableProgressable;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.hbase.util.FSUtils;
66  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
67  import org.apache.hadoop.hbase.util.Threads;
68  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
69  import org.apache.hadoop.hbase.wal.WAL;
70  import org.apache.hadoop.hbase.wal.WALKey;
71  
72  import com.google.common.collect.Lists;
73  import com.google.common.util.concurrent.ListenableFuture;
74  import com.google.common.util.concurrent.Service;
75  
76  /**
77   * Class that handles the source of a replication stream.
78   * Currently does not handle more than 1 slave
79   * For each slave cluster it selects a random number of peers
80   * using a replication ratio. For example, if replication ration = 0.1
81   * and slave cluster has 100 region servers, 10 will be selected.
82   * <p>
83   * A stream is considered down when we cannot contact a region server on the
84   * peer cluster for more than 55 seconds by default.
85   * </p>
86   *
87   */
88  @InterfaceAudience.Private
89  public class ReplicationSource extends Thread
90      implements ReplicationSourceInterface {
91  
92    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
93    // Queues of logs to process, entry in format of walGroupId->queue,
94    // each presents a queue for one wal group
95    private Map<String, PriorityBlockingQueue<Path>> queues =
96        new HashMap<String, PriorityBlockingQueue<Path>>();
97    // per group queue size, keep no more than this number of logs in each wal group
98    private int queueSizePerGroup;
99    private ReplicationQueues replicationQueues;
100   private ReplicationPeers replicationPeers;
101 
102   private Configuration conf;
103   private ReplicationQueueInfo replicationQueueInfo;
104   // id of the peer cluster this source replicates to
105   private String peerId;
106   // The manager of all sources to which we ping back our progress
107   private ReplicationSourceManager manager;
108   // Should we stop everything?
109   private Stoppable stopper;
110   // How long should we sleep for each retry
111   private long sleepForRetries;
112   // Max size in bytes of entriesArray
113   private long replicationQueueSizeCapacity;
114   // Max number of entries in entriesArray
115   private int replicationQueueNbCapacity;
116   private FileSystem fs;
117   // id of this cluster
118   private UUID clusterId;
119   // id of the other cluster
120   private UUID peerClusterId;
121   // total number of edits we replicated
122   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
123   // total number of edits we replicated
124   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
125   // The znode we currently play with
126   private String peerClusterZnode;
127   // Maximum number of retries before taking bold actions
128   private int maxRetriesMultiplier;
129   // Indicates if this particular source is running
130   private volatile boolean sourceRunning = false;
131   // Metrics for this source
132   private MetricsSource metrics;
133   //WARN threshold for the number of queued logs, defaults to 2
134   private int logQueueWarnThreshold;
135   // ReplicationEndpoint which will handle the actual replication
136   private ReplicationEndpoint replicationEndpoint;
137   // A filter (or a chain of filters) for the WAL entries.
138   private WALEntryFilter walEntryFilter;
139   // throttler
140   private ReplicationThrottler throttler;
141   private AtomicInteger logQueueSize = new AtomicInteger(0);
142   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
143       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
144 
145   /**
146    * Instantiation method used by region servers
147    *
148    * @param conf configuration to use
149    * @param fs file system to use
150    * @param manager replication manager to ping to
151    * @param stopper     the atomic boolean to use to stop the regionserver
152    * @param peerClusterZnode the name of our znode
153    * @param clusterId unique UUID for the cluster
154    * @param replicationEndpoint the replication endpoint implementation
155    * @param metrics metrics for replication source
156    * @throws IOException
157    */
158   @Override
159   public void init(final Configuration conf, final FileSystem fs,
160       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
161       final ReplicationPeers replicationPeers, final Stoppable stopper,
162       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
163       final MetricsSource metrics)
164           throws IOException {
165     this.stopper = stopper;
166     this.conf = HBaseConfiguration.create(conf);
167     decorateConf();
168     this.replicationQueueSizeCapacity =
169         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
170     this.replicationQueueNbCapacity =
171         this.conf.getInt("replication.source.nb.capacity", 25000);
172     this.sleepForRetries =
173         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
174     this.maxRetriesMultiplier =
175         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
176     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
177     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
178     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
179     this.replicationQueues = replicationQueues;
180     this.replicationPeers = replicationPeers;
181     this.manager = manager;
182     this.fs = fs;
183     this.metrics = metrics;
184     this.clusterId = clusterId;
185 
186     this.peerClusterZnode = peerClusterZnode;
187     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
188     // ReplicationQueueInfo parses the peerId out of the znode for us
189     this.peerId = this.replicationQueueInfo.getPeerId();
190     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
191     this.replicationEndpoint = replicationEndpoint;
192   }
193 
194   private void decorateConf() {
195     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
196     if (StringUtils.isNotEmpty(replicationCodec)) {
197       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
198     }
199   }
200 
201   @Override
202   public void enqueueLog(Path log) {
203     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
204     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
205     if (queue == null) {
206       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
207       queues.put(logPrefix, queue);
208       if (this.sourceRunning) {
209         // new wal group observed after source startup, start a new worker thread to track it
210         // notice: it's possible that log enqueued when this.running is set but worker thread
211         // still not launched, so it's necessary to check workerThreads before start the worker
212         final ReplicationSourceWorkerThread worker =
213             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
214         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
215         if (extant != null) {
216           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
217         } else {
218           LOG.debug("Starting up worker for wal group " + logPrefix);
219           worker.startup();
220         }
221       }
222     }
223     queue.put(log);
224     int queueSize = logQueueSize.incrementAndGet();
225     this.metrics.setSizeOfLogQueue(queueSize);
226     // This will log a warning for each new log that gets created above the warn threshold
227     if (queue.size() > this.logQueueWarnThreshold) {
228       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
229           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
230     }
231   }
232 
233   @Override
234   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
235       throws ReplicationException {
236     String peerId = peerClusterZnode;
237     if (peerId.contains("-")) {
238       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
239       // A peerId will not have "-" in its name, see HBASE-11394
240       peerId = peerClusterZnode.split("-")[0];
241     }
242     Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
243     if (tableCFMap != null) {
244       List<String> tableCfs = tableCFMap.get(tableName);
245       if (tableCFMap.containsKey(tableName)
246           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
247         this.replicationQueues.addHFileRefs(peerId, files);
248         metrics.incrSizeOfHFileRefsQueue(files.size());
249       } else {
250         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
251             + Bytes.toString(family) + " to peer id " + peerId);
252       }
253     } else {
254       // user has explicitly not defined any table cfs for replication, means replicate all the
255       // data
256       this.replicationQueues.addHFileRefs(peerId, files);
257       metrics.incrSizeOfHFileRefsQueue(files.size());
258     }
259   }
260 
261   private void uninitialize() {
262     LOG.debug("Source exiting " + this.peerId);
263     metrics.clear();
264     if (replicationEndpoint.state() == Service.State.STARTING
265         || replicationEndpoint.state() == Service.State.RUNNING) {
266       replicationEndpoint.stopAndWait();
267     }
268   }
269 
270   @Override
271   public void run() {
272     // mark we are running now
273     this.sourceRunning = true;
274     try {
275       // start the endpoint, connect to the cluster
276       Service.State state = replicationEndpoint.start().get();
277       if (state != Service.State.RUNNING) {
278         LOG.warn("ReplicationEndpoint was not started. Exiting");
279         uninitialize();
280         return;
281       }
282     } catch (Exception ex) {
283       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
284       throw new RuntimeException(ex);
285     }
286 
287     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
288     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
289       (WALEntryFilter)new SystemTableWALEntryFilter());
290     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
291     if (filterFromEndpoint != null) {
292       filters.add(filterFromEndpoint);
293     }
294     this.walEntryFilter = new ChainWALEntryFilter(filters);
295 
296     int sleepMultiplier = 1;
297     // delay this until we are in an asynchronous thread
298     while (this.isSourceActive() && this.peerClusterId == null) {
299       this.peerClusterId = replicationEndpoint.getPeerUUID();
300       if (this.isSourceActive() && this.peerClusterId == null) {
301         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
302           sleepMultiplier++;
303         }
304       }
305     }
306 
307     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
308     // peerClusterId value, which is the same as the source clusterId
309     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
310       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
311           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
312           + replicationEndpoint.getClass().getName(), null, false);
313     }
314     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
315     // start workers
316     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
317       String walGroupId = entry.getKey();
318       PriorityBlockingQueue<Path> queue = entry.getValue();
319       final ReplicationSourceWorkerThread worker =
320           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
321       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
322       if (extant != null) {
323         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
324       } else {
325         LOG.debug("Starting up worker for wal group " + walGroupId);
326         worker.startup();
327       }
328     }
329   }
330 
331   /**
332    * Do the sleeping logic
333    * @param msg Why we sleep
334    * @param sleepMultiplier by how many times the default sleeping time is augmented
335    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
336    */
337   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
338     try {
339       if (LOG.isTraceEnabled()) {
340         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
341       }
342       Thread.sleep(this.sleepForRetries * sleepMultiplier);
343     } catch (InterruptedException e) {
344       LOG.debug("Interrupted while sleeping between retries");
345       Thread.currentThread().interrupt();
346     }
347     return sleepMultiplier < maxRetriesMultiplier;
348   }
349 
350   /**
351    * check whether the peer is enabled or not
352    *
353    * @return true if the peer is enabled, otherwise false
354    */
355   protected boolean isPeerEnabled() {
356     return this.replicationPeers.getStatusOfPeer(this.peerId);
357   }
358 
359   @Override
360   public void startup() {
361     String n = Thread.currentThread().getName();
362     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
363       @Override
364       public void uncaughtException(final Thread t, final Throwable e) {
365         LOG.error("Unexpected exception in ReplicationSource", e);
366       }
367     };
368     Threads
369         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
370   }
371 
372   @Override
373   public void terminate(String reason) {
374     terminate(reason, null);
375   }
376 
377   @Override
378   public void terminate(String reason, Exception cause) {
379     terminate(reason, cause, true);
380   }
381 
382   public void terminate(String reason, Exception cause, boolean join) {
383     if (cause == null) {
384       LOG.info("Closing source "
385           + this.peerClusterZnode + " because: " + reason);
386 
387     } else {
388       LOG.error("Closing source " + this.peerClusterZnode
389           + " because an error occurred: " + reason, cause);
390     }
391     this.sourceRunning = false;
392     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
393     for (ReplicationSourceWorkerThread worker : workers) {
394       worker.setWorkerRunning(false);
395       worker.interrupt();
396     }
397     ListenableFuture<Service.State> future = null;
398     if (this.replicationEndpoint != null) {
399       future = this.replicationEndpoint.stop();
400     }
401     if (join) {
402       for (ReplicationSourceWorkerThread worker : workers) {
403         Threads.shutdown(worker, this.sleepForRetries);
404         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
405       }
406       if (future != null) {
407         try {
408           future.get();
409         } catch (Exception e) {
410           LOG.warn("Got exception:" + e);
411         }
412       }
413     }
414   }
415 
416   @Override
417   public String getPeerClusterZnode() {
418     return this.peerClusterZnode;
419   }
420 
421   @Override
422   public String getPeerClusterId() {
423     return this.peerId;
424   }
425 
426   @Override
427   public Path getCurrentPath() {
428     // only for testing
429     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
430       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
431     }
432     return null;
433   }
434 
435   private boolean isSourceActive() {
436     return !this.stopper.isStopped() && this.sourceRunning;
437   }
438 
439   /**
440    * Comparator used to compare logs together based on their start time
441    */
442   public static class LogsComparator implements Comparator<Path> {
443 
444     @Override
445     public int compare(Path o1, Path o2) {
446       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
447     }
448 
449     /**
450      * Split a path to get the start time
451      * For example: 10.20.20.171%3A60020.1277499063250
452      * @param p path to split
453      * @return start time
454      */
455     private static long getTS(Path p) {
456       int tsIndex = p.getName().lastIndexOf('.') + 1;
457       return Long.parseLong(p.getName().substring(tsIndex));
458     }
459   }
460 
461   @Override
462   public String getStats() {
463     StringBuilder sb = new StringBuilder();
464     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
465         .append(", current progress: \n");
466     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
467       String walGroupId = entry.getKey();
468       ReplicationSourceWorkerThread worker = entry.getValue();
469       long position = worker.getCurrentPosition();
470       Path currentPath = worker.getCurrentPath();
471       sb.append("walGroup [").append(walGroupId).append("]: ");
472       if (currentPath != null) {
473         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
474             .append(position).append("\n");
475       } else {
476         sb.append("no replication ongoing, waiting for new log");
477       }
478     }
479     return sb.toString();
480   }
481 
482   /**
483    * Get Replication Source Metrics
484    * @return sourceMetrics
485    */
486   public MetricsSource getSourceMetrics() {
487     return this.metrics;
488   }
489 
490   public class ReplicationSourceWorkerThread extends Thread {
491     ReplicationSource source;
492     String walGroupId;
493     PriorityBlockingQueue<Path> queue;
494     ReplicationQueueInfo replicationQueueInfo;
495     // Our reader for the current log. open/close handled by repLogReader
496     private WAL.Reader reader;
497     // Last position in the log that we sent to ZooKeeper
498     private long lastLoggedPosition = -1;
499     // Path of the current log
500     private volatile Path currentPath;
501     // Handle on the log reader helper
502     private ReplicationWALReaderManager repLogReader;
503     // Current number of operations (Put/Delete) that we need to replicate
504     private int currentNbOperations = 0;
505     // Current size of data we need to replicate
506     private int currentSize = 0;
507     // Indicates whether this particular worker is running
508     private boolean workerRunning = true;
509     // Current number of hfiles that we need to replicate
510     private long currentNbHFiles = 0;
511 
512     public ReplicationSourceWorkerThread(String walGroupId,
513         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
514       this.walGroupId = walGroupId;
515       this.queue = queue;
516       this.replicationQueueInfo = replicationQueueInfo;
517       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
518       this.source = source;
519     }
520 
521     @Override
522     public void run() {
523       // If this is recovered, the queue is already full and the first log
524       // normally has a position (unless the RS failed between 2 logs)
525       if (this.replicationQueueInfo.isQueueRecovered()) {
526         try {
527           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
528             this.queue.peek().getName()));
529           if (LOG.isTraceEnabled()) {
530             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
531                 + this.repLogReader.getPosition());
532           }
533         } catch (ReplicationException e) {
534           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
535         }
536       }
537       // Loop until we close down
538       while (isWorkerActive()) {
539         int sleepMultiplier = 1;
540         // Sleep until replication is enabled again
541         if (!isPeerEnabled()) {
542           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
543             sleepMultiplier++;
544           }
545           continue;
546         }
547         Path oldPath = getCurrentPath(); //note that in the current scenario,
548                                          //oldPath will be null when a log roll
549                                          //happens.
550         // Get a new path
551         boolean hasCurrentPath = getNextPath();
552         if (getCurrentPath() != null && oldPath == null) {
553           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
554         }
555         if (!hasCurrentPath) {
556           if (sleepForRetries("No log to process", sleepMultiplier)) {
557             sleepMultiplier++;
558           }
559           continue;
560         }
561         boolean currentWALisBeingWrittenTo = false;
562         //For WAL files we own (rather than recovered), take a snapshot of whether the
563         //current WAL file (this.currentPath) is in use (for writing) NOW!
564         //Since the new WAL paths are enqueued only after the prev WAL file
565         //is 'closed', presence of an element in the queue means that
566         //the previous WAL file was closed, else the file is in use (currentPath)
567         //We take the snapshot now so that we are protected against races
568         //where a new file gets enqueued while the current file is being processed
569         //(and where we just finished reading the current file).
570         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
571           currentWALisBeingWrittenTo = true;
572         }
573         // Open a reader on it
574         if (!openReader(sleepMultiplier)) {
575           // Reset the sleep multiplier, else it'd be reused for the next file
576           sleepMultiplier = 1;
577           continue;
578         }
579 
580         // If we got a null reader but didn't continue, then sleep and continue
581         if (this.reader == null) {
582           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
583             sleepMultiplier++;
584           }
585           continue;
586         }
587 
588         boolean gotIOE = false;
589         currentNbOperations = 0;
590         currentNbHFiles = 0;
591         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
592         currentSize = 0;
593         try {
594           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
595             continue;
596           }
597         } catch (IOException ioe) {
598           LOG.warn(peerClusterZnode + " Got: ", ioe);
599           gotIOE = true;
600           if (ioe.getCause() instanceof EOFException) {
601 
602             boolean considerDumping = false;
603             if (this.replicationQueueInfo.isQueueRecovered()) {
604               try {
605                 FileStatus stat = fs.getFileStatus(this.currentPath);
606                 if (stat.getLen() == 0) {
607                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
608                 }
609                 considerDumping = true;
610               } catch (IOException e) {
611                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
612               }
613             }
614 
615             if (considerDumping &&
616                 sleepMultiplier == maxRetriesMultiplier &&
617                 processEndOfFile()) {
618               continue;
619             }
620           }
621         } finally {
622           try {
623             this.reader = null;
624             this.repLogReader.closeReader();
625           } catch (IOException e) {
626             gotIOE = true;
627             LOG.warn("Unable to finalize the tailing of a file", e);
628           }
629         }
630 
631         // If we didn't get anything to replicate, or if we hit a IOE,
632         // wait a bit and retry.
633         // But if we need to stop, don't bother sleeping
634         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
635           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
636             manager.logPositionAndCleanOldLogs(this.currentPath,
637                 peerClusterZnode, this.repLogReader.getPosition(),
638                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
639             this.lastLoggedPosition = this.repLogReader.getPosition();
640           }
641           // Reset the sleep multiplier if nothing has actually gone wrong
642           if (!gotIOE) {
643             sleepMultiplier = 1;
644             // if there was nothing to ship and it's not an error
645             // set "ageOfLastShippedOp" to <now> to indicate that we're current
646             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
647           }
648           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
649             sleepMultiplier++;
650           }
651           continue;
652         }
653         sleepMultiplier = 1;
654         shipEdits(currentWALisBeingWrittenTo, entries);
655       }
656       if (replicationQueueInfo.isQueueRecovered()) {
657         // use synchronize to make sure one last thread will clean the queue
658         synchronized (workerThreads) {
659           Threads.sleep(100);// wait a short while for other worker thread to fully exit
660           boolean allOtherTaskDone = true;
661           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
662             if (!worker.equals(this) && worker.isAlive()) {
663               allOtherTaskDone = false;
664               break;
665             }
666           }
667           if (allOtherTaskDone) {
668             manager.closeRecoveredQueue(this.source);
669             LOG.info("Finished recovering queue " + peerClusterZnode
670                 + " with the following stats: " + getStats());
671           }
672         }
673       }
674     }
675 
676     /**
677      * Read all the entries from the current log files and retain those that need to be replicated.
678      * Else, process the end of the current file.
679      * @param currentWALisBeingWrittenTo is the current WAL being written to
680      * @param entries resulting entries to be replicated
681      * @return true if we got nothing and went to the next file, false if we got entries
682      * @throws IOException
683      */
684     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
685         List<WAL.Entry> entries) throws IOException {
686       long seenEntries = 0;
687       if (LOG.isTraceEnabled()) {
688         LOG.trace("Seeking in " + this.currentPath + " at position "
689             + this.repLogReader.getPosition());
690       }
691       this.repLogReader.seek();
692       long positionBeforeRead = this.repLogReader.getPosition();
693       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
694       while (entry != null) {
695         metrics.incrLogEditsRead();
696         seenEntries++;
697 
698         // don't replicate if the log entries have already been consumed by the cluster
699         if (replicationEndpoint.canReplicateToSameCluster()
700             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
701           // Remove all KVs that should not be replicated
702           entry = walEntryFilter.filter(entry);
703           WALEdit edit = null;
704           WALKey logKey = null;
705           if (entry != null) {
706             edit = entry.getEdit();
707             logKey = entry.getKey();
708           }
709 
710           if (edit != null && edit.size() != 0) {
711             // Mark that the current cluster has the change
712             logKey.addClusterId(clusterId);
713             currentNbOperations += countDistinctRowKeys(edit);
714             entries.add(entry);
715             currentSize += entry.getEdit().heapSize();
716           } else {
717             metrics.incrLogEditsFiltered();
718           }
719         }
720         // Stop if too many entries or too big
721         // FIXME check the relationship between single wal group and overall
722         if (currentSize >= replicationQueueSizeCapacity
723             || entries.size() >= replicationQueueNbCapacity) {
724           break;
725         }
726         try {
727           entry = this.repLogReader.readNextAndSetPosition();
728         } catch (IOException ie) {
729           LOG.debug("Break on IOE: " + ie.getMessage());
730           break;
731         }
732       }
733       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
734       if (currentWALisBeingWrittenTo) {
735         return false;
736       }
737       // If we didn't get anything and the queue has an object, it means we
738       // hit the end of the file for sure
739       return seenEntries == 0 && processEndOfFile();
740     }
741 
742     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
743       String peerId = peerClusterZnode;
744       if (peerId.contains("-")) {
745         // peerClusterZnode will be in the form peerId + "-" + rsZNode.
746         // A peerId will not have "-" in its name, see HBASE-11394
747         peerId = peerClusterZnode.split("-")[0];
748       }
749       List<Cell> cells = edit.getCells();
750       for (int i = 0; i < cells.size(); i++) {
751         Cell cell = cells.get(i);
752         if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
753           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
754           List<StoreDescriptor> stores = bld.getStoresList();
755           for (int j = 0; j < stores.size(); j++) {
756             List<String> storeFileList = stores.get(j).getStoreFileList();
757             manager.cleanUpHFileRefs(peerId, storeFileList);
758             metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
759           }
760         }
761       }
762     }
763 
764     /**
765      * Poll for the next path
766      * @return true if a path was obtained, false if not
767      */
768     protected boolean getNextPath() {
769       try {
770         if (this.currentPath == null) {
771           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
772           int queueSize = logQueueSize.decrementAndGet();
773           metrics.setSizeOfLogQueue(queueSize);
774           if (this.currentPath != null) {
775             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
776             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
777               this.replicationQueueInfo.isQueueRecovered());
778             if (LOG.isTraceEnabled()) {
779               LOG.trace("New log: " + this.currentPath);
780             }
781           }
782         }
783       } catch (InterruptedException e) {
784         LOG.warn("Interrupted while reading edits", e);
785       }
786       return this.currentPath != null;
787     }
788 
789     /**
790      * Open a reader on the current path
791      *
792      * @param sleepMultiplier by how many times the default sleeping time is augmented
793      * @return true if we should continue with that file, false if we are over with it
794      */
795     protected boolean openReader(int sleepMultiplier) {
796       try {
797         try {
798           if (LOG.isTraceEnabled()) {
799             LOG.trace("Opening log " + this.currentPath);
800           }
801           this.reader = repLogReader.openReader(this.currentPath);
802         } catch (FileNotFoundException fnfe) {
803           if (this.replicationQueueInfo.isQueueRecovered()) {
804             // We didn't find the log in the archive directory, look if it still
805             // exists in the dead RS folder (there could be a chain of failures
806             // to look at)
807             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
808             LOG.info("NB dead servers : " + deadRegionServers.size());
809             final Path rootDir = FSUtils.getRootDir(conf);
810             for (String curDeadServerName : deadRegionServers) {
811               final Path deadRsDirectory = new Path(rootDir,
812                   DefaultWALProvider.getWALDirectoryName(curDeadServerName));
813               Path[] locs = new Path[] {
814                   new Path(deadRsDirectory, currentPath.getName()),
815                   new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
816                                             currentPath.getName()),
817               };
818               for (Path possibleLogLocation : locs) {
819                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
820                 if (manager.getFs().exists(possibleLogLocation)) {
821                   // We found the right new location
822                   LOG.info("Log " + this.currentPath + " still exists at " +
823                       possibleLogLocation);
824                   // Breaking here will make us sleep since reader is null
825                   // TODO why don't we need to set currentPath and call openReader here?
826                   return true;
827                 }
828               }
829             }
830             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
831             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
832             if (stopper instanceof ReplicationSyncUp.DummyServer) {
833               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
834               //      area rather than to the wal area for a particular region server.
835               FileStatus[] rss = fs.listStatus(manager.getLogDir());
836               for (FileStatus rs : rss) {
837                 Path p = rs.getPath();
838                 FileStatus[] logs = fs.listStatus(p);
839                 for (FileStatus log : logs) {
840                   p = new Path(p, log.getPath().getName());
841                   if (p.getName().equals(currentPath.getName())) {
842                     currentPath = p;
843                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
844                     // Open the log at the new location
845                     this.openReader(sleepMultiplier);
846                     return true;
847                   }
848                 }
849               }
850             }
851 
852             // TODO What happens if the log was missing from every single location?
853             // Although we need to check a couple of times as the log could have
854             // been moved by the master between the checks
855             // It can also happen if a recovered queue wasn't properly cleaned,
856             // such that the znode pointing to a log exists but the log was
857             // deleted a long time ago.
858             // For the moment, we'll throw the IO and processEndOfFile
859             throw new IOException("File from recovered queue is " +
860                 "nowhere to be found", fnfe);
861           } else {
862             // If the log was archived, continue reading from there
863             Path archivedLogLocation =
864                 new Path(manager.getOldLogDir(), currentPath.getName());
865             if (manager.getFs().exists(archivedLogLocation)) {
866               currentPath = archivedLogLocation;
867               LOG.info("Log " + this.currentPath + " was moved to " +
868                   archivedLogLocation);
869               // Open the log at the new location
870               this.openReader(sleepMultiplier);
871 
872             }
873             // TODO What happens the log is missing in both places?
874           }
875         }
876       } catch (LeaseNotRecoveredException lnre) {
877         // HBASE-15019 the WAL was not closed due to some hiccup.
878         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
879         recoverLease(conf, currentPath);
880         this.reader = null;
881       } catch (IOException ioe) {
882         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
883         LOG.warn(peerClusterZnode + " Got: ", ioe);
884         this.reader = null;
885         if (ioe.getCause() instanceof NullPointerException) {
886           // Workaround for race condition in HDFS-4380
887           // which throws a NPE if we open a file before any data node has the most recent block
888           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
889           LOG.warn("Got NPE opening reader, will retry.");
890         } else if (sleepMultiplier >= maxRetriesMultiplier) {
891           // TODO Need a better way to determine if a file is really gone but
892           // TODO without scanning all logs dir
893           LOG.warn("Waited too long for this file, considering dumping");
894           return !processEndOfFile();
895         }
896       }
897       return true;
898     }
899 
900     private void recoverLease(final Configuration conf, final Path path) {
901       try {
902         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
903         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
904         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
905           @Override
906           public boolean progress() {
907             LOG.debug("recover WAL lease: " + path);
908             return isWorkerActive();
909           }
910         });
911       } catch (IOException e) {
912         LOG.warn("unable to recover lease for WAL: " + path, e);
913       }
914     }
915 
916     /*
917      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
918      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
919      * trying to read the log file and get EOFException. In case of a recovered queue the last log
920      * file may be empty, and we don't want to retry that.
921      */
922     private boolean isCurrentLogEmpty() {
923       return (this.repLogReader.getPosition() == 0 &&
924           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
925     }
926 
927     /**
928      * Count the number of different row keys in the given edit because of mini-batching. We assume
929      * that there's at least one Cell in the WALEdit.
930      * @param edit edit to count row keys from
931      * @return number of different row keys
932      */
933     private int countDistinctRowKeys(WALEdit edit) {
934       List<Cell> cells = edit.getCells();
935       int distinctRowKeys = 1;
936       int totalHFileEntries = 0;
937       Cell lastCell = cells.get(0);
938 
939       for (int i = 0; i < edit.size(); i++) {
940         // Count HFiles to be replicated
941         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
942           try {
943             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
944             List<StoreDescriptor> stores = bld.getStoresList();
945             for (int j = 0; j < stores.size(); j++) {
946               totalHFileEntries += stores.get(j).getStoreFileList().size();
947             }
948           } catch (IOException e) {
949             LOG.error("Failed to deserialize bulk load entry from wal edit. "
950                 + "This its hfiles count will not be added into metric.");
951           }
952         }
953 
954         if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
955           distinctRowKeys++;
956         }
957         lastCell = cells.get(i);
958       }
959       currentNbHFiles += totalHFileEntries;
960       return distinctRowKeys + totalHFileEntries;
961     }
962 
963     /**
964      * Do the shipping logic
965      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
966      * written to when this method was called
967      */
968     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
969       int sleepMultiplier = 0;
970       if (entries.isEmpty()) {
971         LOG.warn("Was given 0 edits to ship");
972         return;
973       }
974       while (isWorkerActive()) {
975         try {
976           if (throttler.isEnabled()) {
977             long sleepTicks = throttler.getNextSleepInterval(currentSize);
978             if (sleepTicks > 0) {
979               try {
980                 if (LOG.isTraceEnabled()) {
981                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
982                 }
983                 Thread.sleep(sleepTicks);
984               } catch (InterruptedException e) {
985                 LOG.debug("Interrupted while sleeping for throttling control");
986                 Thread.currentThread().interrupt();
987                 // current thread might be interrupted to terminate
988                 // directly go back to while() for confirm this
989                 continue;
990               }
991               // reset throttler's cycle start tick when sleep for throttling occurs
992               throttler.resetStartTick();
993             }
994           }
995           // create replicateContext here, so the entries can be GC'd upon return from this call
996           // stack
997           ReplicationEndpoint.ReplicateContext replicateContext =
998               new ReplicationEndpoint.ReplicateContext();
999           replicateContext.setEntries(entries).setSize(currentSize);
1000           replicateContext.setWalGroupId(walGroupId);
1001 
1002           long startTimeNs = System.nanoTime();
1003           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
1004           boolean replicated = replicationEndpoint.replicate(replicateContext);
1005           long endTimeNs = System.nanoTime();
1006 
1007           if (!replicated) {
1008             continue;
1009           } else {
1010             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
1011           }
1012 
1013           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
1014             //Clean up hfile references
1015             int size = entries.size();
1016             for (int i = 0; i < size; i++) {
1017               cleanUpHFileRefs(entries.get(i).getEdit());
1018             }
1019             //Log and clean up WAL logs
1020             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
1021               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
1022               currentWALisBeingWrittenTo);
1023             this.lastLoggedPosition = this.repLogReader.getPosition();
1024           }
1025           if (throttler.isEnabled()) {
1026             throttler.addPushSize(currentSize);
1027           }
1028           totalReplicatedEdits.addAndGet(entries.size());
1029           totalReplicatedOperations.addAndGet(currentNbOperations);
1030           // FIXME check relationship between wal group and overall
1031           metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
1032           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
1033             walGroupId);
1034           if (LOG.isTraceEnabled()) {
1035             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
1036                 + totalReplicatedOperations + " operations in "
1037                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
1038           }
1039           break;
1040         } catch (Exception ex) {
1041           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
1042               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
1043           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
1044             sleepMultiplier++;
1045           }
1046         }
1047       }
1048     }
1049 
1050     /**
1051      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
1052      * we're done! Else we'll just continue to try reading the log file
1053      * @return true if we're done with the current file, false if we should continue trying to read
1054      *         from it
1055      */
1056     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
1057         justification = "Yeah, this is how it works")
1058     protected boolean processEndOfFile() {
1059       if (this.queue.size() != 0) {
1060         if (LOG.isTraceEnabled()) {
1061           String filesize = "N/A";
1062           try {
1063             FileStatus stat = fs.getFileStatus(this.currentPath);
1064             filesize = stat.getLen() + "";
1065           } catch (IOException ex) {
1066           }
1067           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1068               + ", and the length of the file is " + filesize);
1069         }
1070         this.currentPath = null;
1071         this.repLogReader.finishCurrentFile();
1072         this.reader = null;
1073         return true;
1074       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1075         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1076             + peerClusterZnode);
1077         workerRunning = false;
1078         return true;
1079       }
1080       return false;
1081     }
1082 
1083     public void startup() {
1084       String n = Thread.currentThread().getName();
1085       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1086         @Override
1087         public void uncaughtException(final Thread t, final Throwable e) {
1088           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1089               + getCurrentPath(), e);
1090         }
1091       };
1092       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1093           + peerClusterZnode, handler);
1094       workerThreads.put(walGroupId, this);
1095     }
1096 
1097     public Path getCurrentPath() {
1098       return this.currentPath;
1099     }
1100 
1101     public long getCurrentPosition() {
1102       return this.repLogReader.getPosition();
1103     }
1104 
1105     private boolean isWorkerActive() {
1106       return !stopper.isStopped() && workerRunning && !isInterrupted();
1107     }
1108 
1109     private void terminate(String reason, Exception cause) {
1110       if (cause == null) {
1111         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1112 
1113       } else {
1114         LOG.error("Closing worker for wal group " + this.walGroupId
1115             + " because an error occurred: " + reason, cause);
1116       }
1117       this.interrupt();
1118       Threads.shutdown(this, sleepForRetries);
1119       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1120     }
1121 
1122     public void setWorkerRunning(boolean workerRunning) {
1123       this.workerRunning = workerRunning;
1124     }
1125   }
1126 }