001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; 022 023import java.io.IOException; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.wal.WALEdit; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 041 042/** 043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 044 * ReplicationSourceWALReaderThread 045 */ 046@InterfaceAudience.Private 047public class ReplicationSourceShipper extends Thread { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 049 050 // Hold the state of a replication worker thread 051 public enum WorkerState { 052 RUNNING, 053 STOPPED, 054 FINISHED, // The worker is done processing a queue 055 } 056 057 private final Configuration conf; 058 final String walGroupId; 059 private final ReplicationSource source; 060 061 // Last position in the log that we sent to ZooKeeper 062 // It will be accessed by the stats thread so make it volatile 063 private volatile long currentPosition = -1; 064 // Path of the current log 065 private Path currentPath; 066 // Current state of the worker thread 067 private volatile WorkerState state; 068 final ReplicationSourceWALReader entryReader; 069 070 // How long should we sleep for each retry 071 private final long sleepForRetries; 072 // Maximum number of retries before taking bold actions 073 private final int maxRetriesMultiplier; 074 private final int DEFAULT_TIMEOUT = 20000; 075 private final int getEntriesTimeout; 076 private final int shipEditsTimeout; 077 078 public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, 079 ReplicationSourceWALReader walReader) { 080 this.conf = conf; 081 this.walGroupId = walGroupId; 082 this.source = source; 083 this.entryReader = walReader; 084 // 1 second 085 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 086 // 5 minutes @ 1 sec per 087 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 088 // 20 seconds 089 this.getEntriesTimeout = 090 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); 091 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 092 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 093 } 094 095 @Override 096 public final void run() { 097 setWorkerState(WorkerState.RUNNING); 098 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 099 // Loop until we close down 100 while (isActive()) { 101 // Sleep until replication is enabled again 102 if (!source.isPeerEnabled()) { 103 // The peer enabled check is in memory, not expensive, so do not need to increase the 104 // sleep interval as it may cause a long lag when we enable the peer. 105 sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); 106 continue; 107 } 108 try { 109 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 110 LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), 111 entryBatch); 112 if (entryBatch == null) { 113 continue; 114 } 115 // the NO_MORE_DATA instance has no path so do not call shipEdits 116 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 117 noMoreData(); 118 } else { 119 shipEdits(entryBatch); 120 } 121 } catch (InterruptedException | ReplicationRuntimeException e) { 122 // It is interrupted and needs to quit. 123 LOG.warn("Interrupted while waiting for next replication entry batch", e); 124 Thread.currentThread().interrupt(); 125 } 126 } 127 // If the worker exits run loop without finishing its task, mark it as stopped. 128 if (!isFinished()) { 129 setWorkerState(WorkerState.STOPPED); 130 } else { 131 source.removeWorker(this); 132 postFinish(); 133 } 134 } 135 136 private void noMoreData() { 137 if (source.isRecovered()) { 138 LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, 139 source.getQueueId()); 140 source.getSourceMetrics().incrCompletedRecoveryQueue(); 141 } else { 142 LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId()); 143 } 144 setWorkerState(WorkerState.FINISHED); 145 } 146 147 // To be implemented by recovered shipper 148 protected void postFinish() { 149 } 150 151 /** 152 * Do the shipping logic 153 */ 154 private void shipEdits(WALEntryBatch entryBatch) { 155 List<Entry> entries = entryBatch.getWalEntries(); 156 int sleepMultiplier = 0; 157 if (entries.isEmpty()) { 158 updateLogPosition(entryBatch); 159 return; 160 } 161 int currentSize = (int) entryBatch.getHeapSize(); 162 source.getSourceMetrics() 163 .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); 164 while (isActive()) { 165 try { 166 try { 167 source.tryThrottle(currentSize); 168 } catch (InterruptedException e) { 169 LOG.debug("Interrupted while sleeping for throttling control"); 170 Thread.currentThread().interrupt(); 171 // current thread might be interrupted to terminate 172 // directly go back to while() for confirm this 173 continue; 174 } 175 // create replicateContext here, so the entries can be GC'd upon return from this call 176 // stack 177 ReplicationEndpoint.ReplicateContext replicateContext = 178 new ReplicationEndpoint.ReplicateContext(); 179 replicateContext.setEntries(entries).setSize(currentSize); 180 replicateContext.setWalGroupId(walGroupId); 181 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 182 183 long startTimeNs = System.nanoTime(); 184 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 185 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 186 long endTimeNs = System.nanoTime(); 187 188 if (!replicated) { 189 continue; 190 } else { 191 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 192 } 193 // Clean up hfile references 194 for (Entry entry : entries) { 195 cleanUpHFileRefs(entry.getEdit()); 196 LOG.trace("shipped entry {}: ", entry); 197 } 198 // Log and clean up WAL logs 199 updateLogPosition(entryBatch); 200 201 // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 202 // this sizeExcludeBulkLoad has to use same calculation that when calling 203 // acquireBufferQuota() in ReplicationSourceWALReader because they maintain 204 // same variable: totalBufferUsed 205 source.postShipEdits(entries, entryBatch.getUsedBufferSize()); 206 // FIXME check relationship between wal group and overall 207 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 208 entryBatch.getNbHFiles()); 209 source.getSourceMetrics().setAgeOfLastShippedOp( 210 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 211 source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); 212 213 if (LOG.isTraceEnabled()) { 214 LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), 215 entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 216 } 217 break; 218 } catch (Exception ex) { 219 source.getSourceMetrics().incrementFailedBatches(); 220 LOG.warn("{} threw unknown exception:", 221 source.getReplicationEndpoint().getClass().getName(), ex); 222 if ( 223 sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, 224 maxRetriesMultiplier) 225 ) { 226 sleepMultiplier++; 227 } 228 } 229 } 230 } 231 232 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 233 String peerId = source.getPeerId(); 234 if (peerId.contains("-")) { 235 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 236 // A peerId will not have "-" in its name, see HBASE-11394 237 peerId = peerId.split("-")[0]; 238 } 239 List<Cell> cells = edit.getCells(); 240 int totalCells = cells.size(); 241 for (int i = 0; i < totalCells; i++) { 242 Cell cell = cells.get(i); 243 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 244 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 245 List<StoreDescriptor> stores = bld.getStoresList(); 246 int totalStores = stores.size(); 247 for (int j = 0; j < totalStores; j++) { 248 List<String> storeFileList = stores.get(j).getStoreFileList(); 249 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 250 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 251 } 252 } 253 } 254 } 255 256 private boolean updateLogPosition(WALEntryBatch batch) { 257 boolean updated = false; 258 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 259 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 260 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 261 // position and the file will be removed soon in cleanOldLogs. 262 if ( 263 batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) 264 || batch.getLastWalPosition() != currentPosition 265 ) { 266 source.logPositionAndCleanOldLogs(batch); 267 updated = true; 268 } 269 // if end of file is true, then we can just skip to the next file in queue. 270 // the only exception is for recovered queue, if we reach the end of the queue, then there will 271 // no more files so here the currentPath may be null. 272 if (batch.isEndOfFile()) { 273 currentPath = entryReader.getCurrentPath(); 274 currentPosition = 0L; 275 } else { 276 currentPath = batch.getLastWalPath(); 277 currentPosition = batch.getLastWalPosition(); 278 } 279 return updated; 280 } 281 282 public void startup(UncaughtExceptionHandler handler) { 283 String name = Thread.currentThread().getName(); 284 Threads.setDaemonThreadRunning(this, 285 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), 286 handler::uncaughtException); 287 } 288 289 Path getCurrentPath() { 290 return entryReader.getCurrentPath(); 291 } 292 293 long getCurrentPosition() { 294 return currentPosition; 295 } 296 297 protected boolean isActive() { 298 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 299 } 300 301 protected final void setWorkerState(WorkerState state) { 302 this.state = state; 303 } 304 305 void stopWorker() { 306 setWorkerState(WorkerState.STOPPED); 307 } 308 309 public boolean isFinished() { 310 return state == WorkerState.FINISHED; 311 } 312 313 /** 314 * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case 315 * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't 316 * manage to ship those because the replication source is being terminated. In that case, it 317 * iterates through the batched entries and decrease the pending entries size from 318 * <code>ReplicationSourceManager.totalBufferUser</code> 319 * <p/> 320 * <b>NOTES</b> 1) This method should only be called upon replication source termination. It 321 * blocks waiting for both shipper and reader threads termination, to make sure no race conditions 322 * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b> 323 * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered 324 * interruption/termination prior to calling this method. 325 */ 326 void clearWALEntryBatch() { 327 long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout; 328 while (this.isAlive() || this.entryReader.isAlive()) { 329 try { 330 if (EnvironmentEdgeManager.currentTime() >= timeout) { 331 LOG.warn( 332 "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper " 333 + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", 334 this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); 335 return; 336 } else { 337 // Wait both shipper and reader threads to stop 338 Thread.sleep(this.sleepForRetries); 339 } 340 } catch (InterruptedException e) { 341 LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " 342 + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); 343 return; 344 } 345 } 346 long totalReleasedBytes = 0; 347 while (true) { 348 WALEntryBatch batch = entryReader.entryBatchQueue.poll(); 349 if (batch == null) { 350 break; 351 } 352 totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 353 } 354 if (LOG.isTraceEnabled()) { 355 LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", 356 totalReleasedBytes); 357 } 358 } 359}