001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hadoop.hbase.wal.WAL.Entry; 032import org.apache.hadoop.hbase.wal.WALEdit; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 038 039/** 040 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 041 * ReplicationSourceWALReaderThread 042 */ 043@InterfaceAudience.Private 044public class ReplicationSourceShipper extends Thread { 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 046 047 // Hold the state of a replication worker thread 048 public enum WorkerState { 049 RUNNING, 050 STOPPED, 051 FINISHED, // The worker is done processing a recovered queue 052 } 053 054 private final Configuration conf; 055 protected final String walGroupId; 056 protected final PriorityBlockingQueue<Path> queue; 057 private final ReplicationSourceInterface source; 058 059 // Last position in the log that we sent to ZooKeeper 060 // It will be accessed by the stats thread so make it volatile 061 private volatile long currentPosition = -1; 062 // Path of the current log 063 private Path currentPath; 064 // Current state of the worker thread 065 private volatile WorkerState state; 066 protected ReplicationSourceWALReader entryReader; 067 068 // How long should we sleep for each retry 069 protected final long sleepForRetries; 070 // Maximum number of retries before taking bold actions 071 protected final int maxRetriesMultiplier; 072 private final int DEFAULT_TIMEOUT = 20000; 073 private final int getEntriesTimeout; 074 private final int shipEditsTimeout; 075 076 public ReplicationSourceShipper(Configuration conf, String walGroupId, 077 PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { 078 this.conf = conf; 079 this.walGroupId = walGroupId; 080 this.queue = queue; 081 this.source = source; 082 this.sleepForRetries = 083 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 084 this.maxRetriesMultiplier = 085 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 086 this.getEntriesTimeout = 087 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds 088 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 089 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 090 } 091 092 @Override 093 public final void run() { 094 setWorkerState(WorkerState.RUNNING); 095 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 096 // Loop until we close down 097 while (isActive()) { 098 // Sleep until replication is enabled again 099 if (!source.isPeerEnabled()) { 100 // The peer enabled check is in memory, not expensive, so do not need to increase the 101 // sleep interval as it may cause a long lag when we enable the peer. 102 sleepForRetries("Replication is disabled", 1); 103 continue; 104 } 105 try { 106 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 107 LOG.debug("Shipper from source {} got entry batch from reader: {}", 108 source.getQueueId(), entryBatch); 109 if (entryBatch == null) { 110 continue; 111 } 112 // the NO_MORE_DATA instance has no path so do not call shipEdits 113 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 114 noMoreData(); 115 } else { 116 shipEdits(entryBatch); 117 } 118 } catch (InterruptedException | ReplicationRuntimeException e) { 119 // It is interrupted and needs to quit. 120 LOG.warn("Interrupted while waiting for next replication entry batch", e); 121 Thread.currentThread().interrupt(); 122 } 123 } 124 // If the worker exits run loop without finishing its task, mark it as stopped. 125 if (!isFinished()) { 126 setWorkerState(WorkerState.STOPPED); 127 } else { 128 postFinish(); 129 } 130 } 131 132 // To be implemented by recovered shipper 133 protected void noMoreData() { 134 } 135 136 // To be implemented by recovered shipper 137 protected void postFinish() { 138 } 139 140 /** 141 * get batchEntry size excludes bulk load file sizes. 142 * Uses ReplicationSourceWALReader's static method. 143 */ 144 private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { 145 int totalSize = 0; 146 for(Entry entry : entryBatch.getWalEntries()) { 147 totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry); 148 } 149 return totalSize; 150 } 151 152 /** 153 * Do the shipping logic 154 */ 155 private void shipEdits(WALEntryBatch entryBatch) { 156 List<Entry> entries = entryBatch.getWalEntries(); 157 int sleepMultiplier = 0; 158 if (entries.isEmpty()) { 159 updateLogPosition(entryBatch); 160 return; 161 } 162 int currentSize = (int) entryBatch.getHeapSize(); 163 int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); 164 source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1) 165 .getKey().getWriteTime()); 166 while (isActive()) { 167 try { 168 try { 169 source.tryThrottle(currentSize); 170 } catch (InterruptedException e) { 171 LOG.debug("Interrupted while sleeping for throttling control"); 172 Thread.currentThread().interrupt(); 173 // current thread might be interrupted to terminate 174 // directly go back to while() for confirm this 175 continue; 176 } 177 // create replicateContext here, so the entries can be GC'd upon return from this call 178 // stack 179 ReplicationEndpoint.ReplicateContext replicateContext = 180 new ReplicationEndpoint.ReplicateContext(); 181 replicateContext.setEntries(entries).setSize(currentSize); 182 replicateContext.setWalGroupId(walGroupId); 183 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 184 185 long startTimeNs = System.nanoTime(); 186 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 187 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 188 long endTimeNs = System.nanoTime(); 189 190 if (!replicated) { 191 continue; 192 } else { 193 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 194 } 195 // Clean up hfile references 196 for (Entry entry : entries) { 197 cleanUpHFileRefs(entry.getEdit()); 198 LOG.trace("shipped entry {}: ", entry); 199 } 200 // Log and clean up WAL logs 201 updateLogPosition(entryBatch); 202 203 //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 204 //this sizeExcludeBulkLoad has to use same calculation that when calling 205 //acquireBufferQuota() in ReplicationSourceWALReader because they maintain 206 //same variable: totalBufferUsed 207 source.postShipEdits(entries, sizeExcludeBulkLoad); 208 // FIXME check relationship between wal group and overall 209 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 210 entryBatch.getNbHFiles()); 211 source.getSourceMetrics().setAgeOfLastShippedOp( 212 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 213 source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize()); 214 215 if (LOG.isTraceEnabled()) { 216 LOG.debug("Replicated {} entries or {} operations in {} ms", 217 entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 218 } 219 break; 220 } catch (Exception ex) { 221 LOG.warn("{} threw unknown exception:", 222 source.getReplicationEndpoint().getClass().getName(), ex); 223 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 224 sleepMultiplier++; 225 } 226 } 227 } 228 } 229 230 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 231 String peerId = source.getPeerId(); 232 if (peerId.contains("-")) { 233 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 234 // A peerId will not have "-" in its name, see HBASE-11394 235 peerId = peerId.split("-")[0]; 236 } 237 List<Cell> cells = edit.getCells(); 238 int totalCells = cells.size(); 239 for (int i = 0; i < totalCells; i++) { 240 Cell cell = cells.get(i); 241 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 242 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 243 List<StoreDescriptor> stores = bld.getStoresList(); 244 int totalStores = stores.size(); 245 for (int j = 0; j < totalStores; j++) { 246 List<String> storeFileList = stores.get(j).getStoreFileList(); 247 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 248 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 249 } 250 } 251 } 252 } 253 254 private boolean updateLogPosition(WALEntryBatch batch) { 255 boolean updated = false; 256 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 257 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 258 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 259 // position and the file will be removed soon in cleanOldLogs. 260 if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || 261 batch.getLastWalPosition() != currentPosition) { 262 source.logPositionAndCleanOldLogs(batch); 263 updated = true; 264 } 265 // if end of file is true, then we can just skip to the next file in queue. 266 // the only exception is for recovered queue, if we reach the end of the queue, then there will 267 // no more files so here the currentPath may be null. 268 if (batch.isEndOfFile()) { 269 currentPath = entryReader.getCurrentPath(); 270 currentPosition = 0L; 271 } else { 272 currentPath = batch.getLastWalPath(); 273 currentPosition = batch.getLastWalPosition(); 274 } 275 return updated; 276 } 277 278 public void startup(UncaughtExceptionHandler handler) { 279 String name = Thread.currentThread().getName(); 280 Threads.setDaemonThreadRunning(this, 281 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), 282 handler::uncaughtException); 283 } 284 285 Path getCurrentPath() { 286 return entryReader.getCurrentPath(); 287 } 288 289 long getCurrentPosition() { 290 return currentPosition; 291 } 292 293 void setWALReader(ReplicationSourceWALReader entryReader) { 294 this.entryReader = entryReader; 295 } 296 297 long getStartPosition() { 298 return 0; 299 } 300 301 protected boolean isActive() { 302 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 303 } 304 305 protected final void setWorkerState(WorkerState state) { 306 this.state = state; 307 } 308 309 void stopWorker() { 310 setWorkerState(WorkerState.STOPPED); 311 } 312 313 public boolean isFinished() { 314 return state == WorkerState.FINISHED; 315 } 316 317 /** 318 * Do the sleeping logic 319 * @param msg Why we sleep 320 * @param sleepMultiplier by how many times the default sleeping time is augmented 321 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 322 */ 323 public boolean sleepForRetries(String msg, int sleepMultiplier) { 324 try { 325 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 326 Thread.sleep(this.sleepForRetries * sleepMultiplier); 327 } catch (InterruptedException e) { 328 LOG.debug("Interrupted while sleeping between retries"); 329 Thread.currentThread().interrupt(); 330 } 331 return sleepMultiplier < maxRetriesMultiplier; 332 } 333}