001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.replication.WALEntryFilter;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.hadoop.hbase.wal.WAL.Entry;
036import org.apache.hadoop.hbase.wal.WALEdit;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.yetus.audience.InterfaceStability;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043
044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
047
048/**
049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
050 * onto a queue
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054class ReplicationSourceWALReader extends Thread {
055  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
056
057  private final ReplicationSourceLogQueue logQueue;
058  private final FileSystem fs;
059  private final Configuration conf;
060  private final WALEntryFilter filter;
061  private final ReplicationSource source;
062
063  @InterfaceAudience.Private
064  final BlockingQueue<WALEntryBatch> entryBatchQueue;
065  // max (heap) size of each batch - multiply by number of batches in queue to get total
066  private final long replicationBatchSizeCapacity;
067  // max count of each batch - multiply by number of batches in queue to get total
068  private final int replicationBatchCountCapacity;
069  // position in the WAL to start reading at
070  private long currentPosition;
071  private final long sleepForRetries;
072  private final int maxRetriesMultiplier;
073
074  // Indicates whether this particular worker is running
075  private boolean isReaderRunning = true;
076  private final String walGroupId;
077
078  /**
079   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
080   * entries, and puts them on a batch queue.
081   * @param fs            the files system to use
082   * @param conf          configuration to use
083   * @param logQueue      The WAL queue to read off of
084   * @param startPosition position in the first WAL to start reading from
085   * @param filter        The filter to use while reading
086   * @param source        replication source
087   */
088  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
089    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
090    ReplicationSource source, String walGroupId) {
091    this.logQueue = logQueue;
092    this.currentPosition = startPosition;
093    this.fs = fs;
094    this.conf = conf;
095    this.filter = filter;
096    this.source = source;
097    this.replicationBatchSizeCapacity =
098      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
099    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
100    // memory used will be batchSizeCapacity * (nb.batches + 1)
101    // the +1 is for the current thread reading before placing onto the queue
102    int batchCount = conf.getInt("replication.source.nb.batches", 1);
103    // 1 second
104    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
105    // 5 minutes @ 1 sec per
106    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
107    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
108    this.walGroupId = walGroupId;
109    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
110      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
111      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
112      + ", replicationBatchQueueCapacity=" + batchCount);
113  }
114
115  private void replicationDone() throws InterruptedException {
116    // we're done with current queue, either this is a recovered queue, or it is the special
117    // group for a sync replication peer and the peer has been transited to DA or S state.
118    LOG.debug("Stopping the replication source wal reader");
119    setReaderRunning(false);
120    // shuts down shipper thread immediately
121    entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
122  }
123
124  protected final int sleep(int sleepMultiplier) {
125    if (sleepMultiplier < maxRetriesMultiplier) {
126      sleepMultiplier++;
127    }
128    Threads.sleep(sleepForRetries * sleepMultiplier);
129    return sleepMultiplier;
130  }
131
132  @Override
133  public void run() {
134    int sleepMultiplier = 1;
135    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
136      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
137        source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
138        while (isReaderRunning()) { // loop here to keep reusing stream while we can
139          if (!source.isPeerEnabled()) {
140            Threads.sleep(sleepForRetries);
141            continue;
142          }
143          if (!checkBufferQuota()) {
144            continue;
145          }
146          Path currentPath = entryStream.getCurrentPath();
147          WALEntryStream.HasNext hasNext = entryStream.hasNext();
148          if (hasNext == WALEntryStream.HasNext.NO) {
149            replicationDone();
150            return;
151          }
152          // first, check if we have switched a file, if so, we need to manually add an EOF entry
153          // batch to the queue
154          if (currentPath != null && switched(entryStream, currentPath)) {
155            entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
156            continue;
157          }
158          if (hasNext == WALEntryStream.HasNext.RETRY) {
159            // sleep and retry
160            sleepMultiplier = sleep(sleepMultiplier);
161            continue;
162          }
163          if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
164            // retry immediately, this usually means we have switched a file
165            continue;
166          }
167          // below are all for hasNext == YES
168          WALEntryBatch batch = createBatch(entryStream);
169          boolean successAddToQueue = false;
170          try {
171            readWALEntries(entryStream, batch);
172            currentPosition = entryStream.getPosition();
173            // need to propagate the batch even it has no entries since it may carry the last
174            // sequence id information for serial replication.
175            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
176            entryBatchQueue.put(batch);
177            successAddToQueue = true;
178            sleepMultiplier = 1;
179          } finally {
180            if (!successAddToQueue) {
181              // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
182              // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
183              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
184              this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
185            }
186          }
187        }
188      } catch (WALEntryFilterRetryableException e) {
189        // here we have to recreate the WALEntryStream, as when filtering, we have already called
190        // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
191        // just considers everything is fine,that's why the catch block is not in the inner block
192        LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
193        sleepMultiplier = sleep(sleepMultiplier);
194      } catch (InterruptedException e) {
195        // this usually means we want to quit
196        LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
197          e);
198        Thread.currentThread().interrupt();
199      }
200    }
201  }
202
203  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
204  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
205    WALEdit edit = entry.getEdit();
206    if (edit == null || edit.isEmpty()) {
207      LOG.trace("Edit null or empty for entry {} ", entry);
208      return false;
209    }
210    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
211      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
212    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
213    long entrySize = getEntrySizeIncludeBulkLoad(entry);
214    batch.addEntry(entry, entrySize);
215    updateBatchStats(batch, entry, entrySize);
216    boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);
217
218    // Stop if too many entries or too big
219    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
220      || batch.getNbEntries() >= replicationBatchCountCapacity;
221  }
222
223  protected static final boolean switched(WALEntryStream entryStream, Path path) {
224    Path newPath = entryStream.getCurrentPath();
225    return newPath == null || !path.getName().equals(newPath.getName());
226  }
227
228  // We need to get the WALEntryBatch from the caller so we can add entries in there
229  // This is required in case there is any exception in while reading entries
230  // we do not want to loss the existing entries in the batch
231  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
232    throws InterruptedException {
233    Path currentPath = entryStream.getCurrentPath();
234    for (;;) {
235      Entry entry = entryStream.next();
236      batch.setLastWalPosition(entryStream.getPosition());
237      entry = filterEntry(entry);
238      if (entry != null) {
239        if (addEntryToBatch(batch, entry)) {
240          break;
241        }
242      }
243      WALEntryStream.HasNext hasNext = entryStream.hasNext();
244      // always return if we have switched to a new file
245      if (switched(entryStream, currentPath)) {
246        batch.setEndOfFile(true);
247        break;
248      }
249      if (hasNext != WALEntryStream.HasNext.YES) {
250        // For hasNext other than YES, it is OK to just retry.
251        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
252        // return NO again when you call the method next time, so it is OK to just return here and
253        // let the loop in the upper layer to call hasNext again.
254        break;
255      }
256    }
257  }
258
259  public Path getCurrentPath() {
260    // if we've read some WAL entries, get the Path we read from
261    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
262    if (batchQueueHead != null) {
263      return batchQueueHead.getLastWalPath();
264    }
265    // otherwise, we must be currently reading from the head of the log queue
266    return logQueue.getQueue(walGroupId).peek();
267  }
268
269  // returns false if we've already exceeded the global quota
270  private boolean checkBufferQuota() {
271    // try not to go over total quota
272    if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
273      Threads.sleep(sleepForRetries);
274      return false;
275    }
276    return true;
277  }
278
279  private WALEntryBatch createBatch(WALEntryStream entryStream) {
280    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
281  }
282
283  protected final Entry filterEntry(Entry entry) {
284    // Always replicate if this edit is Replication Marker edit.
285    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
286      return entry;
287    }
288    Entry filtered = filter.filter(entry);
289    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
290      LOG.trace("Filtered entry for replication: {}", entry);
291      source.getSourceMetrics().incrLogEditsFiltered();
292    }
293    return filtered;
294  }
295
296  /**
297   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
298   * batch to become available
299   * @return A batch of entries, along with the position in the log after reading the batch
300   * @throws InterruptedException if interrupted while waiting
301   */
302  public WALEntryBatch take() throws InterruptedException {
303    return entryBatchQueue.take();
304  }
305
306  public WALEntryBatch poll(long timeout) throws InterruptedException {
307    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
308  }
309
310  private long getEntrySizeIncludeBulkLoad(Entry entry) {
311    WALEdit edit = entry.getEdit();
312    return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
313  }
314
315  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
316    WALEdit edit = entry.getEdit();
317    batch.incrementHeapSize(entrySize);
318    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
319    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
320    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
321  }
322
323  /**
324   * Count the number of different row keys in the given edit because of mini-batching. We assume
325   * that there's at least one Cell in the WALEdit.
326   * @param edit edit to count row keys from
327   * @return number of different row keys and HFiles
328   */
329  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
330    List<Cell> cells = edit.getCells();
331    int distinctRowKeys = 1;
332    int totalHFileEntries = 0;
333    Cell lastCell = cells.get(0);
334
335    int totalCells = edit.size();
336    for (int i = 0; i < totalCells; i++) {
337      // Count HFiles to be replicated
338      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
339        try {
340          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
341          List<StoreDescriptor> stores = bld.getStoresList();
342          int totalStores = stores.size();
343          for (int j = 0; j < totalStores; j++) {
344            totalHFileEntries += stores.get(j).getStoreFileList().size();
345          }
346        } catch (IOException e) {
347          LOG.error("Failed to deserialize bulk load entry from wal edit. "
348            + "Then its hfiles count will not be added into metric.", e);
349        }
350      }
351
352      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
353        distinctRowKeys++;
354      }
355      lastCell = cells.get(i);
356    }
357
358    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
359    return result;
360  }
361
362  /**
363   * Calculate the total size of all the store files
364   * @param edit edit to count row keys from
365   * @return the total size of the store files
366   */
367  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
368    List<Cell> cells = edit.getCells();
369    int totalStoreFilesSize = 0;
370
371    int totalCells = edit.size();
372    for (int i = 0; i < totalCells; i++) {
373      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
374        try {
375          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
376          List<StoreDescriptor> stores = bld.getStoresList();
377          int totalStores = stores.size();
378          for (int j = 0; j < totalStores; j++) {
379            totalStoreFilesSize =
380              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
381          }
382        } catch (IOException e) {
383          LOG.error("Failed to deserialize bulk load entry from wal edit. "
384            + "Size of HFiles part of cell will not be considered in replication "
385            + "request size calculation.", e);
386        }
387      }
388    }
389    return totalStoreFilesSize;
390  }
391
392  /*
393   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
394   * cell's value.
395   */
396  private void updateReplicationMarkerEdit(Entry entry, long offset) {
397    WALEdit edit = entry.getEdit();
398    // Return early if it is not ReplicationMarker edit.
399    if (!WALEdit.isReplicationMarkerEdit(edit)) {
400      return;
401    }
402    List<Cell> cells = edit.getCells();
403    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
404    Cell cell = cells.get(0);
405    // Create a descriptor with region_server_name, wal_name and offset
406    WALProtos.ReplicationMarkerDescriptor.Builder builder =
407      WALProtos.ReplicationMarkerDescriptor.newBuilder();
408    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
409    builder.setWalName(getCurrentPath().getName());
410    builder.setOffset(offset);
411    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
412
413    // Create a new KeyValue
414    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
415      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
416    ArrayList<Cell> newCells = new ArrayList<>();
417    newCells.add(kv);
418    // Update edit with new cell.
419    edit.setCells(newCells);
420  }
421
422  /** Returns whether the reader thread is running */
423  public boolean isReaderRunning() {
424    return isReaderRunning && !isInterrupted();
425  }
426
427  /**
428   * @param readerRunning the readerRunning to set
429   */
430  public void setReaderRunning(boolean readerRunning) {
431    this.isReaderRunning = readerRunning;
432  }
433
434  private ReplicationSourceManager getSourceManager() {
435    return this.source.getSourceManager();
436  }
437}