View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.util.concurrent.ListenableFuture;
23  import com.google.common.util.concurrent.Service;
24
25  import java.io.EOFException;
26  import java.io.FileNotFoundException;
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.Comparator;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.UUID;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.PriorityBlockingQueue;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicLong;
39
40  import org.apache.commons.lang.StringUtils;
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.CellUtil;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.Stoppable;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
55  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
58  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
59  import org.apache.hadoop.hbase.replication.ReplicationException;
60  import org.apache.hadoop.hbase.replication.ReplicationPeers;
61  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
62  import org.apache.hadoop.hbase.replication.ReplicationQueues;
63  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
64  import org.apache.hadoop.hbase.replication.WALEntryFilter;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.CancelableProgressable;
67  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
68  import org.apache.hadoop.hbase.util.FSUtils;
69  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
70  import org.apache.hadoop.hbase.util.Threads;
71  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
72  import org.apache.hadoop.hbase.wal.WAL;
73  import org.apache.hadoop.hbase.wal.WALKey;
74  
75  /**
76   * Class that handles the source of a replication stream.
77   * Currently does not handle more than 1 slave
78   * For each slave cluster it selects a random number of peers
79   * using a replication ratio. For example, if replication ration = 0.1
80   * and slave cluster has 100 region servers, 10 will be selected.
81   * <p>
82   * A stream is considered down when we cannot contact a region server on the
83   * peer cluster for more than 55 seconds by default.
84   * </p>
85   *
86   */
87  @InterfaceAudience.Private
88  public class ReplicationSource extends Thread
89      implements ReplicationSourceInterface {
90  
91    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
92    // Queues of logs to process, entry in format of walGroupId->queue,
93    // each presents a queue for one wal group
94    private Map<String, PriorityBlockingQueue<Path>> queues =
95        new HashMap<String, PriorityBlockingQueue<Path>>();
96    // per group queue size, keep no more than this number of logs in each wal group
97    private int queueSizePerGroup;
98    private ReplicationQueues replicationQueues;
99    private ReplicationPeers replicationPeers;
100 
101   private Configuration conf;
102   private ReplicationQueueInfo replicationQueueInfo;
103   // id of the peer cluster this source replicates to
104   private String peerId;
105   // The manager of all sources to which we ping back our progress
106   private ReplicationSourceManager manager;
107   // Should we stop everything?
108   private Stoppable stopper;
109   // How long should we sleep for each retry
110   private long sleepForRetries;
111   // Max size in bytes of entriesArray
112   private long replicationQueueSizeCapacity;
113   // Max number of entries in entriesArray
114   private int replicationQueueNbCapacity;
115   private FileSystem fs;
116   // id of this cluster
117   private UUID clusterId;
118   // id of the other cluster
119   private UUID peerClusterId;
120   // total number of edits we replicated
121   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
122   // total number of edits we replicated
123   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
124   // The znode we currently play with
125   private String peerClusterZnode;
126   // Maximum number of retries before taking bold actions
127   private int maxRetriesMultiplier;
128   // Indicates if this particular source is running
129   private volatile boolean sourceRunning = false;
130   // Metrics for this source
131   private MetricsSource metrics;
132   //WARN threshold for the number of queued logs, defaults to 2
133   private int logQueueWarnThreshold;
134   // ReplicationEndpoint which will handle the actual replication
135   private ReplicationEndpoint replicationEndpoint;
136   // A filter (or a chain of filters) for the WAL entries.
137   private WALEntryFilter walEntryFilter;
138   // throttler
139   private ReplicationThrottler throttler;
140   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
141       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
142 
143   /**
144    * Instantiation method used by region servers
145    *
146    * @param conf configuration to use
147    * @param fs file system to use
148    * @param manager replication manager to ping to
149    * @param stopper     the atomic boolean to use to stop the regionserver
150    * @param peerClusterZnode the name of our znode
151    * @param clusterId unique UUID for the cluster
152    * @param replicationEndpoint the replication endpoint implementation
153    * @param metrics metrics for replication source
154    * @throws IOException
155    */
156   @Override
157   public void init(final Configuration conf, final FileSystem fs,
158       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
159       final ReplicationPeers replicationPeers, final Stoppable stopper,
160       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
161       final MetricsSource metrics)
162           throws IOException {
163     this.stopper = stopper;
164     this.conf = HBaseConfiguration.create(conf);
165     decorateConf();
166     this.replicationQueueSizeCapacity =
167         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
168     this.replicationQueueNbCapacity =
169         this.conf.getInt("replication.source.nb.capacity", 25000);
170     this.sleepForRetries =
171         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
172     this.maxRetriesMultiplier =
173         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
174     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
175     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
176     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
177     this.replicationQueues = replicationQueues;
178     this.replicationPeers = replicationPeers;
179     this.manager = manager;
180     this.fs = fs;
181     this.metrics = metrics;
182     this.clusterId = clusterId;
183 
184     this.peerClusterZnode = peerClusterZnode;
185     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
186     // ReplicationQueueInfo parses the peerId out of the znode for us
187     this.peerId = this.replicationQueueInfo.getPeerId();
188     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
189     this.replicationEndpoint = replicationEndpoint;
190   }
191 
192   private void decorateConf() {
193     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
194     if (StringUtils.isNotEmpty(replicationCodec)) {
195       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
196     }
197   }
198 
199   @Override
200   public void enqueueLog(Path log) {
201     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
202     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
203     if (queue == null) {
204       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
205       queues.put(logPrefix, queue);
206       if (this.sourceRunning) {
207         // new wal group observed after source startup, start a new worker thread to track it
208         // notice: it's possible that log enqueued when this.running is set but worker thread
209         // still not launched, so it's necessary to check workerThreads before start the worker
210         final ReplicationSourceWorkerThread worker =
211             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
212         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
213         if (extant != null) {
214           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
215         } else {
216           LOG.debug("Starting up worker for wal group " + logPrefix);
217           worker.startup();
218         }
219       }
220     }
221     queue.put(log);
222     this.metrics.incrSizeOfLogQueue();
223     // This will log a warning for each new log that gets created above the warn threshold
224     int queueSize = queue.size();
225     if (queueSize > this.logQueueWarnThreshold) {
226       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
227           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
228     }
229   }
230 
231   @Override
232   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
233       throws ReplicationException {
234     String peerId = peerClusterZnode;
235     if (peerId.contains("-")) {
236       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
237       // A peerId will not have "-" in its name, see HBASE-11394
238       peerId = peerClusterZnode.split("-")[0];
239     }
240     Map<TableName, List<String>> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs();
241     if (tableCFMap != null) {
242       List<String> tableCfs = tableCFMap.get(tableName);
243       if (tableCFMap.containsKey(tableName)
244           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
245         this.replicationQueues.addHFileRefs(peerId, files);
246         metrics.incrSizeOfHFileRefsQueue(files.size());
247       } else {
248         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
249             + Bytes.toString(family) + " to peer id " + peerId);
250       }
251     } else {
252       // user has explicitly not defined any table cfs for replication, means replicate all the
253       // data
254       this.replicationQueues.addHFileRefs(peerId, files);
255       metrics.incrSizeOfHFileRefsQueue(files.size());
256     }
257   }
258 
259   private void uninitialize() {
260     LOG.debug("Source exiting " + this.peerId);
261     metrics.clear();
262     if (replicationEndpoint.state() == Service.State.STARTING
263         || replicationEndpoint.state() == Service.State.RUNNING) {
264       replicationEndpoint.stopAndWait();
265     }
266   }
267 
268   @Override
269   public void run() {
270     // mark we are running now
271     this.sourceRunning = true;
272     try {
273       // start the endpoint, connect to the cluster
274       Service.State state = replicationEndpoint.start().get();
275       if (state != Service.State.RUNNING) {
276         LOG.warn("ReplicationEndpoint was not started. Exiting");
277         uninitialize();
278         return;
279       }
280     } catch (Exception ex) {
281       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
282       throw new RuntimeException(ex);
283     }
284 
285     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
286     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
287       (WALEntryFilter)new SystemTableWALEntryFilter());
288     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
289     if (filterFromEndpoint != null) {
290       filters.add(filterFromEndpoint);
291     }
292     this.walEntryFilter = new ChainWALEntryFilter(filters);
293 
294     int sleepMultiplier = 1;
295     // delay this until we are in an asynchronous thread
296     while (this.isSourceActive() && this.peerClusterId == null) {
297       this.peerClusterId = replicationEndpoint.getPeerUUID();
298       if (this.isSourceActive() && this.peerClusterId == null) {
299         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
300           sleepMultiplier++;
301         }
302       }
303     }
304 
305     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
306     // peerClusterId value, which is the same as the source clusterId
307     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
308       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
309           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
310           + replicationEndpoint.getClass().getName(), null, false);
311     }
312     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
313     // start workers
314     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
315       String walGroupId = entry.getKey();
316       PriorityBlockingQueue<Path> queue = entry.getValue();
317       final ReplicationSourceWorkerThread worker =
318           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
319       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
320       if (extant != null) {
321         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
322       } else {
323         LOG.debug("Starting up worker for wal group " + walGroupId);
324         worker.startup();
325       }
326     }
327   }
328 
329   /**
330    * Do the sleeping logic
331    * @param msg Why we sleep
332    * @param sleepMultiplier by how many times the default sleeping time is augmented
333    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
334    */
335   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
336     try {
337       if (LOG.isTraceEnabled()) {
338         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
339       }
340       Thread.sleep(this.sleepForRetries * sleepMultiplier);
341     } catch (InterruptedException e) {
342       LOG.debug("Interrupted while sleeping between retries");
343       Thread.currentThread().interrupt();
344     }
345     return sleepMultiplier < maxRetriesMultiplier;
346   }
347 
348   /**
349    * check whether the peer is enabled or not
350    *
351    * @return true if the peer is enabled, otherwise false
352    */
353   protected boolean isPeerEnabled() {
354     return this.replicationPeers.getStatusOfPeer(this.peerId);
355   }
356 
357   @Override
358   public void startup() {
359     String n = Thread.currentThread().getName();
360     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
361       @Override
362       public void uncaughtException(final Thread t, final Throwable e) {
363         LOG.error("Unexpected exception in ReplicationSource", e);
364       }
365     };
366     Threads
367         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
368   }
369 
370   @Override
371   public void terminate(String reason) {
372     terminate(reason, null);
373   }
374 
375   @Override
376   public void terminate(String reason, Exception cause) {
377     terminate(reason, cause, true);
378   }
379 
380   public void terminate(String reason, Exception cause, boolean join) {
381     if (cause == null) {
382       LOG.info("Closing source "
383           + this.peerClusterZnode + " because: " + reason);
384 
385     } else {
386       LOG.error("Closing source " + this.peerClusterZnode
387           + " because an error occurred: " + reason, cause);
388     }
389     this.sourceRunning = false;
390     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
391     for (ReplicationSourceWorkerThread worker : workers) {
392       worker.setWorkerRunning(false);
393       worker.interrupt();
394     }
395     ListenableFuture<Service.State> future = null;
396     if (this.replicationEndpoint != null) {
397       future = this.replicationEndpoint.stop();
398     }
399     if (join) {
400       for (ReplicationSourceWorkerThread worker : workers) {
401         Threads.shutdown(worker, this.sleepForRetries);
402         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
403       }
404       if (future != null) {
405         try {
406           future.get();
407         } catch (Exception e) {
408           LOG.warn("Got exception:" + e);
409         }
410       }
411     }
412   }
413 
414   @Override
415   public String getPeerClusterZnode() {
416     return this.peerClusterZnode;
417   }
418 
419   @Override
420   public String getPeerClusterId() {
421     return this.peerId;
422   }
423 
424   @Override
425   public Path getCurrentPath() {
426     // only for testing
427     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
428       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
429     }
430     return null;
431   }
432 
433   private boolean isSourceActive() {
434     return !this.stopper.isStopped() && this.sourceRunning;
435   }
436 
437   /**
438    * Comparator used to compare logs together based on their start time
439    */
440   public static class LogsComparator implements Comparator<Path> {
441 
442     @Override
443     public int compare(Path o1, Path o2) {
444       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
445     }
446 
447     /**
448      * Split a path to get the start time
449      * For example: 10.20.20.171%3A60020.1277499063250
450      * @param p path to split
451      * @return start time
452      */
453     private static long getTS(Path p) {
454       int tsIndex = p.getName().lastIndexOf('.') + 1;
455       return Long.parseLong(p.getName().substring(tsIndex));
456     }
457   }
458 
459   @Override
460   public String getStats() {
461     StringBuilder sb = new StringBuilder();
462     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
463         .append(", current progress: \n");
464     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
465       String walGroupId = entry.getKey();
466       ReplicationSourceWorkerThread worker = entry.getValue();
467       long position = worker.getCurrentPosition();
468       Path currentPath = worker.getCurrentPath();
469       sb.append("walGroup [").append(walGroupId).append("]: ");
470       if (currentPath != null) {
471         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
472             .append(position).append("\n");
473       } else {
474         sb.append("no replication ongoing, waiting for new log");
475       }
476     }
477     return sb.toString();
478   }
479 
480   /**
481    * Get Replication Source Metrics
482    * @return sourceMetrics
483    */
484   public MetricsSource getSourceMetrics() {
485     return this.metrics;
486   }
487 
488   public class ReplicationSourceWorkerThread extends Thread {
489     ReplicationSource source;
490     String walGroupId;
491     PriorityBlockingQueue<Path> queue;
492     ReplicationQueueInfo replicationQueueInfo;
493     // Our reader for the current log. open/close handled by repLogReader
494     private WAL.Reader reader;
495     // Last position in the log that we sent to ZooKeeper
496     private long lastLoggedPosition = -1;
497     // Path of the current log
498     private volatile Path currentPath;
499     // Handle on the log reader helper
500     private ReplicationWALReaderManager repLogReader;
501     // Current number of operations (Put/Delete) that we need to replicate
502     private int currentNbOperations = 0;
503     // Current size of data we need to replicate
504     private int currentSize = 0;
505     // Indicates whether this particular worker is running
506     private boolean workerRunning = true;
507     // Current number of hfiles that we need to replicate
508     private long currentNbHFiles = 0;
509 
510     public ReplicationSourceWorkerThread(String walGroupId,
511         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
512         ReplicationSource source) {
513       this.walGroupId = walGroupId;
514       this.queue = queue;
515       this.replicationQueueInfo = replicationQueueInfo;
516       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
517       this.source = source;
518     }
519 
520     @Override
521     public void run() {
522       // If this is recovered, the queue is already full and the first log
523       // normally has a position (unless the RS failed between 2 logs)
524       if (this.replicationQueueInfo.isQueueRecovered()) {
525         try {
526           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
527             this.queue.peek().getName()));
528           if (LOG.isTraceEnabled()) {
529             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
530                 + this.repLogReader.getPosition());
531           }
532         } catch (ReplicationException e) {
533           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
534         }
535       }
536       // Loop until we close down
537       while (isWorkerActive()) {
538         int sleepMultiplier = 1;
539         // Sleep until replication is enabled again
540         if (!isPeerEnabled()) {
541           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
542             sleepMultiplier++;
543           }
544           continue;
545         }
546         Path oldPath = getCurrentPath(); //note that in the current scenario,
547                                          //oldPath will be null when a log roll
548                                          //happens.
549         // Get a new path
550         boolean hasCurrentPath = getNextPath();
551         if (getCurrentPath() != null && oldPath == null) {
552           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
553         }
554         if (!hasCurrentPath) {
555           if (sleepForRetries("No log to process", sleepMultiplier)) {
556             sleepMultiplier++;
557           }
558           continue;
559         }
560         boolean currentWALisBeingWrittenTo = false;
561         //For WAL files we own (rather than recovered), take a snapshot of whether the
562         //current WAL file (this.currentPath) is in use (for writing) NOW!
563         //Since the new WAL paths are enqueued only after the prev WAL file
564         //is 'closed', presence of an element in the queue means that
565         //the previous WAL file was closed, else the file is in use (currentPath)
566         //We take the snapshot now so that we are protected against races
567         //where a new file gets enqueued while the current file is being processed
568         //(and where we just finished reading the current file).
569         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
570           currentWALisBeingWrittenTo = true;
571         }
572         // Open a reader on it
573         if (!openReader(sleepMultiplier)) {
574           // Reset the sleep multiplier, else it'd be reused for the next file
575           sleepMultiplier = 1;
576           continue;
577         }
578 
579         // If we got a null reader but didn't continue, then sleep and continue
580         if (this.reader == null) {
581           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
582             sleepMultiplier++;
583           }
584           continue;
585         }
586 
587         boolean gotIOE = false;
588         currentNbOperations = 0;
589         currentNbHFiles = 0;
590         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
591         currentSize = 0;
592         try {
593           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
594             continue;
595           }
596         } catch (IOException ioe) {
597           LOG.warn(peerClusterZnode + " Got: ", ioe);
598           gotIOE = true;
599           if (ioe.getCause() instanceof EOFException) {
600 
601             boolean considerDumping = false;
602             if (this.replicationQueueInfo.isQueueRecovered()) {
603               try {
604                 FileStatus stat = fs.getFileStatus(this.currentPath);
605                 if (stat.getLen() == 0) {
606                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
607                 }
608                 considerDumping = true;
609               } catch (IOException e) {
610                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
611               }
612             }
613 
614             if (considerDumping &&
615                 sleepMultiplier == maxRetriesMultiplier &&
616                 processEndOfFile()) {
617               continue;
618             }
619           }
620         } finally {
621           try {
622             this.reader = null;
623             this.repLogReader.closeReader();
624           } catch (IOException e) {
625             gotIOE = true;
626             LOG.warn("Unable to finalize the tailing of a file", e);
627           }
628         }
629 
630         // If we didn't get anything to replicate, or if we hit a IOE,
631         // wait a bit and retry.
632         // But if we need to stop, don't bother sleeping
633         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
634           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
635             manager.logPositionAndCleanOldLogs(this.currentPath,
636                 peerClusterZnode, this.repLogReader.getPosition(),
637                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
638             this.lastLoggedPosition = this.repLogReader.getPosition();
639           }
640           // Reset the sleep multiplier if nothing has actually gone wrong
641           if (!gotIOE) {
642             sleepMultiplier = 1;
643             // if there was nothing to ship and it's not an error
644             // set "ageOfLastShippedOp" to <now> to indicate that we're current
645             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
646           }
647           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
648             sleepMultiplier++;
649           }
650           continue;
651         }
652         sleepMultiplier = 1;
653         shipEdits(currentWALisBeingWrittenTo, entries);
654       }
655       if (replicationQueueInfo.isQueueRecovered()) {
656         // use synchronize to make sure one last thread will clean the queue
657         synchronized (workerThreads) {
658           Threads.sleep(100);// wait a short while for other worker thread to fully exit
659           boolean allOtherTaskDone = true;
660           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
661             if (!worker.equals(this) && worker.isAlive()) {
662               allOtherTaskDone = false;
663               break;
664             }
665           }
666           if (allOtherTaskDone) {
667             manager.closeRecoveredQueue(this.source);
668             LOG.info("Finished recovering queue " + peerClusterZnode
669                 + " with the following stats: " + getStats());
670           }
671         }
672       }
673     }
674 
675     /**
676      * Read all the entries from the current log files and retain those that need to be replicated.
677      * Else, process the end of the current file.
678      * @param currentWALisBeingWrittenTo is the current WAL being written to
679      * @param entries resulting entries to be replicated
680      * @return true if we got nothing and went to the next file, false if we got entries
681      * @throws IOException
682      */
683     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
684         List<WAL.Entry> entries) throws IOException {
685       long seenEntries = 0;
686       if (LOG.isTraceEnabled()) {
687         LOG.trace("Seeking in " + this.currentPath + " at position "
688             + this.repLogReader.getPosition());
689       }
690       this.repLogReader.seek();
691       long positionBeforeRead = this.repLogReader.getPosition();
692       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
693       while (entry != null) {
694         metrics.incrLogEditsRead();
695         seenEntries++;
696 
697         // don't replicate if the log entries have already been consumed by the cluster
698         if (replicationEndpoint.canReplicateToSameCluster()
699             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
700           // Remove all KVs that should not be replicated
701           entry = walEntryFilter.filter(entry);
702           WALEdit edit = null;
703           WALKey logKey = null;
704           if (entry != null) {
705             edit = entry.getEdit();
706             logKey = entry.getKey();
707           }
708 
709           if (edit != null && edit.size() != 0) {
710             // Mark that the current cluster has the change
711             logKey.addClusterId(clusterId);
712             currentNbOperations += countDistinctRowKeys(edit);
713             entries.add(entry);
714             currentSize += entry.getEdit().heapSize();
715             currentSize += calculateTotalSizeOfStoreFiles(edit);
716           } else {
717             metrics.incrLogEditsFiltered();
718           }
719         }
720         // Stop if too many entries or too big
721         // FIXME check the relationship between single wal group and overall
722         if (currentSize >= replicationQueueSizeCapacity
723             || entries.size() >= replicationQueueNbCapacity) {
724           break;
725         }
726         try {
727           entry = this.repLogReader.readNextAndSetPosition();
728         } catch (IOException ie) {
729           LOG.debug("Break on IOE: " + ie.getMessage());
730           break;
731         }
732       }
733       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
734       if (currentWALisBeingWrittenTo) {
735         return false;
736       }
737       // If we didn't get anything and the queue has an object, it means we
738       // hit the end of the file for sure
739       return seenEntries == 0 && processEndOfFile();
740     }
741
742     /**
743      * Calculate the total size of all the store files
744      * @param edit edit to count row keys from
745      * @return the total size of the store files
746      */
747     private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
748       List<Cell> cells = edit.getCells();
749       int totalStoreFilesSize = 0;
750
751       int totalCells = edit.size();
752       for (int i = 0; i < totalCells; i++) {
753         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
754           try {
755             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
756             List<StoreDescriptor> stores = bld.getStoresList();
757             int totalStores = stores.size();
758             for (int j = 0; j < totalStores; j++) {
759               totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
760             }
761           } catch (IOException e) {
762             LOG.error("Failed to deserialize bulk load entry from wal edit. "
763                 + "Size of HFiles part of cell will not be considered in replication "
764                 + "request size calculation.", e);
765           }
766         }
767       }
768       return totalStoreFilesSize;
769     }
770
771     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
772       String peerId = peerClusterZnode;
773       if (peerId.contains("-")) {
774         // peerClusterZnode will be in the form peerId + "-" + rsZNode.
775         // A peerId will not have "-" in its name, see HBASE-11394
776         peerId = peerClusterZnode.split("-")[0];
777       }
778       List<Cell> cells = edit.getCells();
779       int totalCells = cells.size();
780       for (int i = 0; i < totalCells; i++) {
781         Cell cell = cells.get(i);
782         if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
783           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
784           List<StoreDescriptor> stores = bld.getStoresList();
785           int totalStores = stores.size();
786           for (int j = 0; j < totalStores; j++) {
787             List<String> storeFileList = stores.get(j).getStoreFileList();
788             manager.cleanUpHFileRefs(peerId, storeFileList);
789             metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
790           }
791         }
792       }
793     }
794
795     /**
796      * Poll for the next path
797      * @return true if a path was obtained, false if not
798      */
799     protected boolean getNextPath() {
800       try {
801         if (this.currentPath == null) {
802           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
803           metrics.decrSizeOfLogQueue();
804           if (this.currentPath != null) {
805             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
806             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
807               this.replicationQueueInfo.isQueueRecovered());
808             if (LOG.isTraceEnabled()) {
809               LOG.trace("New log: " + this.currentPath);
810             }
811           }
812         }
813       } catch (InterruptedException e) {
814         LOG.warn("Interrupted while reading edits", e);
815       }
816       return this.currentPath != null;
817     }
818
819     /**
820      * Open a reader on the current path
821      *
822      * @param sleepMultiplier by how many times the default sleeping time is augmented
823      * @return true if we should continue with that file, false if we are over with it
824      */
825     protected boolean openReader(int sleepMultiplier) {
826       try {
827         try {
828           if (LOG.isTraceEnabled()) {
829             LOG.trace("Opening log " + this.currentPath);
830           }
831           this.reader = repLogReader.openReader(this.currentPath);
832         } catch (FileNotFoundException fnfe) {
833           if (this.replicationQueueInfo.isQueueRecovered()) {
834             // We didn't find the log in the archive directory, look if it still
835             // exists in the dead RS folder (there could be a chain of failures
836             // to look at)
837             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
838             LOG.info("NB dead servers : " + deadRegionServers.size());
839             final Path rootDir = FSUtils.getRootDir(conf);
840             for (String curDeadServerName : deadRegionServers) {
841               final Path deadRsDirectory = new Path(rootDir,
842                 AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
843               Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()),
844                 new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT),
845                   currentPath.getName()) };
846               for (Path possibleLogLocation : locs) {
847                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
848                 if (manager.getFs().exists(possibleLogLocation)) {
849                   // We found the right new location
850                   LOG.info("Log " + this.currentPath + " still exists at " +
851                       possibleLogLocation);
852                   // Breaking here will make us sleep since reader is null
853                   // TODO why don't we need to set currentPath and call openReader here?
854                   return true;
855                 }
856               }
857             }
858             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
859             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
860             if (stopper instanceof ReplicationSyncUp.DummyServer) {
861               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
862               //      area rather than to the wal area for a particular region server.
863               FileStatus[] rss = fs.listStatus(manager.getLogDir());
864               for (FileStatus rs : rss) {
865                 Path p = rs.getPath();
866                 FileStatus[] logs = fs.listStatus(p);
867                 for (FileStatus log : logs) {
868                   p = new Path(p, log.getPath().getName());
869                   if (p.getName().equals(currentPath.getName())) {
870                     currentPath = p;
871                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
872                     // Open the log at the new location
873                     this.openReader(sleepMultiplier);
874                     return true;
875                   }
876                 }
877               }
878             }
879
880             // TODO What happens if the log was missing from every single location?
881             // Although we need to check a couple of times as the log could have
882             // been moved by the master between the checks
883             // It can also happen if a recovered queue wasn't properly cleaned,
884             // such that the znode pointing to a log exists but the log was
885             // deleted a long time ago.
886             // For the moment, we'll throw the IO and processEndOfFile
887             throw new IOException("File from recovered queue is " +
888                 "nowhere to be found", fnfe);
889           } else {
890             // If the log was archived, continue reading from there
891             Path archivedLogLocation =
892                 new Path(manager.getOldLogDir(), currentPath.getName());
893             if (manager.getFs().exists(archivedLogLocation)) {
894               currentPath = archivedLogLocation;
895               LOG.info("Log " + this.currentPath + " was moved to " +
896                   archivedLogLocation);
897               // Open the log at the new location
898               this.openReader(sleepMultiplier);
899
900             }
901             // TODO What happens the log is missing in both places?
902           }
903         }
904       } catch (LeaseNotRecoveredException lnre) {
905         // HBASE-15019 the WAL was not closed due to some hiccup.
906         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
907         recoverLease(conf, currentPath);
908         this.reader = null;
909       } catch (IOException ioe) {
910         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
911         LOG.warn(peerClusterZnode + " Got: ", ioe);
912         this.reader = null;
913         if (ioe.getCause() instanceof NullPointerException) {
914           // Workaround for race condition in HDFS-4380
915           // which throws a NPE if we open a file before any data node has the most recent block
916           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
917           LOG.warn("Got NPE opening reader, will retry.");
918         } else if (sleepMultiplier >= maxRetriesMultiplier) {
919           // TODO Need a better way to determine if a file is really gone but
920           // TODO without scanning all logs dir
921           LOG.warn("Waited too long for this file, considering dumping");
922           return !processEndOfFile();
923         }
924       }
925       return true;
926     }
927
928     private void recoverLease(final Configuration conf, final Path path) {
929       try {
930         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
931         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
932         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
933           @Override
934           public boolean progress() {
935             LOG.debug("recover WAL lease: " + path);
936             return isWorkerActive();
937           }
938         });
939       } catch (IOException e) {
940         LOG.warn("unable to recover lease for WAL: " + path, e);
941       }
942     }
943
944     /*
945      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
946      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
947      * trying to read the log file and get EOFException. In case of a recovered queue the last log
948      * file may be empty, and we don't want to retry that.
949      */
950     private boolean isCurrentLogEmpty() {
951       return (this.repLogReader.getPosition() == 0 &&
952           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
953     }
954
955     /**
956      * Count the number of different row keys in the given edit because of mini-batching. We assume
957      * that there's at least one Cell in the WALEdit.
958      * @param edit edit to count row keys from
959      * @return number of different row keys
960      */
961     private int countDistinctRowKeys(WALEdit edit) {
962       List<Cell> cells = edit.getCells();
963       int distinctRowKeys = 1;
964       int totalHFileEntries = 0;
965       Cell lastCell = cells.get(0);
966
967       int totalCells = edit.size();
968       for (int i = 0; i < totalCells; i++) {
969         // Count HFiles to be replicated
970         if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
971           try {
972             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
973             List<StoreDescriptor> stores = bld.getStoresList();
974             int totalStores = stores.size();
975             for (int j = 0; j < totalStores; j++) {
976               totalHFileEntries += stores.get(j).getStoreFileList().size();
977             }
978           } catch (IOException e) {
979             LOG.error("Failed to deserialize bulk load entry from wal edit. "
980                 + "Then its hfiles count will not be added into metric.");
981           }
982         }
983
984         if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
985           distinctRowKeys++;
986         }
987         lastCell = cells.get(i);
988       }
989       currentNbHFiles += totalHFileEntries;
990       return distinctRowKeys + totalHFileEntries;
991     }
992
993     /**
994      * Do the shipping logic
995      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
996      * written to when this method was called
997      */
998     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
999       int sleepMultiplier = 0;
1000       if (entries.isEmpty()) {
1001         LOG.warn("Was given 0 edits to ship");
1002         return;
1003       }
1004       while (isWorkerActive()) {
1005         try {
1006           if (throttler.isEnabled()) {
1007             long sleepTicks = throttler.getNextSleepInterval(currentSize);
1008             if (sleepTicks > 0) {
1009               try {
1010                 if (LOG.isTraceEnabled()) {
1011                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
1012                 }
1013                 Thread.sleep(sleepTicks);
1014               } catch (InterruptedException e) {
1015                 LOG.debug("Interrupted while sleeping for throttling control");
1016                 Thread.currentThread().interrupt();
1017                 // current thread might be interrupted to terminate
1018                 // directly go back to while() for confirm this
1019                 continue;
1020               }
1021               // reset throttler's cycle start tick when sleep for throttling occurs
1022               throttler.resetStartTick();
1023             }
1024           }
1025           // create replicateContext here, so the entries can be GC'd upon return from this call
1026           // stack
1027           ReplicationEndpoint.ReplicateContext replicateContext =
1028               new ReplicationEndpoint.ReplicateContext();
1029           replicateContext.setEntries(entries).setSize(currentSize);
1030           replicateContext.setWalGroupId(walGroupId);
1031
1032           long startTimeNs = System.nanoTime();
1033           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
1034           boolean replicated = replicationEndpoint.replicate(replicateContext);
1035           long endTimeNs = System.nanoTime();
1036
1037           if (!replicated) {
1038             continue;
1039           } else {
1040             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
1041           }
1042
1043           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
1044             //Clean up hfile references
1045             int size = entries.size();
1046             for (int i = 0; i < size; i++) {
1047               cleanUpHFileRefs(entries.get(i).getEdit());
1048             }
1049             //Log and clean up WAL logs
1050             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
1051               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
1052               currentWALisBeingWrittenTo);
1053             this.lastLoggedPosition = this.repLogReader.getPosition();
1054           }
1055           if (throttler.isEnabled()) {
1056             throttler.addPushSize(currentSize);
1057           }
1058           totalReplicatedEdits.addAndGet(entries.size());
1059           totalReplicatedOperations.addAndGet(currentNbOperations);
1060           // FIXME check relationship between wal group and overall
1061           metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
1062           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
1063             walGroupId);
1064           if (LOG.isTraceEnabled()) {
1065             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
1066                 + totalReplicatedOperations + " operations in "
1067                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
1068           }
1069           break;
1070         } catch (Exception ex) {
1071           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
1072               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
1073           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
1074             sleepMultiplier++;
1075           }
1076         }
1077       }
1078     }
1079
1080     /**
1081      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
1082      * we're done! Else we'll just continue to try reading the log file
1083      * @return true if we're done with the current file, false if we should continue trying to read
1084      *         from it
1085      */
1086     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
1087         justification = "Yeah, this is how it works")
1088     protected boolean processEndOfFile() {
1089       if (this.queue.size() != 0) {
1090         if (LOG.isTraceEnabled()) {
1091           String filesize = "N/A";
1092           try {
1093             FileStatus stat = fs.getFileStatus(this.currentPath);
1094             filesize = stat.getLen() + "";
1095           } catch (IOException ex) {
1096           }
1097           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1098               + ", and the length of the file is " + filesize);
1099         }
1100         this.currentPath = null;
1101         this.repLogReader.finishCurrentFile();
1102         this.reader = null;
1103         return true;
1104       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1105         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1106             + peerClusterZnode);
1107         workerRunning = false;
1108         return true;
1109       }
1110       return false;
1111     }
1112
1113     public void startup() {
1114       String n = Thread.currentThread().getName();
1115       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1116         @Override
1117         public void uncaughtException(final Thread t, final Throwable e) {
1118           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1119               + getCurrentPath(), e);
1120         }
1121       };
1122       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1123           + peerClusterZnode, handler);
1124       workerThreads.put(walGroupId, this);
1125     }
1126
1127     public Path getCurrentPath() {
1128       return this.currentPath;
1129     }
1130
1131     public long getCurrentPosition() {
1132       return this.repLogReader.getPosition();
1133     }
1134
1135     private boolean isWorkerActive() {
1136       return !stopper.isStopped() && workerRunning && !isInterrupted();
1137     }
1138
1139     private void terminate(String reason, Exception cause) {
1140       if (cause == null) {
1141         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1142
1143       } else {
1144         LOG.error("Closing worker for wal group " + this.walGroupId
1145             + " because an error occurred: " + reason, cause);
1146       }
1147       this.interrupt();
1148       Threads.shutdown(this, sleepForRetries);
1149       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1150     }
1151
1152     public void setWorkerRunning(boolean workerRunning) {
1153       this.workerRunning = workerRunning;
1154     }
1155   }
1156 }