1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseIOException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.TableNotFoundException;
50 import org.apache.hadoop.hbase.TableDescriptors;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
53 import org.apache.hadoop.hbase.client.ClusterConnection;
54 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
55 import org.apache.hadoop.hbase.client.RetryingCallable;
56 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
57 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
58 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
63 import org.apache.hadoop.hbase.wal.WALKey;
64 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
65 import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
66 import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
67 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
68 import org.apache.hadoop.hbase.wal.WAL.Entry;
69 import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
70 import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
71 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
72 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
73 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
74 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
75 import org.apache.hadoop.hbase.replication.WALEntryFilter;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.Pair;
78 import org.apache.hadoop.hbase.util.Threads;
79 import org.apache.hadoop.util.StringUtils;
80
81 import com.google.common.cache.Cache;
82 import com.google.common.cache.CacheBuilder;
83 import com.google.common.collect.Lists;
84 import com.google.protobuf.ServiceException;
85
86
87
88
89
90 @InterfaceAudience.Private
91 public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
92
93 private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
94
95
96 private static String CLIENT_RETRIES_NUMBER
97 = "hbase.region.replica.replication.client.retries.number";
98
99 private Configuration conf;
100 private ClusterConnection connection;
101 private TableDescriptors tableDescriptors;
102
103
104 private PipelineController controller;
105 private RegionReplicaOutputSink outputSink;
106 private EntryBuffers entryBuffers;
107
108
109 private int numWriterThreads;
110
111 private int operationTimeout;
112
113 private ExecutorService pool;
114
115
116
117
118
119 private static class SkipReplayedEditsFilter extends BaseWALEntryFilter {
120 @Override
121 public Entry filter(Entry entry) {
122
123 if (entry.getKey().getOrigLogSeqNum() > 0) {
124 return null;
125 }
126 return entry;
127 }
128 }
129
130 @Override
131 public WALEntryFilter getWALEntryfilter() {
132 WALEntryFilter superFilter = super.getWALEntryfilter();
133 WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
134
135 if (superFilter == null) {
136 return skipReplayedEditsFilter;
137 }
138
139 if (skipReplayedEditsFilter == null) {
140 return superFilter;
141 }
142
143 ArrayList<WALEntryFilter> filters = Lists.newArrayList();
144 filters.add(superFilter);
145 filters.add(skipReplayedEditsFilter);
146 return new ChainWALEntryFilter(filters);
147 }
148
149 protected WALEntryFilter getSkipReplayedEditsFilter() {
150 return new SkipReplayedEditsFilter();
151 }
152
153 @Override
154 public void init(Context context) throws IOException {
155 super.init(context);
156
157 this.conf = HBaseConfiguration.create(context.getConfiguration());
158 this.tableDescriptors = context.getTableDescriptors();
159
160
161
162
163 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
164 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
165 if (defaultNumRetries > 10) {
166 int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
167 defaultNumRetries = defaultNumRetries / mult;
168 }
169
170 conf.setInt("hbase.client.serverside.retries.multiplier", 1);
171 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
172 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
173
174 this.numWriterThreads = this.conf.getInt(
175 "hbase.region.replica.replication.writer.threads", 3);
176 controller = new PipelineController();
177 entryBuffers = new EntryBuffers(controller,
178 this.conf.getInt("hbase.region.replica.replication.buffersize",
179 128*1024*1024));
180
181
182 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
183 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
184 }
185
186 @Override
187 protected void doStart() {
188 try {
189 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
190 this.pool = getDefaultThreadPool(conf);
191 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
192 connection, pool, numWriterThreads, operationTimeout);
193 outputSink.startWriterThreads();
194 super.doStart();
195 } catch (IOException ex) {
196 LOG.warn("Received exception while creating connection :" + ex);
197 notifyFailed(ex);
198 }
199 }
200
201 @Override
202 protected void doStop() {
203 if (outputSink != null) {
204 try {
205 outputSink.finishWritingAndClose();
206 } catch (IOException ex) {
207 LOG.warn("Got exception while trying to close OutputSink");
208 LOG.warn(ex);
209 }
210 }
211 if (this.pool != null) {
212 this.pool.shutdownNow();
213 try {
214
215 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
216 if (!shutdown) {
217 LOG.warn("Failed to shutdown the thread pool after 10 seconds");
218 }
219 } catch (InterruptedException e) {
220 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
221 }
222 }
223 if (connection != null) {
224 try {
225 connection.close();
226 } catch (IOException ex) {
227 LOG.warn("Got exception closing connection :" + ex);
228 }
229 }
230 super.doStop();
231 }
232
233
234
235
236
237 private ExecutorService getDefaultThreadPool(Configuration conf) {
238 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
239 int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
240 if (maxThreads == 0) {
241 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
242 }
243 if (coreThreads == 0) {
244 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
245 }
246 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
247 LinkedBlockingQueue<Runnable> workQueue =
248 new LinkedBlockingQueue<Runnable>(maxThreads *
249 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
250 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
251 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
252 coreThreads,
253 maxThreads,
254 keepAliveTime,
255 TimeUnit.SECONDS,
256 workQueue,
257 Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
258 tpe.allowCoreThreadTimeOut(true);
259 return tpe;
260 }
261
262 @Override
263 public boolean replicate(ReplicateContext replicateContext) {
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 while (this.isRunning()) {
288 try {
289 for (Entry entry: replicateContext.getEntries()) {
290 entryBuffers.appendEntry(entry);
291 }
292 outputSink.flush();
293 ctx.getMetrics().incrLogEditsFiltered(
294 outputSink.getSkippedEditsCounter().getAndSet(0));
295 return true;
296 } catch (InterruptedException e) {
297 Thread.currentThread().interrupt();
298 return false;
299 } catch (IOException e) {
300 LOG.warn("Received IOException while trying to replicate"
301 + StringUtils.stringifyException(e));
302 }
303 }
304
305 return false;
306 }
307
308 @Override
309 public boolean canReplicateToSameCluster() {
310 return true;
311 }
312
313 @Override
314 protected WALEntryFilter getScopeWALEntryFilter() {
315
316 return null;
317 }
318
319 static class RegionReplicaOutputSink extends OutputSink {
320 private final RegionReplicaSinkWriter sinkWriter;
321 private final TableDescriptors tableDescriptors;
322 private final Cache<TableName, Boolean> memstoreReplicationEnabled;
323
324 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
325 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
326 int numWriters, int operationTimeout) {
327 super(controller, entryBuffers, numWriters);
328 this.sinkWriter =
329 new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
330 this.tableDescriptors = tableDescriptors;
331
332
333
334
335
336 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
337 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
338 this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
339 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
340 .initialCapacity(10)
341 .maximumSize(1000)
342 .build();
343 }
344
345 @Override
346 public void append(RegionEntryBuffer buffer) throws IOException {
347 List<Entry> entries = buffer.getEntryBuffer();
348
349 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
350 return;
351 }
352
353
354
355 if (!requiresReplication(buffer.getTableName(), entries)) {
356 return;
357 }
358
359 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
360 entries.get(0).getEdit().getCells().get(0).getRow(), entries);
361 }
362
363 @Override
364 public boolean flush() throws IOException {
365
366
367 entryBuffers.waitUntilDrained();
368 return super.flush();
369 }
370
371 @Override
372 public boolean keepRegionEvent(Entry entry) {
373 return true;
374 }
375
376 @Override
377 public List<Path> finishWritingAndClose() throws IOException {
378 finishWriting(true);
379 return null;
380 }
381
382 @Override
383 public Map<byte[], Long> getOutputCounts() {
384 return null;
385 }
386
387 @Override
388 public int getNumberOfRecoveredRegions() {
389 return 0;
390 }
391
392 AtomicLong getSkippedEditsCounter() {
393 return skippedEdits;
394 }
395
396
397
398
399
400
401 private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
402 throws IOException {
403
404 if (tableDescriptors == null) return true;
405
406 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
407 if (requiresReplication == null) {
408
409
410 HTableDescriptor htd = tableDescriptors.get(tableName);
411 requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
412 memstoreReplicationEnabled.put(tableName, requiresReplication);
413 }
414
415
416
417 if (!requiresReplication) {
418 int skipEdits = 0;
419 java.util.Iterator<Entry> it = entries.iterator();
420 while (it.hasNext()) {
421 Entry entry = it.next();
422 if (entry.getEdit().isMetaEdit()) {
423 requiresReplication = true;
424 } else {
425 it.remove();
426 skipEdits++;
427 }
428 }
429 skippedEdits.addAndGet(skipEdits);
430 }
431 return requiresReplication;
432 }
433 }
434
435 static class RegionReplicaSinkWriter extends SinkWriter {
436 RegionReplicaOutputSink sink;
437 ClusterConnection connection;
438 RpcControllerFactory rpcControllerFactory;
439 RpcRetryingCallerFactory rpcRetryingCallerFactory;
440 int operationTimeout;
441 ExecutorService pool;
442 Cache<TableName, Boolean> disabledAndDroppedTables;
443 TableDescriptors tableDescriptors;
444
445 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
446 ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
447 this.sink = sink;
448 this.connection = connection;
449 this.operationTimeout = operationTimeout;
450 this.rpcRetryingCallerFactory
451 = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
452 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
453 this.pool = pool;
454 this.tableDescriptors = tableDescriptors;
455
456 int nonExistentTableCacheExpiryMs = connection.getConfiguration()
457 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
458
459
460
461 disabledAndDroppedTables = CacheBuilder.newBuilder()
462 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
463 .initialCapacity(10)
464 .maximumSize(1000)
465 .build();
466 }
467
468 public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
469 List<Entry> entries) throws IOException {
470
471 if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
472 if (LOG.isTraceEnabled()) {
473 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
474 + " is cached as a disabled or dropped table");
475 for (Entry entry : entries) {
476 LOG.trace("Skipping : " + entry);
477 }
478 }
479 sink.getSkippedEditsCounter().addAndGet(entries.size());
480 return;
481 }
482
483
484
485
486 RegionLocations locations = null;
487 boolean useCache = true;
488 while (true) {
489
490 try {
491 locations = RegionReplicaReplayCallable
492 .getRegionLocations(connection, tableName, row, useCache, 0);
493
494 if (locations == null) {
495 throw new HBaseIOException("Cannot locate locations for "
496 + tableName + ", row:" + Bytes.toStringBinary(row));
497 }
498 } catch (TableNotFoundException e) {
499 if (LOG.isTraceEnabled()) {
500 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
501 + " is dropped. Adding table to cache.");
502 for (Entry entry : entries) {
503 LOG.trace("Skipping : " + entry);
504 }
505 }
506 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
507
508 sink.getSkippedEditsCounter().addAndGet(entries.size());
509 return;
510 }
511
512
513
514 HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
515 if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
516 encodedRegionName)) {
517 if (useCache) {
518 useCache = false;
519 continue;
520 }
521 if (LOG.isTraceEnabled()) {
522 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
523 + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
524 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
525 + " from WALEdit");
526 for (Entry entry : entries) {
527 LOG.trace("Skipping : " + entry);
528 }
529 }
530 sink.getSkippedEditsCounter().addAndGet(entries.size());
531 return;
532 }
533 break;
534 }
535
536 if (locations.size() == 1) {
537 return;
538 }
539
540 ArrayList<Future<ReplicateWALEntryResponse>> tasks
541 = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
542
543
544
545 for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
546 HRegionLocation location = locations.getRegionLocation(replicaId);
547 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
548 HRegionInfo regionInfo = location == null
549 ? RegionReplicaUtil.getRegionInfoForReplica(
550 locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
551 : location.getRegionInfo();
552 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
553 rpcControllerFactory, tableName, location, regionInfo, row, entries,
554 sink.getSkippedEditsCounter());
555 Future<ReplicateWALEntryResponse> task = pool.submit(
556 new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
557 callable, operationTimeout));
558 tasks.add(task);
559 }
560 }
561
562 boolean tasksCancelled = false;
563 for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
564 try {
565 tasks.get(replicaId).get();
566 } catch (InterruptedException e) {
567 throw new InterruptedIOException(e.getMessage());
568 } catch (ExecutionException e) {
569 Throwable cause = e.getCause();
570 boolean canBeSkipped = false;
571 if (cause instanceof IOException) {
572
573
574
575
576
577
578 if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
579 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
580 canBeSkipped = true;
581 } else if (tableDescriptors != null) {
582 HTableDescriptor tableDescriptor = tableDescriptors.get(tableName);
583 if (tableDescriptor != null
584
585 && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
586 canBeSkipped = true;
587 }
588 }
589 if (canBeSkipped) {
590 if (LOG.isTraceEnabled()) {
591 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
592 + " because received exception for dropped or disabled table",
593 cause);
594 for (Entry entry : entries) {
595 LOG.trace("Skipping : " + entry);
596 }
597 }
598
599 if (!tasksCancelled) {
600 sink.getSkippedEditsCounter().addAndGet(entries.size());
601 tasksCancelled = true;
602 }
603 continue;
604 }
605
606 throw (IOException)cause;
607 }
608
609 throw new IOException(cause);
610 }
611 }
612 }
613 }
614
615 static class RetryingRpcCallable<V> implements Callable<V> {
616 RpcRetryingCallerFactory factory;
617 RetryingCallable<V> callable;
618 int timeout;
619 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
620 int timeout) {
621 this.factory = factory;
622 this.callable = callable;
623 this.timeout = timeout;
624 }
625 @Override
626 public V call() throws Exception {
627 return factory.<V>newCaller().callWithRetries(callable, timeout);
628 }
629 }
630
631
632
633
634
635 static class RegionReplicaReplayCallable
636 extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
637
638 private final List<Entry> entries;
639 private final byte[] initialEncodedRegionName;
640 private final AtomicLong skippedEntries;
641
642 public RegionReplicaReplayCallable(ClusterConnection connection,
643 RpcControllerFactory rpcControllerFactory, TableName tableName,
644 HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
645 AtomicLong skippedEntries) {
646 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
647 this.entries = entries;
648 this.skippedEntries = skippedEntries;
649 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
650 }
651
652 @Override
653 public ReplicateWALEntryResponse call(int timeout) throws IOException {
654 return replayToServer(this.entries, timeout);
655 }
656
657 private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
658 throws IOException {
659
660
661
662 boolean skip = false;
663
664 if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
665 initialEncodedRegionName)) {
666 skip = true;
667 }
668 if (!entries.isEmpty() && !skip) {
669 Entry[] entriesArray = new Entry[entries.size()];
670 entriesArray = entries.toArray(entriesArray);
671
672
673 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
674 ReplicationProtbufUtil.buildReplicateWALEntryRequest(
675 entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
676 try {
677 PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
678 controller.setCallTimeout(timeout);
679 controller.setPriority(tableName);
680 return stub.replay(controller, p.getFirst());
681 } catch (ServiceException se) {
682 throw ProtobufUtil.getRemoteException(se);
683 }
684 }
685
686 if (skip) {
687 if (LOG.isTraceEnabled()) {
688 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
689 + " because located region " + location.getRegionInfo().getEncodedName()
690 + " is different than the original region "
691 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
692 for (Entry entry : entries) {
693 LOG.trace("Skipping : " + entry);
694 }
695 }
696 skippedEntries.addAndGet(entries.size());
697 }
698 return ReplicateWALEntryResponse.newBuilder().build();
699 }
700 }
701 }