001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.ExtendedCell; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.replication.WALEntryFilter; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 049 050/** 051 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 052 * onto a queue 053 */ 054@InterfaceAudience.Private 055@InterfaceStability.Evolving 056class ReplicationSourceWALReader extends Thread { 057 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 058 059 private final ReplicationSourceLogQueue logQueue; 060 private final FileSystem fs; 061 private final Configuration conf; 062 private final WALEntryFilter filter; 063 private final ReplicationSource source; 064 065 @InterfaceAudience.Private 066 final BlockingQueue<WALEntryBatch> entryBatchQueue; 067 // max (heap) size of each batch - multiply by number of batches in queue to get total 068 private final long replicationBatchSizeCapacity; 069 // max count of each batch - multiply by number of batches in queue to get total 070 private final int replicationBatchCountCapacity; 071 // position in the WAL to start reading at 072 private long currentPosition; 073 private final long sleepForRetries; 074 private final int maxRetriesMultiplier; 075 076 // Indicates whether this particular worker is running 077 private boolean isReaderRunning = true; 078 private final String walGroupId; 079 080 /** 081 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 082 * entries, and puts them on a batch queue. 083 * @param fs the files system to use 084 * @param conf configuration to use 085 * @param logQueue The WAL queue to read off of 086 * @param startPosition position in the first WAL to start reading from 087 * @param filter The filter to use while reading 088 * @param source replication source 089 */ 090 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 091 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 092 ReplicationSource source, String walGroupId) { 093 this.logQueue = logQueue; 094 this.currentPosition = startPosition; 095 this.fs = fs; 096 this.conf = conf; 097 this.filter = filter; 098 this.source = source; 099 this.replicationBatchSizeCapacity = 100 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 101 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 102 // memory used will be batchSizeCapacity * (nb.batches + 1) 103 // the +1 is for the current thread reading before placing onto the queue 104 int batchCount = conf.getInt("replication.source.nb.batches", 1); 105 // 1 second 106 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 107 // 5 minutes @ 1 sec per 108 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 109 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 110 this.walGroupId = walGroupId; 111 LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " 112 + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 113 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 114 + ", replicationBatchQueueCapacity=" + batchCount); 115 } 116 117 private void replicationDone() throws InterruptedException { 118 // we're done with current queue, either this is a recovered queue, or it is the special 119 // group for a sync replication peer and the peer has been transited to DA or S state. 120 LOG.debug("Stopping the replication source wal reader"); 121 setReaderRunning(false); 122 // shuts down shipper thread immediately 123 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 124 } 125 126 protected final int sleep(int sleepMultiplier) { 127 if (sleepMultiplier < maxRetriesMultiplier) { 128 sleepMultiplier++; 129 } 130 Threads.sleep(sleepForRetries * sleepMultiplier); 131 return sleepMultiplier; 132 } 133 134 @Override 135 public void run() { 136 int sleepMultiplier = 1; 137 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 138 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, 139 source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { 140 while (isReaderRunning()) { // loop here to keep reusing stream while we can 141 if (!source.isPeerEnabled()) { 142 Threads.sleep(sleepForRetries); 143 continue; 144 } 145 if (!checkBufferQuota()) { 146 continue; 147 } 148 Path currentPath = entryStream.getCurrentPath(); 149 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 150 if (hasNext == WALEntryStream.HasNext.NO) { 151 replicationDone(); 152 return; 153 } 154 // first, check if we have switched a file, if so, we need to manually add an EOF entry 155 // batch to the queue 156 if (currentPath != null && switched(entryStream, currentPath)) { 157 currentPosition = entryStream.getPosition(); 158 entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath)); 159 continue; 160 } 161 if (hasNext == WALEntryStream.HasNext.RETRY) { 162 // sleep and retry 163 sleepMultiplier = sleep(sleepMultiplier); 164 continue; 165 } 166 if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) { 167 // retry immediately, this usually means we have switched a file 168 continue; 169 } 170 // below are all for hasNext == YES 171 WALEntryBatch batch = createBatch(entryStream); 172 boolean successAddToQueue = false; 173 try { 174 readWALEntries(entryStream, batch); 175 currentPosition = entryStream.getPosition(); 176 // need to propagate the batch even it has no entries since it may carry the last 177 // sequence id information for serial replication. 178 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 179 entryBatchQueue.put(batch); 180 successAddToQueue = true; 181 sleepMultiplier = 1; 182 } finally { 183 if (!successAddToQueue) { 184 // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should 185 // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which 186 // acquired in ReplicationSourceWALReader.acquireBufferQuota. 187 this.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 188 } 189 } 190 } 191 } catch (WALEntryFilterRetryableException e) { 192 // here we have to recreate the WALEntryStream, as when filtering, we have already called 193 // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it 194 // just considers everything is fine,that's why the catch block is not in the inner block 195 LOG.warn("Failed to filter WAL entries and the filter let us retry later", e); 196 sleepMultiplier = sleep(sleepMultiplier); 197 } catch (InterruptedException e) { 198 // this usually means we want to quit 199 LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue", 200 e); 201 Thread.currentThread().interrupt(); 202 } 203 } 204 } 205 206 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 207 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 208 WALEdit edit = entry.getEdit(); 209 if (edit == null || edit.isEmpty()) { 210 LOG.trace("Edit null or empty for entry {} ", entry); 211 return false; 212 } 213 LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 214 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 215 updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); 216 long entrySize = getEntrySizeIncludeBulkLoad(entry); 217 batch.addEntry(entry, entrySize); 218 updateBatchStats(batch, entry, entrySize); 219 boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry); 220 221 // Stop if too many entries or too big 222 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 223 || batch.getNbEntries() >= replicationBatchCountCapacity; 224 } 225 226 protected static final boolean switched(WALEntryStream entryStream, Path path) { 227 Path newPath = entryStream.getCurrentPath(); 228 return newPath == null || !path.getName().equals(newPath.getName()); 229 } 230 231 // We need to get the WALEntryBatch from the caller so we can add entries in there 232 // This is required in case there is any exception in while reading entries 233 // we do not want to loss the existing entries in the batch 234 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 235 throws InterruptedException { 236 Path currentPath = entryStream.getCurrentPath(); 237 for (;;) { 238 Entry entry = entryStream.next(); 239 batch.setLastWalPosition(entryStream.getPosition()); 240 entry = filterEntry(entry); 241 if (entry != null) { 242 if (addEntryToBatch(batch, entry)) { 243 break; 244 } 245 } 246 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 247 // always return if we have switched to a new file 248 if (switched(entryStream, currentPath)) { 249 batch.setEndOfFile(true); 250 break; 251 } 252 if (hasNext != WALEntryStream.HasNext.YES) { 253 // For hasNext other than YES, it is OK to just retry. 254 // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will 255 // return NO again when you call the method next time, so it is OK to just return here and 256 // let the loop in the upper layer to call hasNext again. 257 break; 258 } 259 } 260 } 261 262 public Path getCurrentPath() { 263 // if we've read some WAL entries, get the Path we read from 264 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 265 if (batchQueueHead != null) { 266 return batchQueueHead.getLastWalPath(); 267 } 268 // otherwise, we must be currently reading from the head of the log queue 269 return logQueue.getQueue(walGroupId).peek(); 270 } 271 272 // returns false if we've already exceeded the global quota 273 private boolean checkBufferQuota() { 274 // try not to go over total quota 275 if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { 276 Threads.sleep(sleepForRetries); 277 return false; 278 } 279 return true; 280 } 281 282 private WALEntryBatch createBatch(WALEntryStream entryStream) { 283 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 284 } 285 286 protected final Entry filterEntry(Entry entry) { 287 // Always replicate if this edit is Replication Marker edit. 288 if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { 289 return entry; 290 } 291 Entry filtered = filter.filter(entry); 292 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 293 LOG.trace("Filtered entry for replication: {}", entry); 294 source.getSourceMetrics().incrLogEditsFiltered(); 295 } 296 return filtered; 297 } 298 299 /** 300 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 301 * batch to become available 302 * @return A batch of entries, along with the position in the log after reading the batch 303 * @throws InterruptedException if interrupted while waiting 304 */ 305 public WALEntryBatch take() throws InterruptedException { 306 return entryBatchQueue.take(); 307 } 308 309 public WALEntryBatch poll(long timeout) throws InterruptedException { 310 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 311 } 312 313 private long getEntrySizeIncludeBulkLoad(Entry entry) { 314 WALEdit edit = entry.getEdit(); 315 return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 316 } 317 318 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 319 WALEdit edit = entry.getEdit(); 320 batch.incrementHeapSize(entrySize); 321 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 322 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 323 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 324 } 325 326 /** 327 * Count the number of different row keys in the given edit because of mini-batching. We assume 328 * that there's at least one Cell in the WALEdit. 329 * @param edit edit to count row keys from 330 * @return number of different row keys and HFiles 331 */ 332 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 333 List<Cell> cells = edit.getCells(); 334 int distinctRowKeys = 1; 335 int totalHFileEntries = 0; 336 Cell lastCell = cells.get(0); 337 338 int totalCells = edit.size(); 339 for (int i = 0; i < totalCells; i++) { 340 // Count HFiles to be replicated 341 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 342 try { 343 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 344 List<StoreDescriptor> stores = bld.getStoresList(); 345 int totalStores = stores.size(); 346 for (int j = 0; j < totalStores; j++) { 347 totalHFileEntries += stores.get(j).getStoreFileList().size(); 348 } 349 } catch (IOException e) { 350 LOG.error("Failed to deserialize bulk load entry from wal edit. " 351 + "Then its hfiles count will not be added into metric.", e); 352 } 353 } 354 355 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 356 distinctRowKeys++; 357 } 358 lastCell = cells.get(i); 359 } 360 361 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 362 return result; 363 } 364 365 /** 366 * Calculate the total size of all the store files 367 * @param edit edit to count row keys from 368 * @return the total size of the store files 369 */ 370 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 371 List<Cell> cells = edit.getCells(); 372 int totalStoreFilesSize = 0; 373 374 int totalCells = edit.size(); 375 for (int i = 0; i < totalCells; i++) { 376 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 377 try { 378 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 379 List<StoreDescriptor> stores = bld.getStoresList(); 380 int totalStores = stores.size(); 381 for (int j = 0; j < totalStores; j++) { 382 totalStoreFilesSize = 383 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 384 } 385 } catch (IOException e) { 386 LOG.error("Failed to deserialize bulk load entry from wal edit. " 387 + "Size of HFiles part of cell will not be considered in replication " 388 + "request size calculation.", e); 389 } 390 } 391 } 392 return totalStoreFilesSize; 393 } 394 395 /* 396 * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to 397 * cell's value. 398 */ 399 private void updateReplicationMarkerEdit(Entry entry, long offset) { 400 WALEdit edit = entry.getEdit(); 401 // Return early if it is not ReplicationMarker edit. 402 if (!WALEdit.isReplicationMarkerEdit(edit)) { 403 return; 404 } 405 List<Cell> cells = edit.getCells(); 406 Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); 407 Cell cell = cells.get(0); 408 // Create a descriptor with region_server_name, wal_name and offset 409 WALProtos.ReplicationMarkerDescriptor.Builder builder = 410 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 411 builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); 412 builder.setWalName(getCurrentPath().getName()); 413 builder.setOffset(offset); 414 WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); 415 416 // Create a new KeyValue 417 KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), 418 CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); 419 ArrayList<ExtendedCell> newCells = new ArrayList<>(); 420 newCells.add(kv); 421 // Update edit with new cell. 422 WALEditInternalHelper.setExtendedCells(edit, newCells); 423 } 424 425 /** Returns whether the reader thread is running */ 426 public boolean isReaderRunning() { 427 return isReaderRunning && !isInterrupted(); 428 } 429 430 /** 431 * @param readerRunning the readerRunning to set 432 */ 433 public void setReaderRunning(boolean readerRunning) { 434 this.isReaderRunning = readerRunning; 435 } 436 437 private ReplicationSourceManager getSourceManager() { 438 return this.source.getSourceManager(); 439 } 440 441 long getSleepForRetries() { 442 return sleepForRetries; 443 } 444}