001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; 022 023import java.io.IOException; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.wal.WALEdit; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 041 042/** 043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 044 * ReplicationSourceWALReaderThread 045 */ 046@InterfaceAudience.Private 047public class ReplicationSourceShipper extends Thread { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 049 050 // Hold the state of a replication worker thread 051 public enum WorkerState { 052 RUNNING, 053 STOPPED, 054 FINISHED, // The worker is done processing a queue 055 } 056 057 private final Configuration conf; 058 final String walGroupId; 059 private final ReplicationSource source; 060 061 // Last position in the log that we sent to ZooKeeper 062 // It will be accessed by the stats thread so make it volatile 063 private volatile long currentPosition = -1; 064 // Path of the current log 065 private Path currentPath; 066 // Current state of the worker thread 067 private volatile WorkerState state; 068 final ReplicationSourceWALReader entryReader; 069 070 // How long should we sleep for each retry 071 private final long sleepForRetries; 072 // Maximum number of retries before taking bold actions 073 private final int maxRetriesMultiplier; 074 private final int DEFAULT_TIMEOUT = 20000; 075 private final int getEntriesTimeout; 076 private final int shipEditsTimeout; 077 private long accumulatedSizeSinceLastUpdate = 0L; 078 private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); 079 private final long offsetUpdateIntervalMs; 080 private final long offsetUpdateSizeThresholdBytes; 081 private WALEntryBatch lastShippedBatch; 082 083 private static final String OFFSET_UPDATE_INTERVAL_MS_KEY = 084 "hbase.replication.shipper.offset.update.interval.ms"; 085 private static final String OFFSET_UPDATE_SIZE_THRESHOLD_KEY = 086 "hbase.replication.shipper.offset.update.size.threshold"; 087 private static final long DEFAULT_OFFSET_UPDATE_INTERVAL_MS = Long.MAX_VALUE; 088 private static final long DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD = -1L; 089 090 public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, 091 ReplicationSourceWALReader walReader) { 092 this.conf = conf; 093 this.walGroupId = walGroupId; 094 this.source = source; 095 this.entryReader = walReader; 096 // 1 second 097 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 098 // 5 minutes @ 1 sec per 099 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 100 // 20 seconds 101 this.getEntriesTimeout = 102 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); 103 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 104 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 105 this.offsetUpdateIntervalMs = 106 conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, DEFAULT_OFFSET_UPDATE_INTERVAL_MS); 107 this.offsetUpdateSizeThresholdBytes = 108 conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD); 109 } 110 111 @Override 112 public final void run() { 113 setWorkerState(WorkerState.RUNNING); 114 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 115 // Loop until we close down 116 while (isActive()) { 117 // Sleep until replication is enabled again 118 if (!source.isPeerEnabled()) { 119 // The peer enabled check is in memory, not expensive, so do not need to increase the 120 // sleep interval as it may cause a long lag when we enable the peer. 121 sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); 122 continue; 123 } 124 try { 125 // check time-based offset persistence 126 if (shouldPersistLogPosition()) { 127 persistLogPosition(); 128 } 129 130 long pollTimeout = getEntriesTimeout; 131 if (offsetUpdateIntervalMs != Long.MAX_VALUE) { 132 long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime; 133 long remaining = offsetUpdateIntervalMs - elapsed; 134 if (remaining > 0) { 135 pollTimeout = Math.min(getEntriesTimeout, remaining); 136 } 137 } 138 WALEntryBatch entryBatch = entryReader.poll(pollTimeout); 139 LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), 140 entryBatch); 141 142 if (entryBatch == null) { 143 continue; 144 } 145 // the NO_MORE_DATA instance has no path so do not call shipEdits 146 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 147 noMoreData(); 148 } else { 149 shipEdits(entryBatch); 150 } 151 } catch (InterruptedException | ReplicationRuntimeException e) { 152 LOG.warn("Interrupted while waiting for next replication entry batch", e); 153 Thread.currentThread().interrupt(); 154 } catch (Exception e) { 155 // During source shutdown / peer removal we can see interrupted IOEs 156 // from replication queue updates. Do not restart in this case. 157 if (!source.isSourceActive() || isInterrupted() || !source.isPeerEnabled()) { 158 LOG.info("Ignoring persist failure during shutdown for walGroupId={}", walGroupId, e); 159 break; 160 } 161 LOG.error("Shipper {} failed to persist offset, restarting", walGroupId, e); 162 abortAndRestart(e); 163 return; 164 } 165 } 166 167 // If the worker exits run loop without finishing its task, mark it as stopped. 168 if (!isFinished()) { 169 try { 170 persistLogPosition(); 171 } catch (Exception e) { 172 LOG.error("Failed persisting final offset for walGroupId={}", walGroupId, e); 173 } 174 setWorkerState(WorkerState.STOPPED); 175 } else { 176 source.removeWorker(this); 177 postFinish(); 178 } 179 } 180 181 private void noMoreData() throws IOException { 182 // Flush any outstanding replication offset before marking the queue finished. 183 // Offset persistence may be delayed by size/time thresholds, so ensure the 184 // latest replicated position is stored before transitioning to FINISHED state. 185 persistLogPosition(); 186 187 if (source.isRecovered()) { 188 LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, 189 source.getQueueId()); 190 source.getSourceMetrics().incrCompletedRecoveryQueue(); 191 } else { 192 LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId()); 193 } 194 setWorkerState(WorkerState.FINISHED); 195 } 196 197 // To be implemented by recovered shipper 198 protected void postFinish() { 199 } 200 201 /** 202 * Do the shipping logic 203 */ 204 void shipEdits(WALEntryBatch entryBatch) throws IOException { 205 List<Entry> entries = entryBatch.getWalEntries(); 206 int sleepMultiplier = 0; 207 int currentSize = (int) entryBatch.getHeapSize(); 208 MetricsSource metrics = source.getSourceMetrics(); 209 if (metrics != null && !entries.isEmpty()) { 210 metrics.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); 211 } 212 if (entries.isEmpty()) { 213 // empty batch may mean WAL boundary advancement 214 lastShippedBatch = entryBatch; 215 persistLogPosition(); 216 return; 217 } 218 while (isActive()) { 219 try { 220 try { 221 source.tryThrottle(currentSize); 222 } catch (InterruptedException e) { 223 LOG.debug("Interrupted while sleeping for throttling control"); 224 Thread.currentThread().interrupt(); 225 // current thread might be interrupted to terminate 226 // directly go back to while() for confirm this 227 continue; 228 } 229 // create replicateContext here, so the entries can be GC'd upon return from this call 230 // stack 231 ReplicationEndpoint.ReplicateContext replicateContext = 232 new ReplicationEndpoint.ReplicateContext(); 233 replicateContext.setEntries(entries).setSize(currentSize); 234 replicateContext.setWalGroupId(walGroupId); 235 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 236 237 long startTimeNs = System.nanoTime(); 238 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 239 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 240 long endTimeNs = System.nanoTime(); 241 242 if (!replicated) { 243 continue; 244 } else { 245 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 246 } 247 // Clean up hfile references 248 for (Entry entry : entries) { 249 cleanUpHFileRefs(entry.getEdit()); 250 LOG.trace("shipped entry {}: ", entry); 251 } 252 253 // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 254 // this sizeExcludeBulkLoad has to use same calculation that when calling 255 // acquireBufferQuota() in ReplicationSourceWALReader because they maintain 256 // same variable: totalBufferUsed 257 source.postShipEdits(entries, entryBatch.getUsedBufferSize()); 258 // FIXME check relationship between wal group and overall 259 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 260 entryBatch.getNbHFiles()); 261 source.getSourceMetrics().setAgeOfLastShippedOp( 262 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 263 source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); 264 265 if (LOG.isTraceEnabled()) { 266 LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), 267 entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 268 } 269 break; 270 } catch (Exception ex) { 271 source.getSourceMetrics().incrementFailedBatches(); 272 LOG.warn("{} threw unknown exception:", 273 source.getReplicationEndpoint().getClass().getName(), ex); 274 if ( 275 sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, 276 maxRetriesMultiplier) 277 ) { 278 sleepMultiplier++; 279 } 280 } 281 } 282 283 accumulatedSizeSinceLastUpdate += currentSize; 284 lastShippedBatch = entryBatch; 285 if (shouldPersistLogPosition()) { 286 persistLogPosition(); 287 } 288 } 289 290 private boolean shouldPersistLogPosition() { 291 if (lastShippedBatch == null) { 292 return false; 293 } 294 // Default behaviour to update offset immediately after replicate() 295 if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { 296 return true; 297 } 298 299 return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) 300 || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); 301 } 302 303 void persistLogPosition() throws IOException { 304 if (lastShippedBatch == null) { 305 return; 306 } 307 308 if (!source.isSourceActive() || isInterrupted() || !source.isPeerEnabled()) { 309 LOG.debug("Skip persistLogPosition for inactive/stopping source"); 310 return; 311 } 312 313 ReplicationEndpoint endpoint = source.getReplicationEndpoint(); 314 if (endpoint != null) { 315 endpoint.beforePersistingReplicationOffset(); 316 } 317 318 // Log and clean up WAL logs 319 updateLogPosition(lastShippedBatch); 320 accumulatedSizeSinceLastUpdate = 0; 321 lastShippedBatch = null; 322 lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); 323 } 324 325 void cleanUpHFileRefs(WALEdit edit) throws IOException { 326 String peerId = source.getPeerId(); 327 if (peerId.contains("-")) { 328 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 329 // A peerId will not have "-" in its name, see HBASE-11394 330 peerId = peerId.split("-")[0]; 331 } 332 List<Cell> cells = edit.getCells(); 333 int totalCells = cells.size(); 334 for (int i = 0; i < totalCells; i++) { 335 Cell cell = cells.get(i); 336 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 337 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 338 List<StoreDescriptor> stores = bld.getStoresList(); 339 int totalStores = stores.size(); 340 for (int j = 0; j < totalStores; j++) { 341 List<String> storeFileList = stores.get(j).getStoreFileList(); 342 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 343 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 344 } 345 } 346 } 347 } 348 349 private boolean updateLogPosition(WALEntryBatch batch) { 350 boolean updated = false; 351 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 352 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 353 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 354 // position and the file will be removed soon in cleanOldLogs. 355 if ( 356 batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) 357 || batch.getLastWalPosition() != currentPosition 358 ) { 359 source.logPositionAndCleanOldLogs(batch); 360 updated = true; 361 } 362 // if end of file is true, then we can just skip to the next file in queue. 363 // the only exception is for recovered queue, if we reach the end of the queue, then there will 364 // no more files so here the currentPath may be null. 365 if (batch.isEndOfFile()) { 366 currentPath = entryReader.getCurrentPath(); 367 currentPosition = 0L; 368 } else { 369 currentPath = batch.getLastWalPath(); 370 currentPosition = batch.getLastWalPosition(); 371 } 372 return updated; 373 } 374 375 public void startup(UncaughtExceptionHandler handler) { 376 String name = Thread.currentThread().getName(); 377 Threads.setDaemonThreadRunning(this, 378 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), 379 handler::uncaughtException); 380 } 381 382 Path getCurrentPath() { 383 return entryReader.getCurrentPath(); 384 } 385 386 long getCurrentPosition() { 387 return currentPosition; 388 } 389 390 protected boolean isActive() { 391 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 392 } 393 394 protected final void setWorkerState(WorkerState state) { 395 this.state = state; 396 } 397 398 void stopWorker() { 399 setWorkerState(WorkerState.STOPPED); 400 } 401 402 public boolean isFinished() { 403 return state == WorkerState.FINISHED; 404 } 405 406 /** 407 * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case 408 * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't 409 * manage to ship those because the replication source is being terminated. In that case, it 410 * iterates through the batched entries and decrease the pending entries size from 411 * <code>ReplicationSourceManager.totalBufferUser</code> 412 * <p/> 413 * <b>NOTES</b> 1) This method should only be called upon replication source termination. It 414 * blocks waiting for both shipper and reader threads termination, to make sure no race conditions 415 * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b> 416 * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered 417 * interruption/termination prior to calling this method. 418 */ 419 void clearWALEntryBatch() { 420 long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout; 421 while (this.isAlive() || this.entryReader.isAlive()) { 422 try { 423 if (EnvironmentEdgeManager.currentTime() >= timeout) { 424 LOG.warn("Shipper clearWALEntryBatch method timed out while waiting reader/shipper " 425 + "thread to stop. Not cleaning buffer usage. PeerId: {}; Shipper alive: {}; Reader " 426 + "alive: {}", this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); 427 return; 428 } else { 429 // Wait both shipper and reader threads to stop 430 Thread.sleep(this.sleepForRetries); 431 } 432 } catch (InterruptedException e) { 433 LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " 434 + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); 435 return; 436 } 437 } 438 long totalReleasedBytes = 0; 439 while (true) { 440 WALEntryBatch batch = entryReader.entryBatchQueue.poll(); 441 if (batch == null) { 442 break; 443 } 444 totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 445 } 446 if (LOG.isTraceEnabled()) { 447 LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", 448 totalReleasedBytes); 449 } 450 } 451 452 long getSleepForRetries() { 453 return sleepForRetries; 454 } 455 456 // Restart worker so replication resumes from last persisted offset. 457 void abortAndRestart(Throwable cause) { 458 LOG.warn("Restarting shipper for walGroupId={}", walGroupId, cause); 459 if (!source.isSourceActive() || !source.isPeerEnabled() || isInterrupted()) { 460 LOG.warn("abortAndRestart SKIPPED walGroupId={}, thread={}", walGroupId, 461 Thread.currentThread().getName()); 462 return; 463 } 464 setWorkerState(WorkerState.STOPPED); 465 source.restartShipper(walGroupId, this); 466 } 467}