View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.SocketTimeoutException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CompletionService;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorCompletionService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.TableNotFoundException;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.HConnectionManager;
47  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
48  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.wal.WAL.Entry;
51  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
53  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
54  import org.apache.hadoop.ipc.RemoteException;
55  import javax.security.sasl.SaslException;
56  
57  /**
58   * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
59   * implementation for replicating to another HBase cluster.
60   * For the slave cluster it selects a random number of peers
61   * using a replication ratio. For example, if replication ration = 0.1
62   * and slave cluster has 100 region servers, 10 will be selected.
63   * <p>
64   * A stream is considered down when we cannot contact a region server on the
65   * peer cluster for more than 55 seconds by default.
66   * </p>
67   */
68  @InterfaceAudience.Private
69  public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
70  
71    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
72    private HConnection conn;
73  
74    private Configuration conf;
75  
76    // How long should we sleep for each retry
77    private long sleepForRetries;
78  
79    // Maximum number of retries before taking bold actions
80    private int maxRetriesMultiplier;
81    // Socket timeouts require even bolder actions since we don't want to DDOS
82    private int socketTimeoutMultiplier;
83    //Metrics for this source
84    private MetricsSource metrics;
85    // Handles connecting to peer region servers
86    private ReplicationSinkManager replicationSinkMgr;
87    private boolean peersSelected = false;
88    private ThreadPoolExecutor exec;
89    private int maxThreads;
90  
91    @Override
92    public void init(Context context) throws IOException {
93      super.init(context);
94      this.conf = HBaseConfiguration.create(ctx.getConfiguration());
95      decorateConf();
96      this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
97      this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
98          maxRetriesMultiplier);
99      // TODO: This connection is replication specific or we should make it particular to
100     // replication and make replication specific settings such as compression or codec to use
101     // passing Cells.
102     this.conn = HConnectionManager.createConnection(this.conf);
103     this.sleepForRetries =
104         this.conf.getLong("replication.source.sleepforretries", 1000);
105     this.metrics = context.getMetrics();
106     // ReplicationQueueInfo parses the peerId out of the znode for us
107     this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
108     // per sink thread pool
109     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
110       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
111 
112     this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
113         new LinkedBlockingQueue<Runnable>());
114     this.exec.allowCoreThreadTimeOut(true);
115   }
116 
117   private void decorateConf() {
118     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
119     if (StringUtils.isNotEmpty(replicationCodec)) {
120       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
121     }
122   }
123 
124   private void connectToPeers() {
125     getRegionServers();
126 
127     int sleepMultiplier = 1;
128 
129     // Connect to peer cluster first, unless we have to stop
130     while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
131       replicationSinkMgr.chooseSinks();
132       if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
133         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
134           sleepMultiplier++;
135         }
136       }
137     }
138   }
139 
140   /**
141    * Do the sleeping logic
142    * @param msg Why we sleep
143    * @param sleepMultiplier by how many times the default sleeping time is augmented
144    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
145    */
146   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
147     try {
148       if (LOG.isTraceEnabled()) {
149         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
150       }
151       Thread.sleep(this.sleepForRetries * sleepMultiplier);
152     } catch (InterruptedException e) {
153       LOG.debug("Interrupted while sleeping between retries");
154     }
155     return sleepMultiplier < maxRetriesMultiplier;
156   }
157 
158   /**
159    * Do the shipping logic
160    */
161   @Override
162   public boolean replicate(ReplicateContext replicateContext) {
163     CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
164     List<Entry> entries = replicateContext.getEntries();
165     String walGroupId = replicateContext.getWalGroupId();
166     int sleepMultiplier = 1;
167     int numReplicated = 0;
168 
169     if (!peersSelected && this.isRunning()) {
170       connectToPeers();
171       peersSelected = true;
172     }
173 
174     int numSinks = replicationSinkMgr.getNumSinks();
175     if (numSinks == 0) {
176       LOG.warn("No replication sinks found, returning without replicating. The source should retry"
177           + " with the same set of edits.");
178       return false;
179     }
180 
181     // minimum of: configured threads, number of 100-waledit batches,
182     //  and number of current sinks
183     int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
184 
185     List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
186     if (n == 1) {
187       entryLists.add(entries);
188     } else {
189       for (int i=0; i<n; i++) {
190         entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
191       }
192       // now group by region
193       for (Entry e : entries) {
194         entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
195       }
196     }
197     while (this.isRunning()) {
198       if (!isPeerEnabled()) {
199         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
200           sleepMultiplier++;
201         }
202         continue;
203       }
204       try {
205         if (LOG.isTraceEnabled()) {
206           LOG.trace("Replicating " + entries.size() +
207               " entries of total size " + replicateContext.getSize());
208         }
209 
210         int futures = 0;
211         for (int i=0; i<entryLists.size(); i++) {
212           if (!entryLists.get(i).isEmpty()) {
213             if (LOG.isTraceEnabled()) {
214               LOG.trace("Submitting " + entryLists.get(i).size() +
215                   " entries of total size " + replicateContext.getSize());
216             }
217             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
218             pool.submit(createReplicator(entryLists.get(i), i));
219             futures++;
220           }
221         }
222         IOException iox = null;
223 
224         for (int i=0; i<futures; i++) {
225           try {
226             // wait for all futures, remove successful parts
227             // (only the remaining parts will be retried)
228             Future<Integer> f = pool.take();
229             int index = f.get().intValue();
230             int batchSize =  entryLists.get(index).size();
231             entryLists.set(index, Collections.<Entry>emptyList());
232             // Now, we have marked the batch as done replicating, record its size
233             numReplicated += batchSize;
234           } catch (InterruptedException ie) {
235             iox =  new IOException(ie);
236           } catch (ExecutionException ee) {
237             // cause must be an IOException
238             iox = (IOException)ee.getCause();
239           }
240         }
241         if (iox != null) {
242           // if we had any exceptions, try again
243           throw iox;
244         }
245         if (numReplicated != entries.size()) {
246           // Something went wrong here and we don't know what, let's just fail and retry.
247           LOG.warn("The number of edits replicated is different from the number received,"
248               + " failing for now.");
249           return false;
250         }
251         // update metrics
252         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
253           walGroupId);
254         return true;
255 
256       } catch (IOException ioe) {
257         // Didn't ship anything, but must still age the last time we did
258         this.metrics.refreshAgeOfLastShippedOp(walGroupId);
259         if (ioe instanceof RemoteException) {
260           ioe = ((RemoteException) ioe).unwrapRemoteException();
261           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
262           if (ioe instanceof TableNotFoundException) {
263             if (sleepForRetries("A table is missing in the peer cluster. "
264                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
265               sleepMultiplier++;
266             }
267           } else {
268             LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
269             replicationSinkMgr.chooseSinks();
270           }
271         } else {
272           if (ioe instanceof SocketTimeoutException) {
273             // This exception means we waited for more than 60s and nothing
274             // happened, the cluster is alive and calling it right away
275             // even for a test just makes things worse.
276             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
277               "call to the remote cluster timed out, which is usually " +
278               "caused by a machine failure or a massive slowdown",
279               this.socketTimeoutMultiplier);
280           } else if (ioe instanceof ConnectException) {
281             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
282             replicationSinkMgr.chooseSinks();
283           } else {
284             LOG.warn("Can't replicate because of a local or network error: ", ioe);
285           }
286         }
287         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
288           sleepMultiplier++;
289         }
290       }
291     }
292     return false; // in case we exited before replicating
293   }
294 
295   protected boolean isPeerEnabled() {
296     return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
297   }
298 
299   @Override
300   protected void doStop() {
301     disconnect(); //don't call super.doStop()
302     if (this.conn != null) {
303       try {
304         this.conn.close();
305         this.conn = null;
306       } catch (IOException e) {
307         LOG.warn("Failed to close the connection");
308       }
309     }
310     exec.shutdownNow();
311     notifyStopped();
312   }
313 
314   // is this needed? Nobody else will call doStop() otherwise
315   @Override
316   public State stopAndWait() {
317     doStop();
318     return super.stopAndWait();
319   }
320 
321   @VisibleForTesting
322   protected Replicator createReplicator(List<Entry> entries, int ordinal) {
323     return new Replicator(entries, ordinal);
324   }
325 
326   @VisibleForTesting
327   protected class Replicator implements Callable<Integer> {
328     private List<Entry> entries;
329     private int ordinal;
330     public Replicator(List<Entry> entries, int ordinal) {
331       this.entries = entries;
332       this.ordinal = ordinal;
333     }
334 
335     @Override
336     public Integer call() throws IOException {
337       SinkPeer sinkPeer = null;
338       try {
339         sinkPeer = replicationSinkMgr.getReplicationSink();
340         BlockingInterface rrs = sinkPeer.getRegionServer();
341         ReplicationProtbufUtil.replicateWALEntry(rrs,
342             entries.toArray(new Entry[entries.size()]));
343         replicationSinkMgr.reportSinkSuccess(sinkPeer);
344         return ordinal;
345 
346       } catch (IOException ioe) {
347         if (sinkPeer != null) {
348           replicationSinkMgr.reportBadSink(sinkPeer);
349         }
350         throw ioe;
351       }
352     }
353 
354   }
355 }