001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.hadoop.hbase.wal.WAL.Entry; 033import org.apache.hadoop.hbase.wal.WALEdit; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 040 041/** 042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 043 * ReplicationSourceWALReaderThread 044 */ 045@InterfaceAudience.Private 046public class ReplicationSourceShipper extends Thread { 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 048 049 // Hold the state of a replication worker thread 050 public enum WorkerState { 051 RUNNING, 052 STOPPED, 053 FINISHED, // The worker is done processing a recovered queue 054 } 055 056 private final Configuration conf; 057 protected final String walGroupId; 058 protected final PriorityBlockingQueue<Path> queue; 059 private final ReplicationSourceInterface source; 060 061 // Last position in the log that we sent to ZooKeeper 062 // It will be accessed by the stats thread so make it volatile 063 private volatile long currentPosition = -1; 064 // Path of the current log 065 private Path currentPath; 066 // Current state of the worker thread 067 private volatile WorkerState state; 068 protected ReplicationSourceWALReader entryReader; 069 070 // How long should we sleep for each retry 071 protected final long sleepForRetries; 072 // Maximum number of retries before taking bold actions 073 protected final int maxRetriesMultiplier; 074 private final int DEFAULT_TIMEOUT = 20000; 075 private final int getEntriesTimeout; 076 private final int shipEditsTimeout; 077 078 public ReplicationSourceShipper(Configuration conf, String walGroupId, 079 PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { 080 this.conf = conf; 081 this.walGroupId = walGroupId; 082 this.queue = queue; 083 this.source = source; 084 this.sleepForRetries = 085 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 086 this.maxRetriesMultiplier = 087 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 088 this.getEntriesTimeout = 089 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds 090 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 091 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 092 } 093 094 @Override 095 public final void run() { 096 setWorkerState(WorkerState.RUNNING); 097 LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); 098 // Loop until we close down 099 while (isActive()) { 100 // Sleep until replication is enabled again 101 if (!source.isPeerEnabled()) { 102 // The peer enabled check is in memory, not expensive, so do not need to increase the 103 // sleep interval as it may cause a long lag when we enable the peer. 104 sleepForRetries("Replication is disabled", 1); 105 continue; 106 } 107 try { 108 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 109 LOG.debug("Shipper from source {} got entry batch from reader: {}", 110 source.getQueueId(), entryBatch); 111 if (entryBatch == null) { 112 continue; 113 } 114 // the NO_MORE_DATA instance has no path so do not call shipEdits 115 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 116 noMoreData(); 117 } else { 118 shipEdits(entryBatch); 119 } 120 } catch (InterruptedException | ReplicationRuntimeException e) { 121 // It is interrupted and needs to quit. 122 LOG.warn("Interrupted while waiting for next replication entry batch", e); 123 Thread.currentThread().interrupt(); 124 } 125 } 126 // If the worker exits run loop without finishing its task, mark it as stopped. 127 if (!isFinished()) { 128 setWorkerState(WorkerState.STOPPED); 129 } else { 130 postFinish(); 131 } 132 } 133 134 // To be implemented by recovered shipper 135 protected void noMoreData() { 136 } 137 138 // To be implemented by recovered shipper 139 protected void postFinish() { 140 } 141 142 /** 143 * get batchEntry size excludes bulk load file sizes. 144 * Uses ReplicationSourceWALReader's static method. 145 */ 146 private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { 147 int totalSize = 0; 148 for(Entry entry : entryBatch.getWalEntries()) { 149 totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry); 150 } 151 return totalSize; 152 } 153 154 /** 155 * Do the shipping logic 156 */ 157 private void shipEdits(WALEntryBatch entryBatch) { 158 List<Entry> entries = entryBatch.getWalEntries(); 159 int sleepMultiplier = 0; 160 if (entries.isEmpty()) { 161 updateLogPosition(entryBatch); 162 return; 163 } 164 int currentSize = (int) entryBatch.getHeapSize(); 165 int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); 166 source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1) 167 .getKey().getWriteTime()); 168 while (isActive()) { 169 try { 170 try { 171 source.tryThrottle(currentSize); 172 } catch (InterruptedException e) { 173 LOG.debug("Interrupted while sleeping for throttling control"); 174 Thread.currentThread().interrupt(); 175 // current thread might be interrupted to terminate 176 // directly go back to while() for confirm this 177 continue; 178 } 179 // create replicateContext here, so the entries can be GC'd upon return from this call 180 // stack 181 ReplicationEndpoint.ReplicateContext replicateContext = 182 new ReplicationEndpoint.ReplicateContext(); 183 replicateContext.setEntries(entries).setSize(currentSize); 184 replicateContext.setWalGroupId(walGroupId); 185 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 186 187 long startTimeNs = System.nanoTime(); 188 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 189 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 190 long endTimeNs = System.nanoTime(); 191 192 if (!replicated) { 193 continue; 194 } else { 195 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 196 } 197 // Clean up hfile references 198 for (Entry entry : entries) { 199 cleanUpHFileRefs(entry.getEdit()); 200 LOG.trace("shipped entry {}: ", entry); 201 TableName tableName = entry.getKey().getTableName(); 202 source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), 203 tableName.getNameAsString()); 204 } 205 // Log and clean up WAL logs 206 updateLogPosition(entryBatch); 207 208 //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 209 //this sizeExcludeBulkLoad has to use same calculation that when calling 210 //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain 211 //same variable: totalBufferUsed 212 source.postShipEdits(entries, sizeExcludeBulkLoad); 213 // FIXME check relationship between wal group and overall 214 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 215 entryBatch.getNbHFiles()); 216 source.getSourceMetrics().setAgeOfLastShippedOp( 217 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 218 if (LOG.isTraceEnabled()) { 219 LOG.debug("Replicated {} entries or {} operations in {} ms", 220 entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 221 } 222 break; 223 } catch (Exception ex) { 224 LOG.warn("{} threw unknown exception:", 225 source.getReplicationEndpoint().getClass().getName(), ex); 226 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 227 sleepMultiplier++; 228 } 229 } 230 } 231 } 232 233 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 234 String peerId = source.getPeerId(); 235 if (peerId.contains("-")) { 236 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 237 // A peerId will not have "-" in its name, see HBASE-11394 238 peerId = peerId.split("-")[0]; 239 } 240 List<Cell> cells = edit.getCells(); 241 int totalCells = cells.size(); 242 for (int i = 0; i < totalCells; i++) { 243 Cell cell = cells.get(i); 244 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 245 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 246 List<StoreDescriptor> stores = bld.getStoresList(); 247 int totalStores = stores.size(); 248 for (int j = 0; j < totalStores; j++) { 249 List<String> storeFileList = stores.get(j).getStoreFileList(); 250 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 251 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 252 } 253 } 254 } 255 } 256 257 private boolean updateLogPosition(WALEntryBatch batch) { 258 boolean updated = false; 259 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 260 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 261 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 262 // position and the file will be removed soon in cleanOldLogs. 263 if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || 264 batch.getLastWalPosition() != currentPosition) { 265 source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), 266 source.isRecovered(), batch); 267 updated = true; 268 } 269 // if end of file is true, then we can just skip to the next file in queue. 270 // the only exception is for recovered queue, if we reach the end of the queue, then there will 271 // no more files so here the currentPath may be null. 272 if (batch.isEndOfFile()) { 273 currentPath = entryReader.getCurrentPath(); 274 currentPosition = 0L; 275 } else { 276 currentPath = batch.getLastWalPath(); 277 currentPosition = batch.getLastWalPosition(); 278 } 279 return updated; 280 } 281 282 public void startup(UncaughtExceptionHandler handler) { 283 String name = Thread.currentThread().getName(); 284 Threads.setDaemonThreadRunning(this, 285 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); 286 } 287 288 Path getCurrentPath() { 289 return entryReader.getCurrentPath(); 290 } 291 292 long getCurrentPosition() { 293 return currentPosition; 294 } 295 296 void setWALReader(ReplicationSourceWALReader entryReader) { 297 this.entryReader = entryReader; 298 } 299 300 long getStartPosition() { 301 return 0; 302 } 303 304 protected boolean isActive() { 305 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 306 } 307 308 protected final void setWorkerState(WorkerState state) { 309 this.state = state; 310 } 311 312 void stopWorker() { 313 setWorkerState(WorkerState.STOPPED); 314 } 315 316 public boolean isFinished() { 317 return state == WorkerState.FINISHED; 318 } 319 320 /** 321 * Do the sleeping logic 322 * @param msg Why we sleep 323 * @param sleepMultiplier by how many times the default sleeping time is augmented 324 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 325 */ 326 public boolean sleepForRetries(String msg, int sleepMultiplier) { 327 try { 328 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 329 Thread.sleep(this.sleepForRetries * sleepMultiplier); 330 } catch (InterruptedException e) { 331 LOG.debug("Interrupted while sleeping between retries"); 332 Thread.currentThread().interrupt(); 333 } 334 return sleepMultiplier < maxRetriesMultiplier; 335 } 336}