001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.PriorityBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.replication.WALEntryFilter; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WALEdit; 039import org.apache.hadoop.hbase.wal.WALKey; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 047 048/** 049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 050 * onto a queue 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054class ReplicationSourceWALReader extends Thread { 055 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 056 057 private final PriorityBlockingQueue<Path> logQueue; 058 private final FileSystem fs; 059 private final Configuration conf; 060 private final WALEntryFilter filter; 061 private final ReplicationSource source; 062 063 private final BlockingQueue<WALEntryBatch> entryBatchQueue; 064 // max (heap) size of each batch - multiply by number of batches in queue to get total 065 private final long replicationBatchSizeCapacity; 066 // max count of each batch - multiply by number of batches in queue to get total 067 private final int replicationBatchCountCapacity; 068 // position in the WAL to start reading at 069 private long currentPosition; 070 private final long sleepForRetries; 071 private final int maxRetriesMultiplier; 072 private final boolean eofAutoRecovery; 073 074 //Indicates whether this particular worker is running 075 private boolean isReaderRunning = true; 076 077 private AtomicLong totalBufferUsed; 078 private long totalBufferQuota; 079 080 /** 081 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 082 * entries, and puts them on a batch queue. 083 * @param fs the files system to use 084 * @param conf configuration to use 085 * @param logQueue The WAL queue to read off of 086 * @param startPosition position in the first WAL to start reading from 087 * @param filter The filter to use while reading 088 * @param source replication source 089 */ 090 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 091 PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, 092 ReplicationSource source) { 093 this.logQueue = logQueue; 094 this.currentPosition = startPosition; 095 this.fs = fs; 096 this.conf = conf; 097 this.filter = filter; 098 this.source = source; 099 this.replicationBatchSizeCapacity = 100 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 101 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 102 // memory used will be batchSizeCapacity * (nb.batches + 1) 103 // the +1 is for the current thread reading before placing onto the queue 104 int batchCount = conf.getInt("replication.source.nb.batches", 1); 105 this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); 106 this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); 107 this.sleepForRetries = 108 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 109 this.maxRetriesMultiplier = 110 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 111 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 112 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 113 LOG.info("peerClusterZnode=" + source.getQueueId() 114 + ", ReplicationSourceWALReaderThread : " + source.getPeerId() 115 + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 116 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 117 + ", replicationBatchQueueCapacity=" + batchCount); 118 } 119 120 @Override 121 public void run() { 122 int sleepMultiplier = 1; 123 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 124 try (WALEntryStream entryStream = 125 new WALEntryStream(logQueue, conf, currentPosition, 126 source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), 127 source.getSourceMetrics())) { 128 while (isReaderRunning()) { // loop here to keep reusing stream while we can 129 if (!source.isPeerEnabled()) { 130 Threads.sleep(sleepForRetries); 131 continue; 132 } 133 if (!checkQuota()) { 134 continue; 135 } 136 WALEntryBatch batch = readWALEntries(entryStream); 137 currentPosition = entryStream.getPosition(); 138 if (batch != null) { 139 // need to propagate the batch even it has no entries since it may carry the last 140 // sequence id information for serial replication. 141 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 142 entryBatchQueue.put(batch); 143 sleepMultiplier = 1; 144 } else { // got no entries and didn't advance position in WAL 145 handleEmptyWALEntryBatch(entryStream.getCurrentPath()); 146 entryStream.reset(); // reuse stream 147 } 148 } 149 } catch (IOException e) { // stream related 150 if (sleepMultiplier < maxRetriesMultiplier) { 151 LOG.debug("Failed to read stream of replication entries: " + e); 152 sleepMultiplier++; 153 } else { 154 LOG.error("Failed to read stream of replication entries", e); 155 handleEofException(e); 156 } 157 Threads.sleep(sleepForRetries * sleepMultiplier); 158 } catch (InterruptedException e) { 159 LOG.trace("Interrupted while sleeping between WAL reads"); 160 Thread.currentThread().interrupt(); 161 } 162 } 163 } 164 165 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 166 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 167 WALEdit edit = entry.getEdit(); 168 if (edit == null || edit.isEmpty()) { 169 LOG.debug("Edit null or empty for entry {} ", entry); 170 return false; 171 } 172 LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 173 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 174 long entrySize = getEntrySizeIncludeBulkLoad(entry); 175 long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); 176 batch.addEntry(entry, entrySize); 177 updateBatchStats(batch, entry, entrySize); 178 boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); 179 180 // Stop if too many entries or too big 181 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || 182 batch.getNbEntries() >= replicationBatchCountCapacity; 183 } 184 185 protected static final boolean switched(WALEntryStream entryStream, Path path) { 186 Path newPath = entryStream.getCurrentPath(); 187 return newPath == null || !path.getName().equals(newPath.getName()); 188 } 189 190 protected WALEntryBatch readWALEntries(WALEntryStream entryStream) 191 throws IOException, InterruptedException { 192 Path currentPath = entryStream.getCurrentPath(); 193 if (!entryStream.hasNext()) { 194 // check whether we have switched a file 195 if (currentPath != null && switched(entryStream, currentPath)) { 196 return WALEntryBatch.endOfFile(currentPath); 197 } else { 198 return null; 199 } 200 } 201 if (currentPath != null) { 202 if (switched(entryStream, currentPath)) { 203 return WALEntryBatch.endOfFile(currentPath); 204 } 205 } else { 206 // when reading from the entry stream first time we will enter here 207 currentPath = entryStream.getCurrentPath(); 208 } 209 WALEntryBatch batch = createBatch(entryStream); 210 for (;;) { 211 Entry entry = entryStream.next(); 212 batch.setLastWalPosition(entryStream.getPosition()); 213 entry = filterEntry(entry); 214 if (entry != null) { 215 if (addEntryToBatch(batch, entry)) { 216 break; 217 } 218 } 219 boolean hasNext = entryStream.hasNext(); 220 // always return if we have switched to a new file 221 if (switched(entryStream, currentPath)) { 222 batch.setEndOfFile(true); 223 break; 224 } 225 if (!hasNext) { 226 break; 227 } 228 } 229 return batch; 230 } 231 232 private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { 233 LOG.trace("Didn't read any new entries from WAL"); 234 if (source.isRecovered()) { 235 // we're done with queue recovery, shut ourself down 236 setReaderRunning(false); 237 // shuts down shipper thread immediately 238 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 239 } else { 240 Thread.sleep(sleepForRetries); 241 } 242 } 243 244 // if we get an EOF due to a zero-length log, and there are other logs in queue 245 // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is 246 // enabled, then dump the log 247 private void handleEofException(IOException e) { 248 if ((e instanceof EOFException || e.getCause() instanceof EOFException) && 249 logQueue.size() > 1 && this.eofAutoRecovery) { 250 try { 251 if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { 252 LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); 253 logQueue.remove(); 254 currentPosition = 0; 255 } 256 } catch (IOException ioe) { 257 LOG.warn("Couldn't get file length information about log " + logQueue.peek()); 258 } 259 } 260 } 261 262 public Path getCurrentPath() { 263 // if we've read some WAL entries, get the Path we read from 264 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 265 if (batchQueueHead != null) { 266 return batchQueueHead.getLastWalPath(); 267 } 268 // otherwise, we must be currently reading from the head of the log queue 269 return logQueue.peek(); 270 } 271 272 //returns false if we've already exceeded the global quota 273 private boolean checkQuota() { 274 // try not to go over total quota 275 if (totalBufferUsed.get() > totalBufferQuota) { 276 LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", 277 this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); 278 Threads.sleep(sleepForRetries); 279 return false; 280 } 281 return true; 282 } 283 284 protected final WALEntryBatch createBatch(WALEntryStream entryStream) { 285 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 286 } 287 288 protected final Entry filterEntry(Entry entry) { 289 Entry filtered = filter.filter(entry); 290 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 291 LOG.debug("Filtered entry for replication: {}", entry); 292 source.getSourceMetrics().incrLogEditsFiltered(); 293 } 294 return filtered; 295 } 296 297 /** 298 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 299 * batch to become available 300 * @return A batch of entries, along with the position in the log after reading the batch 301 * @throws InterruptedException if interrupted while waiting 302 */ 303 public WALEntryBatch take() throws InterruptedException { 304 return entryBatchQueue.take(); 305 } 306 307 public WALEntryBatch poll(long timeout) throws InterruptedException { 308 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 309 } 310 311 private long getEntrySizeIncludeBulkLoad(Entry entry) { 312 WALEdit edit = entry.getEdit(); 313 return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 314 } 315 316 public static long getEntrySizeExcludeBulkLoad(Entry entry) { 317 WALEdit edit = entry.getEdit(); 318 WALKey key = entry.getKey(); 319 return edit.heapSize() + key.estimatedSerializedSizeOf(); 320 } 321 322 323 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 324 WALEdit edit = entry.getEdit(); 325 batch.incrementHeapSize(entrySize); 326 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 327 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 328 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 329 } 330 331 /** 332 * Count the number of different row keys in the given edit because of mini-batching. We assume 333 * that there's at least one Cell in the WALEdit. 334 * @param edit edit to count row keys from 335 * @return number of different row keys and HFiles 336 */ 337 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 338 List<Cell> cells = edit.getCells(); 339 int distinctRowKeys = 1; 340 int totalHFileEntries = 0; 341 Cell lastCell = cells.get(0); 342 343 int totalCells = edit.size(); 344 for (int i = 0; i < totalCells; i++) { 345 // Count HFiles to be replicated 346 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 347 try { 348 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 349 List<StoreDescriptor> stores = bld.getStoresList(); 350 int totalStores = stores.size(); 351 for (int j = 0; j < totalStores; j++) { 352 totalHFileEntries += stores.get(j).getStoreFileList().size(); 353 } 354 } catch (IOException e) { 355 LOG.error("Failed to deserialize bulk load entry from wal edit. " 356 + "Then its hfiles count will not be added into metric."); 357 } 358 } 359 360 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 361 distinctRowKeys++; 362 } 363 lastCell = cells.get(i); 364 } 365 366 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 367 return result; 368 } 369 370 /** 371 * Calculate the total size of all the store files 372 * @param edit edit to count row keys from 373 * @return the total size of the store files 374 */ 375 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 376 List<Cell> cells = edit.getCells(); 377 int totalStoreFilesSize = 0; 378 379 int totalCells = edit.size(); 380 for (int i = 0; i < totalCells; i++) { 381 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 382 try { 383 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 384 List<StoreDescriptor> stores = bld.getStoresList(); 385 int totalStores = stores.size(); 386 for (int j = 0; j < totalStores; j++) { 387 totalStoreFilesSize = 388 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 389 } 390 } catch (IOException e) { 391 LOG.error("Failed to deserialize bulk load entry from wal edit. " 392 + "Size of HFiles part of cell will not be considered in replication " 393 + "request size calculation.", 394 e); 395 } 396 } 397 } 398 return totalStoreFilesSize; 399 } 400 401 /** 402 * @param size delta size for grown buffer 403 * @return true if we should clear buffer and push all 404 */ 405 private boolean acquireBufferQuota(long size) { 406 long newBufferUsed = totalBufferUsed.addAndGet(size); 407 // Record the new buffer usage 408 this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); 409 return newBufferUsed >= totalBufferQuota; 410 } 411 412 /** 413 * @return whether the reader thread is running 414 */ 415 public boolean isReaderRunning() { 416 return isReaderRunning && !isInterrupted(); 417 } 418 419 /** 420 * @param readerRunning the readerRunning to set 421 */ 422 public void setReaderRunning(boolean readerRunning) { 423 this.isReaderRunning = readerRunning; 424 } 425}