001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.ExtendedCell;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WALEdit;
038import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
049
050/**
051 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
052 * onto a queue
053 */
054@InterfaceAudience.Private
055@InterfaceStability.Evolving
056class ReplicationSourceWALReader extends Thread {
057  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
058
059  private final ReplicationSourceLogQueue logQueue;
060  private final FileSystem fs;
061  private final Configuration conf;
062  private final WALEntryFilter filter;
063  private final ReplicationSource source;
064
065  @InterfaceAudience.Private
066  final BlockingQueue<WALEntryBatch> entryBatchQueue;
067  // max (heap) size of each batch - multiply by number of batches in queue to get total
068  private final long replicationBatchSizeCapacity;
069  // max count of each batch - multiply by number of batches in queue to get total
070  private final int replicationBatchCountCapacity;
071  // position in the WAL to start reading at
072  private long currentPosition;
073  private final long sleepForRetries;
074  private final int maxRetriesMultiplier;
075
076  // Indicates whether this particular worker is running
077  private boolean isReaderRunning = true;
078  private final String walGroupId;
079
080  /**
081   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
082   * entries, and puts them on a batch queue.
083   * @param fs            the files system to use
084   * @param conf          configuration to use
085   * @param logQueue      The WAL queue to read off of
086   * @param startPosition position in the first WAL to start reading from
087   * @param filter        The filter to use while reading
088   * @param source        replication source
089   */
090  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
091    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
092    ReplicationSource source, String walGroupId) {
093    this.logQueue = logQueue;
094    this.currentPosition = startPosition;
095    this.fs = fs;
096    this.conf = conf;
097    this.filter = filter;
098    this.source = source;
099    this.replicationBatchSizeCapacity =
100      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
101    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
102    // memory used will be batchSizeCapacity * (nb.batches + 1)
103    // the +1 is for the current thread reading before placing onto the queue
104    int batchCount = conf.getInt("replication.source.nb.batches", 1);
105    // 1 second
106    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
107    // 5 minutes @ 1 sec per
108    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
109    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
110    this.walGroupId = walGroupId;
111    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
112      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
113      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
114      + ", replicationBatchQueueCapacity=" + batchCount);
115  }
116
117  private void replicationDone() throws InterruptedException {
118    // we're done with current queue, either this is a recovered queue, or it is the special
119    // group for a sync replication peer and the peer has been transited to DA or S state.
120    LOG.debug("Stopping the replication source wal reader");
121    setReaderRunning(false);
122    // shuts down shipper thread immediately
123    entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
124  }
125
126  protected final int sleep(int sleepMultiplier) {
127    if (sleepMultiplier < maxRetriesMultiplier) {
128      sleepMultiplier++;
129    }
130    Threads.sleep(sleepForRetries * sleepMultiplier);
131    return sleepMultiplier;
132  }
133
134  @Override
135  public void run() {
136    int sleepMultiplier = 1;
137    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
138      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
139        source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
140        while (isReaderRunning()) { // loop here to keep reusing stream while we can
141          if (!source.isPeerEnabled()) {
142            Threads.sleep(sleepForRetries);
143            continue;
144          }
145          if (!checkBufferQuota()) {
146            continue;
147          }
148          Path currentPath = entryStream.getCurrentPath();
149          WALEntryStream.HasNext hasNext = entryStream.hasNext();
150          if (hasNext == WALEntryStream.HasNext.NO) {
151            replicationDone();
152            return;
153          }
154          // first, check if we have switched a file, if so, we need to manually add an EOF entry
155          // batch to the queue
156          if (currentPath != null && switched(entryStream, currentPath)) {
157            currentPosition = entryStream.getPosition();
158            entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
159            continue;
160          }
161          if (hasNext == WALEntryStream.HasNext.RETRY) {
162            // sleep and retry
163            sleepMultiplier = sleep(sleepMultiplier);
164            continue;
165          }
166          if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
167            // retry immediately, this usually means we have switched a file
168            continue;
169          }
170          // below are all for hasNext == YES
171          WALEntryBatch batch = createBatch(entryStream);
172          boolean successAddToQueue = false;
173          try {
174            readWALEntries(entryStream, batch);
175            currentPosition = entryStream.getPosition();
176            // need to propagate the batch even it has no entries since it may carry the last
177            // sequence id information for serial replication.
178            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
179            entryBatchQueue.put(batch);
180            successAddToQueue = true;
181            sleepMultiplier = 1;
182          } finally {
183            if (!successAddToQueue) {
184              // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
185              // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
186              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
187              this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
188            }
189          }
190        }
191      } catch (WALEntryFilterRetryableException e) {
192        // here we have to recreate the WALEntryStream, as when filtering, we have already called
193        // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
194        // just considers everything is fine,that's why the catch block is not in the inner block
195        LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
196        sleepMultiplier = sleep(sleepMultiplier);
197      } catch (InterruptedException e) {
198        // this usually means we want to quit
199        LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
200          e);
201        Thread.currentThread().interrupt();
202      }
203    }
204  }
205
206  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
207  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
208    WALEdit edit = entry.getEdit();
209    if (edit == null || edit.isEmpty()) {
210      LOG.trace("Edit null or empty for entry {} ", entry);
211      return false;
212    }
213    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
214      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
215    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
216    long entrySize = getEntrySizeIncludeBulkLoad(entry);
217    batch.addEntry(entry, entrySize);
218    updateBatchStats(batch, entry, entrySize);
219    boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
220
221    // Stop if too many entries or too big
222    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
223      || batch.getNbEntries() >= replicationBatchCountCapacity;
224  }
225
226  protected static final boolean switched(WALEntryStream entryStream, Path path) {
227    Path newPath = entryStream.getCurrentPath();
228    return newPath == null || !path.getName().equals(newPath.getName());
229  }
230
231  // We need to get the WALEntryBatch from the caller so we can add entries in there
232  // This is required in case there is any exception in while reading entries
233  // we do not want to loss the existing entries in the batch
234  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
235    throws InterruptedException {
236    Path currentPath = entryStream.getCurrentPath();
237    for (;;) {
238      Entry entry = entryStream.next();
239      batch.setLastWalPosition(entryStream.getPosition());
240      entry = filterEntry(entry);
241      if (entry != null) {
242        if (addEntryToBatch(batch, entry)) {
243          break;
244        }
245      }
246      WALEntryStream.HasNext hasNext = entryStream.hasNext();
247      // always return if we have switched to a new file
248      if (switched(entryStream, currentPath)) {
249        batch.setEndOfFile(true);
250        break;
251      }
252      if (hasNext != WALEntryStream.HasNext.YES) {
253        // For hasNext other than YES, it is OK to just retry.
254        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
255        // return NO again when you call the method next time, so it is OK to just return here and
256        // let the loop in the upper layer to call hasNext again.
257        break;
258      }
259    }
260  }
261
262  public Path getCurrentPath() {
263    // if we've read some WAL entries, get the Path we read from
264    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
265    if (batchQueueHead != null) {
266      return batchQueueHead.getLastWalPath();
267    }
268    // otherwise, we must be currently reading from the head of the log queue
269    return logQueue.getQueue(walGroupId).peek();
270  }
271
272  // returns false if we've already exceeded the global quota
273  private boolean checkBufferQuota() {
274    // try not to go over total quota
275    if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
276      Threads.sleep(sleepForRetries);
277      return false;
278    }
279    return true;
280  }
281
282  private WALEntryBatch createBatch(WALEntryStream entryStream) {
283    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
284  }
285
286  protected final Entry filterEntry(Entry entry) {
287    // Always replicate if this edit is Replication Marker edit.
288    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
289      return entry;
290    }
291    Entry filtered = filter.filter(entry);
292    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
293      LOG.trace("Filtered entry for replication: {}", entry);
294      source.getSourceMetrics().incrLogEditsFiltered();
295    }
296    return filtered;
297  }
298
299  /**
300   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
301   * batch to become available
302   * @return A batch of entries, along with the position in the log after reading the batch
303   * @throws InterruptedException if interrupted while waiting
304   */
305  public WALEntryBatch take() throws InterruptedException {
306    return entryBatchQueue.take();
307  }
308
309  public WALEntryBatch poll(long timeout) throws InterruptedException {
310    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
311  }
312
313  private long getEntrySizeIncludeBulkLoad(Entry entry) {
314    WALEdit edit = entry.getEdit();
315    return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
316  }
317
318  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
319    WALEdit edit = entry.getEdit();
320    batch.incrementHeapSize(entrySize);
321    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
322    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
323    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
324  }
325
326  /**
327   * Count the number of different row keys in the given edit because of mini-batching. We assume
328   * that there's at least one Cell in the WALEdit.
329   * @param edit edit to count row keys from
330   * @return number of different row keys and HFiles
331   */
332  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
333    List<Cell> cells = edit.getCells();
334    int distinctRowKeys = 1;
335    int totalHFileEntries = 0;
336    Cell lastCell = cells.get(0);
337
338    int totalCells = edit.size();
339    for (int i = 0; i < totalCells; i++) {
340      // Count HFiles to be replicated
341      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
342        try {
343          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
344          List<StoreDescriptor> stores = bld.getStoresList();
345          int totalStores = stores.size();
346          for (int j = 0; j < totalStores; j++) {
347            totalHFileEntries += stores.get(j).getStoreFileList().size();
348          }
349        } catch (IOException e) {
350          LOG.error("Failed to deserialize bulk load entry from wal edit. "
351            + "Then its hfiles count will not be added into metric.", e);
352        }
353      }
354
355      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
356        distinctRowKeys++;
357      }
358      lastCell = cells.get(i);
359    }
360
361    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
362    return result;
363  }
364
365  /**
366   * Calculate the total size of all the store files
367   * @param edit edit to count row keys from
368   * @return the total size of the store files
369   */
370  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
371    List<Cell> cells = edit.getCells();
372    int totalStoreFilesSize = 0;
373
374    int totalCells = edit.size();
375    for (int i = 0; i < totalCells; i++) {
376      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
377        try {
378          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
379          List<StoreDescriptor> stores = bld.getStoresList();
380          int totalStores = stores.size();
381          for (int j = 0; j < totalStores; j++) {
382            totalStoreFilesSize =
383              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
384          }
385        } catch (IOException e) {
386          LOG.error("Failed to deserialize bulk load entry from wal edit. "
387            + "Size of HFiles part of cell will not be considered in replication "
388            + "request size calculation.", e);
389        }
390      }
391    }
392    return totalStoreFilesSize;
393  }
394
395  /*
396   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
397   * cell's value.
398   */
399  private void updateReplicationMarkerEdit(Entry entry, long offset) {
400    WALEdit edit = entry.getEdit();
401    // Return early if it is not ReplicationMarker edit.
402    if (!WALEdit.isReplicationMarkerEdit(edit)) {
403      return;
404    }
405    List<Cell> cells = edit.getCells();
406    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
407    Cell cell = cells.get(0);
408    // Create a descriptor with region_server_name, wal_name and offset
409    WALProtos.ReplicationMarkerDescriptor.Builder builder =
410      WALProtos.ReplicationMarkerDescriptor.newBuilder();
411    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
412    builder.setWalName(getCurrentPath().getName());
413    builder.setOffset(offset);
414    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
415
416    // Create a new KeyValue
417    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
418      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
419    ArrayList<ExtendedCell> newCells = new ArrayList<>();
420    newCells.add(kv);
421    // Update edit with new cell.
422    WALEditInternalHelper.setExtendedCells(edit, newCells);
423  }
424
425  /** Returns whether the reader thread is running */
426  public boolean isReaderRunning() {
427    return isReaderRunning && !isInterrupted();
428  }
429
430  /**
431   * @param readerRunning the readerRunning to set
432   */
433  public void setReaderRunning(boolean readerRunning) {
434    this.isReaderRunning = readerRunning;
435  }
436
437  private ReplicationSourceManager getSourceManager() {
438    return this.source.getSourceManager();
439  }
440
441  long getSleepForRetries() {
442    return sleepForRetries;
443  }
444}