View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.Stoppable;
50  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
51  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
53  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
59  import org.apache.hadoop.hbase.replication.WALEntryFilter;
60  import org.apache.hadoop.hbase.util.CancelableProgressable;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66  import org.apache.hadoop.hbase.wal.WAL;
67  import org.apache.hadoop.hbase.wal.WALKey;
68  
69  import com.google.common.collect.Lists;
70  import com.google.common.util.concurrent.ListenableFuture;
71  import com.google.common.util.concurrent.Service;
72  
73  /**
74   * Class that handles the source of a replication stream.
75   * Currently does not handle more than 1 slave
76   * For each slave cluster it selects a random number of peers
77   * using a replication ratio. For example, if replication ration = 0.1
78   * and slave cluster has 100 region servers, 10 will be selected.
79   * <p>
80   * A stream is considered down when we cannot contact a region server on the
81   * peer cluster for more than 55 seconds by default.
82   * </p>
83   *
84   */
85  @InterfaceAudience.Private
86  public class ReplicationSource extends Thread
87      implements ReplicationSourceInterface {
88  
89    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
90    // Queues of logs to process, entry in format of walGroupId->queue,
91    // each presents a queue for one wal group
92    private Map<String, PriorityBlockingQueue<Path>> queues =
93        new HashMap<String, PriorityBlockingQueue<Path>>();
94    // per group queue size, keep no more than this number of logs in each wal group
95    private int queueSizePerGroup;
96    private ReplicationQueues replicationQueues;
97    private ReplicationPeers replicationPeers;
98  
99    private Configuration conf;
100   private ReplicationQueueInfo replicationQueueInfo;
101   // id of the peer cluster this source replicates to
102   private String peerId;
103   // The manager of all sources to which we ping back our progress
104   private ReplicationSourceManager manager;
105   // Should we stop everything?
106   private Stoppable stopper;
107   // How long should we sleep for each retry
108   private long sleepForRetries;
109   // Max size in bytes of entriesArray
110   private long replicationQueueSizeCapacity;
111   // Max number of entries in entriesArray
112   private int replicationQueueNbCapacity;
113   private FileSystem fs;
114   // id of this cluster
115   private UUID clusterId;
116   // id of the other cluster
117   private UUID peerClusterId;
118   // total number of edits we replicated
119   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
120   // total number of edits we replicated
121   private AtomicLong totalReplicatedOperations = new AtomicLong(0);
122   // The znode we currently play with
123   private String peerClusterZnode;
124   // Maximum number of retries before taking bold actions
125   private int maxRetriesMultiplier;
126   // Indicates if this particular source is running
127   private volatile boolean sourceRunning = false;
128   // Metrics for this source
129   private MetricsSource metrics;
130   //WARN threshold for the number of queued logs, defaults to 2
131   private int logQueueWarnThreshold;
132   // ReplicationEndpoint which will handle the actual replication
133   private ReplicationEndpoint replicationEndpoint;
134   // A filter (or a chain of filters) for the WAL entries.
135   private WALEntryFilter walEntryFilter;
136   // throttler
137   private ReplicationThrottler throttler;
138   private AtomicInteger logQueueSize = new AtomicInteger(0);
139   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
140       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
141   // Hold the state of a replication worker thread
142   public enum WorkerState {
143     RUNNING,
144     STOPPED,
145     FINISHED  // The worker is done processing a recovered queue
146   }
147 
148   /**
149    * Instantiation method used by region servers
150    *
151    * @param conf configuration to use
152    * @param fs file system to use
153    * @param manager replication manager to ping to
154    * @param stopper     the atomic boolean to use to stop the regionserver
155    * @param peerClusterZnode the name of our znode
156    * @param clusterId unique UUID for the cluster
157    * @param replicationEndpoint the replication endpoint implementation
158    * @param metrics metrics for replication source
159    * @throws IOException
160    */
161   @Override
162   public void init(final Configuration conf, final FileSystem fs,
163       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
164       final ReplicationPeers replicationPeers, final Stoppable stopper,
165       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
166       final MetricsSource metrics)
167           throws IOException {
168     this.stopper = stopper;
169     this.conf = HBaseConfiguration.create(conf);
170     decorateConf();
171     this.replicationQueueSizeCapacity =
172         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
173     this.replicationQueueNbCapacity =
174         this.conf.getInt("replication.source.nb.capacity", 25000);
175     this.sleepForRetries =
176         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
177     this.maxRetriesMultiplier =
178         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
179     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
180     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
181     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
182     this.replicationQueues = replicationQueues;
183     this.replicationPeers = replicationPeers;
184     this.manager = manager;
185     this.fs = fs;
186     this.metrics = metrics;
187     this.clusterId = clusterId;
188 
189     this.peerClusterZnode = peerClusterZnode;
190     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
191     // ReplicationQueueInfo parses the peerId out of the znode for us
192     this.peerId = this.replicationQueueInfo.getPeerId();
193     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
194     this.replicationEndpoint = replicationEndpoint;
195   }
196 
197   private void decorateConf() {
198     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
199     if (StringUtils.isNotEmpty(replicationCodec)) {
200       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
201     }
202   }
203 
204   @Override
205   public void enqueueLog(Path log) {
206     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
207     PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
208     if (queue == null) {
209       queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
210       queues.put(logPrefix, queue);
211       if (this.sourceRunning) {
212         // new wal group observed after source startup, start a new worker thread to track it
213         // notice: it's possible that log enqueued when this.running is set but worker thread
214         // still not launched, so it's necessary to check workerThreads before start the worker
215         final ReplicationSourceWorkerThread worker =
216             new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
217         ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
218         if (extant != null) {
219           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
220         } else {
221           LOG.debug("Starting up worker for wal group " + logPrefix);
222           worker.startup();
223         }
224       }
225     }
226     queue.put(log);
227     int queueSize = logQueueSize.incrementAndGet();
228     this.metrics.setSizeOfLogQueue(queueSize);
229     // This will log a warning for each new log that gets created above the warn threshold
230     if (queue.size() > this.logQueueWarnThreshold) {
231       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
232           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
233     }
234   }
235 
236   private void uninitialize() {
237     LOG.debug("Source exiting " + this.peerId);
238     metrics.clear();
239     if (replicationEndpoint.state() == Service.State.STARTING
240         || replicationEndpoint.state() == Service.State.RUNNING) {
241       replicationEndpoint.stopAndWait();
242     }
243   }
244 
245   @Override
246   public void run() {
247     // mark we are running now
248     this.sourceRunning = true;
249     try {
250       // start the endpoint, connect to the cluster
251       Service.State state = replicationEndpoint.start().get();
252       if (state != Service.State.RUNNING) {
253         LOG.warn("ReplicationEndpoint was not started. Exiting");
254         uninitialize();
255         return;
256       }
257     } catch (Exception ex) {
258       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
259       throw new RuntimeException(ex);
260     }
261 
262     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
263     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
264       (WALEntryFilter)new SystemTableWALEntryFilter());
265     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
266     if (filterFromEndpoint != null) {
267       filters.add(filterFromEndpoint);
268     }
269     this.walEntryFilter = new ChainWALEntryFilter(filters);
270 
271     int sleepMultiplier = 1;
272     // delay this until we are in an asynchronous thread
273     while (this.isSourceActive() && this.peerClusterId == null) {
274       this.peerClusterId = replicationEndpoint.getPeerUUID();
275       if (this.isSourceActive() && this.peerClusterId == null) {
276         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
277           sleepMultiplier++;
278         }
279       }
280     }
281 
282     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
283     // peerClusterId value, which is the same as the source clusterId
284     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
285       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
286           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
287           + replicationEndpoint.getClass().getName(), null, false);
288       this.manager.closeQueue(this);
289       return;
290     }
291     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
292     // start workers
293     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
294       String walGroupId = entry.getKey();
295       PriorityBlockingQueue<Path> queue = entry.getValue();
296       final ReplicationSourceWorkerThread worker =
297           new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
298       ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
299       if (extant != null) {
300         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
301       } else {
302         LOG.debug("Starting up worker for wal group " + walGroupId);
303         worker.startup();
304       }
305     }
306   }
307 
308   /**
309    * Do the sleeping logic
310    * @param msg Why we sleep
311    * @param sleepMultiplier by how many times the default sleeping time is augmented
312    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
313    */
314   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
315     try {
316       if (LOG.isTraceEnabled()) {
317         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
318       }
319       Thread.sleep(this.sleepForRetries * sleepMultiplier);
320     } catch (InterruptedException e) {
321       LOG.debug("Interrupted while sleeping between retries");
322       Thread.currentThread().interrupt();
323     }
324     return sleepMultiplier < maxRetriesMultiplier;
325   }
326 
327   /**
328    * check whether the peer is enabled or not
329    *
330    * @return true if the peer is enabled, otherwise false
331    */
332   protected boolean isPeerEnabled() {
333     return this.replicationPeers.getStatusOfPeer(this.peerId);
334   }
335 
336   @Override
337   public void startup() {
338     String n = Thread.currentThread().getName();
339     Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
340       @Override
341       public void uncaughtException(final Thread t, final Throwable e) {
342         LOG.error("Unexpected exception in ReplicationSource", e);
343       }
344     };
345     Threads
346         .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
347   }
348 
349   @Override
350   public void terminate(String reason) {
351     terminate(reason, null);
352   }
353 
354   @Override
355   public void terminate(String reason, Exception cause) {
356     terminate(reason, cause, true);
357   }
358 
359   public void terminate(String reason, Exception cause, boolean join) {
360     if (cause == null) {
361       LOG.info("Closing source "
362           + this.peerClusterZnode + " because: " + reason);
363 
364     } else {
365       LOG.error("Closing source " + this.peerClusterZnode
366           + " because an error occurred: " + reason, cause);
367     }
368     this.sourceRunning = false;
369     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
370     for (ReplicationSourceWorkerThread worker : workers) {
371       worker.setWorkerState(WorkerState.STOPPED);
372       worker.interrupt();
373     }
374     ListenableFuture<Service.State> future = null;
375     if (this.replicationEndpoint != null) {
376       future = this.replicationEndpoint.stop();
377     }
378     if (join) {
379       for (ReplicationSourceWorkerThread worker : workers) {
380         Threads.shutdown(worker, this.sleepForRetries);
381         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
382       }
383       if (future != null) {
384         try {
385           future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
386         } catch (Exception e) {
387           LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
388               + this.peerClusterZnode,
389             e);
390         }
391       }
392     }
393   }
394 
395   @Override
396   public String getPeerClusterZnode() {
397     return this.peerClusterZnode;
398   }
399 
400   @Override
401   public String getPeerClusterId() {
402     return this.peerId;
403   }
404 
405   @Override
406   public Path getCurrentPath() {
407     // only for testing
408     for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
409       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
410     }
411     return null;
412   }
413 
414   private boolean isSourceActive() {
415     return !this.stopper.isStopped() && this.sourceRunning;
416   }
417 
418   /**
419    * Comparator used to compare logs together based on their start time
420    */
421   public static class LogsComparator implements Comparator<Path> {
422 
423     @Override
424     public int compare(Path o1, Path o2) {
425       return Long.compare(getTS(o1), getTS(o2));
426     }
427 
428     /**
429      * Split a path to get the start time
430      * For example: 10.20.20.171%3A60020.1277499063250
431      * @param p path to split
432      * @return start time
433      */
434     private static long getTS(Path p) {
435       int tsIndex = p.getName().lastIndexOf('.') + 1;
436       return Long.parseLong(p.getName().substring(tsIndex));
437     }
438   }
439 
440   @Override
441   public String getStats() {
442     StringBuilder sb = new StringBuilder();
443     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
444         .append(", current progress: \n");
445     for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
446       String walGroupId = entry.getKey();
447       ReplicationSourceWorkerThread worker = entry.getValue();
448       long position = worker.getCurrentPosition();
449       Path currentPath = worker.getCurrentPath();
450       sb.append("walGroup [").append(walGroupId).append("]: ");
451       if (currentPath != null) {
452         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
453             .append(position).append("\n");
454       } else {
455         sb.append("no replication ongoing, waiting for new log");
456       }
457     }
458     return sb.toString();
459   }
460 
461   /**
462    * Get Replication Source Metrics
463    * @return sourceMetrics
464    */
465   public MetricsSource getSourceMetrics() {
466     return this.metrics;
467   }
468 
469   public class ReplicationSourceWorkerThread extends Thread {
470     private ReplicationSource source;
471     private String walGroupId;
472     private PriorityBlockingQueue<Path> queue;
473     private ReplicationQueueInfo replicationQueueInfo;
474     // Our reader for the current log. open/close handled by repLogReader
475     private WAL.Reader reader;
476     // Last position in the log that we sent to ZooKeeper
477     private long lastLoggedPosition = -1;
478     // Path of the current log
479     private volatile Path currentPath;
480     // Handle on the log reader helper
481     private ReplicationWALReaderManager repLogReader;
482     // Current number of operations (Put/Delete) that we need to replicate
483     private int currentNbOperations = 0;
484     // Current size of data we need to replicate
485     private int currentSize = 0;
486     // Current state of the worker thread
487     private WorkerState state;
488 
489     public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
490         ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
491       this.walGroupId = walGroupId;
492       this.queue = queue;
493       this.replicationQueueInfo = replicationQueueInfo;
494       this.repLogReader = new ReplicationWALReaderManager(fs, conf);
495       this.source = source;
496     }
497 
498     @Override
499     public void run() {
500       setWorkerState(WorkerState.RUNNING);
501       // If this is recovered, the queue is already full and the first log
502       // normally has a position (unless the RS failed between 2 logs)
503       if (this.replicationQueueInfo.isQueueRecovered()) {
504         try {
505           this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
506             this.queue.peek().getName()));
507           if (LOG.isTraceEnabled()) {
508             LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
509                 + this.repLogReader.getPosition());
510           }
511         } catch (ReplicationException e) {
512           terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
513         }
514       }
515       int sleepMultiplier = 1;
516       // Loop until we close down
517       while (isWorkerActive()) {
518         // Sleep until replication is enabled again
519         if (!isPeerEnabled()) {
520           if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
521             sleepMultiplier++;
522           }
523           continue;
524         }
525         Path oldPath = getCurrentPath(); //note that in the current scenario,
526                                          //oldPath will be null when a log roll
527                                          //happens.
528         // Get a new path
529         boolean hasCurrentPath = getNextPath();
530         if (getCurrentPath() != null && oldPath == null) {
531           sleepMultiplier = 1; //reset the sleepMultiplier on a path change
532         }
533         if (!hasCurrentPath) {
534           if (sleepForRetries("No log to process", sleepMultiplier)) {
535             sleepMultiplier++;
536           }
537           continue;
538         }
539         boolean currentWALisBeingWrittenTo = false;
540         //For WAL files we own (rather than recovered), take a snapshot of whether the
541         //current WAL file (this.currentPath) is in use (for writing) NOW!
542         //Since the new WAL paths are enqueued only after the prev WAL file
543         //is 'closed', presence of an element in the queue means that
544         //the previous WAL file was closed, else the file is in use (currentPath)
545         //We take the snapshot now so that we are protected against races
546         //where a new file gets enqueued while the current file is being processed
547         //(and where we just finished reading the current file).
548         if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
549           currentWALisBeingWrittenTo = true;
550         }
551         // Open a reader on it
552         if (!openReader(sleepMultiplier)) {
553           // Reset the sleep multiplier, else it'd be reused for the next file
554           sleepMultiplier = 1;
555           continue;
556         }
557 
558         // If we got a null reader but didn't continue, then sleep and continue
559         if (this.reader == null) {
560           if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
561             sleepMultiplier++;
562           }
563           continue;
564         }
565 
566         boolean gotIOE = false;
567         currentNbOperations = 0;
568         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
569         currentSize = 0;
570         try {
571           if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
572             continue;
573           }
574         } catch (IOException ioe) {
575           LOG.warn(peerClusterZnode + " Got: ", ioe);
576           gotIOE = true;
577           if (ioe.getCause() instanceof EOFException) {
578 
579             boolean considerDumping = false;
580             if (this.replicationQueueInfo.isQueueRecovered()) {
581               try {
582                 FileStatus stat = fs.getFileStatus(this.currentPath);
583                 if (stat.getLen() == 0) {
584                   LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
585                 }
586                 considerDumping = true;
587               } catch (IOException e) {
588                 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
589               }
590             }
591 
592             if (considerDumping &&
593                 sleepMultiplier == maxRetriesMultiplier &&
594                 processEndOfFile(false)) {
595               continue;
596             }
597           }
598         } finally {
599           try {
600             this.reader = null;
601             this.repLogReader.closeReader();
602           } catch (IOException e) {
603             gotIOE = true;
604             LOG.warn("Unable to finalize the tailing of a file", e);
605           }
606         }
607 
608         // If we didn't get anything to replicate, or if we hit a IOE,
609         // wait a bit and retry.
610         // But if we need to stop, don't bother sleeping
611         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
612           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
613             manager.logPositionAndCleanOldLogs(this.currentPath,
614                 peerClusterZnode, this.repLogReader.getPosition(),
615                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
616             this.lastLoggedPosition = this.repLogReader.getPosition();
617           }
618           // Reset the sleep multiplier if nothing has actually gone wrong
619           if (!gotIOE) {
620             sleepMultiplier = 1;
621             // if there was nothing to ship and it's not an error
622             // set "ageOfLastShippedOp" to <now> to indicate that we're current
623             metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
624           }
625           if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
626             sleepMultiplier++;
627           }
628           continue;
629         }
630         sleepMultiplier = 1;
631         shipEdits(currentWALisBeingWrittenTo, entries);
632       }
633       if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
634         // use synchronize to make sure one last thread will clean the queue
635         synchronized (workerThreads) {
636           Threads.sleep(100);// wait a short while for other worker thread to fully exit
637           boolean allOtherTaskDone = true;
638           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
639             if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
640               allOtherTaskDone = false;
641               break;
642             }
643           }
644           if (allOtherTaskDone) {
645             manager.closeRecoveredQueue(this.source);
646             LOG.info("Finished recovering queue " + peerClusterZnode
647                 + " with the following stats: " + getStats());
648           }
649         }
650       }
651       // If the worker exits run loop without finishing it's task, mark it as stopped.
652       if (state != WorkerState.FINISHED) {
653         setWorkerState(WorkerState.STOPPED);
654       }
655     }
656 
657     /**
658      * Read all the entries from the current log files and retain those that need to be replicated.
659      * Else, process the end of the current file.
660      * @param currentWALisBeingWrittenTo is the current WAL being written to
661      * @param entries resulting entries to be replicated
662      * @return true if we got nothing and went to the next file, false if we got entries
663      * @throws IOException
664      */
665     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
666         List<WAL.Entry> entries) throws IOException {
667       long seenEntries = 0;
668       if (LOG.isTraceEnabled()) {
669         LOG.trace("Seeking in " + this.currentPath + " at position "
670             + this.repLogReader.getPosition());
671       }
672       this.repLogReader.seek();
673       long positionBeforeRead = this.repLogReader.getPosition();
674       WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
675       while (entry != null) {
676         metrics.incrLogEditsRead();
677         seenEntries++;
678 
679         // don't replicate if the log entries have already been consumed by the cluster
680         if (replicationEndpoint.canReplicateToSameCluster()
681             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
682           // Remove all KVs that should not be replicated
683           entry = walEntryFilter.filter(entry);
684           WALEdit edit = null;
685           WALKey logKey = null;
686           if (entry != null) {
687             edit = entry.getEdit();
688             logKey = entry.getKey();
689           }
690 
691           if (edit != null && edit.size() != 0) {
692             // Mark that the current cluster has the change
693             logKey.addClusterId(clusterId);
694             currentNbOperations += countDistinctRowKeys(edit);
695             entries.add(entry);
696             currentSize += entry.getEdit().heapSize();
697           } else {
698             metrics.incrLogEditsFiltered();
699           }
700         }
701         // Stop if too many entries or too big
702         // FIXME check the relationship between single wal group and overall
703         if (currentSize >= replicationQueueSizeCapacity
704             || entries.size() >= replicationQueueNbCapacity) {
705           break;
706         }
707         try {
708           entry = this.repLogReader.readNextAndSetPosition();
709         } catch (IOException ie) {
710           LOG.debug("Break on IOE: " + ie.getMessage());
711           break;
712         }
713       }
714       metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
715       if (currentWALisBeingWrittenTo) {
716         return false;
717       }
718       // If we didn't get anything and the queue has an object, it means we
719       // hit the end of the file for sure
720       return seenEntries == 0 && processEndOfFile(false);
721     }
722 
723     /**
724      * Poll for the next path
725      * @return true if a path was obtained, false if not
726      */
727     protected boolean getNextPath() {
728       try {
729         if (this.currentPath == null) {
730           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
731           int queueSize = logQueueSize.decrementAndGet();
732           metrics.setSizeOfLogQueue(queueSize);
733           if (this.currentPath != null) {
734             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
735             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
736               this.replicationQueueInfo.isQueueRecovered());
737             if (LOG.isTraceEnabled()) {
738               LOG.trace("New log: " + this.currentPath);
739             }
740           }
741         }
742       } catch (InterruptedException e) {
743         LOG.warn("Interrupted while reading edits", e);
744       }
745       return this.currentPath != null;
746     }
747 
748     /**
749      * Open a reader on the current path
750      *
751      * @param sleepMultiplier by how many times the default sleeping time is augmented
752      * @return true if we should continue with that file, false if we are over with it
753      */
754     protected boolean openReader(int sleepMultiplier) {
755       try {
756         try {
757           if (LOG.isTraceEnabled()) {
758             LOG.trace("Opening log " + this.currentPath);
759           }
760           this.reader = repLogReader.openReader(this.currentPath);
761         } catch (FileNotFoundException fnfe) {
762           if (this.replicationQueueInfo.isQueueRecovered()) {
763             // We didn't find the log in the archive directory, look if it still
764             // exists in the dead RS folder (there could be a chain of failures
765             // to look at)
766             List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
767             LOG.info("NB dead servers : " + deadRegionServers.size());
768             final Path rootDir = FSUtils.getRootDir(conf);
769             for (String curDeadServerName : deadRegionServers) {
770               final Path deadRsDirectory = new Path(rootDir,
771                   DefaultWALProvider.getWALDirectoryName(curDeadServerName));
772               Path[] locs = new Path[] {
773                   new Path(deadRsDirectory, currentPath.getName()),
774                   new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
775                                             currentPath.getName()),
776               };
777               for (Path possibleLogLocation : locs) {
778                 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
779                 if (manager.getFs().exists(possibleLogLocation)) {
780                   // We found the right new location
781                   LOG.info("Log " + this.currentPath + " still exists at " +
782                       possibleLogLocation);
783                   // When running ReplicationSyncUp tool, we should replicate the data from WAL
784                   // which is moved to WAL splitting directory also.
785                   if (stopper instanceof ReplicationSyncUp.DummyServer) {
786                     // Open the log at the this location
787                     this.currentPath = possibleLogLocation;
788                     this.openReader(sleepMultiplier);
789                   }
790                   return true;
791                 }
792               }
793             }
794             // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
795             // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
796             if (stopper instanceof ReplicationSyncUp.DummyServer) {
797               // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
798               //      area rather than to the wal area for a particular region server.
799               FileStatus[] rss = fs.listStatus(manager.getLogDir());
800               for (FileStatus rs : rss) {
801                 Path p = rs.getPath();
802                 FileStatus[] logs = fs.listStatus(p);
803                 for (FileStatus log : logs) {
804                   p = new Path(p, log.getPath().getName());
805                   if (p.getName().equals(currentPath.getName())) {
806                     currentPath = p;
807                     LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
808                     // Open the log at the new location
809                     this.openReader(sleepMultiplier);
810                     return true;
811                   }
812                 }
813               }
814             }
815 
816             // TODO What happens if the log was missing from every single location?
817             // Although we need to check a couple of times as the log could have
818             // been moved by the master between the checks
819             // It can also happen if a recovered queue wasn't properly cleaned,
820             // such that the znode pointing to a log exists but the log was
821             // deleted a long time ago.
822             // For the moment, we'll throw the IO and processEndOfFile
823             throw new IOException("File from recovered queue is " +
824                 "nowhere to be found", fnfe);
825           } else {
826             // If the log was archived, continue reading from there
827             Path archivedLogLocation =
828                 new Path(manager.getOldLogDir(), currentPath.getName());
829             if (manager.getFs().exists(archivedLogLocation)) {
830               currentPath = archivedLogLocation;
831               LOG.info("Log " + this.currentPath + " was moved to " +
832                   archivedLogLocation);
833               // Open the log at the new location
834               this.openReader(sleepMultiplier);
835 
836             }
837             // TODO What happens the log is missing in both places?
838           }
839         }
840       } catch (LeaseNotRecoveredException lnre) {
841         // HBASE-15019 the WAL was not closed due to some hiccup.
842         LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
843         recoverLease(conf, currentPath);
844         this.reader = null;
845       } catch (IOException ioe) {
846         if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
847         LOG.warn(peerClusterZnode + " Got: ", ioe);
848         this.reader = null;
849         if (ioe.getCause() instanceof NullPointerException) {
850           // Workaround for race condition in HDFS-4380
851           // which throws a NPE if we open a file before any data node has the most recent block
852           // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
853           LOG.warn("Got NPE opening reader, will retry.");
854         } else if (sleepMultiplier >= maxRetriesMultiplier
855             && conf.getBoolean("replication.source.eof.autorecovery", false)) {
856           // TODO Need a better way to determine if a file is really gone but
857           // TODO without scanning all logs dir
858           LOG.warn("Waited too long for this file, considering dumping");
859           return !processEndOfFile(true);
860         }
861       }
862       return true;
863     }
864 
865     private void recoverLease(final Configuration conf, final Path path) {
866       try {
867         final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
868         FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
869         fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
870           @Override
871           public boolean progress() {
872             LOG.debug("recover WAL lease: " + path);
873             return isWorkerActive();
874           }
875         });
876       } catch (IOException e) {
877         LOG.warn("unable to recover lease for WAL: " + path, e);
878       }
879     }
880 
881     /*
882      * Checks whether the current log file is empty, and it is not a recovered queue. This is to
883      * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
884      * trying to read the log file and get EOFException. In case of a recovered queue the last log
885      * file may be empty, and we don't want to retry that.
886      */
887     private boolean isCurrentLogEmpty() {
888       return (this.repLogReader.getPosition() == 0 &&
889           !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
890     }
891 
892     /**
893      * Count the number of different row keys in the given edit because of mini-batching. We assume
894      * that there's at least one Cell in the WALEdit.
895      * @param edit edit to count row keys from
896      * @return number of different row keys
897      */
898     private int countDistinctRowKeys(WALEdit edit) {
899       List<Cell> cells = edit.getCells();
900       int distinctRowKeys = 1;
901       Cell lastCell = cells.get(0);
902       for (int i = 0; i < edit.size(); i++) {
903         if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
904           distinctRowKeys++;
905         }
906       }
907       return distinctRowKeys;
908     }
909 
910     /**
911      * Do the shipping logic
912      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
913      * written to when this method was called
914      */
915     protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
916       int sleepMultiplier = 0;
917       if (entries.isEmpty()) {
918         LOG.warn("Was given 0 edits to ship");
919         return;
920       }
921       while (isWorkerActive()) {
922         try {
923           if (throttler.isEnabled()) {
924             long sleepTicks = throttler.getNextSleepInterval(currentSize);
925             if (sleepTicks > 0) {
926               try {
927                 if (LOG.isTraceEnabled()) {
928                   LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
929                 }
930                 Thread.sleep(sleepTicks);
931               } catch (InterruptedException e) {
932                 LOG.debug("Interrupted while sleeping for throttling control");
933                 Thread.currentThread().interrupt();
934                 // current thread might be interrupted to terminate
935                 // directly go back to while() for confirm this
936                 continue;
937               }
938               // reset throttler's cycle start tick when sleep for throttling occurs
939               throttler.resetStartTick();
940             }
941           }
942           // create replicateContext here, so the entries can be GC'd upon return from this call
943           // stack
944           ReplicationEndpoint.ReplicateContext replicateContext =
945               new ReplicationEndpoint.ReplicateContext();
946           replicateContext.setEntries(entries).setSize(currentSize);
947           replicateContext.setWalGroupId(walGroupId);
948 
949           long startTimeNs = System.nanoTime();
950           // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
951           boolean replicated = replicationEndpoint.replicate(replicateContext);
952           long endTimeNs = System.nanoTime();
953 
954           if (!replicated) {
955             continue;
956           } else {
957             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
958           }
959 
960           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
961             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
962               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
963               currentWALisBeingWrittenTo);
964             this.lastLoggedPosition = this.repLogReader.getPosition();
965           }
966           if (throttler.isEnabled()) {
967             throttler.addPushSize(currentSize);
968           }
969           totalReplicatedEdits.addAndGet(entries.size());
970           totalReplicatedOperations.addAndGet(currentNbOperations);
971           // FIXME check relationship between wal group and overall
972           metrics.shipBatch(currentNbOperations, currentSize / 1024);
973           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
974             walGroupId);
975           if (LOG.isTraceEnabled()) {
976             LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
977                 + totalReplicatedOperations + " operations in "
978                 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
979           }
980           break;
981         } catch (Exception ex) {
982           LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
983               + org.apache.hadoop.util.StringUtils.stringifyException(ex));
984           if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
985             sleepMultiplier++;
986           }
987         }
988       }
989     }
990 
991     /**
992      * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
993      * we're done! Else we'll just continue to try reading the log file
994      * @return true if we're done with the current file, false if we should continue trying to read
995      *         from it
996      */
997     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
998         justification = "Yeah, this is how it works")
999     protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
1000       // We presume this means the file we're reading is closed.
1001       if (this.queue.size() != 0) {
1002         // -1 means the wal wasn't closed cleanly.
1003         final long trailerSize = this.repLogReader.currentTrailerSize();
1004         final long currentPosition = this.repLogReader.getPosition();
1005         FileStatus stat = null;
1006         try {
1007           stat = fs.getFileStatus(this.currentPath);
1008         } catch (IOException exception) {
1009           LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
1010               + ", stats: " + getStats());
1011           metrics.incrUnknownFileLengthForClosedWAL();
1012         }
1013         if (stat != null) {
1014           if (trailerSize < 0) {
1015             if (currentPosition < stat.getLen()) {
1016               final long skippedBytes = stat.getLen() - currentPosition;
1017               LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
1018               metrics.incrUncleanlyClosedWALs();
1019               metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
1020             }
1021           } else if (currentPosition + trailerSize < stat.getLen()){
1022             LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
1023                 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
1024             repLogReader.setPosition(0);
1025             metrics.incrRestartedWALReading();
1026             metrics.incrRepeatedFileBytes(currentPosition);
1027             return false;
1028           }
1029         }
1030         if (LOG.isTraceEnabled()) {
1031           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
1032               + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
1033         }
1034         if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
1035           return false;
1036         }
1037         this.currentPath = null;
1038         this.repLogReader.finishCurrentFile();
1039         this.reader = null;
1040         metrics.incrCompletedWAL();
1041         return true;
1042       } else if (this.replicationQueueInfo.isQueueRecovered()) {
1043         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
1044             + peerClusterZnode);
1045         metrics.incrCompletedRecoveryQueue();
1046         setWorkerState(WorkerState.FINISHED);
1047         return true;
1048       }
1049       return false;
1050     }
1051 
1052     public void startup() {
1053       String n = Thread.currentThread().getName();
1054       Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1055         @Override
1056         public void uncaughtException(final Thread t, final Throwable e) {
1057           RSRpcServices.exitIfOOME(e);
1058           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1059               + getCurrentPath(), e);
1060           stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
1061         }
1062       };
1063       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1064           + peerClusterZnode, handler);
1065       workerThreads.put(walGroupId, this);
1066     }
1067 
1068     public Path getCurrentPath() {
1069       return this.currentPath;
1070     }
1071 
1072     public long getCurrentPosition() {
1073       return this.repLogReader.getPosition();
1074     }
1075 
1076     private boolean isWorkerActive() {
1077       return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
1078     }
1079 
1080     private void terminate(String reason, Exception cause) {
1081       if (cause == null) {
1082         LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1083 
1084       } else {
1085         LOG.error("Closing worker for wal group " + this.walGroupId
1086             + " because an error occurred: " + reason, cause);
1087       }
1088       setWorkerState(WorkerState.STOPPED);
1089       this.interrupt();
1090       Threads.shutdown(this, sleepForRetries);
1091       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1092     }
1093 
1094     /**
1095      * Set the worker state
1096      * @param state
1097      */
1098     public void setWorkerState(WorkerState state) {
1099       this.state = state;
1100     }
1101 
1102     /**
1103      * Get the current state of this worker.
1104      * @return WorkerState
1105      */
1106     public WorkerState getWorkerState() {
1107       return state;
1108     }
1109   }
1110 }