View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.UUID;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.regionserver.wal.HLog;
45  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
46  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
48  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
49  import org.apache.hadoop.hbase.replication.ReplicationException;
50  import org.apache.hadoop.hbase.replication.ReplicationPeers;
51  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
52  import org.apache.hadoop.hbase.replication.ReplicationQueues;
53  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
54  import org.apache.hadoop.hbase.replication.WALEntryFilter;
55  import org.apache.hadoop.hbase.util.Threads;
56  
57  import com.google.common.collect.Lists;
58  import com.google.common.util.concurrent.ListenableFuture;
59  import com.google.common.util.concurrent.Service;
60  
61  /**
62   * Class that handles the source of a replication stream.
63   * Currently does not handle more than 1 slave
64   * For each slave cluster it selects a random number of peers
65   * using a replication ratio. For example, if replication ration = 0.1
66   * and slave cluster has 100 region servers, 10 will be selected.
67   * <p/>
68   * A stream is considered down when we cannot contact a region server on the
69   * peer cluster for more than 55 seconds by default.
70   * <p/>
71   *
72   */
73  @InterfaceAudience.Private
74  public class ReplicationSource extends Thread
75      implements ReplicationSourceInterface {
76  
77    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
78    // Queue of logs to process
79    private PriorityBlockingQueue<Path> queue;
80    private ReplicationQueues replicationQueues;
81    private ReplicationPeers replicationPeers;
82  
83    private Configuration conf;
84    private ReplicationQueueInfo replicationQueueInfo;
85    // id of the peer cluster this source replicates to
86    private String peerId;
87    // The manager of all sources to which we ping back our progress
88    private ReplicationSourceManager manager;
89    // Should we stop everything?
90    private Stoppable stopper;
91    // How long should we sleep for each retry
92    private long sleepForRetries;
93    // Max size in bytes of entriesArray
94    private long replicationQueueSizeCapacity;
95    // Max number of entries in entriesArray
96    private int replicationQueueNbCapacity;
97    // Our reader for the current log
98    private HLog.Reader reader;
99    // Last position in the log that we sent to ZooKeeper
100   private long lastLoggedPosition = -1;
101   // Path of the current log
102   private volatile Path currentPath;
103   private FileSystem fs;
104   // id of this cluster
105   private UUID clusterId;
106   // id of the other cluster
107   private UUID peerClusterId;
108   // total number of edits we replicated
109   private long totalReplicatedEdits = 0;
110   // total number of edits we replicated
111   private long totalReplicatedOperations = 0;
112   // The znode we currently play with
113   private String peerClusterZnode;
114   // Maximum number of retries before taking bold actions
115   private int maxRetriesMultiplier;
116   // Current number of operations (Put/Delete) that we need to replicate
117   private int currentNbOperations = 0;
118   // Current size of data we need to replicate
119   private int currentSize = 0;
120   // Indicates if this particular source is running
121   private volatile boolean running = true;
122   // Metrics for this source
123   private MetricsSource metrics;
124   // Handle on the log reader helper
125   private ReplicationHLogReaderManager repLogReader;
126   //WARN threshold for the number of queued logs, defaults to 2
127   private int logQueueWarnThreshold;
128   // ReplicationEndpoint which will handle the actual replication
129   private ReplicationEndpoint replicationEndpoint;
130   // A filter (or a chain of filters) for the WAL entries.
131   private WALEntryFilter walEntryFilter;
132   // Context for ReplicationEndpoint#replicate()
133   private ReplicationEndpoint.ReplicateContext replicateContext;
134   // throttler
135   private ReplicationThrottler throttler;
136 
137   /**
138    * Instantiation method used by region servers
139    *
140    * @param conf configuration to use
141    * @param fs file system to use
142    * @param manager replication manager to ping to
143    * @param stopper     the atomic boolean to use to stop the regionserver
144    * @param peerClusterZnode the name of our znode
145    * @param clusterId unique UUID for the cluster
146    * @param replicationEndpoint the replication endpoint implementation
147    * @param metrics metrics for replication source
148    * @throws IOException
149    */
150   @Override
151   public void init(final Configuration conf, final FileSystem fs,
152       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
153       final ReplicationPeers replicationPeers, final Stoppable stopper,
154       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
155       final MetricsSource metrics)
156           throws IOException {
157     this.stopper = stopper;
158     this.conf = HBaseConfiguration.create(conf);
159     decorateConf();
160     this.replicationQueueSizeCapacity =
161         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
162     this.replicationQueueNbCapacity =
163         this.conf.getInt("replication.source.nb.capacity", 25000);
164     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
165     this.queue =
166         new PriorityBlockingQueue<Path>(
167             this.conf.getInt("hbase.regionserver.maxlogs", 32),
168             new LogsComparator());
169     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
170     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
171     this.replicationQueues = replicationQueues;
172     this.replicationPeers = replicationPeers;
173     this.manager = manager;
174     this.sleepForRetries =
175         this.conf.getLong("replication.source.sleepforretries", 1000);
176     this.fs = fs;
177     this.metrics = metrics;
178     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
179     this.clusterId = clusterId;
180 
181     this.peerClusterZnode = peerClusterZnode;
182     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
183     // ReplicationQueueInfo parses the peerId out of the znode for us
184     this.peerId = this.replicationQueueInfo.getPeerId();
185     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
186     this.replicationEndpoint = replicationEndpoint;
187 
188     this.replicateContext = new ReplicationEndpoint.ReplicateContext();
189   }
190 
191   private void decorateConf() {
192     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
193     if (StringUtils.isNotEmpty(replicationCodec)) {
194       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
195     }
196   }
197 
198   @Override
199   public void enqueueLog(Path log) {
200     this.queue.put(log);
201     int queueSize = queue.size();
202     this.metrics.setSizeOfLogQueue(queueSize);
203     // This will log a warning for each new log that gets created above the warn threshold
204     if (queueSize > this.logQueueWarnThreshold) {
205       LOG.warn("Queue size: " + queueSize +
206         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
207     }
208   }
209 
210   private void uninitialize() {
211     LOG.debug("Source exiting " + this.peerId);
212     metrics.clear();
213     if (replicationEndpoint.state() == Service.State.STARTING
214         || replicationEndpoint.state() == Service.State.RUNNING) {
215       replicationEndpoint.stopAndWait();
216     }
217   }
218 
219   @Override
220   public void run() {
221     // We were stopped while looping to connect to sinks, just abort
222     if (!this.isActive()) {
223       uninitialize();
224       return;
225     }
226 
227     try {
228       // start the endpoint, connect to the cluster
229       Service.State state = replicationEndpoint.start().get();
230       if (state != Service.State.RUNNING) {
231         LOG.warn("ReplicationEndpoint was not started. Exiting");
232         uninitialize();
233         return;
234       }
235     } catch (Exception ex) {
236       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
237       throw new RuntimeException(ex);
238     }
239 
240     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
241     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
242       (WALEntryFilter)new SystemTableWALEntryFilter());
243     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
244     if (filterFromEndpoint != null) {
245       filters.add(filterFromEndpoint);
246     }
247     this.walEntryFilter = new ChainWALEntryFilter(filters);
248 
249     int sleepMultiplier = 1;
250     // delay this until we are in an asynchronous thread
251     while (this.isActive() && this.peerClusterId == null) {
252       this.peerClusterId = replicationEndpoint.getPeerUUID();
253       if (this.isActive() && this.peerClusterId == null) {
254         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
255           sleepMultiplier++;
256         }
257       }
258     }
259     // We were stopped while looping to contact peer's zk ensemble, just abort
260     if (!this.isActive()) {
261       uninitialize();
262       return;
263     }
264 
265     // resetting to 1 to reuse later
266     sleepMultiplier = 1;
267 
268     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
269     // peerClusterId value, which is the same as the source clusterId
270     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
271       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
272           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
273           + replicationEndpoint.getClass().getName(), null, false);
274     }
275     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
276 
277     // If this is recovered, the queue is already full and the first log
278     // normally has a position (unless the RS failed between 2 logs)
279     if (this.replicationQueueInfo.isQueueRecovered()) {
280       try {
281         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
282           this.queue.peek().getName()));
283         if (LOG.isTraceEnabled()) {
284           LOG.trace("Recovered queue started with log " + this.queue.peek() +
285               " at position " + this.repLogReader.getPosition());
286         }
287       } catch (ReplicationException e) {
288         this.terminate("Couldn't get the position of this recovered queue " +
289             this.peerClusterZnode, e);
290       }
291     }
292     // Loop until we close down
293     while (isActive()) {
294       // Sleep until replication is enabled again
295       if (!isPeerEnabled()) {
296         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
297           sleepMultiplier++;
298         }
299         continue;
300       }
301       Path oldPath = getCurrentPath(); //note that in the current scenario,
302                                        //oldPath will be null when a log roll
303                                        //happens.
304       // Get a new path
305       boolean hasCurrentPath = getNextPath();
306       if (getCurrentPath() != null && oldPath == null) {
307         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
308       }
309       if (!hasCurrentPath) {
310         if (sleepForRetries("No log to process", sleepMultiplier)) {
311           sleepMultiplier++;
312         }
313         continue;
314       }
315       boolean currentWALisBeingWrittenTo = false;
316       //For WAL files we own (rather than recovered), take a snapshot of whether the
317       //current WAL file (this.currentPath) is in use (for writing) NOW!
318       //Since the new WAL paths are enqueued only after the prev WAL file
319       //is 'closed', presence of an element in the queue means that
320       //the previous WAL file was closed, else the file is in use (currentPath)
321       //We take the snapshot now so that we are protected against races
322       //where a new file gets enqueued while the current file is being processed
323       //(and where we just finished reading the current file).
324       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
325         currentWALisBeingWrittenTo = true;
326       }
327       // Open a reader on it
328       if (!openReader(sleepMultiplier)) {
329         // Reset the sleep multiplier, else it'd be reused for the next file
330         sleepMultiplier = 1;
331         continue;
332       }
333 
334       // If we got a null reader but didn't continue, then sleep and continue
335       if (this.reader == null) {
336         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
337           sleepMultiplier++;
338         }
339         continue;
340       }
341 
342       boolean gotIOE = false;
343       currentNbOperations = 0;
344       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
345       currentSize = 0;
346       try {
347         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
348           continue;
349         }
350       } catch (IOException ioe) {
351         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
352         gotIOE = true;
353         if (ioe.getCause() instanceof EOFException) {
354 
355           boolean considerDumping = false;
356           if (this.replicationQueueInfo.isQueueRecovered()) {
357             try {
358               FileStatus stat = this.fs.getFileStatus(this.currentPath);
359               if (stat.getLen() == 0) {
360                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
361               }
362               considerDumping = true;
363             } catch (IOException e) {
364               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
365             }
366           }
367 
368           if (considerDumping &&
369               sleepMultiplier == this.maxRetriesMultiplier &&
370               processEndOfFile()) {
371             continue;
372           }
373         }
374       } finally {
375         try {
376           this.reader = null;
377           this.repLogReader.closeReader();
378         } catch (IOException e) {
379           gotIOE = true;
380           LOG.warn("Unable to finalize the tailing of a file", e);
381         }
382       }
383 
384       // If we didn't get anything to replicate, or if we hit a IOE,
385       // wait a bit and retry.
386       // But if we need to stop, don't bother sleeping
387       if (this.isActive() && (gotIOE || entries.isEmpty())) {
388         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
389           this.manager.logPositionAndCleanOldLogs(this.currentPath,
390               this.peerClusterZnode, this.repLogReader.getPosition(),
391               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
392           this.lastLoggedPosition = this.repLogReader.getPosition();
393         }
394         // Reset the sleep multiplier if nothing has actually gone wrong
395         if (!gotIOE) {
396           sleepMultiplier = 1;
397           // if there was nothing to ship and it's not an error
398           // set "ageOfLastShippedOp" to <now> to indicate that we're current
399           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
400         }
401         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
402           sleepMultiplier++;
403         }
404         continue;
405       }
406       sleepMultiplier = 1;
407       shipEdits(currentWALisBeingWrittenTo, entries);
408     }
409     uninitialize();
410   }
411 
412   /**
413    * Read all the entries from the current log files and retain those
414    * that need to be replicated. Else, process the end of the current file.
415    * @param currentWALisBeingWrittenTo is the current WAL being written to
416    * @param entries resulting entries to be replicated
417    * @return true if we got nothing and went to the next file, false if we got
418    * entries
419    * @throws IOException
420    */
421   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
422       List<HLog.Entry> entries) throws IOException{
423     long seenEntries = 0;
424     if (LOG.isTraceEnabled()) {
425       LOG.trace("Seeking in " + this.currentPath + " at position "
426           + this.repLogReader.getPosition());
427     }
428     this.repLogReader.seek();
429     long positionBeforeRead = this.repLogReader.getPosition();
430     HLog.Entry entry =
431         this.repLogReader.readNextAndSetPosition();
432     while (entry != null) {
433       this.metrics.incrLogEditsRead();
434       seenEntries++;
435 
436       // don't replicate if the log entries have already been consumed by the cluster
437       if (replicationEndpoint.canReplicateToSameCluster()
438           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
439         // Remove all KVs that should not be replicated
440         entry = walEntryFilter.filter(entry);
441         WALEdit edit = null;
442         HLogKey logKey = null;
443         if (entry != null) {
444           edit = entry.getEdit();
445           logKey = entry.getKey();
446         }
447 
448         if (edit != null && edit.size() != 0) {
449           //Mark that the current cluster has the change
450           logKey.addClusterId(clusterId);
451           currentNbOperations += countDistinctRowKeys(edit);
452           entries.add(entry);
453           currentSize += entry.getEdit().heapSize();
454         } else {
455           this.metrics.incrLogEditsFiltered();
456         }
457       }
458       // Stop if too many entries or too big
459       if (currentSize >= this.replicationQueueSizeCapacity ||
460           entries.size() >= this.replicationQueueNbCapacity) {
461         break;
462       }
463       try {
464         entry = this.repLogReader.readNextAndSetPosition();
465       } catch (IOException ie) {
466         LOG.debug("Break on IOE: " + ie.getMessage());
467         break;
468       }
469     }
470     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
471     if (currentWALisBeingWrittenTo) {
472       return false;
473     }
474     // If we didn't get anything and the queue has an object, it means we
475     // hit the end of the file for sure
476     return seenEntries == 0 && processEndOfFile();
477   }
478 
479   /**
480    * Poll for the next path
481    * @return true if a path was obtained, false if not
482    */
483   protected boolean getNextPath() {
484     try {
485       if (this.currentPath == null) {
486         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
487         this.metrics.setSizeOfLogQueue(queue.size());
488         if (this.currentPath != null) {
489           this.manager.cleanOldLogs(this.currentPath.getName(),
490               this.peerId,
491               this.replicationQueueInfo.isQueueRecovered());
492           if (LOG.isTraceEnabled()) {
493             LOG.trace("New log: " + this.currentPath);
494           }
495         }
496       }
497     } catch (InterruptedException e) {
498       LOG.warn("Interrupted while reading edits", e);
499     }
500     return this.currentPath != null;
501   }
502 
503   /**
504    * Open a reader on the current path
505    *
506    * @param sleepMultiplier by how many times the default sleeping time is augmented
507    * @return true if we should continue with that file, false if we are over with it
508    */
509   protected boolean openReader(int sleepMultiplier) {
510     try {
511       try {
512         if (LOG.isTraceEnabled()) {
513           LOG.trace("Opening log " + this.currentPath);
514         }
515         this.reader = repLogReader.openReader(this.currentPath);
516       } catch (FileNotFoundException fnfe) {
517         if (this.replicationQueueInfo.isQueueRecovered()) {
518           // We didn't find the log in the archive directory, look if it still
519           // exists in the dead RS folder (there could be a chain of failures
520           // to look at)
521           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
522           LOG.info("NB dead servers : " + deadRegionServers.size());
523           for (String curDeadServerName : deadRegionServers) {
524             Path deadRsDirectory =
525                 new Path(manager.getLogDir().getParent(), curDeadServerName);
526             Path[] locs = new Path[] {
527                 new Path(deadRsDirectory, currentPath.getName()),
528                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
529                                           currentPath.getName()),
530             };
531             for (Path possibleLogLocation : locs) {
532               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
533               if (this.manager.getFs().exists(possibleLogLocation)) {
534                 // We found the right new location
535                 LOG.info("Log " + this.currentPath + " still exists at " +
536                     possibleLogLocation);
537                 // Breaking here will make us sleep since reader is null
538                 return true;
539               }
540             }
541           }
542           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
543           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
544           if (stopper instanceof ReplicationSyncUp.DummyServer) {
545             FileStatus[] rss = fs.listStatus(manager.getLogDir());
546             for (FileStatus rs : rss) {
547               Path p = rs.getPath();
548               FileStatus[] logs = fs.listStatus(p);
549               for (FileStatus log : logs) {
550                 p = new Path(p, log.getPath().getName());
551                 if (p.getName().equals(currentPath.getName())) {
552                   currentPath = p;
553                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
554                   // Open the log at the new location
555                   this.openReader(sleepMultiplier);
556                   return true;
557                 }
558               }
559             }
560           }
561 
562           // TODO What happens if the log was missing from every single location?
563           // Although we need to check a couple of times as the log could have
564           // been moved by the master between the checks
565           // It can also happen if a recovered queue wasn't properly cleaned,
566           // such that the znode pointing to a log exists but the log was
567           // deleted a long time ago.
568           // For the moment, we'll throw the IO and processEndOfFile
569           throw new IOException("File from recovered queue is " +
570               "nowhere to be found", fnfe);
571         } else {
572           // If the log was archived, continue reading from there
573           Path archivedLogLocation =
574               new Path(manager.getOldLogDir(), currentPath.getName());
575           if (this.manager.getFs().exists(archivedLogLocation)) {
576             currentPath = archivedLogLocation;
577             LOG.info("Log " + this.currentPath + " was moved to " +
578                 archivedLogLocation);
579             // Open the log at the new location
580             this.openReader(sleepMultiplier);
581 
582           }
583           // TODO What happens the log is missing in both places?
584         }
585       }
586     } catch (IOException ioe) {
587       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
588       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
589       this.reader = null;
590       if (ioe.getCause() instanceof NullPointerException) {
591         // Workaround for race condition in HDFS-4380
592         // which throws a NPE if we open a file before any data node has the most recent block
593         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
594         LOG.warn("Got NPE opening reader, will retry.");
595       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
596         // TODO Need a better way to determine if a file is really gone but
597         // TODO without scanning all logs dir
598         LOG.warn("Waited too long for this file, considering dumping");
599         return !processEndOfFile();
600       }
601     }
602     return true;
603   }
604 
605   /*
606    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
607    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
608    * trying to read the log file and get EOFException. In case of a recovered queue the last log
609    * file may be empty, and we don't want to retry that.
610    */
611   private boolean isCurrentLogEmpty() {
612     return (this.repLogReader.getPosition() == 0 &&
613         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
614   }
615 
616   /**
617    * Do the sleeping logic
618    * @param msg Why we sleep
619    * @param sleepMultiplier by how many times the default sleeping time is augmented
620    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
621    */
622   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
623     try {
624       if (LOG.isTraceEnabled()) {
625         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
626       }
627       Thread.sleep(this.sleepForRetries * sleepMultiplier);
628     } catch (InterruptedException e) {
629       LOG.debug("Interrupted while sleeping between retries");
630       Thread.currentThread().interrupt();
631     }
632     return sleepMultiplier < maxRetriesMultiplier;
633   }
634 
635   /**
636    * Count the number of different row keys in the given edit because of
637    * mini-batching. We assume that there's at least one Cell in the WALEdit.
638    * @param edit edit to count row keys from
639    * @return number of different row keys
640    */
641   private int countDistinctRowKeys(WALEdit edit) {
642     List<Cell> cells = edit.getCells();
643     int distinctRowKeys = 1;
644     Cell lastCell = cells.get(0);
645     for (int i = 0; i < edit.size(); i++) {
646       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
647         distinctRowKeys++;
648       }
649     }
650     return distinctRowKeys;
651   }
652 
653   /**
654    * Do the shipping logic
655    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
656    * written to when this method was called
657    */
658   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
659     int sleepMultiplier = 1;
660     if (entries.isEmpty()) {
661       LOG.warn("Was given 0 edits to ship");
662       return;
663     }
664     while (this.isActive()) {
665       try {
666         if (this.throttler.isEnabled()) {
667           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
668           if (sleepTicks > 0) {
669             try {
670               if (LOG.isTraceEnabled()) {
671                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
672               }
673               Thread.sleep(sleepTicks);
674             } catch (InterruptedException e) {
675               LOG.debug("Interrupted while sleeping for throttling control");
676               Thread.currentThread().interrupt();
677               // current thread might be interrupted to terminate
678               // directly go back to while() for confirm this
679               continue;
680             }
681             // reset throttler's cycle start tick when sleep for throttling occurs
682             this.throttler.resetStartTick();
683           }
684         }
685         replicateContext.setEntries(entries).setSize(currentSize);
686 
687         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
688         boolean replicated = replicationEndpoint.replicate(replicateContext);
689 
690         if (!replicated) {
691           continue;
692         }
693 
694         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
695           this.manager.logPositionAndCleanOldLogs(this.currentPath,
696               this.peerClusterZnode, this.repLogReader.getPosition(),
697               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
698           this.lastLoggedPosition = this.repLogReader.getPosition();
699         }
700         if (this.throttler.isEnabled()) {
701           this.throttler.addPushSize(currentSize);
702         }
703         this.totalReplicatedEdits += entries.size();
704         this.totalReplicatedOperations += currentNbOperations;
705         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
706         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
707         if (LOG.isTraceEnabled()) {
708           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
709               + this.totalReplicatedOperations + " operations");
710         }
711         break;
712       } catch (Exception ex) {
713         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
714             org.apache.hadoop.util.StringUtils.stringifyException(ex));
715         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
716           sleepMultiplier++;
717         }
718       }
719     }
720   }
721 
722   /**
723    * check whether the peer is enabled or not
724    *
725    * @return true if the peer is enabled, otherwise false
726    */
727   protected boolean isPeerEnabled() {
728     return this.replicationPeers.getStatusOfPeer(this.peerId);
729   }
730 
731   /**
732    * If the queue isn't empty, switch to the next one
733    * Else if this is a recovered queue, it means we're done!
734    * Else we'll just continue to try reading the log file
735    * @return true if we're done with the current file, false if we should
736    * continue trying to read from it
737    */
738   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
739       justification="Yeah, this is how it works")
740   protected boolean processEndOfFile() {
741     if (this.queue.size() != 0) {
742       if (LOG.isTraceEnabled()) {
743         String filesize = "N/A";
744         try {
745           FileStatus stat = this.fs.getFileStatus(this.currentPath);
746           filesize = stat.getLen()+"";
747         } catch (IOException ex) {}
748         LOG.trace("Reached the end of a log, stats: " + getStats() +
749             ", and the length of the file is " + filesize);
750       }
751       this.currentPath = null;
752       this.repLogReader.finishCurrentFile();
753       this.reader = null;
754       return true;
755     } else if (this.replicationQueueInfo.isQueueRecovered()) {
756       this.manager.closeRecoveredQueue(this);
757       LOG.info("Finished recovering the queue with the following stats " + getStats());
758       this.running = false;
759       return true;
760     }
761     return false;
762   }
763 
764   @Override
765   public void startup() {
766     String n = Thread.currentThread().getName();
767     Thread.UncaughtExceptionHandler handler =
768         new Thread.UncaughtExceptionHandler() {
769           @Override
770           public void uncaughtException(final Thread t, final Throwable e) {
771             LOG.error("Unexpected exception in ReplicationSource," +
772               " currentPath=" + currentPath, e);
773           }
774         };
775     Threads.setDaemonThreadRunning(
776         this, n + ".replicationSource," +
777         this.peerClusterZnode, handler);
778   }
779 
780   @Override
781   public void terminate(String reason) {
782     terminate(reason, null);
783   }
784 
785   @Override
786   public void terminate(String reason, Exception cause) {
787     terminate(reason, cause, true);
788   }
789 
790   public void terminate(String reason, Exception cause, boolean join) {
791     if (cause == null) {
792       LOG.info("Closing source "
793           + this.peerClusterZnode + " because: " + reason);
794 
795     } else {
796       LOG.error("Closing source " + this.peerClusterZnode
797           + " because an error occurred: " + reason, cause);
798     }
799     this.running = false;
800     this.interrupt();
801     ListenableFuture<Service.State> future = null;
802     if (this.replicationEndpoint != null) {
803       future = this.replicationEndpoint.stop();
804     }
805     if (join) {
806       Threads.shutdown(this, this.sleepForRetries);
807       if (future != null) {
808         try {
809           future.get();
810         } catch (Exception e) {
811           LOG.warn("Got exception:" + e);
812         }
813       }
814     }
815   }
816 
817   @Override
818   public String getPeerClusterZnode() {
819     return this.peerClusterZnode;
820   }
821 
822   @Override
823   public String getPeerClusterId() {
824     return this.peerId;
825   }
826 
827   @Override
828   public Path getCurrentPath() {
829     return this.currentPath;
830   }
831 
832   private boolean isActive() {
833     return !this.stopper.isStopped() && this.running && !isInterrupted();
834   }
835 
836   /**
837    * Comparator used to compare logs together based on their start time
838    */
839   public static class LogsComparator implements Comparator<Path> {
840 
841     @Override
842     public int compare(Path o1, Path o2) {
843       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
844     }
845 
846     /**
847      * Split a path to get the start time
848      * For example: 10.20.20.171%3A60020.1277499063250
849      * @param p path to split
850      * @return start time
851      */
852     private long getTS(Path p) {
853       String[] parts = p.getName().split("\\.");
854       return Long.parseLong(parts[parts.length-1]);
855     }
856   }
857 
858   @Override
859   public String getStats() {
860     long position = this.repLogReader.getPosition();
861     return "Total replicated edits: " + totalReplicatedEdits +
862       ", currently replicating from: " + this.currentPath +
863       " at position: " + position;
864   }
865 }