001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.PriorityBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.replication.WALEntryFilter; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALKey; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 048 049/** 050 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 051 * onto a queue 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055class ReplicationSourceWALReader extends Thread { 056 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 057 058 private final PriorityBlockingQueue<Path> logQueue; 059 private final FileSystem fs; 060 private final Configuration conf; 061 private final WALEntryFilter filter; 062 private final ReplicationSource source; 063 064 private final BlockingQueue<WALEntryBatch> entryBatchQueue; 065 // max (heap) size of each batch - multiply by number of batches in queue to get total 066 private final long replicationBatchSizeCapacity; 067 // max count of each batch - multiply by number of batches in queue to get total 068 private final int replicationBatchCountCapacity; 069 // position in the WAL to start reading at 070 private long currentPosition; 071 private final long sleepForRetries; 072 private final int maxRetriesMultiplier; 073 private final boolean eofAutoRecovery; 074 075 //Indicates whether this particular worker is running 076 private boolean isReaderRunning = true; 077 078 private AtomicLong totalBufferUsed; 079 private long totalBufferQuota; 080 081 /** 082 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 083 * entries, and puts them on a batch queue. 084 * @param fs the files system to use 085 * @param conf configuration to use 086 * @param logQueue The WAL queue to read off of 087 * @param startPosition position in the first WAL to start reading from 088 * @param filter The filter to use while reading 089 * @param source replication source 090 */ 091 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 092 PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, 093 ReplicationSource source) { 094 this.logQueue = logQueue; 095 this.currentPosition = startPosition; 096 this.fs = fs; 097 this.conf = conf; 098 this.filter = filter; 099 this.source = source; 100 this.replicationBatchSizeCapacity = 101 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 102 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 103 // memory used will be batchSizeCapacity * (nb.batches + 1) 104 // the +1 is for the current thread reading before placing onto the queue 105 int batchCount = conf.getInt("replication.source.nb.batches", 1); 106 this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); 107 this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 108 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 109 this.sleepForRetries = 110 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 111 this.maxRetriesMultiplier = 112 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 113 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 114 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 115 LOG.info("peerClusterZnode=" + source.getQueueId() 116 + ", ReplicationSourceWALReaderThread : " + source.getPeerId() 117 + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 118 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 119 + ", replicationBatchQueueCapacity=" + batchCount); 120 } 121 122 @Override 123 public void run() { 124 int sleepMultiplier = 1; 125 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 126 try (WALEntryStream entryStream = 127 new WALEntryStream(logQueue, conf, currentPosition, 128 source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), 129 source.getSourceMetrics())) { 130 while (isReaderRunning()) { // loop here to keep reusing stream while we can 131 if (!source.isPeerEnabled()) { 132 Threads.sleep(sleepForRetries); 133 continue; 134 } 135 if (!checkQuota()) { 136 continue; 137 } 138 WALEntryBatch batch = readWALEntries(entryStream); 139 currentPosition = entryStream.getPosition(); 140 if (batch != null) { 141 // need to propagate the batch even it has no entries since it may carry the last 142 // sequence id information for serial replication. 143 LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries()); 144 entryBatchQueue.put(batch); 145 sleepMultiplier = 1; 146 } else { // got no entries and didn't advance position in WAL 147 handleEmptyWALEntryBatch(entryStream.getCurrentPath()); 148 entryStream.reset(); // reuse stream 149 } 150 } 151 } catch (IOException e) { // stream related 152 if (sleepMultiplier < maxRetriesMultiplier) { 153 LOG.debug("Failed to read stream of replication entries: " + e); 154 sleepMultiplier++; 155 } else { 156 LOG.error("Failed to read stream of replication entries", e); 157 handleEofException(e); 158 } 159 Threads.sleep(sleepForRetries * sleepMultiplier); 160 } catch (InterruptedException e) { 161 LOG.trace("Interrupted while sleeping between WAL reads"); 162 Thread.currentThread().interrupt(); 163 } 164 } 165 } 166 167 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 168 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 169 WALEdit edit = entry.getEdit(); 170 if (edit == null || edit.isEmpty()) { 171 return false; 172 } 173 long entrySize = getEntrySizeIncludeBulkLoad(entry); 174 long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); 175 batch.addEntry(entry); 176 updateBatchStats(batch, entry, entrySize); 177 boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); 178 179 // Stop if too many entries or too big 180 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || 181 batch.getNbEntries() >= replicationBatchCountCapacity; 182 } 183 184 protected static final boolean switched(WALEntryStream entryStream, Path path) { 185 Path newPath = entryStream.getCurrentPath(); 186 return newPath == null || !path.getName().equals(newPath.getName()); 187 } 188 189 protected WALEntryBatch readWALEntries(WALEntryStream entryStream) 190 throws IOException, InterruptedException { 191 Path currentPath = entryStream.getCurrentPath(); 192 if (!entryStream.hasNext()) { 193 // check whether we have switched a file 194 if (currentPath != null && switched(entryStream, currentPath)) { 195 return WALEntryBatch.endOfFile(currentPath); 196 } else { 197 return null; 198 } 199 } 200 if (currentPath != null) { 201 if (switched(entryStream, currentPath)) { 202 return WALEntryBatch.endOfFile(currentPath); 203 } 204 } else { 205 // when reading from the entry stream first time we will enter here 206 currentPath = entryStream.getCurrentPath(); 207 } 208 WALEntryBatch batch = createBatch(entryStream); 209 for (;;) { 210 Entry entry = entryStream.next(); 211 batch.setLastWalPosition(entryStream.getPosition()); 212 entry = filterEntry(entry); 213 if (entry != null) { 214 if (addEntryToBatch(batch, entry)) { 215 break; 216 } 217 } 218 boolean hasNext = entryStream.hasNext(); 219 // always return if we have switched to a new file 220 if (switched(entryStream, currentPath)) { 221 batch.setEndOfFile(true); 222 break; 223 } 224 if (!hasNext) { 225 break; 226 } 227 } 228 return batch; 229 } 230 231 private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { 232 LOG.trace("Didn't read any new entries from WAL"); 233 if (source.isRecovered()) { 234 // we're done with queue recovery, shut ourself down 235 setReaderRunning(false); 236 // shuts down shipper thread immediately 237 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 238 } else { 239 Thread.sleep(sleepForRetries); 240 } 241 } 242 243 // if we get an EOF due to a zero-length log, and there are other logs in queue 244 // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is 245 // enabled, then dump the log 246 private void handleEofException(IOException e) { 247 if ((e instanceof EOFException || e.getCause() instanceof EOFException) && 248 logQueue.size() > 1 && this.eofAutoRecovery) { 249 try { 250 if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { 251 LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); 252 logQueue.remove(); 253 currentPosition = 0; 254 } 255 } catch (IOException ioe) { 256 LOG.warn("Couldn't get file length information about log " + logQueue.peek()); 257 } 258 } 259 } 260 261 public Path getCurrentPath() { 262 // if we've read some WAL entries, get the Path we read from 263 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 264 if (batchQueueHead != null) { 265 return batchQueueHead.getLastWalPath(); 266 } 267 // otherwise, we must be currently reading from the head of the log queue 268 return logQueue.peek(); 269 } 270 271 //returns false if we've already exceeded the global quota 272 private boolean checkQuota() { 273 // try not to go over total quota 274 if (totalBufferUsed.get() > totalBufferQuota) { 275 Threads.sleep(sleepForRetries); 276 return false; 277 } 278 return true; 279 } 280 281 protected final WALEntryBatch createBatch(WALEntryStream entryStream) { 282 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 283 } 284 285 protected final Entry filterEntry(Entry entry) { 286 Entry filtered = filter.filter(entry); 287 if (entry != null && filtered == null) { 288 source.getSourceMetrics().incrLogEditsFiltered(); 289 } 290 return filtered; 291 } 292 293 /** 294 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 295 * batch to become available 296 * @return A batch of entries, along with the position in the log after reading the batch 297 * @throws InterruptedException if interrupted while waiting 298 */ 299 public WALEntryBatch take() throws InterruptedException { 300 return entryBatchQueue.take(); 301 } 302 303 public WALEntryBatch poll(long timeout) throws InterruptedException { 304 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 305 } 306 307 private long getEntrySizeIncludeBulkLoad(Entry entry) { 308 WALEdit edit = entry.getEdit(); 309 WALKey key = entry.getKey(); 310 return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + 311 key.estimatedSerializedSizeOf(); 312 } 313 314 public static long getEntrySizeExcludeBulkLoad(Entry entry) { 315 WALEdit edit = entry.getEdit(); 316 WALKey key = entry.getKey(); 317 return edit.heapSize() + key.estimatedSerializedSizeOf(); 318 } 319 320 321 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 322 WALEdit edit = entry.getEdit(); 323 batch.incrementHeapSize(entrySize); 324 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 325 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 326 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 327 } 328 329 /** 330 * Count the number of different row keys in the given edit because of mini-batching. We assume 331 * that there's at least one Cell in the WALEdit. 332 * @param edit edit to count row keys from 333 * @return number of different row keys and HFiles 334 */ 335 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 336 List<Cell> cells = edit.getCells(); 337 int distinctRowKeys = 1; 338 int totalHFileEntries = 0; 339 Cell lastCell = cells.get(0); 340 341 int totalCells = edit.size(); 342 for (int i = 0; i < totalCells; i++) { 343 // Count HFiles to be replicated 344 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 345 try { 346 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 347 List<StoreDescriptor> stores = bld.getStoresList(); 348 int totalStores = stores.size(); 349 for (int j = 0; j < totalStores; j++) { 350 totalHFileEntries += stores.get(j).getStoreFileList().size(); 351 } 352 } catch (IOException e) { 353 LOG.error("Failed to deserialize bulk load entry from wal edit. " 354 + "Then its hfiles count will not be added into metric."); 355 } 356 } 357 358 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 359 distinctRowKeys++; 360 } 361 lastCell = cells.get(i); 362 } 363 364 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 365 return result; 366 } 367 368 /** 369 * Calculate the total size of all the store files 370 * @param edit edit to count row keys from 371 * @return the total size of the store files 372 */ 373 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 374 List<Cell> cells = edit.getCells(); 375 int totalStoreFilesSize = 0; 376 377 int totalCells = edit.size(); 378 for (int i = 0; i < totalCells; i++) { 379 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 380 try { 381 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 382 List<StoreDescriptor> stores = bld.getStoresList(); 383 int totalStores = stores.size(); 384 for (int j = 0; j < totalStores; j++) { 385 totalStoreFilesSize = 386 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 387 } 388 } catch (IOException e) { 389 LOG.error("Failed to deserialize bulk load entry from wal edit. " 390 + "Size of HFiles part of cell will not be considered in replication " 391 + "request size calculation.", 392 e); 393 } 394 } 395 } 396 return totalStoreFilesSize; 397 } 398 399 /** 400 * @param size delta size for grown buffer 401 * @return true if we should clear buffer and push all 402 */ 403 private boolean acquireBufferQuota(long size) { 404 return totalBufferUsed.addAndGet(size) >= totalBufferQuota; 405 } 406 407 /** 408 * @return whether the reader thread is running 409 */ 410 public boolean isReaderRunning() { 411 return isReaderRunning && !isInterrupted(); 412 } 413 414 /** 415 * @param readerRunning the readerRunning to set 416 */ 417 public void setReaderRunning(boolean readerRunning) { 418 this.isReaderRunning = readerRunning; 419 } 420}