001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.replication.WALEntryFilter; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.hadoop.hbase.wal.WAL.Entry; 036import org.apache.hadoop.hbase.wal.WALEdit; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.yetus.audience.InterfaceStability; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043 044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 047 048/** 049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches 050 * onto a queue 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054class ReplicationSourceWALReader extends Thread { 055 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); 056 057 private final ReplicationSourceLogQueue logQueue; 058 private final FileSystem fs; 059 private final Configuration conf; 060 private final WALEntryFilter filter; 061 private final ReplicationSource source; 062 063 @InterfaceAudience.Private 064 final BlockingQueue<WALEntryBatch> entryBatchQueue; 065 // max (heap) size of each batch - multiply by number of batches in queue to get total 066 private final long replicationBatchSizeCapacity; 067 // max count of each batch - multiply by number of batches in queue to get total 068 private final int replicationBatchCountCapacity; 069 // position in the WAL to start reading at 070 private long currentPosition; 071 private final long sleepForRetries; 072 private final int maxRetriesMultiplier; 073 074 // Indicates whether this particular worker is running 075 private boolean isReaderRunning = true; 076 private final String walGroupId; 077 078 /** 079 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the 080 * entries, and puts them on a batch queue. 081 * @param fs the files system to use 082 * @param conf configuration to use 083 * @param logQueue The WAL queue to read off of 084 * @param startPosition position in the first WAL to start reading from 085 * @param filter The filter to use while reading 086 * @param source replication source 087 */ 088 public ReplicationSourceWALReader(FileSystem fs, Configuration conf, 089 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 090 ReplicationSource source, String walGroupId) { 091 this.logQueue = logQueue; 092 this.currentPosition = startPosition; 093 this.fs = fs; 094 this.conf = conf; 095 this.filter = filter; 096 this.source = source; 097 this.replicationBatchSizeCapacity = 098 this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); 099 this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); 100 // memory used will be batchSizeCapacity * (nb.batches + 1) 101 // the +1 is for the current thread reading before placing onto the queue 102 int batchCount = conf.getInt("replication.source.nb.batches", 1); 103 // 1 second 104 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 105 // 5 minutes @ 1 sec per 106 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 107 this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); 108 this.walGroupId = walGroupId; 109 LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " 110 + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity 111 + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity 112 + ", replicationBatchQueueCapacity=" + batchCount); 113 } 114 115 private void replicationDone() throws InterruptedException { 116 // we're done with current queue, either this is a recovered queue, or it is the special 117 // group for a sync replication peer and the peer has been transited to DA or S state. 118 LOG.debug("Stopping the replication source wal reader"); 119 setReaderRunning(false); 120 // shuts down shipper thread immediately 121 entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); 122 } 123 124 protected final int sleep(int sleepMultiplier) { 125 if (sleepMultiplier < maxRetriesMultiplier) { 126 sleepMultiplier++; 127 } 128 Threads.sleep(sleepForRetries * sleepMultiplier); 129 return sleepMultiplier; 130 } 131 132 @Override 133 public void run() { 134 int sleepMultiplier = 1; 135 while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream 136 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, 137 source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { 138 while (isReaderRunning()) { // loop here to keep reusing stream while we can 139 if (!source.isPeerEnabled()) { 140 Threads.sleep(sleepForRetries); 141 continue; 142 } 143 if (!checkBufferQuota()) { 144 continue; 145 } 146 Path currentPath = entryStream.getCurrentPath(); 147 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 148 if (hasNext == WALEntryStream.HasNext.NO) { 149 replicationDone(); 150 return; 151 } 152 // first, check if we have switched a file, if so, we need to manually add an EOF entry 153 // batch to the queue 154 if (currentPath != null && switched(entryStream, currentPath)) { 155 entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath)); 156 continue; 157 } 158 if (hasNext == WALEntryStream.HasNext.RETRY) { 159 // sleep and retry 160 sleepMultiplier = sleep(sleepMultiplier); 161 continue; 162 } 163 if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) { 164 // retry immediately, this usually means we have switched a file 165 continue; 166 } 167 // below are all for hasNext == YES 168 WALEntryBatch batch = createBatch(entryStream); 169 boolean successAddToQueue = false; 170 try { 171 readWALEntries(entryStream, batch); 172 currentPosition = entryStream.getPosition(); 173 // need to propagate the batch even it has no entries since it may carry the last 174 // sequence id information for serial replication. 175 LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); 176 entryBatchQueue.put(batch); 177 successAddToQueue = true; 178 sleepMultiplier = 1; 179 } finally { 180 if (!successAddToQueue) { 181 // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should 182 // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which 183 // acquired in ReplicationSourceWALReader.acquireBufferQuota. 184 this.getSourceManager().releaseWALEntryBatchBufferQuota(batch); 185 } 186 } 187 } 188 } catch (WALEntryFilterRetryableException e) { 189 // here we have to recreate the WALEntryStream, as when filtering, we have already called 190 // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it 191 // just considers everything is fine,that's why the catch block is not in the inner block 192 LOG.warn("Failed to filter WAL entries and the filter let us retry later", e); 193 sleepMultiplier = sleep(sleepMultiplier); 194 } catch (InterruptedException e) { 195 // this usually means we want to quit 196 LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue", 197 e); 198 Thread.currentThread().interrupt(); 199 } 200 } 201 } 202 203 // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. 204 protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { 205 WALEdit edit = entry.getEdit(); 206 if (edit == null || edit.isEmpty()) { 207 LOG.trace("Edit null or empty for entry {} ", entry); 208 return false; 209 } 210 LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", 211 entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); 212 updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); 213 long entrySize = getEntrySizeIncludeBulkLoad(entry); 214 batch.addEntry(entry, entrySize); 215 updateBatchStats(batch, entry, entrySize); 216 boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry); 217 218 // Stop if too many entries or too big 219 return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity 220 || batch.getNbEntries() >= replicationBatchCountCapacity; 221 } 222 223 protected static final boolean switched(WALEntryStream entryStream, Path path) { 224 Path newPath = entryStream.getCurrentPath(); 225 return newPath == null || !path.getName().equals(newPath.getName()); 226 } 227 228 // We need to get the WALEntryBatch from the caller so we can add entries in there 229 // This is required in case there is any exception in while reading entries 230 // we do not want to loss the existing entries in the batch 231 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 232 throws InterruptedException { 233 Path currentPath = entryStream.getCurrentPath(); 234 for (;;) { 235 Entry entry = entryStream.next(); 236 batch.setLastWalPosition(entryStream.getPosition()); 237 entry = filterEntry(entry); 238 if (entry != null) { 239 if (addEntryToBatch(batch, entry)) { 240 break; 241 } 242 } 243 WALEntryStream.HasNext hasNext = entryStream.hasNext(); 244 // always return if we have switched to a new file 245 if (switched(entryStream, currentPath)) { 246 batch.setEndOfFile(true); 247 break; 248 } 249 if (hasNext != WALEntryStream.HasNext.YES) { 250 // For hasNext other than YES, it is OK to just retry. 251 // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will 252 // return NO again when you call the method next time, so it is OK to just return here and 253 // let the loop in the upper layer to call hasNext again. 254 break; 255 } 256 } 257 } 258 259 public Path getCurrentPath() { 260 // if we've read some WAL entries, get the Path we read from 261 WALEntryBatch batchQueueHead = entryBatchQueue.peek(); 262 if (batchQueueHead != null) { 263 return batchQueueHead.getLastWalPath(); 264 } 265 // otherwise, we must be currently reading from the head of the log queue 266 return logQueue.getQueue(walGroupId).peek(); 267 } 268 269 // returns false if we've already exceeded the global quota 270 private boolean checkBufferQuota() { 271 // try not to go over total quota 272 if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { 273 Threads.sleep(sleepForRetries); 274 return false; 275 } 276 return true; 277 } 278 279 private WALEntryBatch createBatch(WALEntryStream entryStream) { 280 return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); 281 } 282 283 protected final Entry filterEntry(Entry entry) { 284 // Always replicate if this edit is Replication Marker edit. 285 if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { 286 return entry; 287 } 288 Entry filtered = filter.filter(entry); 289 if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { 290 LOG.trace("Filtered entry for replication: {}", entry); 291 source.getSourceMetrics().incrLogEditsFiltered(); 292 } 293 return filtered; 294 } 295 296 /** 297 * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a 298 * batch to become available 299 * @return A batch of entries, along with the position in the log after reading the batch 300 * @throws InterruptedException if interrupted while waiting 301 */ 302 public WALEntryBatch take() throws InterruptedException { 303 return entryBatchQueue.take(); 304 } 305 306 public WALEntryBatch poll(long timeout) throws InterruptedException { 307 return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); 308 } 309 310 private long getEntrySizeIncludeBulkLoad(Entry entry) { 311 WALEdit edit = entry.getEdit(); 312 return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit); 313 } 314 315 private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { 316 WALEdit edit = entry.getEdit(); 317 batch.incrementHeapSize(entrySize); 318 Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); 319 batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); 320 batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); 321 } 322 323 /** 324 * Count the number of different row keys in the given edit because of mini-batching. We assume 325 * that there's at least one Cell in the WALEdit. 326 * @param edit edit to count row keys from 327 * @return number of different row keys and HFiles 328 */ 329 private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) { 330 List<Cell> cells = edit.getCells(); 331 int distinctRowKeys = 1; 332 int totalHFileEntries = 0; 333 Cell lastCell = cells.get(0); 334 335 int totalCells = edit.size(); 336 for (int i = 0; i < totalCells; i++) { 337 // Count HFiles to be replicated 338 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 339 try { 340 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 341 List<StoreDescriptor> stores = bld.getStoresList(); 342 int totalStores = stores.size(); 343 for (int j = 0; j < totalStores; j++) { 344 totalHFileEntries += stores.get(j).getStoreFileList().size(); 345 } 346 } catch (IOException e) { 347 LOG.error("Failed to deserialize bulk load entry from wal edit. " 348 + "Then its hfiles count will not be added into metric.", e); 349 } 350 } 351 352 if (!CellUtil.matchingRows(cells.get(i), lastCell)) { 353 distinctRowKeys++; 354 } 355 lastCell = cells.get(i); 356 } 357 358 Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries); 359 return result; 360 } 361 362 /** 363 * Calculate the total size of all the store files 364 * @param edit edit to count row keys from 365 * @return the total size of the store files 366 */ 367 private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { 368 List<Cell> cells = edit.getCells(); 369 int totalStoreFilesSize = 0; 370 371 int totalCells = edit.size(); 372 for (int i = 0; i < totalCells; i++) { 373 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { 374 try { 375 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); 376 List<StoreDescriptor> stores = bld.getStoresList(); 377 int totalStores = stores.size(); 378 for (int j = 0; j < totalStores; j++) { 379 totalStoreFilesSize = 380 (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes()); 381 } 382 } catch (IOException e) { 383 LOG.error("Failed to deserialize bulk load entry from wal edit. " 384 + "Size of HFiles part of cell will not be considered in replication " 385 + "request size calculation.", e); 386 } 387 } 388 } 389 return totalStoreFilesSize; 390 } 391 392 /* 393 * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to 394 * cell's value. 395 */ 396 private void updateReplicationMarkerEdit(Entry entry, long offset) { 397 WALEdit edit = entry.getEdit(); 398 // Return early if it is not ReplicationMarker edit. 399 if (!WALEdit.isReplicationMarkerEdit(edit)) { 400 return; 401 } 402 List<Cell> cells = edit.getCells(); 403 Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); 404 Cell cell = cells.get(0); 405 // Create a descriptor with region_server_name, wal_name and offset 406 WALProtos.ReplicationMarkerDescriptor.Builder builder = 407 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 408 builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); 409 builder.setWalName(getCurrentPath().getName()); 410 builder.setOffset(offset); 411 WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); 412 413 // Create a new KeyValue 414 KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), 415 CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); 416 ArrayList<Cell> newCells = new ArrayList<>(); 417 newCells.add(kv); 418 // Update edit with new cell. 419 edit.setCells(newCells); 420 } 421 422 /** Returns whether the reader thread is running */ 423 public boolean isReaderRunning() { 424 return isReaderRunning && !isInterrupted(); 425 } 426 427 /** 428 * @param readerRunning the readerRunning to set 429 */ 430 public void setReaderRunning(boolean readerRunning) { 431 this.isReaderRunning = readerRunning; 432 } 433 434 private ReplicationSourceManager getSourceManager() { 435 return this.source.getSourceManager(); 436 } 437}