001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.wal.WALEdit; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 041 042/** 043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 044 * ReplicationSourceWALReaderThread 045 */ 046@InterfaceAudience.Private 047public class ReplicationSourceShipper extends Thread { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 049 050 // Hold the state of a replication worker thread 051 public enum WorkerState { 052 RUNNING, 053 STOPPED, 054 FINISHED, // The worker is done processing a recovered queue 055 } 056 057 private final Configuration conf; 058 protected final String walGroupId; 059 protected final PriorityBlockingQueue<Path> queue; 060 private final ReplicationSourceInterface source; 061 062 // Last position in the log that we sent to ZooKeeper 063 // It will be accessed by the stats thread so make it volatile 064 private volatile long currentPosition = -1; 065 // Path of the current log 066 private Path currentPath; 067 // Current state of the worker thread 068 private volatile WorkerState state; 069 protected ReplicationSourceWALReader entryReader; 070 071 // How long should we sleep for each retry 072 protected final long sleepForRetries; 073 // Maximum number of retries before taking bold actions 074 protected final int maxRetriesMultiplier; 075 private final int DEFAULT_TIMEOUT = 20000; 076 private final int getEntriesTimeout; 077 private final int shipEditsTimeout; 078 079 public ReplicationSourceShipper(Configuration conf, String walGroupId, 080 PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { 081 this.conf = conf; 082 this.walGroupId = walGroupId; 083 this.queue = queue; 084 this.source = source; 085 this.sleepForRetries = 086 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 087 this.maxRetriesMultiplier = 088 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 089 this.getEntriesTimeout = 090 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds 091 this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 092 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); 093 } 094 095 @Override 096 public final void run() { 097 setWorkerState(WorkerState.RUNNING); 098 // Loop until we close down 099 while (isActive()) { 100 // Sleep until replication is enabled again 101 if (!source.isPeerEnabled()) { 102 // The peer enabled check is in memory, not expensive, so do not need to increase the 103 // sleep interval as it may cause a long lag when we enable the peer. 104 sleepForRetries("Replication is disabled", 1); 105 continue; 106 } 107 try { 108 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 109 if (entryBatch == null) { 110 // since there is no logs need to replicate, we refresh the ageOfLastShippedOp 111 source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 112 walGroupId); 113 continue; 114 } 115 // the NO_MORE_DATA instance has no path so do not call shipEdits 116 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 117 noMoreData(); 118 } else { 119 shipEdits(entryBatch); 120 } 121 } catch (InterruptedException e) { 122 LOG.trace("Interrupted while waiting for next replication entry batch", e); 123 Thread.currentThread().interrupt(); 124 } 125 } 126 // If the worker exits run loop without finishing its task, mark it as stopped. 127 if (!isFinished()) { 128 setWorkerState(WorkerState.STOPPED); 129 } else { 130 postFinish(); 131 } 132 } 133 134 // To be implemented by recovered shipper 135 protected void noMoreData() { 136 } 137 138 // To be implemented by recovered shipper 139 protected void postFinish() { 140 } 141 142 /** 143 * get batchEntry size excludes bulk load file sizes. 144 * Uses ReplicationSourceWALReader's static method. 145 */ 146 private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { 147 int totalSize = 0; 148 for(Entry entry : entryBatch.getWalEntries()) { 149 totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry); 150 } 151 return totalSize; 152 } 153 154 /** 155 * Do the shipping logic 156 */ 157 private void shipEdits(WALEntryBatch entryBatch) { 158 List<Entry> entries = entryBatch.getWalEntries(); 159 int sleepMultiplier = 0; 160 if (entries.isEmpty()) { 161 if (updateLogPosition(entryBatch)) { 162 // if there was nothing to ship and it's not an error 163 // set "ageOfLastShippedOp" to <now> to indicate that we're current 164 source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 165 walGroupId); 166 } 167 return; 168 } 169 int currentSize = (int) entryBatch.getHeapSize(); 170 int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); 171 while (isActive()) { 172 try { 173 try { 174 source.tryThrottle(currentSize); 175 } catch (InterruptedException e) { 176 LOG.debug("Interrupted while sleeping for throttling control"); 177 Thread.currentThread().interrupt(); 178 // current thread might be interrupted to terminate 179 // directly go back to while() for confirm this 180 continue; 181 } 182 183 // create replicateContext here, so the entries can be GC'd upon return from this call 184 // stack 185 ReplicationEndpoint.ReplicateContext replicateContext = 186 new ReplicationEndpoint.ReplicateContext(); 187 replicateContext.setEntries(entries).setSize(currentSize); 188 replicateContext.setWalGroupId(walGroupId); 189 replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); 190 191 long startTimeNs = System.nanoTime(); 192 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 193 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 194 long endTimeNs = System.nanoTime(); 195 196 if (!replicated) { 197 continue; 198 } else { 199 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 200 } 201 // Clean up hfile references 202 for (Entry entry : entries) { 203 cleanUpHFileRefs(entry.getEdit()); 204 205 TableName tableName = entry.getKey().getTableName(); 206 source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), 207 tableName.getNameAsString()); 208 } 209 // Log and clean up WAL logs 210 updateLogPosition(entryBatch); 211 212 //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 213 //this sizeExcludeBulkLoad has to use same calculation that when calling 214 //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain 215 //same variable: totalBufferUsed 216 source.postShipEdits(entries, sizeExcludeBulkLoad); 217 // FIXME check relationship between wal group and overall 218 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 219 entryBatch.getNbHFiles()); 220 source.getSourceMetrics().setAgeOfLastShippedOp( 221 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 222 if (LOG.isTraceEnabled()) { 223 LOG.trace("Replicated {} entries or {} operations in {} ms", 224 entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 225 } 226 break; 227 } catch (Exception ex) { 228 LOG.warn("{} threw unknown exception:", 229 source.getReplicationEndpoint().getClass().getName(), ex); 230 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 231 sleepMultiplier++; 232 } 233 } 234 } 235 } 236 237 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 238 String peerId = source.getPeerId(); 239 if (peerId.contains("-")) { 240 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 241 // A peerId will not have "-" in its name, see HBASE-11394 242 peerId = peerId.split("-")[0]; 243 } 244 List<Cell> cells = edit.getCells(); 245 int totalCells = cells.size(); 246 for (int i = 0; i < totalCells; i++) { 247 Cell cell = cells.get(i); 248 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 249 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 250 List<StoreDescriptor> stores = bld.getStoresList(); 251 int totalStores = stores.size(); 252 for (int j = 0; j < totalStores; j++) { 253 List<String> storeFileList = stores.get(j).getStoreFileList(); 254 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 255 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 256 } 257 } 258 } 259 } 260 261 private boolean updateLogPosition(WALEntryBatch batch) { 262 boolean updated = false; 263 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 264 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 265 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 266 // position and the file will be removed soon in cleanOldLogs. 267 if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || 268 batch.getLastWalPosition() != currentPosition) { 269 source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), 270 source.isRecovered(), batch); 271 updated = true; 272 } 273 // if end of file is true, then we can just skip to the next file in queue. 274 // the only exception is for recovered queue, if we reach the end of the queue, then there will 275 // no more files so here the currentPath may be null. 276 if (batch.isEndOfFile()) { 277 currentPath = entryReader.getCurrentPath(); 278 currentPosition = 0L; 279 } else { 280 currentPath = batch.getLastWalPath(); 281 currentPosition = batch.getLastWalPosition(); 282 } 283 return updated; 284 } 285 286 public void startup(UncaughtExceptionHandler handler) { 287 String name = Thread.currentThread().getName(); 288 Threads.setDaemonThreadRunning(this, 289 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); 290 } 291 292 Path getCurrentPath() { 293 return entryReader.getCurrentPath(); 294 } 295 296 long getCurrentPosition() { 297 return currentPosition; 298 } 299 300 void setWALReader(ReplicationSourceWALReader entryReader) { 301 this.entryReader = entryReader; 302 } 303 304 long getStartPosition() { 305 return 0; 306 } 307 308 private boolean isActive() { 309 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 310 } 311 312 protected final void setWorkerState(WorkerState state) { 313 this.state = state; 314 } 315 316 void stopWorker() { 317 setWorkerState(WorkerState.STOPPED); 318 } 319 320 public boolean isFinished() { 321 return state == WorkerState.FINISHED; 322 } 323 324 /** 325 * Do the sleeping logic 326 * @param msg Why we sleep 327 * @param sleepMultiplier by how many times the default sleeping time is augmented 328 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 329 */ 330 public boolean sleepForRetries(String msg, int sleepMultiplier) { 331 try { 332 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 333 Thread.sleep(this.sleepForRetries * sleepMultiplier); 334 } catch (InterruptedException e) { 335 LOG.debug("Interrupted while sleeping between retries"); 336 Thread.currentThread().interrupt(); 337 } 338 return sleepMultiplier < maxRetriesMultiplier; 339 } 340}