001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.net.SocketTimeoutException; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.TreeMap; 030import java.util.concurrent.Callable; 031import java.util.concurrent.CompletionService; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.ExecutorCompletionService; 034import java.util.concurrent.Future; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.regex.Matcher; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041 042import org.apache.commons.lang3.StringUtils; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotFoundException; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.ipc.RpcServer; 054import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 055import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 056import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.wal.WAL.Entry; 061import org.apache.hadoop.ipc.RemoteException; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 068 069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 070 071/** 072 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 073 * implementation for replicating to another HBase cluster. 074 * For the slave cluster it selects a random number of peers 075 * using a replication ratio. For example, if replication ration = 0.1 076 * and slave cluster has 100 region servers, 10 will be selected. 077 * <p> 078 * A stream is considered down when we cannot contact a region server on the 079 * peer cluster for more than 55 seconds by default. 080 * </p> 081 */ 082@InterfaceAudience.Private 083public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { 084 private static final Logger LOG = 085 LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); 086 087 private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; 088 089 private ClusterConnection conn; 090 private Configuration localConf; 091 private Configuration conf; 092 // How long should we sleep for each retry 093 private long sleepForRetries; 094 // Maximum number of retries before taking bold actions 095 private int maxRetriesMultiplier; 096 // Socket timeouts require even bolder actions since we don't want to DDOS 097 private int socketTimeoutMultiplier; 098 // Amount of time for shutdown to wait for all tasks to complete 099 private long maxTerminationWait; 100 // Size limit for replication RPCs, in bytes 101 private int replicationRpcLimit; 102 //Metrics for this source 103 private MetricsSource metrics; 104 // Handles connecting to peer region servers 105 private ReplicationSinkManager replicationSinkMgr; 106 private boolean peersSelected = false; 107 private String replicationClusterId = ""; 108 private ThreadPoolExecutor exec; 109 private int maxThreads; 110 private Path baseNamespaceDir; 111 private Path hfileArchiveDir; 112 private boolean replicationBulkLoadDataEnabled; 113 private Abortable abortable; 114 private boolean dropOnDeletedTables; 115 private boolean isSerial = false; 116 117 @Override 118 public void init(Context context) throws IOException { 119 super.init(context); 120 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 121 this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); 122 decorateConf(); 123 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 124 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", 125 maxRetriesMultiplier); 126 // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator 127 // tasks to terminate when doStop() is called. 128 long maxTerminationWaitMultiplier = this.conf.getLong( 129 "replication.source.maxterminationmultiplier", 130 DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); 131 this.maxTerminationWait = maxTerminationWaitMultiplier * 132 this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 133 // TODO: This connection is replication specific or we should make it particular to 134 // replication and make replication specific settings such as compression or codec to use 135 // passing Cells. 136 this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 137 this.sleepForRetries = 138 this.conf.getLong("replication.source.sleepforretries", 1000); 139 this.metrics = context.getMetrics(); 140 // ReplicationQueueInfo parses the peerId out of the znode for us 141 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); 142 // per sink thread pool 143 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 144 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); 145 this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, 146 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); 147 this.abortable = ctx.getAbortable(); 148 // Set the size limit for replication RPCs to 95% of the max request size. 149 // We could do with less slop if we have an accurate estimate of encoded size. Being 150 // conservative for now. 151 this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, 152 RpcServer.DEFAULT_MAX_REQUEST_SIZE)); 153 this.dropOnDeletedTables = 154 this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 155 156 this.replicationBulkLoadDataEnabled = 157 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 158 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 159 if (this.replicationBulkLoadDataEnabled) { 160 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); 161 } 162 // Construct base namespace directory and hfile archive directory path 163 Path rootDir = FSUtils.getRootDir(conf); 164 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); 165 baseNamespaceDir = new Path(rootDir, baseNSDir); 166 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); 167 isSerial = context.getPeerConfig().isSerial(); 168 } 169 170 private void decorateConf() { 171 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 172 if (StringUtils.isNotEmpty(replicationCodec)) { 173 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 174 } 175 } 176 177 private void connectToPeers() { 178 getRegionServers(); 179 180 int sleepMultiplier = 1; 181 182 // Connect to peer cluster first, unless we have to stop 183 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 184 replicationSinkMgr.chooseSinks(); 185 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 186 if (sleepForRetries("Waiting for peers", sleepMultiplier)) { 187 sleepMultiplier++; 188 } 189 } 190 } 191 } 192 193 /** 194 * Do the sleeping logic 195 * @param msg Why we sleep 196 * @param sleepMultiplier by how many times the default sleeping time is augmented 197 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 198 */ 199 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 200 try { 201 if (LOG.isTraceEnabled()) { 202 LOG.trace("{} {}, sleeping {} times {}", 203 logPeerId(), msg, sleepForRetries, sleepMultiplier); 204 } 205 Thread.sleep(this.sleepForRetries * sleepMultiplier); 206 } catch (InterruptedException e) { 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 209 } 210 } 211 return sleepMultiplier < maxRetriesMultiplier; 212 } 213 214 private int getEstimatedEntrySize(Entry e) { 215 long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf(); 216 return (int) size; 217 } 218 219 private List<List<Entry>> createParallelBatches(final List<Entry> entries) { 220 int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); 221 int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); 222 List<List<Entry>> entryLists = 223 Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList()); 224 int[] sizes = new int[n]; 225 for (Entry e : entries) { 226 int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n); 227 int entrySize = getEstimatedEntrySize(e); 228 // If this batch has at least one entry and is over sized, move it to the tail of list and 229 // initialize the entryLists[index] to be a empty list. 230 if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) { 231 entryLists.add(entryLists.get(index)); 232 entryLists.set(index, new ArrayList<>()); 233 sizes[index] = 0; 234 } 235 entryLists.get(index).add(e); 236 sizes[index] += entrySize; 237 } 238 return entryLists; 239 } 240 241 private List<List<Entry>> createSerialBatches(final List<Entry> entries) { 242 Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR); 243 for (Entry e : entries) { 244 regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>()) 245 .add(e); 246 } 247 return new ArrayList<>(regionEntries.values()); 248 } 249 250 /** 251 * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool 252 * concurrently. Note that, for serial replication, we need to make sure that entries from the 253 * same region to be replicated serially, so entries from the same region consist of a batch, and 254 * we will divide a batch into several batches by replicationRpcLimit in method 255 * serialReplicateRegionEntries() 256 */ 257 private List<List<Entry>> createBatches(final List<Entry> entries) { 258 if (isSerial) { 259 return createSerialBatches(entries); 260 } else { 261 return createParallelBatches(entries); 262 } 263 } 264 265 private TableName parseTable(String msg) { 266 // ... TableNotFoundException: '<table>'/n... 267 Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'"); 268 Matcher m = p.matcher(msg); 269 if (m.find()) { 270 String table = m.group(1); 271 try { 272 // double check that table is a valid table name 273 TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table))); 274 return TableName.valueOf(table); 275 } catch (IllegalArgumentException ignore) { 276 } 277 } 278 return null; 279 } 280 281 // Filter a set of batches by TableName 282 private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) { 283 return oldEntryList 284 .stream().map(entries -> entries.stream() 285 .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList())) 286 .collect(Collectors.toList()); 287 } 288 289 private void reconnectToPeerCluster() { 290 ClusterConnection connection = null; 291 try { 292 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 293 } catch (IOException ioe) { 294 LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); 295 } 296 if (connection != null) { 297 this.conn = connection; 298 } 299 } 300 301 private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, 302 List<List<Entry>> batches) throws IOException { 303 int futures = 0; 304 for (int i = 0; i < batches.size(); i++) { 305 List<Entry> entries = batches.get(i); 306 if (!entries.isEmpty()) { 307 if (LOG.isTraceEnabled()) { 308 LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), 309 replicateContext.getSize()); 310 } 311 // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource 312 pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); 313 futures++; 314 } 315 } 316 317 IOException iox = null; 318 long lastWriteTime = 0; 319 for (int i = 0; i < futures; i++) { 320 try { 321 // wait for all futures, remove successful parts 322 // (only the remaining parts will be retried) 323 Future<Integer> f = pool.take(); 324 int index = f.get(); 325 List<Entry> batch = batches.get(index); 326 batches.set(index, Collections.emptyList()); // remove successful batch 327 // Find the most recent write time in the batch 328 long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); 329 if (writeTime > lastWriteTime) { 330 lastWriteTime = writeTime; 331 } 332 } catch (InterruptedException ie) { 333 iox = new IOException(ie); 334 } catch (ExecutionException ee) { 335 // cause must be an IOException 336 iox = (IOException) ee.getCause(); 337 } 338 } 339 if (iox != null) { 340 // if we had any exceptions, try again 341 throw iox; 342 } 343 return lastWriteTime; 344 } 345 346 /** 347 * Do the shipping logic 348 */ 349 @Override 350 public boolean replicate(ReplicateContext replicateContext) { 351 CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); 352 String walGroupId = replicateContext.getWalGroupId(); 353 int sleepMultiplier = 1; 354 355 if (!peersSelected && this.isRunning()) { 356 connectToPeers(); 357 peersSelected = true; 358 } 359 360 int numSinks = replicationSinkMgr.getNumSinks(); 361 if (numSinks == 0) { 362 LOG.warn("{} No replication sinks found, returning without replicating. " 363 + "The source should retry with the same set of edits.", logPeerId()); 364 return false; 365 } 366 367 List<List<Entry>> batches = createBatches(replicateContext.getEntries()); 368 while (this.isRunning() && !exec.isShutdown()) { 369 if (!isPeerEnabled()) { 370 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 371 sleepMultiplier++; 372 } 373 continue; 374 } 375 if (this.conn == null || this.conn.isClosed()) { 376 reconnectToPeerCluster(); 377 } 378 try { 379 long lastWriteTime; 380 381 // replicate the batches to sink side. 382 lastWriteTime = parallelReplicate(pool, replicateContext, batches); 383 384 // update metrics 385 if (lastWriteTime > 0) { 386 this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); 387 } 388 return true; 389 } catch (IOException ioe) { 390 // Didn't ship anything, but must still age the last time we did 391 this.metrics.refreshAgeOfLastShippedOp(walGroupId); 392 if (ioe instanceof RemoteException) { 393 ioe = ((RemoteException) ioe).unwrapRemoteException(); 394 LOG.warn("{} Can't replicate because of an error on the remote cluster: ", 395 logPeerId(), ioe); 396 if (ioe instanceof TableNotFoundException) { 397 if (dropOnDeletedTables) { 398 // this is a bit fragile, but cannot change how TNFE is serialized 399 // at least check whether the table name is legal 400 TableName table = parseTable(ioe.getMessage()); 401 if (table != null) { 402 try (Connection localConn = 403 ConnectionFactory.createConnection(ctx.getLocalConfiguration())) { 404 if (!localConn.getAdmin().tableExists(table)) { 405 // Would potentially be better to retry in one of the outer loops 406 // and add a table filter there; but that would break the encapsulation, 407 // so we're doing the filtering here. 408 LOG.info("{} Missing table detected at sink, local table also does not " 409 + "exist, filtering edits for '{}'", logPeerId(), table); 410 batches = filterBatches(batches, table); 411 continue; 412 } 413 } catch (IOException iox) { 414 LOG.warn("{} Exception checking for local table: ", logPeerId(), iox); 415 } 416 } 417 } 418 // fall through and sleep below 419 } else { 420 LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), 421 ioe); 422 replicationSinkMgr.chooseSinks(); 423 } 424 } else { 425 if (ioe instanceof SocketTimeoutException) { 426 // This exception means we waited for more than 60s and nothing 427 // happened, the cluster is alive and calling it right away 428 // even for a test just makes things worse. 429 sleepForRetries("Encountered a SocketTimeoutException. Since the " + 430 "call to the remote cluster timed out, which is usually " + 431 "caused by a machine failure or a massive slowdown", 432 this.socketTimeoutMultiplier); 433 } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { 434 LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); 435 replicationSinkMgr.chooseSinks(); 436 } else { 437 LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); 438 } 439 } 440 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { 441 sleepMultiplier++; 442 } 443 } 444 } 445 return false; // in case we exited before replicating 446 } 447 448 protected boolean isPeerEnabled() { 449 return ctx.getReplicationPeer().isPeerEnabled(); 450 } 451 452 @Override 453 protected void doStop() { 454 disconnect(); // don't call super.doStop() 455 if (this.conn != null) { 456 try { 457 this.conn.close(); 458 this.conn = null; 459 } catch (IOException e) { 460 LOG.warn("{} Failed to close the connection", logPeerId()); 461 } 462 } 463 // Allow currently running replication tasks to finish 464 exec.shutdown(); 465 try { 466 exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); 467 } catch (InterruptedException e) { 468 } 469 // Abort if the tasks did not terminate in time 470 if (!exec.isTerminated()) { 471 String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + 472 "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + 473 "Aborting to prevent Replication from deadlocking. See HBASE-16081."; 474 abortable.abort(errMsg, new IOException(errMsg)); 475 } 476 notifyStopped(); 477 } 478 479 @VisibleForTesting 480 protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) 481 throws IOException { 482 SinkPeer sinkPeer = null; 483 try { 484 int entriesHashCode = System.identityHashCode(entries); 485 if (LOG.isTraceEnabled()) { 486 long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); 487 LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", 488 logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); 489 } 490 sinkPeer = replicationSinkMgr.getReplicationSink(); 491 BlockingInterface rrs = sinkPeer.getRegionServer(); 492 try { 493 ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), 494 replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout); 495 if (LOG.isTraceEnabled()) { 496 LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); 497 } 498 } catch (IOException e) { 499 if (LOG.isTraceEnabled()) { 500 LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); 501 } 502 throw e; 503 } 504 replicationSinkMgr.reportSinkSuccess(sinkPeer); 505 } catch (IOException ioe) { 506 if (sinkPeer != null) { 507 replicationSinkMgr.reportBadSink(sinkPeer); 508 } 509 throw ioe; 510 } 511 return batchIndex; 512 } 513 514 private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) 515 throws IOException { 516 int batchSize = 0, index = 0; 517 List<Entry> batch = new ArrayList<>(); 518 for (Entry entry : entries) { 519 int entrySize = getEstimatedEntrySize(entry); 520 if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { 521 replicateEntries(batch, index++, timeout); 522 batch.clear(); 523 batchSize = 0; 524 } 525 batch.add(entry); 526 batchSize += entrySize; 527 } 528 if (batchSize > 0) { 529 replicateEntries(batch, index, timeout); 530 } 531 return batchIndex; 532 } 533 534 @VisibleForTesting 535 protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { 536 return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) 537 : () -> replicateEntries(entries, batchIndex, timeout); 538 } 539 540 private String logPeerId(){ 541 return "[Source for peer " + this.ctx.getPeerId() + "]:"; 542 } 543 544}