001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.PriorityBlockingQueue; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hadoop.hbase.wal.WAL.Entry; 032import org.apache.hadoop.hbase.wal.WALEdit; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 039 040/** 041 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 042 * ReplicationSourceWALReaderThread 043 */ 044@InterfaceAudience.Private 045public class ReplicationSourceShipper extends Thread { 046 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 047 048 // Hold the state of a replication worker thread 049 public enum WorkerState { 050 RUNNING, 051 STOPPED, 052 FINISHED, // The worker is done processing a recovered queue 053 } 054 055 private final Configuration conf; 056 protected final String walGroupId; 057 protected final PriorityBlockingQueue<Path> queue; 058 private final ReplicationSourceInterface source; 059 060 // Last position in the log that we sent to ZooKeeper 061 // It will be accessed by the stats thread so make it volatile 062 private volatile long currentPosition = -1; 063 // Path of the current log 064 private Path currentPath; 065 // Current state of the worker thread 066 private volatile WorkerState state; 067 protected ReplicationSourceWALReader entryReader; 068 069 // How long should we sleep for each retry 070 protected final long sleepForRetries; 071 // Maximum number of retries before taking bold actions 072 protected final int maxRetriesMultiplier; 073 private final int DEFAULT_TIMEOUT = 20000; 074 private final int getEntriesTimeout; 075 076 public ReplicationSourceShipper(Configuration conf, String walGroupId, 077 PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { 078 this.conf = conf; 079 this.walGroupId = walGroupId; 080 this.queue = queue; 081 this.source = source; 082 this.sleepForRetries = 083 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 084 this.maxRetriesMultiplier = 085 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 086 this.getEntriesTimeout = 087 this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds 088 } 089 090 @Override 091 public final void run() { 092 setWorkerState(WorkerState.RUNNING); 093 // Loop until we close down 094 while (isActive()) { 095 // Sleep until replication is enabled again 096 if (!source.isPeerEnabled()) { 097 // The peer enabled check is in memory, not expensive, so do not need to increase the 098 // sleep interval as it may cause a long lag when we enable the peer. 099 sleepForRetries("Replication is disabled", 1); 100 continue; 101 } 102 try { 103 WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); 104 if (entryBatch == null) { 105 // since there is no logs need to replicate, we refresh the ageOfLastShippedOp 106 source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 107 walGroupId); 108 continue; 109 } 110 // the NO_MORE_DATA instance has no path so do not call shipEdits 111 if (entryBatch == WALEntryBatch.NO_MORE_DATA) { 112 noMoreData(); 113 } else { 114 shipEdits(entryBatch); 115 } 116 } catch (InterruptedException e) { 117 LOG.trace("Interrupted while waiting for next replication entry batch", e); 118 Thread.currentThread().interrupt(); 119 } 120 } 121 // If the worker exits run loop without finishing its task, mark it as stopped. 122 if (!isFinished()) { 123 setWorkerState(WorkerState.STOPPED); 124 } else { 125 postFinish(); 126 } 127 } 128 129 // To be implemented by recovered shipper 130 protected void noMoreData() { 131 } 132 133 // To be implemented by recovered shipper 134 protected void postFinish() { 135 } 136 137 /** 138 * get batchEntry size excludes bulk load file sizes. 139 * Uses ReplicationSourceWALReader's static method. 140 */ 141 private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { 142 int totalSize = 0; 143 for(Entry entry : entryBatch.getWalEntries()) { 144 totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry); 145 } 146 return totalSize; 147 } 148 149 /** 150 * Do the shipping logic 151 */ 152 private void shipEdits(WALEntryBatch entryBatch) { 153 List<Entry> entries = entryBatch.getWalEntries(); 154 int sleepMultiplier = 0; 155 if (entries.isEmpty()) { 156 if (updateLogPosition(entryBatch)) { 157 // if there was nothing to ship and it's not an error 158 // set "ageOfLastShippedOp" to <now> to indicate that we're current 159 source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 160 walGroupId); 161 } 162 return; 163 } 164 int currentSize = (int) entryBatch.getHeapSize(); 165 int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); 166 while (isActive()) { 167 try { 168 try { 169 source.tryThrottle(currentSize); 170 } catch (InterruptedException e) { 171 LOG.debug("Interrupted while sleeping for throttling control"); 172 Thread.currentThread().interrupt(); 173 // current thread might be interrupted to terminate 174 // directly go back to while() for confirm this 175 continue; 176 } 177 178 // create replicateContext here, so the entries can be GC'd upon return from this call 179 // stack 180 ReplicationEndpoint.ReplicateContext replicateContext = 181 new ReplicationEndpoint.ReplicateContext(); 182 replicateContext.setEntries(entries).setSize(currentSize); 183 replicateContext.setWalGroupId(walGroupId); 184 185 long startTimeNs = System.nanoTime(); 186 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 187 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 188 long endTimeNs = System.nanoTime(); 189 190 if (!replicated) { 191 continue; 192 } else { 193 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 194 } 195 // Clean up hfile references 196 for (Entry entry : entries) { 197 cleanUpHFileRefs(entry.getEdit()); 198 199 TableName tableName = entry.getKey().getTableName(); 200 source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), 201 tableName.getNameAsString()); 202 } 203 // Log and clean up WAL logs 204 updateLogPosition(entryBatch); 205 206 //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) 207 //this sizeExcludeBulkLoad has to use same calculation that when calling 208 //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain 209 //same variable: totalBufferUsed 210 source.postShipEdits(entries, sizeExcludeBulkLoad); 211 // FIXME check relationship between wal group and overall 212 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 213 entryBatch.getNbHFiles()); 214 source.getSourceMetrics().setAgeOfLastShippedOp( 215 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 216 if (LOG.isTraceEnabled()) { 217 LOG.trace("Replicated {} entries or {} operations in {} ms", 218 entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); 219 } 220 break; 221 } catch (Exception ex) { 222 LOG.warn("{} threw unknown exception:", 223 source.getReplicationEndpoint().getClass().getName(), ex); 224 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 225 sleepMultiplier++; 226 } 227 } 228 } 229 } 230 231 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 232 String peerId = source.getPeerId(); 233 if (peerId.contains("-")) { 234 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 235 // A peerId will not have "-" in its name, see HBASE-11394 236 peerId = peerId.split("-")[0]; 237 } 238 List<Cell> cells = edit.getCells(); 239 int totalCells = cells.size(); 240 for (int i = 0; i < totalCells; i++) { 241 Cell cell = cells.get(i); 242 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 243 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 244 List<StoreDescriptor> stores = bld.getStoresList(); 245 int totalStores = stores.size(); 246 for (int j = 0; j < totalStores; j++) { 247 List<String> storeFileList = stores.get(j).getStoreFileList(); 248 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 249 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 250 } 251 } 252 } 253 } 254 255 private boolean updateLogPosition(WALEntryBatch batch) { 256 boolean updated = false; 257 // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file 258 // record on zk, so let's call it. The last wal position maybe zero if end of file is true and 259 // there is no entry in the batch. It is OK because that the queue storage will ignore the zero 260 // position and the file will be removed soon in cleanOldLogs. 261 if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || 262 batch.getLastWalPosition() != currentPosition) { 263 source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), 264 source.isRecovered(), batch); 265 updated = true; 266 } 267 // if end of file is true, then we can just skip to the next file in queue. 268 // the only exception is for recovered queue, if we reach the end of the queue, then there will 269 // no more files so here the currentPath may be null. 270 if (batch.isEndOfFile()) { 271 currentPath = entryReader.getCurrentPath(); 272 currentPosition = 0L; 273 } else { 274 currentPath = batch.getLastWalPath(); 275 currentPosition = batch.getLastWalPosition(); 276 } 277 return updated; 278 } 279 280 public void startup(UncaughtExceptionHandler handler) { 281 String name = Thread.currentThread().getName(); 282 Threads.setDaemonThreadRunning(this, 283 name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); 284 } 285 286 Path getCurrentPath() { 287 return entryReader.getCurrentPath(); 288 } 289 290 long getCurrentPosition() { 291 return currentPosition; 292 } 293 294 void setWALReader(ReplicationSourceWALReader entryReader) { 295 this.entryReader = entryReader; 296 } 297 298 long getStartPosition() { 299 return 0; 300 } 301 302 private boolean isActive() { 303 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 304 } 305 306 protected final void setWorkerState(WorkerState state) { 307 this.state = state; 308 } 309 310 void stopWorker() { 311 setWorkerState(WorkerState.STOPPED); 312 } 313 314 public boolean isFinished() { 315 return state == WorkerState.FINISHED; 316 } 317 318 /** 319 * Do the sleeping logic 320 * @param msg Why we sleep 321 * @param sleepMultiplier by how many times the default sleeping time is augmented 322 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 323 */ 324 public boolean sleepForRetries(String msg, int sleepMultiplier) { 325 try { 326 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 327 Thread.sleep(this.sleepForRetries * sleepMultiplier); 328 } catch (InterruptedException e) { 329 LOG.debug("Interrupted while sleeping between retries"); 330 Thread.currentThread().interrupt(); 331 } 332 return sleepMultiplier < maxRetriesMultiplier; 333 } 334}