1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.SocketTimeoutException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CompletionService;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorCompletionService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.TableNotFoundException;
45 import org.apache.hadoop.hbase.client.HConnection;
46 import org.apache.hadoop.hbase.client.HConnectionManager;
47 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
48 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.wal.WAL.Entry;
51 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
52 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
53 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
54 import org.apache.hadoop.ipc.RemoteException;
55 import javax.security.sasl.SaslException;
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Private
69 public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
70
71 private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
72 private HConnection conn;
73
74 private Configuration conf;
75
76
77 private long sleepForRetries;
78
79
80 private int maxRetriesMultiplier;
81
82 private int socketTimeoutMultiplier;
83
84 private MetricsSource metrics;
85
86 private ReplicationSinkManager replicationSinkMgr;
87 private boolean peersSelected = false;
88 private ThreadPoolExecutor exec;
89 private int maxThreads;
90
91 @Override
92 public void init(Context context) throws IOException {
93 super.init(context);
94 this.conf = HBaseConfiguration.create(ctx.getConfiguration());
95 decorateConf();
96 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
97 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
98 maxRetriesMultiplier);
99
100
101
102 this.conn = HConnectionManager.createConnection(this.conf);
103 this.sleepForRetries =
104 this.conf.getLong("replication.source.sleepforretries", 1000);
105 this.metrics = context.getMetrics();
106
107 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
108
109 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
110 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
111
112 this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
113 new LinkedBlockingQueue<Runnable>());
114 this.exec.allowCoreThreadTimeOut(true);
115 }
116
117 private void decorateConf() {
118 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
119 if (StringUtils.isNotEmpty(replicationCodec)) {
120 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
121 }
122 }
123
124 private void connectToPeers() {
125 getRegionServers();
126
127 int sleepMultiplier = 1;
128
129
130 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
131 replicationSinkMgr.chooseSinks();
132 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
133 if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
134 sleepMultiplier++;
135 }
136 }
137 }
138 }
139
140
141
142
143
144
145
146 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
147 try {
148 if (LOG.isTraceEnabled()) {
149 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
150 }
151 Thread.sleep(this.sleepForRetries * sleepMultiplier);
152 } catch (InterruptedException e) {
153 LOG.debug("Interrupted while sleeping between retries");
154 }
155 return sleepMultiplier < maxRetriesMultiplier;
156 }
157
158
159
160
161 @Override
162 public boolean replicate(ReplicateContext replicateContext) {
163 CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
164 List<Entry> entries = replicateContext.getEntries();
165 String walGroupId = replicateContext.getWalGroupId();
166 int sleepMultiplier = 1;
167 int numReplicated = 0;
168
169 if (!peersSelected && this.isRunning()) {
170 connectToPeers();
171 peersSelected = true;
172 }
173
174 int numSinks = replicationSinkMgr.getNumSinks();
175 if (numSinks == 0) {
176 LOG.warn("No replication sinks found, returning without replicating. The source should retry"
177 + " with the same set of edits.");
178 return false;
179 }
180
181
182
183 int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
184
185 List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
186 if (n == 1) {
187 entryLists.add(entries);
188 } else {
189 for (int i=0; i<n; i++) {
190 entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
191 }
192
193 for (Entry e : entries) {
194 entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
195 }
196 }
197 while (this.isRunning()) {
198 if (!isPeerEnabled()) {
199 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
200 sleepMultiplier++;
201 }
202 continue;
203 }
204 try {
205 if (LOG.isTraceEnabled()) {
206 LOG.trace("Replicating " + entries.size() +
207 " entries of total size " + replicateContext.getSize());
208 }
209
210 int futures = 0;
211 for (int i=0; i<entryLists.size(); i++) {
212 if (!entryLists.get(i).isEmpty()) {
213 if (LOG.isTraceEnabled()) {
214 LOG.trace("Submitting " + entryLists.get(i).size() +
215 " entries of total size " + replicateContext.getSize());
216 }
217
218 pool.submit(createReplicator(entryLists.get(i), i));
219 futures++;
220 }
221 }
222 IOException iox = null;
223
224 for (int i=0; i<futures; i++) {
225 try {
226
227
228 Future<Integer> f = pool.take();
229 int index = f.get().intValue();
230 int batchSize = entryLists.get(index).size();
231 entryLists.set(index, Collections.<Entry>emptyList());
232
233 numReplicated += batchSize;
234 } catch (InterruptedException ie) {
235 iox = new IOException(ie);
236 } catch (ExecutionException ee) {
237
238 iox = (IOException)ee.getCause();
239 }
240 }
241 if (iox != null) {
242
243 throw iox;
244 }
245 if (numReplicated != entries.size()) {
246
247 LOG.warn("The number of edits replicated is different from the number received,"
248 + " failing for now.");
249 return false;
250 }
251
252 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
253 walGroupId);
254 return true;
255
256 } catch (IOException ioe) {
257
258 this.metrics.refreshAgeOfLastShippedOp(walGroupId);
259 if (ioe instanceof RemoteException) {
260 ioe = ((RemoteException) ioe).unwrapRemoteException();
261 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
262 if (ioe instanceof TableNotFoundException) {
263 if (sleepForRetries("A table is missing in the peer cluster. "
264 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
265 sleepMultiplier++;
266 }
267 } else {
268 LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
269 replicationSinkMgr.chooseSinks();
270 }
271 } else {
272 if (ioe instanceof SocketTimeoutException) {
273
274
275
276 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
277 "call to the remote cluster timed out, which is usually " +
278 "caused by a machine failure or a massive slowdown",
279 this.socketTimeoutMultiplier);
280 } else if (ioe instanceof ConnectException) {
281 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
282 replicationSinkMgr.chooseSinks();
283 } else {
284 LOG.warn("Can't replicate because of a local or network error: ", ioe);
285 }
286 }
287 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
288 sleepMultiplier++;
289 }
290 }
291 }
292 return false;
293 }
294
295 protected boolean isPeerEnabled() {
296 return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
297 }
298
299 @Override
300 protected void doStop() {
301 disconnect();
302 if (this.conn != null) {
303 try {
304 this.conn.close();
305 this.conn = null;
306 } catch (IOException e) {
307 LOG.warn("Failed to close the connection");
308 }
309 }
310 exec.shutdownNow();
311 notifyStopped();
312 }
313
314
315 @Override
316 public State stopAndWait() {
317 doStop();
318 return super.stopAndWait();
319 }
320
321 @VisibleForTesting
322 protected Replicator createReplicator(List<Entry> entries, int ordinal) {
323 return new Replicator(entries, ordinal);
324 }
325
326 @VisibleForTesting
327 protected class Replicator implements Callable<Integer> {
328 private List<Entry> entries;
329 private int ordinal;
330 public Replicator(List<Entry> entries, int ordinal) {
331 this.entries = entries;
332 this.ordinal = ordinal;
333 }
334
335 @Override
336 public Integer call() throws IOException {
337 SinkPeer sinkPeer = null;
338 try {
339 sinkPeer = replicationSinkMgr.getReplicationSink();
340 BlockingInterface rrs = sinkPeer.getRegionServer();
341 ReplicationProtbufUtil.replicateWALEntry(rrs,
342 entries.toArray(new Entry[entries.size()]));
343 replicationSinkMgr.reportSinkSuccess(sinkPeer);
344 return ordinal;
345
346 } catch (IOException ioe) {
347 if (sinkPeer != null) {
348 replicationSinkMgr.reportBadSink(sinkPeer);
349 }
350 throw ioe;
351 }
352 }
353
354 }
355 }