View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.EOFException;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.net.ConnectException;
26  import java.net.SocketTimeoutException;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.HashSet;
31  import java.util.List;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.UUID;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.PriorityBlockingQueue;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.Stoppable;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.client.HConnection;
52  import org.apache.hadoop.hbase.client.HConnectionManager;
53  import org.apache.hadoop.hbase.ipc.HRegionInterface;
54  import org.apache.hadoop.hbase.regionserver.wal.HLog;
55  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Threads;
60  import org.apache.hadoop.ipc.RemoteException;
61  import org.apache.zookeeper.KeeperException;
62  
63  /**
64   * Class that handles the source of a replication stream.
65   * Currently does not handle more than 1 slave
66   * For each slave cluster it selects a random number of peers
67   * using a replication ratio. For example, if replication ration = 0.1
68   * and slave cluster has 100 region servers, 10 will be selected.
69   * <p/>
70   * A stream is considered down when we cannot contact a region server on the
71   * peer cluster for more than 55 seconds by default.
72   * <p/>
73   *
74   */
75  public class ReplicationSource extends Thread
76      implements ReplicationSourceInterface {
77  
78    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
79    // Queue of logs to process
80    private PriorityBlockingQueue<Path> queue;
81    private HConnection conn;
82    // Helper class for zookeeper
83    private ReplicationZookeeper zkHelper;
84    private Configuration conf;
85    // ratio of region servers to chose from a slave cluster
86    private float ratio;
87    private Random random;
88    // should we replicate or not?
89    private AtomicBoolean replicating;
90    // id of the peer cluster this source replicates to
91    private String peerId;
92    // The manager of all sources to which we ping back our progress
93    private ReplicationSourceManager manager;
94    // Should we stop everything?
95    private Stoppable stopper;
96    // List of chosen sinks (region servers)
97    private List<ServerName> currentPeers;
98    // How long should we sleep for each retry
99    private long sleepForRetries;
100   // Max size in bytes of entriesArray
101   private long replicationQueueSizeCapacity;
102   // Max number of entries in entriesArray
103   private int replicationQueueNbCapacity;
104   // Our reader for the current log
105   private HLog.Reader reader;
106   // Last position in the log that we sent to ZooKeeper
107   private long lastLoggedPosition = -1;
108   // Path of the current log
109   private volatile Path currentPath;
110   private FileSystem fs;
111   // id of this cluster
112   private UUID clusterId;
113   // id of the other cluster
114   private UUID peerClusterId;
115   // total number of edits we replicated
116   private long totalReplicatedEdits = 0;
117   // The znode we currently play with
118   private String peerClusterZnode;
119   // Indicates if this queue is recovered (and will be deleted when depleted)
120   private boolean queueRecovered;
121   // List of all the dead region servers that had this queue (if recovered)
122   private List<String> deadRegionServers = new ArrayList<String>();
123   // Maximum number of retries before taking bold actions
124   private int maxRetriesMultiplier;
125   // Socket timeouts require even bolder actions since we don't want to DDOS
126   private int socketTimeoutMultiplier;
127   // Current number of operations (Put/Delete) that we need to replicate
128   private int currentNbOperations = 0;
129   // Current size of data we need to replicate
130   private int currentSize = 0;
131   // Indicates if this particular source is running
132   private volatile boolean running = true;
133   // Metrics for this source
134   private ReplicationSourceMetrics metrics;
135   // Handle on the log reader helper
136   private ReplicationHLogReaderManager repLogReader;
137 
138 
139   /**
140    * Instantiation method used by region servers
141    *
142    * @param conf configuration to use
143    * @param fs file system to use
144    * @param manager replication manager to ping to
145    * @param stopper     the atomic boolean to use to stop the regionserver
146    * @param replicating the atomic boolean that starts/stops replication
147    * @param peerClusterZnode the name of our znode
148    * @throws IOException
149    */
150   public void init(final Configuration conf,
151                    final FileSystem fs,
152                    final ReplicationSourceManager manager,
153                    final Stoppable stopper,
154                    final AtomicBoolean replicating,
155                    final String peerClusterZnode)
156       throws IOException {
157     this.stopper = stopper;
158     this.conf = conf;
159     this.replicationQueueSizeCapacity =
160         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
161     this.replicationQueueNbCapacity =
162         this.conf.getInt("replication.source.nb.capacity", 25000);
163     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
164     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
165         maxRetriesMultiplier * maxRetriesMultiplier);
166     this.queue =
167         new PriorityBlockingQueue<Path>(
168             conf.getInt("hbase.regionserver.maxlogs", 32),
169             new LogsComparator());
170     this.conn = HConnectionManager.getConnection(conf);
171     this.zkHelper = manager.getRepZkWrapper();
172     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
173     this.currentPeers = new ArrayList<ServerName>();
174     this.random = new Random();
175     this.replicating = replicating;
176     this.manager = manager;
177     this.sleepForRetries =
178         this.conf.getLong("replication.source.sleepforretries", 1000);
179     this.fs = fs;
180     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
181     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
182     try {
183       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
184     } catch (KeeperException ke) {
185       throw new IOException("Could not read cluster id", ke);
186     }
187 
188     // Finally look if this is a recovered queue
189     this.checkIfQueueRecovered(peerClusterZnode);
190   }
191 
192   // The passed znode will be either the id of the peer cluster or
193   // the handling story of that queue in the form of id-servername-*
194   //
195   // package access for testing
196   void checkIfQueueRecovered(String peerClusterZnode) {
197     String[] parts = peerClusterZnode.split("-", 2);
198     this.queueRecovered = parts.length != 1;
199     this.peerId = this.queueRecovered ?
200         parts[0] : peerClusterZnode;
201     this.peerClusterZnode = peerClusterZnode;
202 
203     if (parts.length < 2) {
204       // not queue recovered situation
205       return;
206     }
207 
208     // extract dead servers
209     extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
210   }
211 
212   /**
213    * for tests only
214    */
215   List<String> getDeadRegionServers() {
216     return Collections.unmodifiableList(this.deadRegionServers);
217   }
218 
219   /**
220    * Parse dead server names from znode string servername can contain "-" such as
221    * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
222    * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
223    */
224   private static void
225       extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
226 
227     if (deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
228 
229     // valid server name delimiter "-" has to be after "," in a server name
230     int seenCommaCnt = 0;
231     int startIndex = 0;
232     int len = deadServerListStr.length();
233 
234     for (int i = 0; i < len; i++) {
235       switch (deadServerListStr.charAt(i)) {
236       case ',':
237         seenCommaCnt += 1;
238         break;
239       case '-':
240         if (seenCommaCnt >= 2) {
241           if (i > startIndex) {
242             result.add(deadServerListStr.substring(startIndex, i));
243             startIndex = i + 1;
244           }
245           seenCommaCnt = 0;
246         }
247         break;
248       default:
249         break;
250       }
251     }
252 
253     // add tail
254     if (startIndex < len - 1) {
255       result.add(deadServerListStr.substring(startIndex, len));
256     }
257 
258     LOG.debug("Found dead servers:" + result);
259   }
260 
261   /**
262    * Select a number of peers at random using the ratio. Mininum 1.
263    */
264   private void chooseSinks() {
265     this.currentPeers.clear();
266     List<ServerName> addresses = this.zkHelper.getSlavesAddresses(peerId);
267     Set<ServerName> setOfAddr = new HashSet<ServerName>();
268     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
269     LOG.info("Getting " + nbPeers +
270         " rs from peer cluster # " + peerId);
271     for (int i = 0; i < nbPeers; i++) {
272       ServerName sn;
273       // Make sure we get one address that we don't already have
274       do {
275         sn = addresses.get(this.random.nextInt(addresses.size()));
276       } while (setOfAddr.contains(sn));
277       LOG.info("Choosing peer " + sn);
278       setOfAddr.add(sn);
279     }
280     this.currentPeers.addAll(setOfAddr);
281   }
282 
283   @Override
284   public void enqueueLog(Path log) {
285     this.queue.put(log);
286     this.metrics.sizeOfLogQueue.set(queue.size());
287   }
288 
289   @Override
290   public void run() {
291     connectToPeers();
292     int sleepMultiplier = 1;
293     // delay this until we are in an asynchronous thread
294     while (this.isActive() && this.peerClusterId == null) {
295       this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
296       if (this.isActive() && this.peerClusterId == null) {
297         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
298           sleepMultiplier++;
299         }
300       }
301     }
302     // We were stopped while looping to connect to sinks, just abort
303     if (!this.isActive()) {
304       return;
305     }
306     // resetting to 1 to reuse later
307     sleepMultiplier = 1;
308 
309     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
310     // peerClusterId value, which is the same as the source clusterId
311     if (clusterId.equals(peerClusterId)) {
312       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
313           + peerClusterId);
314     }
315     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
316 
317     // If this is recovered, the queue is already full and the first log
318     // normally has a position (unless the RS failed between 2 logs)
319     if (this.queueRecovered) {
320       try {
321         this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
322             this.peerClusterZnode, this.queue.peek().getName()));
323       } catch (KeeperException e) {
324         this.terminate("Couldn't get the position of this recovered queue " +
325             peerClusterZnode, e);
326       }
327     }
328     // Loop until we close down
329     while (isActive()) {
330       // Sleep until replication is enabled again
331       if (!isPeerEnabled()) {
332         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
333           sleepMultiplier++;
334         }
335         continue;
336       }
337       Path oldPath = getCurrentPath(); //note that in the current scenario,
338                                        //oldPath will be null when a log roll
339                                        //happens.
340       // Get a new path
341       boolean hasCurrentPath = getNextPath();
342       if (getCurrentPath() != null && oldPath == null) {
343         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
344       }
345       if (!hasCurrentPath) {
346         if (sleepForRetries("No log to process", sleepMultiplier)) {
347           sleepMultiplier++;
348         }
349         continue;
350       }
351       boolean currentWALisBeingWrittenTo = false;
352       //For WAL files we own (rather than recovered), take a snapshot of whether the
353       //current WAL file (this.currentPath) is in use (for writing) NOW!
354       //Since the new WAL paths are enqueued only after the prev WAL file
355       //is 'closed', presence of an element in the queue means that
356       //the previous WAL file was closed, else the file is in use (currentPath)
357       //We take the snapshot now so that we are protected against races
358       //where a new file gets enqueued while the current file is being processed
359       //(and where we just finished reading the current file).
360       if (!this.queueRecovered && queue.size() == 0) {
361         currentWALisBeingWrittenTo = true;
362       }
363       // Open a reader on it
364       if (!openReader(sleepMultiplier)) {
365         // Reset the sleep multiplier, else it'd be reused for the next file
366         sleepMultiplier = 1;
367         continue;
368       }
369 
370       // If we got a null reader but didn't continue, then sleep and continue
371       if (this.reader == null) {
372         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
373           sleepMultiplier++;
374         }
375         continue;
376       }
377 
378       boolean gotIOE = false;
379       currentNbOperations = 0;
380       List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
381       currentSize = 0;
382       try {
383         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
384           continue;
385         }
386       } catch (IOException ioe) {
387         LOG.warn(peerClusterZnode + " Got: ", ioe);
388         gotIOE = true;
389         if (ioe.getCause() instanceof EOFException) {
390 
391           boolean considerDumping = false;
392           if (this.queueRecovered) {
393             try {
394               FileStatus stat = this.fs.getFileStatus(this.currentPath);
395               if (stat.getLen() == 0) {
396                 LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
397               }
398               considerDumping = true;
399             } catch (IOException e) {
400               LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
401             }
402           }
403 
404           if (considerDumping &&
405               sleepMultiplier == this.maxRetriesMultiplier &&
406               processEndOfFile()) {
407             continue;
408           }
409         }
410       } finally {
411         try {
412           this.reader = null;
413           this.repLogReader.closeReader();
414         } catch (IOException e) {
415           gotIOE = true;
416           LOG.warn("Unable to finalize the tailing of a file", e);
417         }
418       }
419 
420       // If we didn't get anything to replicate, or if we hit a IOE,
421       // wait a bit and retry.
422       // But if we need to stop, don't bother sleeping
423       if (this.isActive() && (gotIOE || entries.isEmpty())) {
424         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
425           this.manager.logPositionAndCleanOldLogs(this.currentPath,
426               this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
427           this.lastLoggedPosition = this.repLogReader.getPosition();
428         }
429         if (!gotIOE) {
430           // if there was nothing to ship and it's not an error
431           // set "ageOfLastShippedOp" to <now> to indicate that we're current
432           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
433         }
434         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
435           sleepMultiplier++;
436         }
437         continue;
438       }
439       sleepMultiplier = 1;
440       shipEdits(currentWALisBeingWrittenTo, entries);
441     }
442     if (this.conn != null) {
443       try {
444         this.conn.close();
445       } catch (IOException e) {
446         LOG.debug("Attempt to close connection failed", e);
447       }
448     }
449     metrics.stopReportMetrics();
450     LOG.debug("Source exiting " + peerId);
451   }
452 
453   /**
454    * Read all the entries from the current log files and retain those
455    * that need to be replicated. Else, process the end of the current file.
456    * @param currentWALisBeingWrittenTo is the current WAL being written to
457    * @param entries resulting entries to be replicated
458    * @return true if we got nothing and went to the next file, false if we got
459    * entries
460    * @throws IOException
461    */
462   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
463       throws IOException{
464     long seenEntries = 0;
465     this.repLogReader.seek();
466     HLog.Entry entry =
467         this.repLogReader.readNextAndSetPosition();
468     while (entry != null) {
469       WALEdit edit = entry.getEdit();
470       this.metrics.logEditsReadRate.inc(1);
471       seenEntries++;
472       // Remove all KVs that should not be replicated
473       HLogKey logKey = entry.getKey();
474       List<UUID> consumedClusterIds = edit.getClusterIds();
475       // This cluster id has been added to resolve the scenario of A -> B -> A where A has old
476       // point release and B has the new point release which has the fix HBASE-7709. A change on
477       // cluster A would infinitely replicate to
478       // cluster B if we don't add the original cluster id to the set.
479       consumedClusterIds.add(logKey.getClusterId());
480       // don't replicate if the log entries if it has not already been replicated
481       if (!consumedClusterIds.contains(peerClusterId)) {
482         removeNonReplicableEdits(edit);
483         // Don't replicate catalog entries, if the WALEdit wasn't
484         // containing anything to replicate and if we're currently not set to replicate
485         if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
486             Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
487             edit.size() != 0 && replicating.get()) {
488           // Only set the clusterId if is a local key.
489           // This ensures that the originator sets the cluster id
490           // and all replicas retain the initial cluster id.
491           // This is *only* place where a cluster id other than the default is set.
492           if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
493             logKey.setClusterId(this.clusterId);
494           } else if (logKey.getClusterId() != this.clusterId) {
495             edit.addClusterId(clusterId);
496           }
497           currentNbOperations += countDistinctRowKeys(edit);
498           entries.add(entry);
499           currentSize += entry.getEdit().heapSize();
500         } else {
501           this.metrics.logEditsFilteredRate.inc(1);
502         }
503       }
504       // Stop if too many entries or too big
505       if (currentSize >= this.replicationQueueSizeCapacity ||
506           entries.size() >= this.replicationQueueNbCapacity) {
507         break;
508       }
509       try {
510         entry = this.repLogReader.readNextAndSetPosition();
511       } catch (IOException ie) {
512         LOG.debug("Break on IOE: " + ie.getMessage());
513         break;
514       }
515     }
516     LOG.debug("currentNbOperations:" + currentNbOperations +
517         " and seenEntries:" + seenEntries +
518         " and size: " + this.currentSize);
519     if (currentWALisBeingWrittenTo) {
520       return false;
521     }
522     // If we didn't get anything and the queue has an object, it means we
523     // hit the end of the file for sure
524     return seenEntries == 0 && processEndOfFile();
525   }
526 
527   private void connectToPeers() {
528     int sleepMultiplier = 1;
529 
530     // Connect to peer cluster first, unless we have to stop
531     while (this.isActive() && this.currentPeers.size() == 0) {
532 
533       chooseSinks();
534       if (this.isActive() && this.currentPeers.size() == 0) {
535         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
536           sleepMultiplier++;
537         }
538       }
539     }
540   }
541 
542   /**
543    * Poll for the next path
544    * @return true if a path was obtained, false if not
545    */
546   protected boolean getNextPath() {
547     try {
548       if (this.currentPath == null) {
549         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
550         this.metrics.sizeOfLogQueue.set(queue.size());
551         if (this.currentPath != null) {
552           this.manager.cleanOldLogs(this.currentPath.getName(),
553               this.peerId,
554               this.queueRecovered);
555         }
556       }
557     } catch (InterruptedException e) {
558       LOG.warn("Interrupted while reading edits", e);
559     }
560     return this.currentPath != null;
561   }
562 
563   /**
564    * Open a reader on the current path
565    *
566    * @param sleepMultiplier by how many times the default sleeping time is augmented
567    * @return true if we should continue with that file, false if we are over with it
568    */
569   protected boolean openReader(int sleepMultiplier) {
570     try {
571       LOG.debug("Opening log for replication " + this.currentPath.getName() +
572           " at " + this.repLogReader.getPosition());
573       try {
574         this.reader = repLogReader.openReader(this.currentPath);
575       } catch (FileNotFoundException fnfe) {
576         if (this.queueRecovered) {
577           // We didn't find the log in the archive directory, look if it still
578           // exists in the dead RS folder (there could be a chain of failures
579           // to look at)
580           LOG.info("NB dead servers : " + deadRegionServers.size());
581           for (String curDeadServerName : deadRegionServers) {
582             Path deadRsDirectory =
583                 new Path(manager.getLogDir().getParent(), curDeadServerName);
584             Path[] locs = new Path[] {
585                 new Path(deadRsDirectory, currentPath.getName()),
586                 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
587                                           currentPath.getName()),
588             };
589             for (Path possibleLogLocation : locs) {
590               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
591               if (this.manager.getFs().exists(possibleLogLocation)) {
592                 // We found the right new location
593                 LOG.info("Log " + this.currentPath + " still exists at " +
594                     possibleLogLocation);
595                 // Breaking here will make us sleep since reader is null
596                 return true;
597               }
598             }
599           }
600           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
601           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
602           if (stopper instanceof ReplicationSyncUp.DummyServer) {
603             FileStatus[] rss = fs.listStatus(manager.getLogDir());
604             for (FileStatus rs : rss) {
605               Path p = rs.getPath();
606               FileStatus[] logs = fs.listStatus(p);
607               for (FileStatus log : logs) {
608                 p = new Path(p, log.getPath().getName());
609                 if (p.getName().equals(currentPath.getName())) {
610                   currentPath = p;
611                   LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
612                   // Open the log at the new location
613                   this.openReader(sleepMultiplier);
614                   return true;
615                 }
616               }
617             }
618           }
619 
620           // TODO What happens if the log was missing from every single location?
621           // Although we need to check a couple of times as the log could have
622           // been moved by the master between the checks
623           // It can also happen if a recovered queue wasn't properly cleaned,
624           // such that the znode pointing to a log exists but the log was
625           // deleted a long time ago.
626           // For the moment, we'll throw the IO and processEndOfFile
627           throw new IOException("File from recovered queue is " +
628               "nowhere to be found", fnfe);
629         } else {
630           // If the log was archived, continue reading from there
631           Path archivedLogLocation =
632               new Path(manager.getOldLogDir(), currentPath.getName());
633           if (this.manager.getFs().exists(archivedLogLocation)) {
634             currentPath = archivedLogLocation;
635             LOG.info("Log " + this.currentPath + " was moved to " +
636                 archivedLogLocation);
637             // Open the log at the new location
638             this.openReader(sleepMultiplier);
639 
640           }
641           // TODO What happens the log is missing in both places?
642         }
643       }
644     } catch (IOException ioe) {
645       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
646       LOG.warn(peerClusterZnode + " Got: ", ioe);
647       this.reader = null;
648       if (ioe.getCause() instanceof NullPointerException) {
649         // Workaround for race condition in HDFS-4380
650         // which throws a NPE if we open a file before any data node has the most recent block
651         // Just sleep and retry.  Will require re-reading compressed HLogs for compressionContext.
652         LOG.warn("Got NPE opening reader, will retry.");
653       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
654         // TODO Need a better way to determine if a file is really gone but
655         // TODO without scanning all logs dir  
656         LOG.warn("Waited too long for this file, considering dumping");
657         return !processEndOfFile();
658       }
659     }
660     return true;
661   }
662 
663   /*
664    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
665    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
666    * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
667    * may be empty, and we don't want to retry that.
668    */
669   private boolean isCurrentLogEmpty() {
670     return (this.repLogReader.getPosition() == 0 && !queueRecovered && queue.size() == 0);
671   }
672 
673   /**
674    * Do the sleeping logic
675    * @param msg Why we sleep
676    * @param sleepMultiplier by how many times the default sleeping time is augmented
677    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
678    */
679   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
680     try {
681       LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
682       Thread.sleep(this.sleepForRetries * sleepMultiplier);
683     } catch (InterruptedException e) {
684       LOG.debug("Interrupted while sleeping between retries");
685     }
686     return sleepMultiplier < maxRetriesMultiplier;
687   }
688 
689   /**
690    * We only want KVs that are scoped other than local
691    * @param edit The KV to check for replication
692    */
693   protected void removeNonReplicableEdits(WALEdit edit) {
694     // for backward compatibility WALEdit returns a List
695     ArrayList<KeyValue> kvs = (ArrayList<KeyValue>)edit.getKeyValues();
696     int size = edit.size();
697     for (int i = size-1; i >= 0; i--) {
698       KeyValue kv = kvs.get(i);
699       // The scope will be null or empty if
700       // there's nothing to replicate in that WALEdit
701       if (!edit.hasKeyInScope(kv.getFamily())) {
702         kvs.remove(i);
703       }
704     }
705     if (edit.size() < size/2) {
706       kvs.trimToSize();
707     }
708   }
709 
710   /**
711    * Count the number of different row keys in the given edit because of
712    * mini-batching. We assume that there's at least one KV in the WALEdit.
713    * @param edit edit to count row keys from
714    * @return number of different row keys
715    */
716   private int countDistinctRowKeys(WALEdit edit) {
717     List<KeyValue> kvs = edit.getKeyValues();
718     int distinctRowKeys = 1;
719     KeyValue lastKV = kvs.get(0);
720     for (int i = 0; i < edit.size(); i++) {
721       if (!kvs.get(i).matchingRow(lastKV)) {
722         distinctRowKeys++;
723       }
724     }
725     return distinctRowKeys;
726   }
727 
728   /**
729    * Do the shipping logic
730    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) 
731    * written to when this method was called
732    */
733   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
734     int sleepMultiplier = 1;
735     if (entries.isEmpty()) {
736       LOG.warn("Was given 0 edits to ship");
737       return;
738     }
739     while (this.isActive()) {
740       if (!isPeerEnabled()) {
741         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
742           sleepMultiplier++;
743         }
744         continue;
745       }
746       try {
747         HRegionInterface rrs = getRS();
748         LOG.debug("Replicating " + entries.size() + ", " + this.currentSize + " bytes");
749         // can't avoid the copy here, the replicateLogEntries RPC require an HLog.Entry[]
750         rrs.replicateLogEntries(entries.toArray(new HLog.Entry[entries.size()]));
751         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
752           this.manager.logPositionAndCleanOldLogs(this.currentPath,
753               this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
754           this.lastLoggedPosition = this.repLogReader.getPosition();
755         }
756         this.totalReplicatedEdits += entries.size();
757         this.metrics.shippedBatchesRate.inc(1);
758         this.metrics.shippedKBRate.inc(this.currentSize/1024);
759         this.metrics.shippedOpsRate.inc(this.currentNbOperations);
760         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
761         LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
762         break;
763 
764       } catch (IOException ioe) {
765         // Didn't ship anything, but must still age the last time we did
766         this.metrics.refreshAgeOfLastShippedOp();
767         if (ioe instanceof RemoteException) {
768           ioe = ((RemoteException) ioe).unwrapRemoteException();
769           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
770           if (ioe instanceof TableNotFoundException) {
771             if (sleepForRetries("A table is missing in the peer cluster. "
772                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
773               sleepMultiplier++;
774             }
775           }
776         } else {
777           if (ioe instanceof SocketTimeoutException) {
778             // This exception means we waited for more than 60s and nothing
779             // happened, the cluster is alive and calling it right away
780             // even for a test just makes things worse.
781             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
782               "call to the remote cluster timed out, which is usually " +
783               "caused by a machine failure or a massive slowdown",
784               this.socketTimeoutMultiplier);
785           } else if (ioe instanceof ConnectException) {
786             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
787             chooseSinks();
788           } else {
789             LOG.warn("Can't replicate because of a local or network error: ", ioe);
790           }
791         }
792 
793         try {
794           boolean down;
795           // Spin while the slave is down and we're not asked to shutdown/close
796           do {
797             down = isSlaveDown();
798             if (down) {
799               if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
800                 sleepMultiplier++;
801               } else {
802                 chooseSinks();
803               }
804             }
805           } while (this.isActive() && down );
806         } catch (InterruptedException e) {
807           LOG.debug("Interrupted while trying to contact the peer cluster");
808         }
809       }
810     }
811   }
812 
813   /**
814    * check whether the peer is enabled or not
815    *
816    * @return true if the peer is enabled, otherwise false
817    */
818   protected boolean isPeerEnabled() {
819     return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
820   }
821 
822   /**
823    * If the queue isn't empty, switch to the next one
824    * Else if this is a recovered queue, it means we're done!
825    * Else we'll just continue to try reading the log file
826    * @return true if we're done with the current file, false if we should
827    * continue trying to read from it
828    */
829   protected boolean processEndOfFile() {
830     if (this.queue.size() != 0) {
831       this.currentPath = null;
832       this.repLogReader.finishCurrentFile();
833       this.reader = null;
834       return true;
835     } else if (this.queueRecovered) {
836       this.manager.closeRecoveredQueue(this);
837       LOG.info("Finished recovering the queue");
838       this.running = false;
839       return true;
840     }
841     return false;
842   }
843 
844   public void startup() {
845     String n = Thread.currentThread().getName();
846     Thread.UncaughtExceptionHandler handler =
847         new Thread.UncaughtExceptionHandler() {
848           public void uncaughtException(final Thread t, final Throwable e) {
849             LOG.error("Unexpected exception in ReplicationSource," +
850               " currentPath=" + currentPath, e);
851           }
852         };
853     Threads.setDaemonThreadRunning(
854         this, n + ".replicationSource," + peerClusterZnode, handler);
855   }
856 
857   public void terminate(String reason) {
858     terminate(reason, null);
859   }
860 
861   public void terminate(String reason, Exception cause) {
862     if (cause == null) {
863       LOG.info("Closing source "
864           + this.peerClusterZnode + " because: " + reason);
865 
866     } else {
867       LOG.error("Closing source " + this.peerClusterZnode
868           + " because an error occurred: " + reason, cause);
869     }
870     this.running = false;
871     // Only wait for the thread to die if it's not us
872     if (!Thread.currentThread().equals(this)) {
873       Threads.shutdown(this, this.sleepForRetries);
874     }
875   }
876 
877   /**
878    * Get a new region server at random from this peer
879    * @return
880    * @throws IOException
881    */
882   private HRegionInterface getRS() throws IOException {
883     if (this.currentPeers.size() == 0) {
884       throw new IOException(this.peerClusterZnode + " has 0 region servers");
885     }
886     ServerName address =
887         currentPeers.get(random.nextInt(this.currentPeers.size()));
888     return this.conn.getHRegionConnection(address.getHostname(), address.getPort());
889   }
890 
891   /**
892    * Check if the slave is down by trying to establish a connection
893    * @return true if down, false if up
894    * @throws InterruptedException
895    */
896   public boolean isSlaveDown() throws InterruptedException {
897     final CountDownLatch latch = new CountDownLatch(1);
898     Thread pingThread = new Thread() {
899       public void run() {
900         try {
901           HRegionInterface rrs = getRS();
902           // Dummy call which should fail
903           rrs.getHServerInfo();
904           latch.countDown();
905         } catch (IOException ex) {
906           if (ex instanceof RemoteException) {
907             ex = ((RemoteException) ex).unwrapRemoteException();
908           }
909           LOG.info("Slave cluster looks down: " + ex.getMessage());
910         }
911       }
912     };
913     pingThread.start();
914     // awaits returns true if countDown happened
915     boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
916     pingThread.interrupt();
917     return down;
918   }
919 
920   public String getPeerClusterZnode() {
921     return this.peerClusterZnode;
922   }
923 
924   public String getPeerClusterId() {
925     return this.peerId;
926   }
927 
928   public Path getCurrentPath() {
929     return this.currentPath;
930   }
931 
932   private boolean isActive() {
933     return !this.stopper.isStopped() && this.running;
934   }
935 
936   /**
937    * Comparator used to compare logs together based on their start time
938    */
939   public static class LogsComparator implements Comparator<Path> {
940 
941     @Override
942     public int compare(Path o1, Path o2) {
943       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
944     }
945 
946     @Override
947     public boolean equals(Object o) {
948       return true;
949     }
950 
951     /**
952      * Split a path to get the start time
953      * For example: 10.20.20.171%3A60020.1277499063250
954      * @param p path to split
955      * @return start time
956      */
957     private long getTS(Path p) {
958       String[] parts = p.getName().split("\\.");
959       return Long.parseLong(parts[parts.length-1]);
960     }
961   }
962 }