001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.UUID; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.PriorityBlockingQueue; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.commons.lang3.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.regionserver.RSRpcServices; 045import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 046import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 047import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationPeer; 050import org.apache.hadoop.hbase.replication.ReplicationPeers; 051import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 052import org.apache.hadoop.hbase.replication.ReplicationQueues; 053import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 054import org.apache.hadoop.hbase.replication.WALEntryFilter; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.util.Threads; 058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 059import org.apache.hadoop.hbase.wal.WAL.Entry; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 065 066/** 067 * Class that handles the source of a replication stream. 068 * Currently does not handle more than 1 slave 069 * For each slave cluster it selects a random number of peers 070 * using a replication ratio. For example, if replication ration = 0.1 071 * and slave cluster has 100 region servers, 10 will be selected. 072 * <p> 073 * A stream is considered down when we cannot contact a region server on the 074 * peer cluster for more than 55 seconds by default. 075 * </p> 076 */ 077@InterfaceAudience.Private 078public class ReplicationSource extends Thread implements ReplicationSourceInterface { 079 080 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 081 // Queues of logs to process, entry in format of walGroupId->queue, 082 // each presents a queue for one wal group 083 private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>(); 084 // per group queue size, keep no more than this number of logs in each wal group 085 protected int queueSizePerGroup; 086 protected ReplicationQueues replicationQueues; 087 private ReplicationPeers replicationPeers; 088 089 protected Configuration conf; 090 protected ReplicationQueueInfo replicationQueueInfo; 091 // id of the peer cluster this source replicates to 092 private String peerId; 093 094 // The manager of all sources to which we ping back our progress 095 protected ReplicationSourceManager manager; 096 // Should we stop everything? 097 protected Server server; 098 // How long should we sleep for each retry 099 private long sleepForRetries; 100 protected FileSystem fs; 101 // id of this cluster 102 private UUID clusterId; 103 // id of the other cluster 104 private UUID peerClusterId; 105 // total number of edits we replicated 106 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 107 // The znode we currently play with 108 protected String peerClusterZnode; 109 // Maximum number of retries before taking bold actions 110 private int maxRetriesMultiplier; 111 // Indicates if this particular source is running 112 private volatile boolean sourceRunning = false; 113 // Metrics for this source 114 private MetricsSource metrics; 115 //WARN threshold for the number of queued logs, defaults to 2 116 private int logQueueWarnThreshold; 117 // ReplicationEndpoint which will handle the actual replication 118 private ReplicationEndpoint replicationEndpoint; 119 // A filter (or a chain of filters) for the WAL entries. 120 protected WALEntryFilter walEntryFilter; 121 // throttler 122 private ReplicationThrottler throttler; 123 private long defaultBandwidth; 124 private long currentBandwidth; 125 private WALFileLengthProvider walFileLengthProvider; 126 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 127 new ConcurrentHashMap<>(); 128 129 private AtomicLong totalBufferUsed; 130 131 public static final String WAIT_ON_ENDPOINT_SECONDS = 132 "hbase.replication.wait.on.endpoint.seconds"; 133 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 134 private int waitOnEndpointSeconds = -1; 135 136 /** 137 * Instantiation method used by region servers 138 * 139 * @param conf configuration to use 140 * @param fs file system to use 141 * @param manager replication manager to ping to 142 * @param server the server for this region server 143 * @param peerClusterZnode the name of our znode 144 * @param clusterId unique UUID for the cluster 145 * @param replicationEndpoint the replication endpoint implementation 146 * @param metrics metrics for replication source 147 * @throws IOException 148 */ 149 @Override 150 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 151 ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, 152 String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, 153 WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { 154 this.server = server; 155 this.conf = HBaseConfiguration.create(conf); 156 this.waitOnEndpointSeconds = 157 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 158 decorateConf(); 159 this.sleepForRetries = 160 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 161 this.maxRetriesMultiplier = 162 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 163 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 164 this.replicationQueues = replicationQueues; 165 this.replicationPeers = replicationPeers; 166 this.manager = manager; 167 this.fs = fs; 168 this.metrics = metrics; 169 this.clusterId = clusterId; 170 171 this.peerClusterZnode = peerClusterZnode; 172 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); 173 // ReplicationQueueInfo parses the peerId out of the znode for us 174 this.peerId = this.replicationQueueInfo.getPeerId(); 175 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); 176 this.replicationEndpoint = replicationEndpoint; 177 178 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 179 currentBandwidth = getCurrentBandwidth(); 180 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 181 this.totalBufferUsed = manager.getTotalBufferUsed(); 182 this.walFileLengthProvider = walFileLengthProvider; 183 LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId 184 + ", currentBandwidth=" + this.currentBandwidth); 185 } 186 187 private void decorateConf() { 188 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 189 if (StringUtils.isNotEmpty(replicationCodec)) { 190 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 191 } 192 } 193 194 @Override 195 public void enqueueLog(Path log) { 196 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); 197 PriorityBlockingQueue<Path> queue = queues.get(logPrefix); 198 if (queue == null) { 199 queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); 200 queues.put(logPrefix, queue); 201 if (this.sourceRunning) { 202 // new wal group observed after source startup, start a new worker thread to track it 203 // notice: it's possible that log enqueued when this.running is set but worker thread 204 // still not launched, so it's necessary to check workerThreads before start the worker 205 tryStartNewShipper(logPrefix, queue); 206 } 207 } 208 queue.put(log); 209 this.metrics.incrSizeOfLogQueue(); 210 // This will log a warning for each new log that gets created above the warn threshold 211 int queueSize = queue.size(); 212 if (queueSize > this.logQueueWarnThreshold) { 213 LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize 214 + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); 215 } 216 } 217 218 @Override 219 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 220 throws ReplicationException { 221 String peerId = peerClusterZnode; 222 if (peerId.contains("-")) { 223 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 224 // A peerId will not have "-" in its name, see HBASE-11394 225 peerId = peerClusterZnode.split("-")[0]; 226 } 227 Map<TableName, List<String>> tableCFMap = 228 replicationPeers.getConnectedPeer(peerId).getTableCFs(); 229 if (tableCFMap != null) { 230 List<String> tableCfs = tableCFMap.get(tableName); 231 if (tableCFMap.containsKey(tableName) 232 && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { 233 this.replicationQueues.addHFileRefs(peerId, pairs); 234 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 235 } else { 236 LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " 237 + Bytes.toString(family) + " to peer id " + peerId); 238 } 239 } else { 240 // user has explicitly not defined any table cfs for replication, means replicate all the 241 // data 242 this.replicationQueues.addHFileRefs(peerId, pairs); 243 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 244 } 245 } 246 247 @Override 248 public void run() { 249 // mark we are running now 250 this.sourceRunning = true; 251 try { 252 // start the endpoint, connect to the cluster 253 this.replicationEndpoint.start(); 254 this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS); 255 } catch (Exception ex) { 256 LOG.warn("Error starting ReplicationEndpoint, exiting", ex); 257 uninitialize(); 258 throw new RuntimeException(ex); 259 } 260 261 int sleepMultiplier = 1; 262 // delay this until we are in an asynchronous thread 263 while (this.isSourceActive() && this.peerClusterId == null) { 264 this.peerClusterId = replicationEndpoint.getPeerUUID(); 265 if (this.isSourceActive() && this.peerClusterId == null) { 266 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 267 sleepMultiplier++; 268 } 269 } 270 } 271 272 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 273 // peerClusterId value, which is the same as the source clusterId 274 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { 275 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " 276 + peerClusterId + " which is not allowed by ReplicationEndpoint:" 277 + replicationEndpoint.getClass().getName(), null, false); 278 this.manager.closeQueue(this); 279 return; 280 } 281 LOG.info("Replicating " + clusterId + " -> " + peerClusterId); 282 283 initializeWALEntryFilter(); 284 // start workers 285 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { 286 String walGroupId = entry.getKey(); 287 PriorityBlockingQueue<Path> queue = entry.getValue(); 288 tryStartNewShipper(walGroupId, queue); 289 } 290 } 291 292 private void initializeWALEntryFilter() { 293 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 294 ArrayList<WALEntryFilter> filters = Lists.newArrayList( 295 (WALEntryFilter)new SystemTableWALEntryFilter()); 296 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 297 if (filterFromEndpoint != null) { 298 filters.add(filterFromEndpoint); 299 } 300 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 301 this.walEntryFilter = new ChainWALEntryFilter(filters); 302 } 303 304 protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { 305 final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, 306 walGroupId, queue, this); 307 ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); 308 if (extant != null) { 309 LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); 310 } else { 311 LOG.debug("Starting up worker for wal group " + walGroupId); 312 worker.startup(getUncaughtExceptionHandler()); 313 worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, 314 worker.getStartPosition())); 315 workerThreads.put(walGroupId, worker); 316 } 317 } 318 319 protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, 320 PriorityBlockingQueue<Path> queue, long startPosition) { 321 ReplicationSourceWALReader walReader = 322 new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); 323 return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, 324 threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode, 325 getUncaughtExceptionHandler()); 326 } 327 328 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { 329 return new Thread.UncaughtExceptionHandler() { 330 @Override 331 public void uncaughtException(final Thread t, final Throwable e) { 332 RSRpcServices.exitIfOOME(e); 333 LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); 334 server.stop("Unexpected exception in " + t.getName()); 335 } 336 }; 337 } 338 339 @Override 340 public ReplicationEndpoint getReplicationEndpoint() { 341 return this.replicationEndpoint; 342 } 343 344 @Override 345 public ReplicationSourceManager getSourceManager() { 346 return this.manager; 347 } 348 349 @Override 350 public void tryThrottle(int batchSize) throws InterruptedException { 351 checkBandwidthChangeAndResetThrottler(); 352 if (throttler.isEnabled()) { 353 long sleepTicks = throttler.getNextSleepInterval(batchSize); 354 if (sleepTicks > 0) { 355 if (LOG.isTraceEnabled()) { 356 LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); 357 } 358 Thread.sleep(sleepTicks); 359 // reset throttler's cycle start tick when sleep for throttling occurs 360 throttler.resetStartTick(); 361 } 362 } 363 } 364 365 private void checkBandwidthChangeAndResetThrottler() { 366 long peerBandwidth = getCurrentBandwidth(); 367 if (peerBandwidth != currentBandwidth) { 368 currentBandwidth = peerBandwidth; 369 throttler.setBandwidth((double) currentBandwidth / 10.0); 370 LOG.info("ReplicationSource : " + peerId 371 + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); 372 } 373 } 374 375 private long getCurrentBandwidth() { 376 ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId); 377 long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; 378 // user can set peer bandwidth to 0 to use default bandwidth 379 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 380 } 381 382 private void uninitialize() { 383 LOG.debug("Source exiting " + this.peerId); 384 metrics.clear(); 385 if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) { 386 this.replicationEndpoint.stop(); 387 try { 388 this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS); 389 } catch (TimeoutException e) { 390 LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds."); 391 } 392 } 393 } 394 395 /** 396 * Do the sleeping logic 397 * @param msg Why we sleep 398 * @param sleepMultiplier by how many times the default sleeping time is augmented 399 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 400 */ 401 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 402 try { 403 if (LOG.isTraceEnabled()) { 404 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); 405 } 406 Thread.sleep(this.sleepForRetries * sleepMultiplier); 407 } catch (InterruptedException e) { 408 LOG.debug("Interrupted while sleeping between retries"); 409 Thread.currentThread().interrupt(); 410 } 411 return sleepMultiplier < maxRetriesMultiplier; 412 } 413 414 /** 415 * check whether the peer is enabled or not 416 * 417 * @return true if the peer is enabled, otherwise false 418 */ 419 @Override 420 public boolean isPeerEnabled() { 421 return this.replicationPeers.getStatusOfPeer(this.peerId); 422 } 423 424 @Override 425 public void startup() { 426 String n = Thread.currentThread().getName(); 427 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { 428 @Override 429 public void uncaughtException(final Thread t, final Throwable e) { 430 LOG.error("Unexpected exception in ReplicationSource", e); 431 } 432 }; 433 Threads 434 .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler); 435 } 436 437 @Override 438 public void terminate(String reason) { 439 terminate(reason, null); 440 } 441 442 @Override 443 public void terminate(String reason, Exception cause) { 444 terminate(reason, cause, true); 445 } 446 447 public void terminate(String reason, Exception cause, boolean join) { 448 if (cause == null) { 449 LOG.info("Closing source " 450 + this.peerClusterZnode + " because: " + reason); 451 452 } else { 453 LOG.error("Closing source " + this.peerClusterZnode 454 + " because an error occurred: " + reason, cause); 455 } 456 this.sourceRunning = false; 457 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 458 for (ReplicationSourceShipper worker : workers) { 459 worker.stopWorker(); 460 worker.entryReader.interrupt(); 461 worker.interrupt(); 462 } 463 if (this.replicationEndpoint != null) { 464 this.replicationEndpoint.stop(); 465 } 466 if (join) { 467 for (ReplicationSourceShipper worker : workers) { 468 Threads.shutdown(worker, this.sleepForRetries); 469 LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); 470 } 471 if (this.replicationEndpoint != null) { 472 try { 473 this.replicationEndpoint 474 .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); 475 } catch (TimeoutException te) { 476 LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" 477 + this.peerClusterZnode, 478 te); 479 } 480 } 481 } 482 } 483 484 @Override 485 public String getPeerClusterZnode() { 486 return this.peerClusterZnode; 487 } 488 489 @Override 490 public String getPeerId() { 491 return this.peerId; 492 } 493 494 @Override 495 public Path getCurrentPath() { 496 // only for testing 497 for (ReplicationSourceShipper worker : workerThreads.values()) { 498 if (worker.getCurrentPath() != null) { 499 return worker.getCurrentPath(); 500 } 501 } 502 return null; 503 } 504 505 @Override 506 public boolean isSourceActive() { 507 return !this.server.isStopped() && this.sourceRunning; 508 } 509 510 /** 511 * Comparator used to compare logs together based on their start time 512 */ 513 public static class LogsComparator implements Comparator<Path> { 514 515 @Override 516 public int compare(Path o1, Path o2) { 517 return Long.compare(getTS(o1), getTS(o2)); 518 } 519 520 /** 521 * Split a path to get the start time 522 * For example: 10.20.20.171%3A60020.1277499063250 523 * @param p path to split 524 * @return start time 525 */ 526 private static long getTS(Path p) { 527 int tsIndex = p.getName().lastIndexOf('.') + 1; 528 return Long.parseLong(p.getName().substring(tsIndex)); 529 } 530 } 531 532 @Override 533 public String getStats() { 534 StringBuilder sb = new StringBuilder(); 535 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 536 .append(", current progress: \n"); 537 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 538 String walGroupId = entry.getKey(); 539 ReplicationSourceShipper worker = entry.getValue(); 540 long position = worker.getCurrentPosition(); 541 Path currentPath = worker.getCurrentPath(); 542 sb.append("walGroup [").append(walGroupId).append("]: "); 543 if (currentPath != null) { 544 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 545 .append(position).append("\n"); 546 } else { 547 sb.append("no replication ongoing, waiting for new log"); 548 } 549 } 550 return sb.toString(); 551 } 552 553 @Override 554 public MetricsSource getSourceMetrics() { 555 return this.metrics; 556 } 557 558 @Override 559 public void postShipEdits(List<Entry> entries, int batchSize) { 560 if (throttler.isEnabled()) { 561 throttler.addPushSize(batchSize); 562 } 563 totalReplicatedEdits.addAndGet(entries.size()); 564 totalBufferUsed.addAndGet(-batchSize); 565 } 566 567 @Override 568 public WALFileLengthProvider getWALFileLengthProvider() { 569 return walFileLengthProvider; 570 } 571 572 @Override 573 public ServerName getServerWALsBelongTo() { 574 return server.getServerName(); 575 } 576}