001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.PriorityBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.replication.WALEntryFilter;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
039import org.apache.hadoop.hbase.wal.WAL.Entry;
040import org.apache.hadoop.hbase.wal.WALEdit;
041import org.apache.hadoop.hbase.wal.WALKey;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
052
053/**
054 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
055 * onto a queue
056 */
057@InterfaceAudience.Private
058@InterfaceStability.Evolving
059class ReplicationSourceWALReader extends Thread {
060  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
061
062  private final ReplicationSourceLogQueue logQueue;
063  private final FileSystem fs;
064  private final Configuration conf;
065  private final WALEntryFilter filter;
066  private final ReplicationSource source;
067
068  @InterfaceAudience.Private
069  final BlockingQueue<WALEntryBatch> entryBatchQueue;
070  // max (heap) size of each batch - multiply by number of batches in queue to get total
071  private final long replicationBatchSizeCapacity;
072  // max count of each batch - multiply by number of batches in queue to get total
073  private final int replicationBatchCountCapacity;
074  // position in the WAL to start reading at
075  private long currentPosition;
076  private final long sleepForRetries;
077  private final int maxRetriesMultiplier;
078  private final boolean eofAutoRecovery;
079
080  // Indicates whether this particular worker is running
081  private boolean isReaderRunning = true;
082
083  private AtomicLong totalBufferUsed;
084  private long totalBufferQuota;
085  private final String walGroupId;
086
087  /**
088   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
089   * entries, and puts them on a batch queue.
090   * @param fs            the files system to use
091   * @param conf          configuration to use
092   * @param logQueue      The WAL queue to read off of
093   * @param startPosition position in the first WAL to start reading from
094   * @param filter        The filter to use while reading
095   * @param source        replication source
096   */
097  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
098    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
099    ReplicationSource source, String walGroupId) {
100    this.logQueue = logQueue;
101    this.currentPosition = startPosition;
102    this.fs = fs;
103    this.conf = conf;
104    this.filter = filter;
105    this.source = source;
106    this.replicationBatchSizeCapacity =
107      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
108    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
109    // memory used will be batchSizeCapacity * (nb.batches + 1)
110    // the +1 is for the current thread reading before placing onto the queue
111    int batchCount = conf.getInt("replication.source.nb.batches", 1);
112    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
113    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
114    // 1 second
115    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
116    // 5 minutes @ 1 sec per
117    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
118    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
119    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
120    this.walGroupId = walGroupId;
121    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
122      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
123      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
124      + ", replicationBatchQueueCapacity=" + batchCount);
125  }
126
127  @Override
128  public void run() {
129    int sleepMultiplier = 1;
130    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
131      WALEntryBatch batch = null;
132      try (WALEntryStream entryStream =
133        new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
134          source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId)) {
135        while (isReaderRunning()) { // loop here to keep reusing stream while we can
136          batch = null;
137          if (!source.isPeerEnabled()) {
138            Threads.sleep(sleepForRetries);
139            continue;
140          }
141          if (!checkQuota()) {
142            continue;
143          }
144          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
145          if (batch == null) {
146            // got no entries and didn't advance position in WAL
147            handleEmptyWALEntryBatch();
148            entryStream.reset(); // reuse stream
149            continue;
150          }
151          // if we have already switched a file, skip reading and put it directly to the ship queue
152          if (!batch.isEndOfFile()) {
153            readWALEntries(entryStream, batch);
154            currentPosition = entryStream.getPosition();
155          }
156          // need to propagate the batch even it has no entries since it may carry the last
157          // sequence id information for serial replication.
158          LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
159          entryBatchQueue.put(batch);
160          sleepMultiplier = 1;
161        }
162      } catch (WALEntryFilterRetryableException | IOException e) { // stream related
163        if (!handleEofException(e, batch)) {
164          LOG.warn("Failed to read stream of replication entries", e);
165          if (sleepMultiplier < maxRetriesMultiplier) {
166            sleepMultiplier++;
167          }
168          Threads.sleep(sleepForRetries * sleepMultiplier);
169        }
170      } catch (InterruptedException e) {
171        LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
172        Thread.currentThread().interrupt();
173      }
174    }
175  }
176
177  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
178  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
179    WALEdit edit = entry.getEdit();
180    if (edit == null || edit.isEmpty()) {
181      LOG.trace("Edit null or empty for entry {} ", entry);
182      return false;
183    }
184    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
185      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
186    updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
187    long entrySize = getEntrySizeIncludeBulkLoad(entry);
188    long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
189    batch.addEntry(entry, entrySize);
190    updateBatchStats(batch, entry, entrySize);
191    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
192
193    // Stop if too many entries or too big
194    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
195      || batch.getNbEntries() >= replicationBatchCountCapacity;
196  }
197
198  protected static final boolean switched(WALEntryStream entryStream, Path path) {
199    Path newPath = entryStream.getCurrentPath();
200    return newPath == null || !path.getName().equals(newPath.getName());
201  }
202
203  // We need to get the WALEntryBatch from the caller so we can add entries in there
204  // This is required in case there is any exception in while reading entries
205  // we do not want to loss the existing entries in the batch
206  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
207    throws IOException, InterruptedException {
208    Path currentPath = entryStream.getCurrentPath();
209    for (;;) {
210      Entry entry = entryStream.next();
211      batch.setLastWalPosition(entryStream.getPosition());
212      entry = filterEntry(entry);
213      if (entry != null) {
214        if (addEntryToBatch(batch, entry)) {
215          break;
216        }
217      }
218      boolean hasNext = entryStream.hasNext();
219      // always return if we have switched to a new file
220      if (switched(entryStream, currentPath)) {
221        batch.setEndOfFile(true);
222        break;
223      }
224      if (!hasNext) {
225        break;
226      }
227    }
228  }
229
230  private void handleEmptyWALEntryBatch() throws InterruptedException {
231    LOG.trace("Didn't read any new entries from WAL");
232    if (logQueue.getQueue(walGroupId).isEmpty()) {
233      // we're done with current queue, either this is a recovered queue, or it is the special group
234      // for a sync replication peer and the peer has been transited to DA or S state.
235      LOG.debug("Stopping the replication source wal reader");
236      setReaderRunning(false);
237      // shuts down shipper thread immediately
238      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
239    } else {
240      Thread.sleep(sleepForRetries);
241    }
242  }
243
244  private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
245    throws IOException {
246    Path currentPath = entryStream.getCurrentPath();
247    if (!entryStream.hasNext()) {
248      // check whether we have switched a file
249      if (currentPath != null && switched(entryStream, currentPath)) {
250        return WALEntryBatch.endOfFile(currentPath);
251      } else {
252        return null;
253      }
254    }
255    if (currentPath != null) {
256      if (switched(entryStream, currentPath)) {
257        return WALEntryBatch.endOfFile(currentPath);
258      }
259    }
260    return createBatch(entryStream);
261  }
262
263  /**
264   * This is to handle the EOFException from the WAL entry stream. EOFException should be handled
265   * carefully because there are chances of data loss because of never replicating the data. Thus we
266   * should always try to ship existing batch of entries here. If there was only one log in the
267   * queue before EOF, we ship the empty batch here and since reader is still active, in the next
268   * iteration of reader we will stop the reader.
269   * <p/>
270   * If there was more than one log in the queue before EOF, we ship the existing batch and reset
271   * the wal patch and position to the log with EOF, so shipper can remove logs from replication
272   * queue
273   * @return true only the IOE can be handled
274   */
275  private boolean handleEofException(Exception e, WALEntryBatch batch) {
276    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
277    // Dump the log even if logQueue size is 1 if the source is from recovered Source
278    // since we don't add current log to recovered source queue so it is safe to remove.
279    if (
280      (e instanceof EOFException || e.getCause() instanceof EOFException)
281        && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery
282    ) {
283      Path path = queue.peek();
284      try {
285        if (!fs.exists(path)) {
286          // There is a chance that wal has moved to oldWALs directory, so look there also.
287          path = AbstractFSWALProvider.findArchivedLog(path, conf);
288          // path can be null if unable to locate in archiveDir.
289        }
290        if (path != null && fs.getFileStatus(path).getLen() == 0) {
291          LOG.warn("Forcing removal of 0 length log in queue: {}", path);
292          logQueue.remove(walGroupId);
293          currentPosition = 0;
294          if (batch != null) {
295            // After we removed the WAL from the queue, we should try shipping the existing batch of
296            // entries
297            addBatchToShippingQueue(batch);
298          }
299          return true;
300        }
301      } catch (IOException ioe) {
302        LOG.warn("Couldn't get file length information about log " + path, ioe);
303      } catch (InterruptedException ie) {
304        LOG.trace("Interrupted while adding WAL batch to ship queue");
305        Thread.currentThread().interrupt();
306      }
307    }
308    return false;
309  }
310
311  /**
312   * Update the batch try to ship and return true if shipped
313   * @param batch Batch of entries to ship
314   * @throws InterruptedException throws interrupted exception
315   */
316  private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
317    // need to propagate the batch even it has no entries since it may carry the last
318    // sequence id information for serial replication.
319    LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
320    entryBatchQueue.put(batch);
321  }
322
323  public Path getCurrentPath() {
324    // if we've read some WAL entries, get the Path we read from
325    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
326    if (batchQueueHead != null) {
327      return batchQueueHead.getLastWalPath();
328    }
329    // otherwise, we must be currently reading from the head of the log queue
330    return logQueue.getQueue(walGroupId).peek();
331  }
332
333  // returns false if we've already exceeded the global quota
334  private boolean checkQuota() {
335    // try not to go over total quota
336    if (totalBufferUsed.get() > totalBufferQuota) {
337      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
338        this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
339      Threads.sleep(sleepForRetries);
340      return false;
341    }
342    return true;
343  }
344
345  private WALEntryBatch createBatch(WALEntryStream entryStream) {
346    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
347  }
348
349  protected final Entry filterEntry(Entry entry) {
350    // Always replicate if this edit is Replication Marker edit.
351    if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
352      return entry;
353    }
354    Entry filtered = filter.filter(entry);
355    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
356      LOG.trace("Filtered entry for replication: {}", entry);
357      source.getSourceMetrics().incrLogEditsFiltered();
358    }
359    return filtered;
360  }
361
362  /**
363   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
364   * batch to become available
365   * @return A batch of entries, along with the position in the log after reading the batch
366   * @throws InterruptedException if interrupted while waiting
367   */
368  public WALEntryBatch take() throws InterruptedException {
369    return entryBatchQueue.take();
370  }
371
372  public WALEntryBatch poll(long timeout) throws InterruptedException {
373    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
374  }
375
376  private long getEntrySizeIncludeBulkLoad(Entry entry) {
377    WALEdit edit = entry.getEdit();
378    return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
379  }
380
381  public static long getEntrySizeExcludeBulkLoad(Entry entry) {
382    WALEdit edit = entry.getEdit();
383    WALKey key = entry.getKey();
384    return edit.heapSize() + key.estimatedSerializedSizeOf();
385  }
386
387  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
388    WALEdit edit = entry.getEdit();
389    batch.incrementHeapSize(entrySize);
390    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
391    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
392    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
393  }
394
395  /**
396   * Count the number of different row keys in the given edit because of mini-batching. We assume
397   * that there's at least one Cell in the WALEdit.
398   * @param edit edit to count row keys from
399   * @return number of different row keys and HFiles
400   */
401  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
402    List<Cell> cells = edit.getCells();
403    int distinctRowKeys = 1;
404    int totalHFileEntries = 0;
405    Cell lastCell = cells.get(0);
406
407    int totalCells = edit.size();
408    for (int i = 0; i < totalCells; i++) {
409      // Count HFiles to be replicated
410      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
411        try {
412          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
413          List<StoreDescriptor> stores = bld.getStoresList();
414          int totalStores = stores.size();
415          for (int j = 0; j < totalStores; j++) {
416            totalHFileEntries += stores.get(j).getStoreFileList().size();
417          }
418        } catch (IOException e) {
419          LOG.error("Failed to deserialize bulk load entry from wal edit. "
420            + "Then its hfiles count will not be added into metric.", e);
421        }
422      }
423
424      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
425        distinctRowKeys++;
426      }
427      lastCell = cells.get(i);
428    }
429
430    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
431    return result;
432  }
433
434  /**
435   * Calculate the total size of all the store files
436   * @param edit edit to count row keys from
437   * @return the total size of the store files
438   */
439  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
440    List<Cell> cells = edit.getCells();
441    int totalStoreFilesSize = 0;
442
443    int totalCells = edit.size();
444    for (int i = 0; i < totalCells; i++) {
445      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
446        try {
447          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
448          List<StoreDescriptor> stores = bld.getStoresList();
449          int totalStores = stores.size();
450          for (int j = 0; j < totalStores; j++) {
451            totalStoreFilesSize =
452              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
453          }
454        } catch (IOException e) {
455          LOG.error("Failed to deserialize bulk load entry from wal edit. "
456            + "Size of HFiles part of cell will not be considered in replication "
457            + "request size calculation.", e);
458        }
459      }
460    }
461    return totalStoreFilesSize;
462  }
463
464  /*
465   * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
466   * cell's value.
467   */
468  private void updateReplicationMarkerEdit(Entry entry, long offset) {
469    WALEdit edit = entry.getEdit();
470    // Return early if it is not ReplicationMarker edit.
471    if (!WALEdit.isReplicationMarkerEdit(edit)) {
472      return;
473    }
474    List<Cell> cells = edit.getCells();
475    Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
476    Cell cell = cells.get(0);
477    // Create a descriptor with region_server_name, wal_name and offset
478    WALProtos.ReplicationMarkerDescriptor.Builder builder =
479      WALProtos.ReplicationMarkerDescriptor.newBuilder();
480    builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
481    builder.setWalName(getCurrentPath().getName());
482    builder.setOffset(offset);
483    WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
484
485    // Create a new KeyValue
486    KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
487      CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
488    ArrayList<Cell> newCells = new ArrayList<>();
489    newCells.add(kv);
490    // Update edit with new cell.
491    edit.setCells(newCells);
492  }
493
494  /**
495   * @param size delta size for grown buffer
496   * @return true if we should clear buffer and push all
497   */
498  private boolean acquireBufferQuota(long size) {
499    long newBufferUsed = totalBufferUsed.addAndGet(size);
500    // Record the new buffer usage
501    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
502    return newBufferUsed >= totalBufferQuota;
503  }
504
505  /** Returns whether the reader thread is running */
506  public boolean isReaderRunning() {
507    return isReaderRunning && !isInterrupted();
508  }
509
510  /**
511   * @param readerRunning the readerRunning to set
512   */
513  public void setReaderRunning(boolean readerRunning) {
514    this.isReaderRunning = readerRunning;
515  }
516}