1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseIOException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.TableNotFoundException;
50 import org.apache.hadoop.hbase.TableDescriptors;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
53 import org.apache.hadoop.hbase.client.ClusterConnection;
54 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
55 import org.apache.hadoop.hbase.client.RetryingCallable;
56 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
57 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
58 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
63 import org.apache.hadoop.hbase.wal.WALKey;
64 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
65 import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
66 import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
67 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
68 import org.apache.hadoop.hbase.wal.WAL.Entry;
69 import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
70 import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
71 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
72 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
73 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
74 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
75 import org.apache.hadoop.hbase.replication.WALEntryFilter;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.Pair;
78 import org.apache.hadoop.hbase.util.Threads;
79 import org.apache.hadoop.util.StringUtils;
80
81 import com.google.common.cache.Cache;
82 import com.google.common.cache.CacheBuilder;
83 import com.google.common.collect.Lists;
84 import com.google.protobuf.ServiceException;
85
86
87
88
89
90 @InterfaceAudience.Private
91 public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
92
93 private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
94
95
96 private static String CLIENT_RETRIES_NUMBER
97 = "hbase.region.replica.replication.client.retries.number";
98
99 private Configuration conf;
100 private ClusterConnection connection;
101 private TableDescriptors tableDescriptors;
102
103
104 private PipelineController controller;
105 private RegionReplicaOutputSink outputSink;
106 private EntryBuffers entryBuffers;
107
108
109 private int numWriterThreads;
110
111 private int operationTimeout;
112
113 private ExecutorService pool;
114
115
116
117
118
119 private class SkipReplayedEditsFilter extends BaseWALEntryFilter {
120 @Override
121 public Entry filter(Entry entry) {
122
123 if (entry.getKey().getOrigLogSeqNum() > 0) {
124 return null;
125 }
126 return entry;
127 }
128 }
129
130 @Override
131 public WALEntryFilter getWALEntryfilter() {
132 WALEntryFilter superFilter = super.getWALEntryfilter();
133 WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
134
135 if (superFilter == null) {
136 return skipReplayedEditsFilter;
137 }
138
139 if (skipReplayedEditsFilter == null) {
140 return superFilter;
141 }
142
143 ArrayList<WALEntryFilter> filters = Lists.newArrayList();
144 filters.add(superFilter);
145 filters.add(skipReplayedEditsFilter);
146 return new ChainWALEntryFilter(filters);
147 }
148
149 protected WALEntryFilter getSkipReplayedEditsFilter() {
150 return new SkipReplayedEditsFilter();
151 }
152
153 @Override
154 public void init(Context context) throws IOException {
155 super.init(context);
156
157 this.conf = HBaseConfiguration.create(context.getConfiguration());
158 this.tableDescriptors = context.getTableDescriptors();
159
160
161
162
163 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
164 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
165 if (defaultNumRetries > 10) {
166 int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
167 defaultNumRetries = defaultNumRetries / mult;
168 }
169
170 conf.setInt("hbase.client.serverside.retries.multiplier", 1);
171 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
172 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
173
174 this.numWriterThreads = this.conf.getInt(
175 "hbase.region.replica.replication.writer.threads", 3);
176 controller = new PipelineController();
177 entryBuffers = new EntryBuffers(controller,
178 this.conf.getInt("hbase.region.replica.replication.buffersize",
179 128*1024*1024));
180
181
182 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
183 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
184 }
185
186 @Override
187 protected void doStart() {
188 try {
189 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
190 this.pool = getDefaultThreadPool(conf);
191 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
192 connection, pool, numWriterThreads, operationTimeout);
193 outputSink.startWriterThreads();
194 super.doStart();
195 } catch (IOException ex) {
196 LOG.warn("Received exception while creating connection :" + ex);
197 notifyFailed(ex);
198 }
199 }
200
201 @Override
202 protected void doStop() {
203 if (outputSink != null) {
204 try {
205 outputSink.finishWritingAndClose();
206 } catch (IOException ex) {
207 LOG.warn("Got exception while trying to close OutputSink");
208 LOG.warn(ex);
209 }
210 }
211 if (this.pool != null) {
212 this.pool.shutdownNow();
213 try {
214
215 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
216 if (!shutdown) {
217 LOG.warn("Failed to shutdown the thread pool after 10 seconds");
218 }
219 } catch (InterruptedException e) {
220 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
221 }
222 }
223 if (connection != null) {
224 try {
225 connection.close();
226 } catch (IOException ex) {
227 LOG.warn("Got exception closing connection :" + ex);
228 }
229 }
230 super.doStop();
231 }
232
233
234
235
236
237 private ExecutorService getDefaultThreadPool(Configuration conf) {
238 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
239 int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
240 if (maxThreads == 0) {
241 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
242 }
243 if (coreThreads == 0) {
244 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
245 }
246 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
247 LinkedBlockingQueue<Runnable> workQueue =
248 new LinkedBlockingQueue<Runnable>(maxThreads *
249 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
250 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
251 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
252 coreThreads,
253 maxThreads,
254 keepAliveTime,
255 TimeUnit.SECONDS,
256 workQueue,
257 Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
258 tpe.allowCoreThreadTimeOut(true);
259 return tpe;
260 }
261
262 @Override
263 public boolean replicate(ReplicateContext replicateContext) {
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 while (this.isRunning()) {
288 try {
289 for (Entry entry: replicateContext.getEntries()) {
290 entryBuffers.appendEntry(entry);
291 }
292 outputSink.flush();
293 ctx.getMetrics().incrLogEditsFiltered(
294 outputSink.getSkippedEditsCounter().getAndSet(0));
295 return true;
296 } catch (InterruptedException e) {
297 Thread.currentThread().interrupt();
298 return false;
299 } catch (IOException e) {
300 LOG.warn("Received IOException while trying to replicate"
301 + StringUtils.stringifyException(e));
302 }
303 }
304
305 return false;
306 }
307
308 @Override
309 public boolean canReplicateToSameCluster() {
310 return true;
311 }
312
313 @Override
314 protected WALEntryFilter getScopeWALEntryFilter() {
315
316 return null;
317 }
318
319 static class RegionReplicaOutputSink extends OutputSink {
320 private final RegionReplicaSinkWriter sinkWriter;
321 private final TableDescriptors tableDescriptors;
322 private final Cache<TableName, Boolean> memstoreReplicationEnabled;
323
324 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
325 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
326 int numWriters, int operationTimeout) {
327 super(controller, entryBuffers, numWriters);
328 this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
329 this.tableDescriptors = tableDescriptors;
330
331
332
333
334
335 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
336 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
337 this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
338 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
339 .initialCapacity(10)
340 .maximumSize(1000)
341 .build();
342 }
343
344 @Override
345 public void append(RegionEntryBuffer buffer) throws IOException {
346 List<Entry> entries = buffer.getEntryBuffer();
347
348 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
349 return;
350 }
351
352
353
354 if (!requiresReplication(buffer.getTableName(), entries)) {
355 return;
356 }
357
358 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
359 entries.get(0).getEdit().getCells().get(0).getRow(), entries);
360 }
361
362 @Override
363 public boolean flush() throws IOException {
364
365
366 entryBuffers.waitUntilDrained();
367 return super.flush();
368 }
369
370 @Override
371 public List<Path> finishWritingAndClose() throws IOException {
372 finishWriting(true);
373 return null;
374 }
375
376 @Override
377 public Map<byte[], Long> getOutputCounts() {
378 return null;
379 }
380
381 @Override
382 public int getNumberOfRecoveredRegions() {
383 return 0;
384 }
385
386 AtomicLong getSkippedEditsCounter() {
387 return skippedEdits;
388 }
389
390
391
392
393
394
395 private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
396 throws IOException {
397
398 if (tableDescriptors == null) return true;
399
400 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
401 if (requiresReplication == null) {
402
403
404 HTableDescriptor htd = tableDescriptors.get(tableName);
405 requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
406 memstoreReplicationEnabled.put(tableName, requiresReplication);
407 }
408
409
410
411 if (!requiresReplication) {
412 int skipEdits = 0;
413 java.util.Iterator<Entry> it = entries.iterator();
414 while (it.hasNext()) {
415 Entry entry = it.next();
416 if (entry.getEdit().isMetaEdit()) {
417 requiresReplication = true;
418 } else {
419 it.remove();
420 skipEdits++;
421 }
422 }
423 skippedEdits.addAndGet(skipEdits);
424 }
425 return requiresReplication;
426 }
427 }
428
429 static class RegionReplicaSinkWriter extends SinkWriter {
430 RegionReplicaOutputSink sink;
431 ClusterConnection connection;
432 RpcControllerFactory rpcControllerFactory;
433 RpcRetryingCallerFactory rpcRetryingCallerFactory;
434 int operationTimeout;
435 ExecutorService pool;
436 Cache<TableName, Boolean> disabledAndDroppedTables;
437
438 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
439 ExecutorService pool, int operationTimeout) {
440 this.sink = sink;
441 this.connection = connection;
442 this.operationTimeout = operationTimeout;
443 this.rpcRetryingCallerFactory
444 = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
445 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
446 this.pool = pool;
447
448 int nonExistentTableCacheExpiryMs = connection.getConfiguration()
449 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
450
451
452
453 disabledAndDroppedTables = CacheBuilder.newBuilder()
454 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
455 .initialCapacity(10)
456 .maximumSize(1000)
457 .build();
458 }
459
460 public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
461 List<Entry> entries) throws IOException {
462
463 if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
464 if (LOG.isTraceEnabled()) {
465 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
466 + " is cached as a disabled or dropped table");
467 for (Entry entry : entries) {
468 LOG.trace("Skipping : " + entry);
469 }
470 }
471 sink.getSkippedEditsCounter().addAndGet(entries.size());
472 return;
473 }
474
475
476
477
478 RegionLocations locations = null;
479 boolean useCache = true;
480 while (true) {
481
482 try {
483 locations = RegionReplicaReplayCallable
484 .getRegionLocations(connection, tableName, row, useCache, 0);
485
486 if (locations == null) {
487 throw new HBaseIOException("Cannot locate locations for "
488 + tableName + ", row:" + Bytes.toStringBinary(row));
489 }
490 } catch (TableNotFoundException e) {
491 if (LOG.isTraceEnabled()) {
492 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
493 + " is dropped. Adding table to cache.");
494 for (Entry entry : entries) {
495 LOG.trace("Skipping : " + entry);
496 }
497 }
498 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
499
500 sink.getSkippedEditsCounter().addAndGet(entries.size());
501 return;
502 }
503
504
505
506 HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
507 if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
508 encodedRegionName)) {
509 if (useCache) {
510 useCache = false;
511 continue;
512 }
513 if (LOG.isTraceEnabled()) {
514 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
515 + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
516 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
517 + " from WALEdit");
518 for (Entry entry : entries) {
519 LOG.trace("Skipping : " + entry);
520 }
521 }
522 sink.getSkippedEditsCounter().addAndGet(entries.size());
523 return;
524 }
525 break;
526 }
527
528 if (locations.size() == 1) {
529 return;
530 }
531
532 ArrayList<Future<ReplicateWALEntryResponse>> tasks
533 = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
534
535
536
537 for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
538 HRegionLocation location = locations.getRegionLocation(replicaId);
539 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
540 HRegionInfo regionInfo = location == null
541 ? RegionReplicaUtil.getRegionInfoForReplica(
542 locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
543 : location.getRegionInfo();
544 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
545 rpcControllerFactory, tableName, location, regionInfo, row, entries,
546 sink.getSkippedEditsCounter());
547 Future<ReplicateWALEntryResponse> task = pool.submit(
548 new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
549 callable, operationTimeout));
550 tasks.add(task);
551 }
552 }
553
554 boolean tasksCancelled = false;
555 for (Future<ReplicateWALEntryResponse> task : tasks) {
556 try {
557 task.get();
558 } catch (InterruptedException e) {
559 throw new InterruptedIOException(e.getMessage());
560 } catch (ExecutionException e) {
561 Throwable cause = e.getCause();
562 if (cause instanceof IOException) {
563
564
565
566
567
568
569 if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
570 if (LOG.isTraceEnabled()) {
571 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
572 + " because received exception for dropped or disabled table", cause);
573 for (Entry entry : entries) {
574 LOG.trace("Skipping : " + entry);
575 }
576 }
577 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
578 if (!tasksCancelled) {
579 sink.getSkippedEditsCounter().addAndGet(entries.size());
580 tasksCancelled = true;
581 }
582 continue;
583 }
584
585 throw (IOException)cause;
586 }
587
588 throw new IOException(cause);
589 }
590 }
591 }
592 }
593
594 static class RetryingRpcCallable<V> implements Callable<V> {
595 RpcRetryingCallerFactory factory;
596 RetryingCallable<V> callable;
597 int timeout;
598 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
599 int timeout) {
600 this.factory = factory;
601 this.callable = callable;
602 this.timeout = timeout;
603 }
604 @Override
605 public V call() throws Exception {
606 return factory.<V>newCaller().callWithRetries(callable, timeout);
607 }
608 }
609
610
611
612
613
614 static class RegionReplicaReplayCallable
615 extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
616
617 private final List<Entry> entries;
618 private final byte[] initialEncodedRegionName;
619 private final AtomicLong skippedEntries;
620
621 public RegionReplicaReplayCallable(ClusterConnection connection,
622 RpcControllerFactory rpcControllerFactory, TableName tableName,
623 HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
624 AtomicLong skippedEntries) {
625 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
626 this.entries = entries;
627 this.skippedEntries = skippedEntries;
628 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
629 }
630
631 @Override
632 public ReplicateWALEntryResponse call(int timeout) throws IOException {
633 return replayToServer(this.entries, timeout);
634 }
635
636 private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
637 throws IOException {
638
639
640
641 boolean skip = false;
642
643 if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
644 initialEncodedRegionName)) {
645 skip = true;
646 }
647 if (!entries.isEmpty() && !skip) {
648 Entry[] entriesArray = new Entry[entries.size()];
649 entriesArray = entries.toArray(entriesArray);
650
651
652 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
653 ReplicationProtbufUtil.buildReplicateWALEntryRequest(
654 entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
655 try {
656 PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
657 controller.setCallTimeout(timeout);
658 controller.setPriority(tableName);
659 return stub.replay(controller, p.getFirst());
660 } catch (ServiceException se) {
661 throw ProtobufUtil.getRemoteException(se);
662 }
663 }
664
665 if (skip) {
666 if (LOG.isTraceEnabled()) {
667 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
668 + " because located region " + location.getRegionInfo().getEncodedName()
669 + " is different than the original region "
670 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
671 for (Entry entry : entries) {
672 LOG.trace("Skipping : " + entry);
673 }
674 }
675 skippedEntries.addAndGet(entries.size());
676 }
677 return ReplicateWALEntryResponse.newBuilder().build();
678 }
679 }
680 }