001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 029import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.hadoop.hbase.wal.WAL.Entry; 033import org.apache.hadoop.hbase.wal.WALEdit; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 040 041/** 042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by 043 * ReplicationSourceWALReaderThread 044 */ 045@InterfaceAudience.Private 046public class ReplicationSourceShipper extends Thread { 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class); 048 049 // Hold the state of a replication worker thread 050 public enum WorkerState { 051 RUNNING, 052 STOPPED, 053 FINISHED, // The worker is done processing a recovered queue 054 } 055 056 protected final Configuration conf; 057 protected final String walGroupId; 058 protected final PriorityBlockingQueue<Path> queue; 059 protected final ReplicationSourceInterface source; 060 061 // Last position in the log that we sent to ZooKeeper 062 protected long lastLoggedPosition = -1; 063 // Path of the current log 064 protected volatile Path currentPath; 065 // Current state of the worker thread 066 private WorkerState state; 067 protected ReplicationSourceWALReader entryReader; 068 069 // How long should we sleep for each retry 070 protected final long sleepForRetries; 071 // Maximum number of retries before taking bold actions 072 protected final int maxRetriesMultiplier; 073 074 public ReplicationSourceShipper(Configuration conf, String walGroupId, 075 PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { 076 this.conf = conf; 077 this.walGroupId = walGroupId; 078 this.queue = queue; 079 this.source = source; 080 this.sleepForRetries = 081 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 082 this.maxRetriesMultiplier = 083 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 084 } 085 086 @Override 087 public void run() { 088 setWorkerState(WorkerState.RUNNING); 089 // Loop until we close down 090 while (isActive()) { 091 int sleepMultiplier = 1; 092 // Sleep until replication is enabled again 093 if (!source.isPeerEnabled()) { 094 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 095 sleepMultiplier++; 096 } 097 continue; 098 } 099 100 while (entryReader == null) { 101 if (sleepForRetries("Replication WAL entry reader thread not initialized", 102 sleepMultiplier)) { 103 sleepMultiplier++; 104 } 105 } 106 107 try { 108 WALEntryBatch entryBatch = entryReader.take(); 109 shipEdits(entryBatch); 110 } catch (InterruptedException e) { 111 LOG.trace("Interrupted while waiting for next replication entry batch", e); 112 Thread.currentThread().interrupt(); 113 } 114 } 115 // If the worker exits run loop without finishing its task, mark it as stopped. 116 if (state != WorkerState.FINISHED) { 117 setWorkerState(WorkerState.STOPPED); 118 } 119 } 120 121 /** 122 * Do the shipping logic 123 */ 124 protected void shipEdits(WALEntryBatch entryBatch) { 125 List<Entry> entries = entryBatch.getWalEntries(); 126 long lastReadPosition = entryBatch.getLastWalPosition(); 127 currentPath = entryBatch.getLastWalPath(); 128 int sleepMultiplier = 0; 129 if (entries.isEmpty()) { 130 if (lastLoggedPosition != lastReadPosition) { 131 updateLogPosition(lastReadPosition); 132 // if there was nothing to ship and it's not an error 133 // set "ageOfLastShippedOp" to <now> to indicate that we're current 134 source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 135 walGroupId); 136 } 137 return; 138 } 139 int currentSize = (int) entryBatch.getHeapSize(); 140 while (isActive()) { 141 try { 142 try { 143 source.tryThrottle(currentSize); 144 } catch (InterruptedException e) { 145 LOG.debug("Interrupted while sleeping for throttling control"); 146 Thread.currentThread().interrupt(); 147 // current thread might be interrupted to terminate 148 // directly go back to while() for confirm this 149 continue; 150 } 151 152 // create replicateContext here, so the entries can be GC'd upon return from this call 153 // stack 154 ReplicationEndpoint.ReplicateContext replicateContext = 155 new ReplicationEndpoint.ReplicateContext(); 156 replicateContext.setEntries(entries).setSize(currentSize); 157 replicateContext.setWalGroupId(walGroupId); 158 159 long startTimeNs = System.nanoTime(); 160 // send the edits to the endpoint. Will block until the edits are shipped and acknowledged 161 boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); 162 long endTimeNs = System.nanoTime(); 163 164 if (!replicated) { 165 continue; 166 } else { 167 sleepMultiplier = Math.max(sleepMultiplier - 1, 0); 168 } 169 170 if (this.lastLoggedPosition != lastReadPosition) { 171 //Clean up hfile references 172 int size = entries.size(); 173 for (int i = 0; i < size; i++) { 174 cleanUpHFileRefs(entries.get(i).getEdit()); 175 } 176 //Log and clean up WAL logs 177 updateLogPosition(lastReadPosition); 178 } 179 180 source.postShipEdits(entries, currentSize); 181 // FIXME check relationship between wal group and overall 182 source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, 183 entryBatch.getNbHFiles()); 184 source.getSourceMetrics().setAgeOfLastShippedOp( 185 entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); 186 if (LOG.isTraceEnabled()) { 187 LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations() 188 + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms"); 189 } 190 break; 191 } catch (Exception ex) { 192 LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" 193 + org.apache.hadoop.util.StringUtils.stringifyException(ex)); 194 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { 195 sleepMultiplier++; 196 } 197 } 198 } 199 } 200 201 private void cleanUpHFileRefs(WALEdit edit) throws IOException { 202 String peerId = source.getPeerId(); 203 if (peerId.contains("-")) { 204 // peerClusterZnode will be in the form peerId + "-" + rsZNode. 205 // A peerId will not have "-" in its name, see HBASE-11394 206 peerId = peerId.split("-")[0]; 207 } 208 List<Cell> cells = edit.getCells(); 209 int totalCells = cells.size(); 210 for (int i = 0; i < totalCells; i++) { 211 Cell cell = cells.get(i); 212 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { 213 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); 214 List<StoreDescriptor> stores = bld.getStoresList(); 215 int totalStores = stores.size(); 216 for (int j = 0; j < totalStores; j++) { 217 List<String> storeFileList = stores.get(j).getStoreFileList(); 218 source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList); 219 source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size()); 220 } 221 } 222 } 223 } 224 225 protected void updateLogPosition(long lastReadPosition) { 226 source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), 227 lastReadPosition, false, false); 228 lastLoggedPosition = lastReadPosition; 229 } 230 231 public void startup(UncaughtExceptionHandler handler) { 232 String name = Thread.currentThread().getName(); 233 Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," 234 + source.getPeerClusterZnode(), handler); 235 } 236 237 public PriorityBlockingQueue<Path> getLogQueue() { 238 return this.queue; 239 } 240 241 public Path getCurrentPath() { 242 return this.entryReader.getCurrentPath(); 243 } 244 245 public long getCurrentPosition() { 246 return this.lastLoggedPosition; 247 } 248 249 public void setWALReader(ReplicationSourceWALReader entryReader) { 250 this.entryReader = entryReader; 251 } 252 253 public long getStartPosition() { 254 return 0; 255 } 256 257 protected boolean isActive() { 258 return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); 259 } 260 261 public void setWorkerState(WorkerState state) { 262 this.state = state; 263 } 264 265 public WorkerState getWorkerState() { 266 return state; 267 } 268 269 public void stopWorker() { 270 setWorkerState(WorkerState.STOPPED); 271 } 272 273 public boolean isFinished() { 274 return state == WorkerState.FINISHED; 275 } 276 277 /** 278 * Do the sleeping logic 279 * @param msg Why we sleep 280 * @param sleepMultiplier by how many times the default sleeping time is augmented 281 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 282 */ 283 public boolean sleepForRetries(String msg, int sleepMultiplier) { 284 try { 285 if (LOG.isTraceEnabled()) { 286 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); 287 } 288 Thread.sleep(this.sleepForRetries * sleepMultiplier); 289 } catch (InterruptedException e) { 290 LOG.debug("Interrupted while sleeping between retries"); 291 Thread.currentThread().interrupt(); 292 } 293 return sleepMultiplier < maxRetriesMultiplier; 294 } 295}