001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.PriorityBlockingQueue; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.replication.WALEntryFilter; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 047 048/** 049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue 050 * 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054public class ReplicationSourceWALReader extends Thread { 055 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 056 057 private final PriorityBlockingQueue<Path> logQueue; 058 private final FileSystem fs; 059 private final Configuration conf; 060 private final WALEntryFilter filter; 061 private final ReplicationSource source; 062 063 protected final BlockingQueue<WALEntryBatch> entryBatchQueue; 064 // max (heap) size of each batch - multiply by number of batches in queue to get total 065 private final long replicationBatchSizeCapacity; 066 // max count of each batch - multiply by number of batches in queue to get total 067 protected final int replicationBatchCountCapacity; 068 // position in the WAL to start reading at 069 private long currentPosition; 070 private final long sleepForRetries; 071 private final int maxRetriesMultiplier; 072 private final boolean eofAutoRecovery; 073 074 //Indicates whether this particular worker is running 075 private boolean isReaderRunning = true; 076 077 private AtomicLong totalBufferUsed; 078 private long totalBufferQuota; 079 080 /** 081 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 082 * entries, and puts them on a batch queue. 083 * @param fs the files system to use 084 * @param conf configuration to use 085 * @param logQueue The WAL queue to read off of 086 * @param startPosition position in the first WAL to start reading from 087 * @param filter The filter to use while reading 088 * @param source replication source 089 */ 090 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 091 PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, 092 ReplicationSource source) { 093 this.logQueue = logQueue; 094 this.currentPosition = startPosition; 095 this.fs = fs; 096 this.conf = conf; 097 this.filter = filter; 098 this.source = source; 099 this.replicationBatchSizeCapacity = 100 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 101 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 102 // memory used will be batchSizeCapacity * (nb.batches + 1) 103 // the +1 is for the current thread reading before placing onto the queue 104 int batchCount = conf.getInt("replication.source.nb.batches", 1); 105 this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); 106 this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 107 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 108 this.sleepForRetries = 109 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 110 this.maxRetriesMultiplier = 111 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 112 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 113 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 114 LOG.info("peerClusterZnode=" + source.getPeerClusterZnode() 115 + ", ReplicationSourceWALReaderThread : " + source.getPeerId() 116 + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 117 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 118 + ", replicationBatchQueueCapacity=" + batchCount); 119 } 120 121 @Override 122 public void run() { 123 int sleepMultiplier = 1; 124 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 125 try (WALEntryStream entryStream = 126 new WALEntryStream(logQueue, conf, currentPosition, 127 source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), 128 source.getSourceMetrics())) { 129 while (isReaderRunning()) { // loop here to keep reusing stream while we can 130 if (!checkQuota()) { 131 continue; 132 } 133 WALEntryBatch batch = readWALEntries(entryStream); 134 if (batch != null && batch.getNbEntries() > 0) { 135 if (LOG.isTraceEnabled()) { 136 LOG.trace(String.format("Read %s WAL entries eligible for replication", 137 batch.getNbEntries())); 138 } 139 entryBatchQueue.put(batch); 140 sleepMultiplier = 1; 141 } else { // got no entries and didn't advance position in WAL 142 handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath()); 143 } 144 currentPosition = entryStream.getPosition(); 145 entryStream.reset(); // reuse stream 146 } 147 } catch (IOException e) { // stream related 148 if (sleepMultiplier < maxRetriesMultiplier) { 149 LOG.debug("Failed to read stream of replication entries: " + e); 150 sleepMultiplier++; 151 } else { 152 LOG.error("Failed to read stream of replication entries", e); 153 handleEofException(e); 154 } 155 Threads.sleep(sleepForRetries * sleepMultiplier); 156 } catch (InterruptedException e) { 157 LOG.trace("Interrupted while sleeping between WAL reads"); 158 Thread.currentThread().interrupt(); 159 } 160 } 161 } 162 163 private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException { 164 WALEntryBatch batch = null; 165 while (entryStream.hasNext()) { 166 if (batch == null) { 167 batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 168 } 169 Entry entry = entryStream.next(); 170 entry = filterEntry(entry); 171 if (entry != null) { 172 WALEdit edit = entry.getEdit(); 173 if (edit != null && !edit.isEmpty()) { 174 long entrySize = getEntrySize(entry); 175 batch.addEntry(entry); 176 updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); 177 boolean totalBufferTooLarge = acquireBufferQuota(entrySize); 178 // Stop if too many entries or too big 179 if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 180 || batch.getNbEntries() >= replicationBatchCountCapacity) { 181 break; 182 } 183 } 184 } 185 } 186 return batch; 187 } 188 189 protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) 190 throws InterruptedException { 191 LOG.trace("Didn't read any new entries from WAL"); 192 Thread.sleep(sleepForRetries); 193 } 194 195 // if we get an EOF due to a zero-length log, and there are other logs in queue 196 // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is 197 // enabled, then dump the log 198 private void handleEofException(IOException e) { 199 if ((e instanceof EOFException || e.getCause() instanceof EOFException) && 200 logQueue.size() > 1 && this.eofAutoRecovery) { 201 try { 202 if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { 203 LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); 204 logQueue.remove(); 205 currentPosition = 0; 206 } 207 } catch (IOException ioe) { 208 LOG.warn("Couldn't get file length information about log " + logQueue.peek()); 209 } 210 } 211 } 212 213 public Path getCurrentPath() { 214 // if we've read some WAL entries, get the Path we read from 215 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 216 if (batchQueueHead != null) { 217 return batchQueueHead.lastWalPath; 218 } 219 // otherwise, we must be currently reading from the head of the log queue 220 return logQueue.peek(); 221 } 222 223 //returns false if we've already exceeded the global quota 224 private boolean checkQuota() { 225 // try not to go over total quota 226 if (totalBufferUsed.get() > totalBufferQuota) { 227 Threads.sleep(sleepForRetries); 228 return false; 229 } 230 return true; 231 } 232 233 private Entry filterEntry(Entry entry) { 234 Entry filtered = filter.filter(entry); 235 if (entry != null && filtered == null) { 236 source.getSourceMetrics().incrLogEditsFiltered(); 237 } 238 return filtered; 239 } 240 241 /** 242 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 243 * batch to become available 244 * @return A batch of entries, along with the position in the log after reading the batch 245 * @throws InterruptedException if interrupted while waiting 246 */ 247 public WALEntryBatch take() throws InterruptedException { 248 return entryBatchQueue.take(); 249 } 250 251 private long getEntrySize(Entry entry) { 252 WALEdit edit = entry.getEdit(); 253 return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); 254 } 255 256 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { 257 WALEdit edit = entry.getEdit(); 258 if (edit != null && !edit.isEmpty()) { 259 batch.incrementHeapSize(entrySize); 260 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 261 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 262 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 263 } 264 batch.lastWalPosition = entryPosition; 265 } 266 267 /** 268 * Count the number of different row keys in the given edit because of mini-batching. We assume 269 * that there's at least one Cell in the WALEdit. 270 * @param edit edit to count row keys from 271 * @return number of different row keys and HFiles 272 */ 273 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 274 List<Cell> cells = edit.getCells(); 275 int distinctRowKeys = 1; 276 int totalHFileEntries = 0; 277 Cell lastCell = cells.get(0); 278 279 int totalCells = edit.size(); 280 for (int i = 0; i < totalCells; i++) { 281 // Count HFiles to be replicated 282 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 283 try { 284 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 285 List<StoreDescriptor> stores = bld.getStoresList(); 286 int totalStores = stores.size(); 287 for (int j = 0; j < totalStores; j++) { 288 totalHFileEntries += stores.get(j).getStoreFileList().size(); 289 } 290 } catch (IOException e) { 291 LOG.error("Failed to deserialize bulk load entry from wal edit. " 292 + "Then its hfiles count will not be added into metric."); 293 } 294 } 295 296 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 297 distinctRowKeys++; 298 } 299 lastCell = cells.get(i); 300 } 301 302 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 303 return result; 304 } 305 306 /** 307 * Calculate the total size of all the store files 308 * @param edit edit to count row keys from 309 * @return the total size of the store files 310 */ 311 private int calculateTotalSizeOfStoreFiles(WALEdit edit) { 312 List<Cell> cells = edit.getCells(); 313 int totalStoreFilesSize = 0; 314 315 int totalCells = edit.size(); 316 for (int i = 0; i < totalCells; i++) { 317 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 318 try { 319 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 320 List<StoreDescriptor> stores = bld.getStoresList(); 321 int totalStores = stores.size(); 322 for (int j = 0; j < totalStores; j++) { 323 totalStoreFilesSize = 324 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 325 } 326 } catch (IOException e) { 327 LOG.error("Failed to deserialize bulk load entry from wal edit. " 328 + "Size of HFiles part of cell will not be considered in replication " 329 + "request size calculation.", 330 e); 331 } 332 } 333 } 334 return totalStoreFilesSize; 335 } 336 337 /** 338 * @param size delta size for grown buffer 339 * @return true if we should clear buffer and push all 340 */ 341 private boolean acquireBufferQuota(long size) { 342 return totalBufferUsed.addAndGet(size) >= totalBufferQuota; 343 } 344 345 /** 346 * @return whether the reader thread is running 347 */ 348 public boolean isReaderRunning() { 349 return isReaderRunning && !isInterrupted(); 350 } 351 352 /** 353 * @param readerRunning the readerRunning to set 354 */ 355 public void setReaderRunning(boolean readerRunning) { 356 this.isReaderRunning = readerRunning; 357 } 358 359 /** 360 * Holds a batch of WAL entries to replicate, along with some statistics 361 * 362 */ 363 static class WALEntryBatch { 364 private List<Entry> walEntries; 365 // last WAL that was read 366 private Path lastWalPath; 367 // position in WAL of last entry in this batch 368 private long lastWalPosition = 0; 369 // number of distinct row keys in this batch 370 private int nbRowKeys = 0; 371 // number of HFiles 372 private int nbHFiles = 0; 373 // heap size of data we need to replicate 374 private long heapSize = 0; 375 376 /** 377 * @param walEntries 378 * @param lastWalPath Path of the WAL the last entry in this batch was read from 379 * @param lastWalPosition Position in the WAL the last entry in this batch was read from 380 */ 381 WALEntryBatch(int maxNbEntries, Path lastWalPath) { 382 this.walEntries = new ArrayList<>(maxNbEntries); 383 this.lastWalPath = lastWalPath; 384 } 385 386 public void addEntry(Entry entry) { 387 walEntries.add(entry); 388 } 389 390 /** 391 * @return the WAL Entries. 392 */ 393 public List<Entry> getWalEntries() { 394 return walEntries; 395 } 396 397 /** 398 * @return the path of the last WAL that was read. 399 */ 400 public Path getLastWalPath() { 401 return lastWalPath; 402 } 403 404 /** 405 * @return the position in the last WAL that was read. 406 */ 407 public long getLastWalPosition() { 408 return lastWalPosition; 409 } 410 411 public int getNbEntries() { 412 return walEntries.size(); 413 } 414 415 /** 416 * @return the number of distinct row keys in this batch 417 */ 418 public int getNbRowKeys() { 419 return nbRowKeys; 420 } 421 422 /** 423 * @return the number of HFiles in this batch 424 */ 425 public int getNbHFiles() { 426 return nbHFiles; 427 } 428 429 /** 430 * @return total number of operations in this batch 431 */ 432 public int getNbOperations() { 433 return getNbRowKeys() + getNbHFiles(); 434 } 435 436 /** 437 * @return the heap size of this batch 438 */ 439 public long getHeapSize() { 440 return heapSize; 441 } 442 443 private void incrementNbRowKeys(int increment) { 444 nbRowKeys += increment; 445 } 446 447 private void incrementNbHFiles(int increment) { 448 nbHFiles += increment; 449 } 450 451 private void incrementHeapSize(long increment) { 452 heapSize += increment; 453 } 454 } 455}