001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.PriorityBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.replication.WALEntryFilter; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WALEdit; 039import org.apache.hadoop.hbase.wal.WALKey; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 047 048/** 049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 050 * onto a queue 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054class ReplicationSourceWALReader extends Thread { 055 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 056 057 private final ReplicationSourceLogQueue logQueue; 058 private final FileSystem fs; 059 private final Configuration conf; 060 private final WALEntryFilter filter; 061 private final ReplicationSource source; 062 063 @InterfaceAudience.Private 064 final BlockingQueue<WALEntryBatch> entryBatchQueue; 065 // max (heap) size of each batch - multiply by number of batches in queue to get total 066 private final long replicationBatchSizeCapacity; 067 // max count of each batch - multiply by number of batches in queue to get total 068 private final int replicationBatchCountCapacity; 069 // position in the WAL to start reading at 070 private long currentPosition; 071 private final long sleepForRetries; 072 private final int maxRetriesMultiplier; 073 private final boolean eofAutoRecovery; 074 075 // Indicates whether this particular worker is running 076 private boolean isReaderRunning = true; 077 078 private AtomicLong totalBufferUsed; 079 private long totalBufferQuota; 080 private final String walGroupId; 081 082 /** 083 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 084 * entries, and puts them on a batch queue. 085 * @param fs the files system to use 086 * @param conf configuration to use 087 * @param logQueue The WAL queue to read off of 088 * @param startPosition position in the first WAL to start reading from 089 * @param filter The filter to use while reading 090 * @param source replication source 091 */ 092 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 093 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 094 ReplicationSource source, String walGroupId) { 095 this.logQueue = logQueue; 096 this.currentPosition = startPosition; 097 this.fs = fs; 098 this.conf = conf; 099 this.filter = filter; 100 this.source = source; 101 this.replicationBatchSizeCapacity = 102 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 103 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 104 // memory used will be batchSizeCapacity * (nb.batches + 1) 105 // the +1 is for the current thread reading before placing onto the queue 106 int batchCount = conf.getInt("replication.source.nb.batches", 1); 107 this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); 108 this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); 109 // 1 second 110 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 111 // 5 minutes @ 1 sec per 112 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 113 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 114 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 115 this.walGroupId = walGroupId; 116 LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " 117 + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 118 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 119 + ", replicationBatchQueueCapacity=" + batchCount); 120 } 121 122 @Override 123 public void run() { 124 int sleepMultiplier = 1; 125 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 126 WALEntryBatch batch = null; 127 try (WALEntryStream entryStream = 128 new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), 129 source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId)) { 130 while (isReaderRunning()) { // loop here to keep reusing stream while we can 131 batch = null; 132 if (!source.isPeerEnabled()) { 133 Threads.sleep(sleepForRetries); 134 continue; 135 } 136 if (!checkQuota()) { 137 continue; 138 } 139 batch = tryAdvanceStreamAndCreateWALBatch(entryStream); 140 if (batch == null) { 141 // got no entries and didn't advance position in WAL 142 handleEmptyWALEntryBatch(); 143 entryStream.reset(); // reuse stream 144 continue; 145 } 146 // if we have already switched a file, skip reading and put it directly to the ship queue 147 if (!batch.isEndOfFile()) { 148 readWALEntries(entryStream, batch); 149 currentPosition = entryStream.getPosition(); 150 } 151 // need to propagate the batch even it has no entries since it may carry the last 152 // sequence id information for serial replication. 153 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 154 entryBatchQueue.put(batch); 155 sleepMultiplier = 1; 156 } 157 } catch (WALEntryFilterRetryableException | IOException e) { // stream related 158 if (!handleEofException(e, batch)) { 159 LOG.warn("Failed to read stream of replication entries", e); 160 if (sleepMultiplier < maxRetriesMultiplier) { 161 sleepMultiplier++; 162 } 163 Threads.sleep(sleepForRetries * sleepMultiplier); 164 } 165 } catch (InterruptedException e) { 166 LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue"); 167 Thread.currentThread().interrupt(); 168 } 169 } 170 } 171 172 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 173 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 174 WALEdit edit = entry.getEdit(); 175 if (edit == null || edit.isEmpty()) { 176 LOG.trace("Edit null or empty for entry {} ", entry); 177 return false; 178 } 179 LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 180 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 181 long entrySize = getEntrySizeIncludeBulkLoad(entry); 182 long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); 183 batch.addEntry(entry, entrySize); 184 updateBatchStats(batch, entry, entrySize); 185 boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); 186 187 // Stop if too many entries or too big 188 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 189 || batch.getNbEntries() >= replicationBatchCountCapacity; 190 } 191 192 protected static final boolean switched(WALEntryStream entryStream, Path path) { 193 Path newPath = entryStream.getCurrentPath(); 194 return newPath == null || !path.getName().equals(newPath.getName()); 195 } 196 197 // We need to get the WALEntryBatch from the caller so we can add entries in there 198 // This is required in case there is any exception in while reading entries 199 // we do not want to loss the existing entries in the batch 200 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 201 throws IOException, InterruptedException { 202 Path currentPath = entryStream.getCurrentPath(); 203 for (;;) { 204 Entry entry = entryStream.next(); 205 batch.setLastWalPosition(entryStream.getPosition()); 206 entry = filterEntry(entry); 207 if (entry != null) { 208 if (addEntryToBatch(batch, entry)) { 209 break; 210 } 211 } 212 boolean hasNext = entryStream.hasNext(); 213 // always return if we have switched to a new file 214 if (switched(entryStream, currentPath)) { 215 batch.setEndOfFile(true); 216 break; 217 } 218 if (!hasNext) { 219 break; 220 } 221 } 222 } 223 224 private void handleEmptyWALEntryBatch() throws InterruptedException { 225 LOG.trace("Didn't read any new entries from WAL"); 226 if (logQueue.getQueue(walGroupId).isEmpty()) { 227 // we're done with current queue, either this is a recovered queue, or it is the special group 228 // for a sync replication peer and the peer has been transited to DA or S state. 229 LOG.debug("Stopping the replication source wal reader"); 230 setReaderRunning(false); 231 // shuts down shipper thread immediately 232 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 233 } else { 234 Thread.sleep(sleepForRetries); 235 } 236 } 237 238 private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream) 239 throws IOException { 240 Path currentPath = entryStream.getCurrentPath(); 241 if (!entryStream.hasNext()) { 242 // check whether we have switched a file 243 if (currentPath != null && switched(entryStream, currentPath)) { 244 return WALEntryBatch.endOfFile(currentPath); 245 } else { 246 return null; 247 } 248 } 249 if (currentPath != null) { 250 if (switched(entryStream, currentPath)) { 251 return WALEntryBatch.endOfFile(currentPath); 252 } 253 } 254 return createBatch(entryStream); 255 } 256 257 /** 258 * This is to handle the EOFException from the WAL entry stream. EOFException should be handled 259 * carefully because there are chances of data loss because of never replicating the data. Thus we 260 * should always try to ship existing batch of entries here. If there was only one log in the 261 * queue before EOF, we ship the empty batch here and since reader is still active, in the next 262 * iteration of reader we will stop the reader. 263 * <p/> 264 * If there was more than one log in the queue before EOF, we ship the existing batch and reset 265 * the wal patch and position to the log with EOF, so shipper can remove logs from replication 266 * queue 267 * @return true only the IOE can be handled 268 */ 269 private boolean handleEofException(Exception e, WALEntryBatch batch) { 270 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 271 // Dump the log even if logQueue size is 1 if the source is from recovered Source 272 // since we don't add current log to recovered source queue so it is safe to remove. 273 if ( 274 (e instanceof EOFException || e.getCause() instanceof EOFException) 275 && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery 276 ) { 277 Path path = queue.peek(); 278 try { 279 if (!fs.exists(path)) { 280 // There is a chance that wal has moved to oldWALs directory, so look there also. 281 path = AbstractFSWALProvider.findArchivedLog(path, conf); 282 // path can be null if unable to locate in archiveDir. 283 } 284 if (path != null && fs.getFileStatus(path).getLen() == 0) { 285 LOG.warn("Forcing removal of 0 length log in queue: {}", path); 286 logQueue.remove(walGroupId); 287 currentPosition = 0; 288 if (batch != null) { 289 // After we removed the WAL from the queue, we should try shipping the existing batch of 290 // entries 291 addBatchToShippingQueue(batch); 292 } 293 return true; 294 } 295 } catch (IOException ioe) { 296 LOG.warn("Couldn't get file length information about log " + path, ioe); 297 } catch (InterruptedException ie) { 298 LOG.trace("Interrupted while adding WAL batch to ship queue"); 299 Thread.currentThread().interrupt(); 300 } 301 } 302 return false; 303 } 304 305 /** 306 * Update the batch try to ship and return true if shipped 307 * @param batch Batch of entries to ship 308 * @throws InterruptedException throws interrupted exception 309 */ 310 private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException { 311 // need to propagate the batch even it has no entries since it may carry the last 312 // sequence id information for serial replication. 313 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 314 entryBatchQueue.put(batch); 315 } 316 317 public Path getCurrentPath() { 318 // if we've read some WAL entries, get the Path we read from 319 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 320 if (batchQueueHead != null) { 321 return batchQueueHead.getLastWalPath(); 322 } 323 // otherwise, we must be currently reading from the head of the log queue 324 return logQueue.getQueue(walGroupId).peek(); 325 } 326 327 // returns false if we've already exceeded the global quota 328 private boolean checkQuota() { 329 // try not to go over total quota 330 if (totalBufferUsed.get() > totalBufferQuota) { 331 LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", 332 this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); 333 Threads.sleep(sleepForRetries); 334 return false; 335 } 336 return true; 337 } 338 339 private WALEntryBatch createBatch(WALEntryStream entryStream) { 340 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 341 } 342 343 protected final Entry filterEntry(Entry entry) { 344 Entry filtered = filter.filter(entry); 345 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 346 LOG.trace("Filtered entry for replication: {}", entry); 347 source.getSourceMetrics().incrLogEditsFiltered(); 348 } 349 return filtered; 350 } 351 352 /** 353 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 354 * batch to become available 355 * @return A batch of entries, along with the position in the log after reading the batch 356 * @throws InterruptedException if interrupted while waiting 357 */ 358 public WALEntryBatch take() throws InterruptedException { 359 return entryBatchQueue.take(); 360 } 361 362 public WALEntryBatch poll(long timeout) throws InterruptedException { 363 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 364 } 365 366 private long getEntrySizeIncludeBulkLoad(Entry entry) { 367 WALEdit edit = entry.getEdit(); 368 return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 369 } 370 371 public static long getEntrySizeExcludeBulkLoad(Entry entry) { 372 WALEdit edit = entry.getEdit(); 373 WALKey key = entry.getKey(); 374 return edit.heapSize() + key.estimatedSerializedSizeOf(); 375 } 376 377 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 378 WALEdit edit = entry.getEdit(); 379 batch.incrementHeapSize(entrySize); 380 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 381 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 382 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 383 } 384 385 /** 386 * Count the number of different row keys in the given edit because of mini-batching. We assume 387 * that there's at least one Cell in the WALEdit. 388 * @param edit edit to count row keys from 389 * @return number of different row keys and HFiles 390 */ 391 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 392 List<Cell> cells = edit.getCells(); 393 int distinctRowKeys = 1; 394 int totalHFileEntries = 0; 395 Cell lastCell = cells.get(0); 396 397 int totalCells = edit.size(); 398 for (int i = 0; i < totalCells; i++) { 399 // Count HFiles to be replicated 400 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 401 try { 402 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 403 List<StoreDescriptor> stores = bld.getStoresList(); 404 int totalStores = stores.size(); 405 for (int j = 0; j < totalStores; j++) { 406 totalHFileEntries += stores.get(j).getStoreFileList().size(); 407 } 408 } catch (IOException e) { 409 LOG.error("Failed to deserialize bulk load entry from wal edit. " 410 + "Then its hfiles count will not be added into metric.", e); 411 } 412 } 413 414 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 415 distinctRowKeys++; 416 } 417 lastCell = cells.get(i); 418 } 419 420 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 421 return result; 422 } 423 424 /** 425 * Calculate the total size of all the store files 426 * @param edit edit to count row keys from 427 * @return the total size of the store files 428 */ 429 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 430 List<Cell> cells = edit.getCells(); 431 int totalStoreFilesSize = 0; 432 433 int totalCells = edit.size(); 434 for (int i = 0; i < totalCells; i++) { 435 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 436 try { 437 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 438 List<StoreDescriptor> stores = bld.getStoresList(); 439 int totalStores = stores.size(); 440 for (int j = 0; j < totalStores; j++) { 441 totalStoreFilesSize = 442 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 443 } 444 } catch (IOException e) { 445 LOG.error("Failed to deserialize bulk load entry from wal edit. " 446 + "Size of HFiles part of cell will not be considered in replication " 447 + "request size calculation.", e); 448 } 449 } 450 } 451 return totalStoreFilesSize; 452 } 453 454 /** 455 * @param size delta size for grown buffer 456 * @return true if we should clear buffer and push all 457 */ 458 private boolean acquireBufferQuota(long size) { 459 long newBufferUsed = totalBufferUsed.addAndGet(size); 460 // Record the new buffer usage 461 this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); 462 return newBufferUsed >= totalBufferQuota; 463 } 464 465 /** Returns whether the reader thread is running */ 466 public boolean isReaderRunning() { 467 return isReaderRunning && !isInterrupted(); 468 } 469 470 /** 471 * @param readerRunning the readerRunning to set 472 */ 473 public void setReaderRunning(boolean readerRunning) { 474 this.isReaderRunning = readerRunning; 475 } 476}