001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022 023import java.io.IOException; 024import java.net.ConnectException; 025import java.net.SocketTimeoutException; 026import java.net.UnknownHostException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.Callable; 033import java.util.concurrent.CompletionService; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ExecutorCompletionService; 036import java.util.concurrent.Future; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.regex.Matcher; 041import java.util.regex.Pattern; 042 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Abortable; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotFoundException; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054import org.apache.hadoop.hbase.client.ClusterConnection; 055import org.apache.hadoop.hbase.client.Connection; 056import org.apache.hadoop.hbase.client.ConnectionFactory; 057import org.apache.hadoop.hbase.ipc.RpcServer; 058import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 060import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 061import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 062import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.FSUtils; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.ipc.RemoteException; 067 068/** 069 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 070 * implementation for replicating to another HBase cluster. 071 * For the slave cluster it selects a random number of peers 072 * using a replication ratio. For example, if replication ration = 0.1 073 * and slave cluster has 100 region servers, 10 will be selected. 074 * <p> 075 * A stream is considered down when we cannot contact a region server on the 076 * peer cluster for more than 55 seconds by default. 077 * </p> 078 */ 079@InterfaceAudience.Private 080public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { 081 private static final Logger LOG = 082 LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); 083 084 private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; 085 086 private ClusterConnection conn; 087 private Configuration localConf; 088 private Configuration conf; 089 // How long should we sleep for each retry 090 private long sleepForRetries; 091 // Maximum number of retries before taking bold actions 092 private int maxRetriesMultiplier; 093 // Socket timeouts require even bolder actions since we don't want to DDOS 094 private int socketTimeoutMultiplier; 095 // Amount of time for shutdown to wait for all tasks to complete 096 private long maxTerminationWait; 097 // Size limit for replication RPCs, in bytes 098 private int replicationRpcLimit; 099 //Metrics for this source 100 private MetricsSource metrics; 101 // Handles connecting to peer region servers 102 private ReplicationSinkManager replicationSinkMgr; 103 private boolean peersSelected = false; 104 private String replicationClusterId = ""; 105 private ThreadPoolExecutor exec; 106 private int maxThreads; 107 private Path baseNamespaceDir; 108 private Path hfileArchiveDir; 109 private boolean replicationBulkLoadDataEnabled; 110 private Abortable abortable; 111 private boolean dropOnDeletedTables; 112 113 @Override 114 public void init(Context context) throws IOException { 115 super.init(context); 116 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 117 this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); 118 decorateConf(); 119 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 120 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", 121 maxRetriesMultiplier); 122 // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator 123 // tasks to terminate when doStop() is called. 124 long maxTerminationWaitMultiplier = this.conf.getLong( 125 "replication.source.maxterminationmultiplier", 126 DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); 127 this.maxTerminationWait = maxTerminationWaitMultiplier * 128 this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 129 // TODO: This connection is replication specific or we should make it particular to 130 // replication and make replication specific settings such as compression or codec to use 131 // passing Cells. 132 this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 133 this.sleepForRetries = 134 this.conf.getLong("replication.source.sleepforretries", 1000); 135 this.metrics = context.getMetrics(); 136 // ReplicationQueueInfo parses the peerId out of the znode for us 137 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); 138 // per sink thread pool 139 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 140 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); 141 this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, 142 new LinkedBlockingQueue<>()); 143 this.exec.allowCoreThreadTimeOut(true); 144 this.abortable = ctx.getAbortable(); 145 // Set the size limit for replication RPCs to 95% of the max request size. 146 // We could do with less slop if we have an accurate estimate of encoded size. Being 147 // conservative for now. 148 this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, 149 RpcServer.DEFAULT_MAX_REQUEST_SIZE)); 150 this.dropOnDeletedTables = 151 this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 152 153 this.replicationBulkLoadDataEnabled = 154 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 155 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 156 if (this.replicationBulkLoadDataEnabled) { 157 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); 158 } 159 // Construct base namespace directory and hfile archive directory path 160 Path rootDir = FSUtils.getRootDir(conf); 161 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); 162 baseNamespaceDir = new Path(rootDir, baseNSDir); 163 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); 164 } 165 166 private void decorateConf() { 167 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 168 if (StringUtils.isNotEmpty(replicationCodec)) { 169 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 170 } 171 } 172 173 private void connectToPeers() { 174 getRegionServers(); 175 176 int sleepMultiplier = 1; 177 178 // Connect to peer cluster first, unless we have to stop 179 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 180 replicationSinkMgr.chooseSinks(); 181 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 182 if (sleepForRetries("Waiting for peers", sleepMultiplier)) { 183 sleepMultiplier++; 184 } 185 } 186 } 187 } 188 189 /** 190 * Do the sleeping logic 191 * @param msg Why we sleep 192 * @param sleepMultiplier by how many times the default sleeping time is augmented 193 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 194 */ 195 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 196 try { 197 if (LOG.isTraceEnabled()) { 198 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); 199 } 200 Thread.sleep(this.sleepForRetries * sleepMultiplier); 201 } catch (InterruptedException e) { 202 LOG.debug("Interrupted while sleeping between retries"); 203 } 204 return sleepMultiplier < maxRetriesMultiplier; 205 } 206 207 private List<List<Entry>> createBatches(final List<Entry> entries) { 208 int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); 209 int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); 210 // Maintains the current batch for a given partition index 211 Map<Integer, List<Entry>> entryMap = new HashMap<>(n); 212 List<List<Entry>> entryLists = new ArrayList<>(); 213 int[] sizes = new int[n]; 214 215 for (int i = 0; i < n; i++) { 216 entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1)); 217 } 218 219 for (Entry e: entries) { 220 int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n); 221 int entrySize = (int)e.getKey().estimatedSerializedSizeOf() + 222 (int)e.getEdit().estimatedSerializedSizeOf(); 223 // If this batch is oversized, add it to final list and initialize a new empty batch 224 if (sizes[index] > 0 /* must include at least one entry */ && 225 sizes[index] + entrySize > replicationRpcLimit) { 226 entryLists.add(entryMap.get(index)); 227 entryMap.put(index, new ArrayList<Entry>()); 228 sizes[index] = 0; 229 } 230 entryMap.get(index).add(e); 231 sizes[index] += entrySize; 232 } 233 234 entryLists.addAll(entryMap.values()); 235 return entryLists; 236 } 237 238 private TableName parseTable(String msg) { 239 // ... TableNotFoundException: '<table>'/n... 240 Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'"); 241 Matcher m = p.matcher(msg); 242 if (m.find()) { 243 String table = m.group(1); 244 try { 245 // double check that table is a valid table name 246 TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table))); 247 return TableName.valueOf(table); 248 } catch (IllegalArgumentException ignore) { 249 } 250 } 251 return null; 252 } 253 254 // Filter a set of batches by TableName 255 private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) { 256 List<List<Entry>> entryLists = new ArrayList<>(); 257 for (List<Entry> entries : oldEntryList) { 258 ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size()); 259 entryLists.add(thisList); 260 for (Entry e : entries) { 261 if (!e.getKey().getTableName().equals(table)) { 262 thisList.add(e); 263 } 264 } 265 } 266 return entryLists; 267 } 268 269 private void reconnectToPeerCluster() { 270 ClusterConnection connection = null; 271 try { 272 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 273 } catch (IOException ioe) { 274 LOG.warn("Failed to create connection for peer cluster", ioe); 275 } 276 if (connection != null) { 277 this.conn = connection; 278 } 279 } 280 281 /** 282 * Do the shipping logic 283 */ 284 @Override 285 public boolean replicate(ReplicateContext replicateContext) { 286 CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); 287 List<List<Entry>> batches; 288 String walGroupId = replicateContext.getWalGroupId(); 289 int sleepMultiplier = 1; 290 291 if (!peersSelected && this.isRunning()) { 292 connectToPeers(); 293 peersSelected = true; 294 } 295 296 int numSinks = replicationSinkMgr.getNumSinks(); 297 if (numSinks == 0) { 298 LOG.warn("No replication sinks found, returning without replicating. The source should retry" 299 + " with the same set of edits."); 300 return false; 301 } 302 303 batches = createBatches(replicateContext.getEntries()); 304 305 while (this.isRunning() && !exec.isShutdown()) { 306 if (!isPeerEnabled()) { 307 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 308 sleepMultiplier++; 309 } 310 continue; 311 } 312 if (this.conn == null || this.conn.isClosed()) { 313 reconnectToPeerCluster(); 314 } 315 try { 316 int futures = 0; 317 for (int i=0; i<batches.size(); i++) { 318 List<Entry> entries = batches.get(i); 319 if (!entries.isEmpty()) { 320 if (LOG.isTraceEnabled()) { 321 LOG.trace("Submitting " + entries.size() + 322 " entries of total size " + replicateContext.getSize()); 323 } 324 // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource 325 pool.submit(createReplicator(entries, i)); 326 futures++; 327 } 328 } 329 IOException iox = null; 330 331 long lastWriteTime = 0; 332 for (int i=0; i<futures; i++) { 333 try { 334 // wait for all futures, remove successful parts 335 // (only the remaining parts will be retried) 336 Future<Integer> f = pool.take(); 337 int index = f.get().intValue(); 338 List<Entry> batch = batches.get(index); 339 batches.set(index, Collections.<Entry>emptyList()); // remove successful batch 340 // Find the most recent write time in the batch 341 long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); 342 if (writeTime > lastWriteTime) { 343 lastWriteTime = writeTime; 344 } 345 } catch (InterruptedException ie) { 346 iox = new IOException(ie); 347 } catch (ExecutionException ee) { 348 // cause must be an IOException 349 iox = (IOException)ee.getCause(); 350 } 351 } 352 if (iox != null) { 353 // if we had any exceptions, try again 354 throw iox; 355 } 356 // update metrics 357 if (lastWriteTime > 0) { 358 this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); 359 } 360 return true; 361 362 } catch (IOException ioe) { 363 // Didn't ship anything, but must still age the last time we did 364 this.metrics.refreshAgeOfLastShippedOp(walGroupId); 365 if (ioe instanceof RemoteException) { 366 ioe = ((RemoteException) ioe).unwrapRemoteException(); 367 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); 368 if (ioe instanceof TableNotFoundException) { 369 if (dropOnDeletedTables) { 370 // this is a bit fragile, but cannot change how TNFE is serialized 371 // at least check whether the table name is legal 372 TableName table = parseTable(ioe.getMessage()); 373 if (table != null) { 374 try (Connection localConn = 375 ConnectionFactory.createConnection(ctx.getLocalConfiguration())) { 376 if (!localConn.getAdmin().tableExists(table)) { 377 // Would potentially be better to retry in one of the outer loops 378 // and add a table filter there; but that would break the encapsulation, 379 // so we're doing the filtering here. 380 LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'"); 381 batches = filterBatches(batches, table); 382 continue; 383 } 384 } catch (IOException iox) { 385 LOG.warn("Exception checking for local table: ", iox); 386 } 387 } 388 } 389 // fall through and sleep below 390 } else { 391 LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe); 392 replicationSinkMgr.chooseSinks(); 393 } 394 } else { 395 if (ioe instanceof SocketTimeoutException) { 396 // This exception means we waited for more than 60s and nothing 397 // happened, the cluster is alive and calling it right away 398 // even for a test just makes things worse. 399 sleepForRetries("Encountered a SocketTimeoutException. Since the " + 400 "call to the remote cluster timed out, which is usually " + 401 "caused by a machine failure or a massive slowdown", 402 this.socketTimeoutMultiplier); 403 } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { 404 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); 405 replicationSinkMgr.chooseSinks(); 406 } else { 407 LOG.warn("Can't replicate because of a local or network error: ", ioe); 408 } 409 } 410 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { 411 sleepMultiplier++; 412 } 413 } 414 } 415 return false; // in case we exited before replicating 416 } 417 418 protected boolean isPeerEnabled() { 419 return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED; 420 } 421 422 @Override 423 protected void doStop() { 424 disconnect(); //don't call super.doStop() 425 if (this.conn != null) { 426 try { 427 this.conn.close(); 428 this.conn = null; 429 } catch (IOException e) { 430 LOG.warn("Failed to close the connection"); 431 } 432 } 433 // Allow currently running replication tasks to finish 434 exec.shutdown(); 435 try { 436 exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); 437 } catch (InterruptedException e) { 438 } 439 // Abort if the tasks did not terminate in time 440 if (!exec.isTerminated()) { 441 String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + 442 "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + 443 "Aborting to prevent Replication from deadlocking. See HBASE-16081."; 444 abortable.abort(errMsg, new IOException(errMsg)); 445 } 446 notifyStopped(); 447 } 448 449 @VisibleForTesting 450 protected Replicator createReplicator(List<Entry> entries, int ordinal) { 451 return new Replicator(entries, ordinal); 452 } 453 454 @VisibleForTesting 455 protected class Replicator implements Callable<Integer> { 456 private List<Entry> entries; 457 private int ordinal; 458 public Replicator(List<Entry> entries, int ordinal) { 459 this.entries = entries; 460 this.ordinal = ordinal; 461 } 462 463 protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch, 464 String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) 465 throws IOException { 466 if (LOG.isTraceEnabled()) { 467 long size = 0; 468 for (Entry e: entries) { 469 size += e.getKey().estimatedSerializedSizeOf(); 470 size += e.getEdit().estimatedSerializedSizeOf(); 471 } 472 LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " + 473 entries.size() + " entries with total size " + size + " bytes to " + 474 replicationClusterId); 475 } 476 try { 477 ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]), 478 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 479 if (LOG.isTraceEnabled()) { 480 LOG.trace("Completed replicating batch " + System.identityHashCode(entries)); 481 } 482 } catch (IOException e) { 483 if (LOG.isTraceEnabled()) { 484 LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e); 485 } 486 throw e; 487 } 488 } 489 490 @Override 491 public Integer call() throws IOException { 492 SinkPeer sinkPeer = null; 493 try { 494 sinkPeer = replicationSinkMgr.getReplicationSink(); 495 BlockingInterface rrs = sinkPeer.getRegionServer(); 496 replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir); 497 replicationSinkMgr.reportSinkSuccess(sinkPeer); 498 return ordinal; 499 } catch (IOException ioe) { 500 if (sinkPeer != null) { 501 replicationSinkMgr.reportBadSink(sinkPeer); 502 } 503 throw ioe; 504 } 505 } 506 } 507}