001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.PriorityBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.replication.WALEntryFilter; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALKey; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 048 049/** 050 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 051 * onto a queue 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055class ReplicationSourceWALReader extends Thread { 056 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 057 058 private final PriorityBlockingQueue<Path> logQueue; 059 private final FileSystem fs; 060 private final Configuration conf; 061 private final WALEntryFilter filter; 062 private final ReplicationSource source; 063 064 private final BlockingQueue<WALEntryBatch> entryBatchQueue; 065 // max (heap) size of each batch - multiply by number of batches in queue to get total 066 private final long replicationBatchSizeCapacity; 067 // max count of each batch - multiply by number of batches in queue to get total 068 private final int replicationBatchCountCapacity; 069 // position in the WAL to start reading at 070 private long currentPosition; 071 private final long sleepForRetries; 072 private final int maxRetriesMultiplier; 073 private final boolean eofAutoRecovery; 074 075 //Indicates whether this particular worker is running 076 private boolean isReaderRunning = true; 077 078 private AtomicLong totalBufferUsed; 079 private long totalBufferQuota; 080 081 /** 082 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 083 * entries, and puts them on a batch queue. 084 * @param fs the files system to use 085 * @param conf configuration to use 086 * @param logQueue The WAL queue to read off of 087 * @param startPosition position in the first WAL to start reading from 088 * @param filter The filter to use while reading 089 * @param source replication source 090 */ 091 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 092 PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, 093 ReplicationSource source) { 094 this.logQueue = logQueue; 095 this.currentPosition = startPosition; 096 this.fs = fs; 097 this.conf = conf; 098 this.filter = filter; 099 this.source = source; 100 this.replicationBatchSizeCapacity = 101 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 102 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 103 // memory used will be batchSizeCapacity * (nb.batches + 1) 104 // the +1 is for the current thread reading before placing onto the queue 105 int batchCount = conf.getInt("replication.source.nb.batches", 1); 106 this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); 107 this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 108 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 109 this.sleepForRetries = 110 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 111 this.maxRetriesMultiplier = 112 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 113 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 114 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 115 LOG.info("peerClusterZnode=" + source.getQueueId() 116 + ", ReplicationSourceWALReaderThread : " + source.getPeerId() 117 + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 118 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 119 + ", replicationBatchQueueCapacity=" + batchCount); 120 } 121 122 @Override 123 public void run() { 124 int sleepMultiplier = 1; 125 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 126 try (WALEntryStream entryStream = 127 new WALEntryStream(logQueue, conf, currentPosition, 128 source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), 129 source.getSourceMetrics())) { 130 while (isReaderRunning()) { // loop here to keep reusing stream while we can 131 if (!source.isPeerEnabled()) { 132 Threads.sleep(sleepForRetries); 133 continue; 134 } 135 if (!checkQuota()) { 136 continue; 137 } 138 WALEntryBatch batch = readWALEntries(entryStream); 139 currentPosition = entryStream.getPosition(); 140 if (batch != null) { 141 // need to propagate the batch even it has no entries since it may carry the last 142 // sequence id information for serial replication. 143 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 144 entryBatchQueue.put(batch); 145 sleepMultiplier = 1; 146 } else { // got no entries and didn't advance position in WAL 147 handleEmptyWALEntryBatch(entryStream.getCurrentPath()); 148 entryStream.reset(); // reuse stream 149 } 150 } 151 } catch (IOException e) { // stream related 152 if (sleepMultiplier < maxRetriesMultiplier) { 153 LOG.debug("Failed to read stream of replication entries: " + e); 154 sleepMultiplier++; 155 } else { 156 LOG.error("Failed to read stream of replication entries", e); 157 handleEofException(e); 158 } 159 Threads.sleep(sleepForRetries * sleepMultiplier); 160 } catch (InterruptedException e) { 161 LOG.trace("Interrupted while sleeping between WAL reads"); 162 Thread.currentThread().interrupt(); 163 } 164 } 165 } 166 167 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 168 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 169 WALEdit edit = entry.getEdit(); 170 if (edit == null || edit.isEmpty()) { 171 LOG.debug("Edit null or empty for entry {} ", entry); 172 return false; 173 } 174 LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 175 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 176 long entrySize = getEntrySizeIncludeBulkLoad(entry); 177 long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); 178 batch.addEntry(entry); 179 updateBatchStats(batch, entry, entrySize); 180 boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); 181 182 // Stop if too many entries or too big 183 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || 184 batch.getNbEntries() >= replicationBatchCountCapacity; 185 } 186 187 protected static final boolean switched(WALEntryStream entryStream, Path path) { 188 Path newPath = entryStream.getCurrentPath(); 189 return newPath == null || !path.getName().equals(newPath.getName()); 190 } 191 192 protected WALEntryBatch readWALEntries(WALEntryStream entryStream) 193 throws IOException, InterruptedException { 194 Path currentPath = entryStream.getCurrentPath(); 195 if (!entryStream.hasNext()) { 196 // check whether we have switched a file 197 if (currentPath != null && switched(entryStream, currentPath)) { 198 return WALEntryBatch.endOfFile(currentPath); 199 } else { 200 return null; 201 } 202 } 203 if (currentPath != null) { 204 if (switched(entryStream, currentPath)) { 205 return WALEntryBatch.endOfFile(currentPath); 206 } 207 } else { 208 // when reading from the entry stream first time we will enter here 209 currentPath = entryStream.getCurrentPath(); 210 } 211 WALEntryBatch batch = createBatch(entryStream); 212 for (;;) { 213 Entry entry = entryStream.next(); 214 batch.setLastWalPosition(entryStream.getPosition()); 215 entry = filterEntry(entry); 216 if (entry != null) { 217 if (addEntryToBatch(batch, entry)) { 218 break; 219 } 220 } 221 boolean hasNext = entryStream.hasNext(); 222 // always return if we have switched to a new file 223 if (switched(entryStream, currentPath)) { 224 batch.setEndOfFile(true); 225 break; 226 } 227 if (!hasNext) { 228 break; 229 } 230 } 231 return batch; 232 } 233 234 private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { 235 LOG.trace("Didn't read any new entries from WAL"); 236 if (source.isRecovered()) { 237 // we're done with queue recovery, shut ourself down 238 setReaderRunning(false); 239 // shuts down shipper thread immediately 240 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 241 } else { 242 Thread.sleep(sleepForRetries); 243 } 244 } 245 246 // if we get an EOF due to a zero-length log, and there are other logs in queue 247 // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is 248 // enabled, then dump the log 249 private void handleEofException(IOException e) { 250 if ((e instanceof EOFException || e.getCause() instanceof EOFException) && 251 logQueue.size() > 1 && this.eofAutoRecovery) { 252 try { 253 if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { 254 LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); 255 logQueue.remove(); 256 currentPosition = 0; 257 } 258 } catch (IOException ioe) { 259 LOG.warn("Couldn't get file length information about log " + logQueue.peek()); 260 } 261 } 262 } 263 264 public Path getCurrentPath() { 265 // if we've read some WAL entries, get the Path we read from 266 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 267 if (batchQueueHead != null) { 268 return batchQueueHead.getLastWalPath(); 269 } 270 // otherwise, we must be currently reading from the head of the log queue 271 return logQueue.peek(); 272 } 273 274 //returns false if we've already exceeded the global quota 275 private boolean checkQuota() { 276 // try not to go over total quota 277 if (totalBufferUsed.get() > totalBufferQuota) { 278 Threads.sleep(sleepForRetries); 279 return false; 280 } 281 return true; 282 } 283 284 protected final WALEntryBatch createBatch(WALEntryStream entryStream) { 285 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 286 } 287 288 protected final Entry filterEntry(Entry entry) { 289 Entry filtered = filter.filter(entry); 290 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 291 LOG.debug("Filtered entry for replication: {}", entry); 292 source.getSourceMetrics().incrLogEditsFiltered(); 293 } 294 return filtered; 295 } 296 297 /** 298 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 299 * batch to become available 300 * @return A batch of entries, along with the position in the log after reading the batch 301 * @throws InterruptedException if interrupted while waiting 302 */ 303 public WALEntryBatch take() throws InterruptedException { 304 return entryBatchQueue.take(); 305 } 306 307 public WALEntryBatch poll(long timeout) throws InterruptedException { 308 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 309 } 310 311 private long getEntrySizeIncludeBulkLoad(Entry entry) { 312 WALEdit edit = entry.getEdit(); 313 WALKey key = entry.getKey(); 314 return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + 315 key.estimatedSerializedSizeOf(); 316 } 317 318 public static long getEntrySizeExcludeBulkLoad(Entry entry) { 319 WALEdit edit = entry.getEdit(); 320 WALKey key = entry.getKey(); 321 return edit.heapSize() + key.estimatedSerializedSizeOf(); 322 } 323 324 325 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 326 WALEdit edit = entry.getEdit(); 327 batch.incrementHeapSize(entrySize); 328 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 329 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 330 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 331 } 332 333 /** 334 * Count the number of different row keys in the given edit because of mini-batching. We assume 335 * that there's at least one Cell in the WALEdit. 336 * @param edit edit to count row keys from 337 * @return number of different row keys and HFiles 338 */ 339 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 340 List<Cell> cells = edit.getCells(); 341 int distinctRowKeys = 1; 342 int totalHFileEntries = 0; 343 Cell lastCell = cells.get(0); 344 345 int totalCells = edit.size(); 346 for (int i = 0; i < totalCells; i++) { 347 // Count HFiles to be replicated 348 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 349 try { 350 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 351 List<StoreDescriptor> stores = bld.getStoresList(); 352 int totalStores = stores.size(); 353 for (int j = 0; j < totalStores; j++) { 354 totalHFileEntries += stores.get(j).getStoreFileList().size(); 355 } 356 } catch (IOException e) { 357 LOG.error("Failed to deserialize bulk load entry from wal edit. " 358 + "Then its hfiles count will not be added into metric."); 359 } 360 } 361 362 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 363 distinctRowKeys++; 364 } 365 lastCell = cells.get(i); 366 } 367 368 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 369 return result; 370 } 371 372 /** 373 * Calculate the total size of all the store files 374 * @param edit edit to count row keys from 375 * @return the total size of the store files 376 */ 377 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 378 List<Cell> cells = edit.getCells(); 379 int totalStoreFilesSize = 0; 380 381 int totalCells = edit.size(); 382 for (int i = 0; i < totalCells; i++) { 383 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 384 try { 385 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 386 List<StoreDescriptor> stores = bld.getStoresList(); 387 int totalStores = stores.size(); 388 for (int j = 0; j < totalStores; j++) { 389 totalStoreFilesSize = 390 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 391 } 392 } catch (IOException e) { 393 LOG.error("Failed to deserialize bulk load entry from wal edit. " 394 + "Size of HFiles part of cell will not be considered in replication " 395 + "request size calculation.", 396 e); 397 } 398 } 399 } 400 return totalStoreFilesSize; 401 } 402 403 /** 404 * @param size delta size for grown buffer 405 * @return true if we should clear buffer and push all 406 */ 407 private boolean acquireBufferQuota(long size) { 408 return totalBufferUsed.addAndGet(size) >= totalBufferQuota; 409 } 410 411 /** 412 * @return whether the reader thread is running 413 */ 414 public boolean isReaderRunning() { 415 return isReaderRunning && !isInterrupted(); 416 } 417 418 /** 419 * @param readerRunning the readerRunning to set 420 */ 421 public void setReaderRunning(boolean readerRunning) { 422 this.isReaderRunning = readerRunning; 423 } 424}