001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.atomic.LongAccumulator; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.wal.WALEdit; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 041 042/** 043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 044 * ReplicationSourceWALReaderThread 045 */ 046@InterfaceAudience.Private 047public class ReplicationSourceShipper extends Thread { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 049 050 // Hold the state of a replication worker thread 051 public enum WorkerState { 052 RUNNING, 053 STOPPED, 054 FINISHED, // The worker is done processing a recovered queue 055 } 056 057 private final Configuration conf; 058 protected final String walGroupId; 059 protected final ReplicationSourceLogQueue logQueue; 060 private final ReplicationSource source; 061 062 // Last position in the log that we sent to ZooKeeper 063 // It will be accessed by the stats thread so make it volatile 064 private volatile long currentPosition = -1; 065 // Path of the current log 066 private Path currentPath; 067 // Current state of the worker thread 068 private volatile WorkerState state; 069 protected ReplicationSourceWALReader entryReader; 070 071 // How long should we sleep for each retry 072 protected final long sleepForRetries; 073 // Maximum number of retries before taking bold actions 074 protected final int maxRetriesMultiplier; 075 private final int DEFAULT_TIMEOUT = 20000; 076 private final int getEntriesTimeout; 077 private final int shipEditsTimeout; 078 079 public ReplicationSourceShipper(Configuration conf, String walGroupId, 080 ReplicationSourceLogQueue logQueue, ReplicationSource source) { 081 this.conf = conf; 082 this.walGroupId = walGroupId; 083 this.logQueue = logQueue; 084 this.source = source; 085 // 1 second 086 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 087 // 5 minutes @ 1 sec per 088 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 089 // 20 seconds 090 this.getEntriesTimeout = 091 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); 092 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 093 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 094 } 095 096 @Override 097 public final void run() { 098 setWorkerState(WorkerState.RUNNING); 099 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 100 // Loop until we close down 101 while (isActive()) { 102 // Sleep until replication is enabled again 103 if (!source.isPeerEnabled()) { 104 // The peer enabled check is in memory, not expensive, so do not need to increase the 105 // sleep interval as it may cause a long lag when we enable the peer. 106 sleepForRetries("Replication is disabled", 1); 107 continue; 108 } 109 try { 110 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 111 LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), 112 entryBatch); 113 if (entryBatch == null) { 114 continue; 115 } 116 // the NO_MORE_DATA instance has no path so do not call shipEdits 117 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 118 noMoreData(); 119 } else { 120 shipEdits(entryBatch); 121 } 122 } catch (InterruptedException | ReplicationRuntimeException e) { 123 // It is interrupted and needs to quit. 124 LOG.warn("Interrupted while waiting for next replication entry batch", e); 125 Thread.currentThread().interrupt(); 126 } 127 } 128 // If the worker exits run loop without finishing its task, mark it as stopped. 129 if (!isFinished()) { 130 setWorkerState(WorkerState.STOPPED); 131 } else { 132 source.workerThreads.remove(this.walGroupId); 133 postFinish(); 134 } 135 } 136 137 // To be implemented by recovered shipper 138 protected void noMoreData() { 139 } 140 141 // To be implemented by recovered shipper 142 protected void postFinish() { 143 } 144 145 /** 146 * get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static 147 * method. 148 */ 149 private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { 150 int totalSize = 0; 151 for (Entry entry : entryBatch.getWalEntries()) { 152 totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry); 153 } 154 return totalSize; 155 } 156 157 /** 158 * Do the shipping logic 159 */ 160 private void shipEdits(WALEntryBatch entryBatch) { 161 List<Entry> entries = entryBatch.getWalEntries(); 162 int sleepMultiplier = 0; 163 if (entries.isEmpty()) { 164 updateLogPosition(entryBatch); 165 return; 166 } 167 int currentSize = (int) entryBatch.getHeapSize(); 168 int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); 169 source.getSourceMetrics() 170 .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); 171 while (isActive()) { 172 try { 173 try { 174 source.tryThrottle(currentSize); 175 } catch (InterruptedException e) { 176 LOG.debug("Interrupted while sleeping for throttling control"); 177 Thread.currentThread().interrupt(); 178 // current thread might be interrupted to terminate 179 // directly go back to while() for confirm this 180 continue; 181 } 182 // create replicateContext here, so the entries can be GC'd upon return from this call 183 // stack 184 ReplicationEndpoint.ReplicateContext replicateContext = 185 new ReplicationEndpoint.ReplicateContext(); 186 replicateContext.setEntries(entries).setSize(currentSize); 187 replicateContext.setWalGroupId(walGroupId); 188 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 189 190 long startTimeNs = System.nanoTime(); 191 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 192 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 193 long endTimeNs = System.nanoTime(); 194 195 if (!replicated) { 196 continue; 197 } else { 198 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 199 } 200 // Clean up hfile references 201 for (Entry entry : entries) { 202 cleanUpHFileRefs(entry.getEdit()); 203 LOG.trace("shipped entry {}: ", entry); 204 } 205 // Log and clean up WAL logs 206 updateLogPosition(entryBatch); 207 208 // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 209 // this sizeExcludeBulkLoad has to use same calculation that when calling 210 // acquireBufferQuota() in ReplicationSourceWALReader because they maintain 211 // same variable: totalBufferUsed 212 source.postShipEdits(entries, sizeExcludeBulkLoad); 213 // FIXME check relationship between wal group and overall 214 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 215 entryBatch.getNbHFiles()); 216 source.getSourceMetrics().setAgeOfLastShippedOp( 217 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 218 source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); 219 220 if (LOG.isTraceEnabled()) { 221 LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), 222 entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 223 } 224 break; 225 } catch (Exception ex) { 226 source.getSourceMetrics().incrementFailedBatches(); 227 LOG.warn("{} threw unknown exception:", 228 source.getReplicationEndpoint().getClass().getName(), ex); 229 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 230 sleepMultiplier++; 231 } 232 } 233 } 234 } 235 236 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 237 String peerId = source.getPeerId(); 238 if (peerId.contains("-")) { 239 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 240 // A peerId will not have "-" in its name, see HBASE-11394 241 peerId = peerId.split("-")[0]; 242 } 243 List<Cell> cells = edit.getCells(); 244 int totalCells = cells.size(); 245 for (int i = 0; i < totalCells; i++) { 246 Cell cell = cells.get(i); 247 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 248 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 249 List<StoreDescriptor> stores = bld.getStoresList(); 250 int totalStores = stores.size(); 251 for (int j = 0; j < totalStores; j++) { 252 List<String> storeFileList = stores.get(j).getStoreFileList(); 253 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 254 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 255 } 256 } 257 } 258 } 259 260 private boolean updateLogPosition(WALEntryBatch batch) { 261 boolean updated = false; 262 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 263 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 264 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 265 // position and the file will be removed soon in cleanOldLogs. 266 if ( 267 batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) 268 || batch.getLastWalPosition() != currentPosition 269 ) { 270 source.logPositionAndCleanOldLogs(batch); 271 updated = true; 272 } 273 // if end of file is true, then we can just skip to the next file in queue. 274 // the only exception is for recovered queue, if we reach the end of the queue, then there will 275 // no more files so here the currentPath may be null. 276 if (batch.isEndOfFile()) { 277 currentPath = entryReader.getCurrentPath(); 278 currentPosition = 0L; 279 } else { 280 currentPath = batch.getLastWalPath(); 281 currentPosition = batch.getLastWalPosition(); 282 } 283 return updated; 284 } 285 286 public void startup(UncaughtExceptionHandler handler) { 287 String name = Thread.currentThread().getName(); 288 Threads.setDaemonThreadRunning(this, 289 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), 290 handler::uncaughtException); 291 } 292 293 Path getCurrentPath() { 294 return entryReader.getCurrentPath(); 295 } 296 297 long getCurrentPosition() { 298 return currentPosition; 299 } 300 301 void setWALReader(ReplicationSourceWALReader entryReader) { 302 this.entryReader = entryReader; 303 } 304 305 long getStartPosition() { 306 return 0; 307 } 308 309 protected boolean isActive() { 310 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 311 } 312 313 protected final void setWorkerState(WorkerState state) { 314 this.state = state; 315 } 316 317 void stopWorker() { 318 setWorkerState(WorkerState.STOPPED); 319 } 320 321 public boolean isFinished() { 322 return state == WorkerState.FINISHED; 323 } 324 325 /** 326 * Do the sleeping logic 327 * @param msg Why we sleep 328 * @param sleepMultiplier by how many times the default sleeping time is augmented 329 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 330 */ 331 public boolean sleepForRetries(String msg, int sleepMultiplier) { 332 try { 333 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 334 Thread.sleep(this.sleepForRetries * sleepMultiplier); 335 } catch (InterruptedException e) { 336 LOG.debug("Interrupted while sleeping between retries"); 337 Thread.currentThread().interrupt(); 338 } 339 return sleepMultiplier < maxRetriesMultiplier; 340 } 341 342 /** 343 * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case 344 * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't 345 * manage to ship those because the replication source is being terminated. In that case, it 346 * iterates through the batched entries and decrease the pending entries size from 347 * <code>ReplicationSourceManager.totalBufferUser</code> 348 * <p/> 349 * <b>NOTES</b> 1) This method should only be called upon replication source termination. It 350 * blocks waiting for both shipper and reader threads termination, to make sure no race conditions 351 * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b> 352 * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered 353 * interruption/termination prior to calling this method. 354 */ 355 void clearWALEntryBatch() { 356 long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout; 357 while (this.isAlive() || this.entryReader.isAlive()) { 358 try { 359 if (EnvironmentEdgeManager.currentTime() >= timeout) { 360 LOG.warn( 361 "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper " 362 + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", 363 this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); 364 return; 365 } else { 366 // Wait both shipper and reader threads to stop 367 Thread.sleep(this.sleepForRetries); 368 } 369 } catch (InterruptedException e) { 370 LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " 371 + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); 372 return; 373 } 374 } 375 LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0); 376 entryReader.entryBatchQueue.forEach(w -> { 377 entryReader.entryBatchQueue.remove(w); 378 w.getWalEntries().forEach(e -> { 379 long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e); 380 totalToDecrement.accumulate(entrySizeExcludeBulkLoad); 381 }); 382 }); 383 if (LOG.isTraceEnabled()) { 384 LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", 385 totalToDecrement.longValue()); 386 } 387 long newBufferUsed = 388 source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue()); 389 source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); 390 } 391}