View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.net.ConnectException;
25  import java.net.SocketTimeoutException;
26  import java.util.ArrayList;
27  import java.util.Comparator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.UUID;
32  import java.util.concurrent.PriorityBlockingQueue;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.lang.StringUtils;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.Stoppable;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.TableNotFoundException;
50  import org.apache.hadoop.hbase.client.HConnection;
51  import org.apache.hadoop.hbase.client.HConnectionManager;
52  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
53  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
54  import org.apache.hadoop.hbase.regionserver.wal.HLog;
55  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.ReplicationException;
58  import org.apache.hadoop.hbase.replication.ReplicationPeers;
59  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60  import org.apache.hadoop.hbase.replication.ReplicationQueues;
61  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.apache.hadoop.ipc.RemoteException;
65  
66  /**
67   * Class that handles the source of a replication stream.
68   * Currently does not handle more than 1 slave
69   * For each slave cluster it selects a random number of peers
70   * using a replication ratio. For example, if replication ration = 0.1
71   * and slave cluster has 100 region servers, 10 will be selected.
72   * <p/>
73   * A stream is considered down when we cannot contact a region server on the
74   * peer cluster for more than 55 seconds by default.
75   * <p/>
76   *
77   */
78  @InterfaceAudience.Private
79  public class ReplicationSource extends Thread
80      implements ReplicationSourceInterface {
81  
82    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
83    // Queue of logs to process
84    private PriorityBlockingQueue<Path> queue;
85    private HConnection conn;
86    private ReplicationQueues replicationQueues;
87    private ReplicationPeers replicationPeers;
88    private Configuration conf;
89    private ReplicationQueueInfo replicationQueueInfo;
90    // id of the peer cluster this source replicates to
91    private String peerId;
92    // The manager of all sources to which we ping back our progress
93    private ReplicationSourceManager manager;
94    // Should we stop everything?
95    private Stoppable stopper;
96    // How long should we sleep for each retry
97    private long sleepForRetries;
98    // Max size in bytes of entriesArray
99    private long replicationQueueSizeCapacity;
100   // Max number of entries in entriesArray
101   private int replicationQueueNbCapacity;
102   // Our reader for the current log
103   private HLog.Reader reader;
104   // Last position in the log that we sent to ZooKeeper
105   private long lastLoggedPosition = -1;
106   // Path of the current log
107   private volatile Path currentPath;
108   private FileSystem fs;
109   // id of this cluster
110   private UUID clusterId;
111   // id of the other cluster
112   private UUID peerClusterId;
113   // total number of edits we replicated
114   private long totalReplicatedEdits = 0;
115   // total number of edits we replicated
116   private long totalReplicatedOperations = 0;
117   // The znode we currently play with
118   private String peerClusterZnode;
119   // Maximum number of retries before taking bold actions
120   private int maxRetriesMultiplier;
121   // Socket timeouts require even bolder actions since we don't want to DDOS
122   private int socketTimeoutMultiplier;
123   // Current number of operations (Put/Delete) that we need to replicate
124   private int currentNbOperations = 0;
125   // Current size of data we need to replicate
126   private int currentSize = 0;
127   // Indicates if this particular source is running
128   private volatile boolean running = true;
129   // Metrics for this source
130   private MetricsSource metrics;
131   // Handle on the log reader helper
132   private ReplicationHLogReaderManager repLogReader;
133   // Handles connecting to peer region servers
134   private ReplicationSinkManager replicationSinkMgr;
135   //WARN threshold for the number of queued logs, defaults to 2
136   private int logQueueWarnThreshold;
137   // throttler
138   private ReplicationThrottler throttler;
139 
140   /**
141    * Instantiation method used by region servers
142    *
143    * @param conf configuration to use
144    * @param fs file system to use
145    * @param manager replication manager to ping to
146    * @param stopper     the atomic boolean to use to stop the regionserver
147    * @param peerClusterZnode the name of our znode
148    * @throws IOException
149    */
150   public void init(final Configuration conf, final FileSystem fs,
151       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
152       final ReplicationPeers replicationPeers, final Stoppable stopper,
153       final String peerClusterZnode, final UUID clusterId) throws IOException {
154     this.stopper = stopper;
155     this.conf = HBaseConfiguration.create(conf);
156     decorateConf();
157     this.replicationQueueSizeCapacity =
158         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
159     this.replicationQueueNbCapacity =
160         this.conf.getInt("replication.source.nb.capacity", 25000);
161     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
162     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
163         maxRetriesMultiplier * maxRetriesMultiplier);
164     this.queue =
165         new PriorityBlockingQueue<Path>(
166             this.conf.getInt("hbase.regionserver.maxlogs", 32),
167             new LogsComparator());
168     // TODO: This connection is replication specific or we should make it particular to
169     // replication and make replication specific settings such as compression or codec to use
170     // passing Cells.
171     this.conn = HConnectionManager.getConnection(this.conf);
172     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
173     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
174     this.replicationQueues = replicationQueues;
175     this.replicationPeers = replicationPeers;
176     this.manager = manager;
177     this.sleepForRetries =
178         this.conf.getLong("replication.source.sleepforretries", 1000);
179     this.fs = fs;
180     this.metrics = new MetricsSource(peerClusterZnode);
181     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
182     this.clusterId = clusterId;
183 
184     this.peerClusterZnode = peerClusterZnode;
185     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
186     // ReplicationQueueInfo parses the peerId out of the znode for us
187     this.peerId = this.replicationQueueInfo.getPeerId();
188     this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
189     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
190   }
191 
192   private void decorateConf() {
193     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
194     if (StringUtils.isNotEmpty(replicationCodec)) {
195       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
196     }
197   }
198 
199   @Override
200   public void enqueueLog(Path log) {
201     this.queue.put(log);
202     int queueSize = queue.size();
203     this.metrics.setSizeOfLogQueue(queueSize);
204     // This will log a warning for each new log that gets created above the warn threshold
205     if (queueSize > this.logQueueWarnThreshold) {
206       LOG.warn("Queue size: " + queueSize +
207         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
208     }
209   }
210 
211   private void uninitialize() {
212     if (this.conn != null) {
213       try {
214         this.conn.close();
215       } catch (IOException e) {
216         LOG.debug("Attempt to close connection failed", e);
217       }
218     }
219     LOG.debug("Source exiting " + this.peerId);
220     metrics.clear();
221   }
222 
223   @Override
224   public void run() {
225     connectToPeers();
226     // We were stopped while looping to connect to sinks, just abort
227     if (!this.isActive()) {
228       uninitialize();
229       return;
230     }
231 
232     int sleepMultiplier = 1;
233     // delay this until we are in an asynchronous thread
234     while (this.isActive() && this.peerClusterId == null) {
235       this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
236       if (this.isActive() && this.peerClusterId == null) {
237         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
238           sleepMultiplier++;
239         }
240       }
241     }
242     // We were stopped while looping to contact peer's zk ensemble, just abort
243     if (!this.isActive()) {
244       uninitialize();
245       return;
246     }
247 
248     // resetting to 1 to reuse later
249     sleepMultiplier = 1;
250 
251     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
252     // peerClusterId value, which is the same as the source clusterId
253     if (clusterId.equals(peerClusterId)) {
254       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
255           + peerClusterId);
256     }
257     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
258 
259     // If this is recovered, the queue is already full and the first log
260     // normally has a position (unless the RS failed between 2 logs)
261     if (this.replicationQueueInfo.isQueueRecovered()) {
262       try {
263         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
264           this.queue.peek().getName()));
265         if (LOG.isTraceEnabled()) {
266           LOG.trace("Recovered queue started with log " + this.queue.peek() +
267               " at position " + this.repLogReader.getPosition());
268         }
269       } catch (ReplicationException e) {
270         this.terminate("Couldn't get the position of this recovered queue " +
271             this.peerClusterZnode, e);
272       }
273     }
274     // Loop until we close down
275     while (isActive()) {
276       // Sleep until replication is enabled again
277       if (!isPeerEnabled()) {
278         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
279           sleepMultiplier++;
280         }
281         continue;
282       }
283       Path oldPath = getCurrentPath(); //note that in the current scenario,
284                                        //oldPath will be null when a log roll
285                                        //happens.
286       // Get a new path
287       boolean hasCurrentPath = getNextPath();
288       if (getCurrentPath() != null && oldPath == null) {
289         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
290       }
291       if (!hasCurrentPath) {
292         if (sleepForRetries("No log to process", sleepMultiplier)) {
293           sleepMultiplier++;
294         }
295         continue;
296       }
297       boolean currentWALisBeingWrittenTo = false;
298       //For WAL files we own (rather than recovered), take a snapshot of whether the
299       //current WAL file (this.currentPath) is in use (for writing) NOW!
300       //Since the new WAL paths are enqueued only after the prev WAL file
301       //is 'closed', presence of an element in the queue means that
302       //the previous WAL file was closed, else the file is in use (currentPath)
303       //We take the snapshot now so that we are protected against races
304       //where a new file gets enqueued while the current file is being processed
305       //(and where we just finished reading the current file).
306       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
307         currentWALisBeingWrittenTo = true;
308       }
309       // Open a reader on it
310       if (!openReader(sleepMultiplier)) {
311         // Reset the sleep multiplier, else it'd be reused for the next file
312         sleepMultiplier = 1;
313         continue;
314       }
315 
316       // If we got a null reader but didn't continue, then sleep and continue
317       if (this.reader == null) {
318         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
319           sleepMultiplier++;
320         }
321         continue;
322       }
323 
324       boolean gotIOE = false;
325       currentNbOperations = 0;
326       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
327       currentSize = 0;
328       try {
329         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
330           continue;
331         }
332       } catch (IOException ioe) {
333         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
334         gotIOE = true;
335         if (ioe.getCause() instanceof EOFException) {
336 
337           boolean considerDumping = false;
338           if (this.replicationQueueInfo.isQueueRecovered()) {
339             try {
340               FileStatus stat = this.fs.getFileStatus(this.currentPath);
341               if (stat.getLen() == 0) {
342                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
343               }
344               considerDumping = true;
345             } catch (IOException e) {
346               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
347             }
348           }
349 
350           if (considerDumping &&
351               sleepMultiplier == this.maxRetriesMultiplier &&
352               processEndOfFile()) {
353             continue;
354           }
355         }
356       } finally {
357         try {
358           this.reader = null;
359           this.repLogReader.closeReader();
360         } catch (IOException e) {
361           gotIOE = true;
362           LOG.warn("Unable to finalize the tailing of a file", e);
363         }
364       }
365 
366       // If we didn't get anything to replicate, or if we hit a IOE,
367       // wait a bit and retry.
368       // But if we need to stop, don't bother sleeping
369       if (this.isActive() && (gotIOE || entries.isEmpty())) {
370         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
371           this.manager.logPositionAndCleanOldLogs(this.currentPath,
372               this.peerClusterZnode, this.repLogReader.getPosition(),
373               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
374           this.lastLoggedPosition = this.repLogReader.getPosition();
375         }
376         // Reset the sleep multiplier if nothing has actually gone wrong
377         if (!gotIOE) {
378           sleepMultiplier = 1;
379         }
380         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
381           sleepMultiplier++;
382         }
383         continue;
384       }
385       sleepMultiplier = 1;
386       shipEdits(currentWALisBeingWrittenTo, entries);
387     }
388     uninitialize();
389   }
390 
391   /**
392    * Read all the entries from the current log files and retain those
393    * that need to be replicated. Else, process the end of the current file.
394    * @param currentWALisBeingWrittenTo is the current WAL being written to
395    * @param entries resulting entries to be replicated
396    * @return true if we got nothing and went to the next file, false if we got
397    * entries
398    * @throws IOException
399    */
400   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
401       throws IOException{
402     long seenEntries = 0;
403     if (LOG.isTraceEnabled()) {
404       LOG.trace("Seeking in " + this.currentPath + " at position "
405           + this.repLogReader.getPosition());
406     }
407     this.repLogReader.seek();
408     long positionBeforeRead = this.repLogReader.getPosition();
409     HLog.Entry entry =
410         this.repLogReader.readNextAndSetPosition();
411     while (entry != null) {
412       WALEdit edit = entry.getEdit();
413       this.metrics.incrLogEditsRead();
414       seenEntries++;
415       // Remove all KVs that should not be replicated
416       HLogKey logKey = entry.getKey();
417       // don't replicate if the log entries have already been consumed by the cluster
418       if (!logKey.getClusterIds().contains(peerClusterId)) {
419         removeNonReplicableEdits(entry);
420         // Don't replicate catalog entries, if the WALEdit wasn't
421         // containing anything to replicate and if we're currently not set to replicate
422         if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
423             edit.size() != 0) {
424           //Mark that the current cluster has the change
425           logKey.addClusterId(clusterId);
426           currentNbOperations += countDistinctRowKeys(edit);
427           entries.add(entry);
428           currentSize += entry.getEdit().heapSize();
429         } else {
430           this.metrics.incrLogEditsFiltered();
431         }
432       }
433       // Stop if too many entries or too big
434       if (currentSize >= this.replicationQueueSizeCapacity ||
435           entries.size() >= this.replicationQueueNbCapacity) {
436         break;
437       }
438       try {
439         entry = this.repLogReader.readNextAndSetPosition();
440       } catch (IOException ie) {
441         LOG.debug("Break on IOE: " + ie.getMessage());
442         break;
443       }
444     }
445     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
446     if (currentWALisBeingWrittenTo) {
447       return false;
448     }
449     // If we didn't get anything and the queue has an object, it means we
450     // hit the end of the file for sure
451     return seenEntries == 0 && processEndOfFile();
452   }
453 
454   private void connectToPeers() {
455     int sleepMultiplier = 1;
456 
457     // Connect to peer cluster first, unless we have to stop
458     while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
459       replicationSinkMgr.chooseSinks();
460       if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
461         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
462           sleepMultiplier++;
463         }
464       }
465     }
466   }
467 
468   /**
469    * Poll for the next path
470    * @return true if a path was obtained, false if not
471    */
472   protected boolean getNextPath() {
473     try {
474       if (this.currentPath == null) {
475         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
476         this.metrics.setSizeOfLogQueue(queue.size());
477         if (this.currentPath != null) {
478           this.manager.cleanOldLogs(this.currentPath.getName(),
479               this.peerId,
480               this.replicationQueueInfo.isQueueRecovered());
481           if (LOG.isTraceEnabled()) {
482             LOG.trace("New log: " + this.currentPath);
483           }
484         }
485       }
486     } catch (InterruptedException e) {
487       LOG.warn("Interrupted while reading edits", e);
488     }
489     return this.currentPath != null;
490   }
491 
492   /**
493    * Open a reader on the current path
494    *
495    * @param sleepMultiplier by how many times the default sleeping time is augmented
496    * @return true if we should continue with that file, false if we are over with it
497    */
498   protected boolean openReader(int sleepMultiplier) {
499     try {
500       try {
501         if (LOG.isTraceEnabled()) {
502           LOG.trace("Opening log " + this.currentPath);
503         }
504         this.reader = repLogReader.openReader(this.currentPath);
505       } catch (FileNotFoundException fnfe) {
506         if (this.replicationQueueInfo.isQueueRecovered()) {
507           // We didn't find the log in the archive directory, look if it still
508           // exists in the dead RS folder (there could be a chain of failures
509           // to look at)
510           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
511           LOG.info("NB dead servers : " + deadRegionServers.size());
512           for (String curDeadServerName : deadRegionServers) {
513             Path deadRsDirectory =
514                 new Path(manager.getLogDir().getParent(), curDeadServerName);
515             Path[] locs = new Path[] {
516                 new Path(deadRsDirectory, currentPath.getName()),
517                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
518                                           currentPath.getName()),
519             };
520             for (Path possibleLogLocation : locs) {
521               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
522               if (this.manager.getFs().exists(possibleLogLocation)) {
523                 // We found the right new location
524                 LOG.info("Log " + this.currentPath + " still exists at " +
525                     possibleLogLocation);
526                 // Breaking here will make us sleep since reader is null
527                 return true;
528               }
529             }
530           }
531           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
532           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
533           if (stopper instanceof ReplicationSyncUp.DummyServer) {
534             FileStatus[] rss = fs.listStatus(manager.getLogDir());
535             for (FileStatus rs : rss) {
536               Path p = rs.getPath();
537               FileStatus[] logs = fs.listStatus(p);
538               for (FileStatus log : logs) {
539                 p = new Path(p, log.getPath().getName());
540                 if (p.getName().equals(currentPath.getName())) {
541                   currentPath = p;
542                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
543                   // Open the log at the new location
544                   this.openReader(sleepMultiplier);
545                   return true;
546                 }
547               }
548             }
549           }
550 
551           // TODO What happens if the log was missing from every single location?
552           // Although we need to check a couple of times as the log could have
553           // been moved by the master between the checks
554           // It can also happen if a recovered queue wasn't properly cleaned,
555           // such that the znode pointing to a log exists but the log was
556           // deleted a long time ago.
557           // For the moment, we'll throw the IO and processEndOfFile
558           throw new IOException("File from recovered queue is " +
559               "nowhere to be found", fnfe);
560         } else {
561           // If the log was archived, continue reading from there
562           Path archivedLogLocation =
563               new Path(manager.getOldLogDir(), currentPath.getName());
564           if (this.manager.getFs().exists(archivedLogLocation)) {
565             currentPath = archivedLogLocation;
566             LOG.info("Log " + this.currentPath + " was moved to " +
567                 archivedLogLocation);
568             // Open the log at the new location
569             this.openReader(sleepMultiplier);
570 
571           }
572           // TODO What happens the log is missing in both places?
573         }
574       }
575     } catch (IOException ioe) {
576       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
577       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
578       this.reader = null;
579       if (ioe.getCause() instanceof NullPointerException) {
580         // Workaround for race condition in HDFS-4380
581         // which throws a NPE if we open a file before any data node has the most recent block
582         // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
583         LOG.warn("Got NPE opening reader, will retry.");
584       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
585         // TODO Need a better way to determine if a file is really gone but
586         // TODO without scanning all logs dir
587         LOG.warn("Waited too long for this file, considering dumping");
588         return !processEndOfFile();
589       }
590     }
591     return true;
592   }
593 
594   /*
595    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
596    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
597    * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
598    * may be empty, and we don't want to retry that.
599    */
600   private boolean isCurrentLogEmpty() {
601     return (this.repLogReader.getPosition() == 0 &&
602         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
603   }
604 
605   /**
606    * Do the sleeping logic
607    * @param msg Why we sleep
608    * @param sleepMultiplier by how many times the default sleeping time is augmented
609    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
610    */
611   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
612     try {
613       if (LOG.isTraceEnabled()) {
614         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
615       }
616       Thread.sleep(this.sleepForRetries * sleepMultiplier);
617     } catch (InterruptedException e) {
618       LOG.debug("Interrupted while sleeping between retries");
619       Thread.currentThread().interrupt();
620     }
621     return sleepMultiplier < maxRetriesMultiplier;
622   }
623 
624   /**
625    * We only want KVs that are scoped other than local
626    * @param entry The entry to check for replication
627    */
628   protected void removeNonReplicableEdits(HLog.Entry entry) {
629     String tabName = entry.getKey().getTablename().getNameAsString();
630     ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
631     Map<String, List<String>> tableCFs = null;
632     try {
633       tableCFs = this.replicationPeers.getTableCFs(peerId);
634     } catch (IllegalArgumentException e) {
635       LOG.error("should not happen: can't get tableCFs for peer " + peerId +
636           ", degenerate as if it's not configured by keeping tableCFs==null");
637     }
638     int size = kvs.size();
639 
640     // clear kvs(prevent replicating) if logKey's table isn't in this peer's
641     // replicable table list (empty tableCFs means all table are replicable)
642     if (tableCFs != null && !tableCFs.containsKey(tabName)) {
643       kvs.clear();
644     } else {
645       NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
646       List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
647       for (int i = size - 1; i >= 0; i--) {
648         KeyValue kv = kvs.get(i);
649         // The scope will be null or empty if
650         // there's nothing to replicate in that WALEdit
651         // ignore(remove) kv if its cf isn't in the replicable cf list
652         // (empty cfs means all cfs of this table are replicable)
653         if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
654             (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
655           kvs.remove(i);
656         }
657       }
658     }
659 
660     if (kvs.size() < size/2) {
661       kvs.trimToSize();
662     }
663   }
664 
665   /**
666    * Count the number of different row keys in the given edit because of
667    * mini-batching. We assume that there's at least one KV in the WALEdit.
668    * @param edit edit to count row keys from
669    * @return number of different row keys
670    */
671   private int countDistinctRowKeys(WALEdit edit) {
672     List<KeyValue> kvs = edit.getKeyValues();
673     int distinctRowKeys = 1;
674     KeyValue lastKV = kvs.get(0);
675     for (int i = 0; i < edit.size(); i++) {
676       if (!CellUtil.matchingRow(kvs.get(i), lastKV)) {
677         distinctRowKeys++;
678       }
679     }
680     return distinctRowKeys;
681   }
682 
683   /**
684    * Do the shipping logic
685    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
686    * written to when this method was called
687    */
688   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
689     int sleepMultiplier = 1;
690     if (entries.isEmpty()) {
691       LOG.warn("Was given 0 edits to ship");
692       return;
693     }
694     while (this.isActive()) {
695       if (!isPeerEnabled()) {
696         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
697           sleepMultiplier++;
698         }
699         continue;
700       }
701       SinkPeer sinkPeer = null;
702       try {
703         if (this.throttler.isEnabled()) {
704           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
705           if (sleepTicks > 0) {
706             try {
707               if (LOG.isTraceEnabled()) {
708                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
709               }
710               Thread.sleep(sleepTicks);
711             } catch (InterruptedException e) {
712               LOG.debug("Interrupted while sleeping for throttling control");
713               Thread.currentThread().interrupt();
714               // current thread might be interrupted to terminate
715               // directly go back to while() for confirm this
716               continue;
717             }
718             // reset throttler's cycle start tick when sleep for throttling occurs
719             this.throttler.resetStartTick();
720           }
721         }
722         sinkPeer = replicationSinkMgr.getReplicationSink();
723         BlockingInterface rrs = sinkPeer.getRegionServer();
724         if (LOG.isTraceEnabled()) {
725           LOG.trace("Replicating " + entries.size() +
726               " entries of total size " + currentSize);
727         }
728         ReplicationProtbufUtil.replicateWALEntry(rrs,
729             entries.toArray(new HLog.Entry[entries.size()]));
730         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
731           this.manager.logPositionAndCleanOldLogs(this.currentPath,
732               this.peerClusterZnode, this.repLogReader.getPosition(),
733               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
734           this.lastLoggedPosition = this.repLogReader.getPosition();
735         }
736         if (this.throttler.isEnabled()) {
737           this.throttler.addPushSize(currentSize);
738         }
739         this.totalReplicatedEdits += entries.size();
740         this.totalReplicatedOperations += currentNbOperations;
741         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
742         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
743         if (LOG.isTraceEnabled()) {
744           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
745               + this.totalReplicatedOperations + " operations");
746         }
747         break;
748 
749       } catch (IOException ioe) {
750         // Didn't ship anything, but must still age the last time we did
751         this.metrics.refreshAgeOfLastShippedOp();
752         if (ioe instanceof RemoteException) {
753           ioe = ((RemoteException) ioe).unwrapRemoteException();
754           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
755           if (ioe instanceof TableNotFoundException) {
756             if (sleepForRetries("A table is missing in the peer cluster. "
757                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
758               sleepMultiplier++;
759             }
760             // current thread might be interrupted to terminate
761             // directly go back to while() for confirm this
762             if (isInterrupted()) {
763               continue;
764             }
765           }
766         } else {
767           if (ioe instanceof SocketTimeoutException) {
768             // This exception means we waited for more than 60s and nothing
769             // happened, the cluster is alive and calling it right away
770             // even for a test just makes things worse.
771             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
772               "call to the remote cluster timed out, which is usually " +
773               "caused by a machine failure or a massive slowdown",
774               this.socketTimeoutMultiplier);
775             // current thread might be interrupted to terminate
776             // directly go back to while() for confirm this
777             if (isInterrupted()) {
778               continue;
779             }
780           } else if (ioe instanceof ConnectException) {
781             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
782             replicationSinkMgr.chooseSinks();
783           } else {
784             LOG.warn("Can't replicate because of a local or network error: ", ioe);
785           }
786         }
787 
788         if (sinkPeer != null) {
789           replicationSinkMgr.reportBadSink(sinkPeer);
790         }
791         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
792           sleepMultiplier++;
793         }
794       }
795     }
796   }
797 
798   /**
799    * check whether the peer is enabled or not
800    *
801    * @return true if the peer is enabled, otherwise false
802    */
803   protected boolean isPeerEnabled() {
804     return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
805   }
806 
807   /**
808    * If the queue isn't empty, switch to the next one
809    * Else if this is a recovered queue, it means we're done!
810    * Else we'll just continue to try reading the log file
811    * @return true if we're done with the current file, false if we should
812    * continue trying to read from it
813    */
814   protected boolean processEndOfFile() {
815     if (this.queue.size() != 0) {
816       if (LOG.isTraceEnabled()) {
817         String filesize = "N/A";
818         try {
819           FileStatus stat = this.fs.getFileStatus(this.currentPath);
820           filesize = stat.getLen()+"";
821         } catch (IOException ex) {}
822         LOG.trace("Reached the end of a log, stats: " + getStats() +
823             ", and the length of the file is " + filesize);
824       }
825       this.currentPath = null;
826       this.repLogReader.finishCurrentFile();
827       this.reader = null;
828       return true;
829     } else if (this.replicationQueueInfo.isQueueRecovered()) {
830       this.manager.closeRecoveredQueue(this);
831       LOG.info("Finished recovering the queue with the following stats " + getStats());
832       this.running = false;
833       return true;
834     }
835     return false;
836   }
837 
838   public void startup() {
839     String n = Thread.currentThread().getName();
840     Thread.UncaughtExceptionHandler handler =
841         new Thread.UncaughtExceptionHandler() {
842           public void uncaughtException(final Thread t, final Throwable e) {
843             LOG.error("Unexpected exception in ReplicationSource," +
844               " currentPath=" + currentPath, e);
845           }
846         };
847     Threads.setDaemonThreadRunning(
848         this, n + ".replicationSource," +
849         this.peerClusterZnode, handler);
850   }
851 
852   public void terminate(String reason) {
853     terminate(reason, null);
854   }
855 
856   public void terminate(String reason, Exception cause) {
857     if (cause == null) {
858       LOG.info("Closing source "
859           + this.peerClusterZnode + " because: " + reason);
860 
861     } else {
862       LOG.error("Closing source " + this.peerClusterZnode
863           + " because an error occurred: " + reason, cause);
864     }
865     this.running = false;
866     this.interrupt();
867     Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
868   }
869 
870   public String getPeerClusterZnode() {
871     return this.peerClusterZnode;
872   }
873 
874   public String getPeerClusterId() {
875     return this.peerId;
876   }
877 
878   public Path getCurrentPath() {
879     return this.currentPath;
880   }
881 
882   private boolean isActive() {
883     return !this.stopper.isStopped() && this.running && !isInterrupted();
884   }
885 
886   /**
887    * Comparator used to compare logs together based on their start time
888    */
889   public static class LogsComparator implements Comparator<Path> {
890 
891     @Override
892     public int compare(Path o1, Path o2) {
893       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
894     }
895 
896     /**
897      * Split a path to get the start time
898      * For example: 10.20.20.171%3A60020.1277499063250
899      * @param p path to split
900      * @return start time
901      */
902     private long getTS(Path p) {
903       String[] parts = p.getName().split("\\.");
904       return Long.parseLong(parts[parts.length-1]);
905     }
906   }
907 
908   @Override
909   public String getStats() {
910     long position = this.repLogReader.getPosition();
911     return "Total replicated edits: " + totalReplicatedEdits +
912       ", currently replicating from: " + this.currentPath +
913       " at position: " + position;
914   }
915 }