View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.SocketTimeoutException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CompletionService;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorCompletionService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.TableNotFoundException;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.HConnectionManager;
47  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
48  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.wal.WAL.Entry;
51  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
53  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
54  import org.apache.hadoop.ipc.RemoteException;
55  
56  /**
57   * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
58   * implementation for replicating to another HBase cluster.
59   * For the slave cluster it selects a random number of peers
60   * using a replication ratio. For example, if replication ration = 0.1
61   * and slave cluster has 100 region servers, 10 will be selected.
62   * <p>
63   * A stream is considered down when we cannot contact a region server on the
64   * peer cluster for more than 55 seconds by default.
65   * </p>
66   */
67  @InterfaceAudience.Private
68  public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
69  
70    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
71    private HConnection conn;
72  
73    private Configuration conf;
74  
75    // How long should we sleep for each retry
76    private long sleepForRetries;
77  
78    // Maximum number of retries before taking bold actions
79    private int maxRetriesMultiplier;
80    // Socket timeouts require even bolder actions since we don't want to DDOS
81    private int socketTimeoutMultiplier;
82    //Metrics for this source
83    private MetricsSource metrics;
84    // Handles connecting to peer region servers
85    private ReplicationSinkManager replicationSinkMgr;
86    private boolean peersSelected = false;
87    private ThreadPoolExecutor exec;
88    private int maxThreads;
89  
90    @Override
91    public void init(Context context) throws IOException {
92      super.init(context);
93      this.conf = HBaseConfiguration.create(ctx.getConfiguration());
94      decorateConf();
95      this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
96      this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
97          maxRetriesMultiplier);
98      // TODO: This connection is replication specific or we should make it particular to
99      // replication and make replication specific settings such as compression or codec to use
100     // passing Cells.
101     this.conn = HConnectionManager.createConnection(this.conf);
102     this.sleepForRetries =
103         this.conf.getLong("replication.source.sleepforretries", 1000);
104     this.metrics = context.getMetrics();
105     // ReplicationQueueInfo parses the peerId out of the znode for us
106     this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
107     // per sink thread pool
108     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
109       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
110 
111     this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
112         new LinkedBlockingQueue<Runnable>());
113     this.exec.allowCoreThreadTimeOut(true);
114   }
115 
116   private void decorateConf() {
117     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
118     if (StringUtils.isNotEmpty(replicationCodec)) {
119       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
120     }
121   }
122 
123   private void connectToPeers() {
124     getRegionServers();
125 
126     int sleepMultiplier = 1;
127 
128     // Connect to peer cluster first, unless we have to stop
129     while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
130       replicationSinkMgr.chooseSinks();
131       if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
132         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
133           sleepMultiplier++;
134         }
135       }
136     }
137   }
138 
139   /**
140    * Do the sleeping logic
141    * @param msg Why we sleep
142    * @param sleepMultiplier by how many times the default sleeping time is augmented
143    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
144    */
145   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
146     try {
147       if (LOG.isTraceEnabled()) {
148         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
149       }
150       Thread.sleep(this.sleepForRetries * sleepMultiplier);
151     } catch (InterruptedException e) {
152       LOG.debug("Interrupted while sleeping between retries");
153     }
154     return sleepMultiplier < maxRetriesMultiplier;
155   }
156 
157   /**
158    * Do the shipping logic
159    */
160   @Override
161   public boolean replicate(ReplicateContext replicateContext) {
162     CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
163     List<Entry> entries = replicateContext.getEntries();
164     String walGroupId = replicateContext.getWalGroupId();
165     int sleepMultiplier = 1;
166     int numReplicated = 0;
167 
168     if (!peersSelected && this.isRunning()) {
169       connectToPeers();
170       peersSelected = true;
171     }
172 
173     int numSinks = replicationSinkMgr.getNumSinks();
174     if (numSinks == 0) {
175       LOG.warn("No replication sinks found, returning without replicating. The source should retry"
176           + " with the same set of edits.");
177       return false;
178     }
179 
180     // minimum of: configured threads, number of 100-waledit batches,
181     //  and number of current sinks
182     int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
183 
184     List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
185     if (n == 1) {
186       entryLists.add(entries);
187     } else {
188       for (int i=0; i<n; i++) {
189         entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
190       }
191       // now group by region
192       for (Entry e : entries) {
193         entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
194       }
195     }
196     while (this.isRunning()) {
197       if (!isPeerEnabled()) {
198         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
199           sleepMultiplier++;
200         }
201         continue;
202       }
203       try {
204         if (LOG.isTraceEnabled()) {
205           LOG.trace("Replicating " + entries.size() +
206               " entries of total size " + replicateContext.getSize());
207         }
208 
209         int futures = 0;
210         for (int i=0; i<entryLists.size(); i++) {
211           if (!entryLists.get(i).isEmpty()) {
212             if (LOG.isTraceEnabled()) {
213               LOG.trace("Submitting " + entryLists.get(i).size() +
214                   " entries of total size " + replicateContext.getSize());
215             }
216             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
217             pool.submit(createReplicator(entryLists.get(i), i));
218             futures++;
219           }
220         }
221         IOException iox = null;
222 
223         for (int i=0; i<futures; i++) {
224           try {
225             // wait for all futures, remove successful parts
226             // (only the remaining parts will be retried)
227             Future<Integer> f = pool.take();
228             int index = f.get().intValue();
229             int batchSize =  entryLists.get(index).size();
230             entryLists.set(index, Collections.<Entry>emptyList());
231             // Now, we have marked the batch as done replicating, record its size
232             numReplicated += batchSize;
233           } catch (InterruptedException ie) {
234             iox =  new IOException(ie);
235           } catch (ExecutionException ee) {
236             // cause must be an IOException
237             iox = (IOException)ee.getCause();
238           }
239         }
240         if (iox != null) {
241           // if we had any exceptions, try again
242           throw iox;
243         }
244         if (numReplicated != entries.size()) {
245           // Something went wrong here and we don't know what, let's just fail and retry.
246           LOG.warn("The number of edits replicated is different from the number received,"
247               + " failing for now.");
248           return false;
249         }
250         // update metrics
251         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
252           walGroupId);
253         return true;
254 
255       } catch (IOException ioe) {
256         // Didn't ship anything, but must still age the last time we did
257         this.metrics.refreshAgeOfLastShippedOp(walGroupId);
258         if (ioe instanceof RemoteException) {
259           ioe = ((RemoteException) ioe).unwrapRemoteException();
260           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
261           if (ioe instanceof TableNotFoundException) {
262             if (sleepForRetries("A table is missing in the peer cluster. "
263                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
264               sleepMultiplier++;
265             }
266           }
267         } else {
268           if (ioe instanceof SocketTimeoutException) {
269             // This exception means we waited for more than 60s and nothing
270             // happened, the cluster is alive and calling it right away
271             // even for a test just makes things worse.
272             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
273               "call to the remote cluster timed out, which is usually " +
274               "caused by a machine failure or a massive slowdown",
275               this.socketTimeoutMultiplier);
276           } else if (ioe instanceof ConnectException) {
277             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
278             replicationSinkMgr.chooseSinks();
279           } else {
280             LOG.warn("Can't replicate because of a local or network error: ", ioe);
281           }
282         }
283         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
284           sleepMultiplier++;
285         }
286       }
287     }
288     return false; // in case we exited before replicating
289   }
290 
291   protected boolean isPeerEnabled() {
292     return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
293   }
294 
295   @Override
296   protected void doStop() {
297     disconnect(); //don't call super.doStop()
298     if (this.conn != null) {
299       try {
300         this.conn.close();
301         this.conn = null;
302       } catch (IOException e) {
303         LOG.warn("Failed to close the connection");
304       }
305     }
306     exec.shutdownNow();
307     notifyStopped();
308   }
309 
310   // is this needed? Nobody else will call doStop() otherwise
311   @Override
312   public State stopAndWait() {
313     doStop();
314     return super.stopAndWait();
315   }
316 
317   @VisibleForTesting
318   protected Replicator createReplicator(List<Entry> entries, int ordinal) {
319     return new Replicator(entries, ordinal);
320   }
321 
322   @VisibleForTesting
323   protected class Replicator implements Callable<Integer> {
324     private List<Entry> entries;
325     private int ordinal;
326     public Replicator(List<Entry> entries, int ordinal) {
327       this.entries = entries;
328       this.ordinal = ordinal;
329     }
330 
331     @Override
332     public Integer call() throws IOException {
333       SinkPeer sinkPeer = null;
334       try {
335         sinkPeer = replicationSinkMgr.getReplicationSink();
336         BlockingInterface rrs = sinkPeer.getRegionServer();
337         ReplicationProtbufUtil.replicateWALEntry(rrs,
338             entries.toArray(new Entry[entries.size()]));
339         replicationSinkMgr.reportSinkSuccess(sinkPeer);
340         return ordinal;
341 
342       } catch (IOException ioe) {
343         if (sinkPeer != null) {
344           replicationSinkMgr.reportBadSink(sinkPeer);
345         }
346         throw ioe;
347       }
348     }
349 
350   }
351 }