001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.PriorityBlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WALEdit;
039import org.apache.hadoop.hbase.wal.WALKey;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
047
048/**
049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
050 * onto a queue
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054class ReplicationSourceWALReader extends Thread {
055  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
056
057  private final ReplicationSourceLogQueue logQueue;
058  private final FileSystem fs;
059  private final Configuration conf;
060  private final WALEntryFilter filter;
061  private final ReplicationSource source;
062
063  @InterfaceAudience.Private
064  final BlockingQueue<WALEntryBatch> entryBatchQueue;
065  // max (heap) size of each batch - multiply by number of batches in queue to get total
066  private final long replicationBatchSizeCapacity;
067  // max count of each batch - multiply by number of batches in queue to get total
068  private final int replicationBatchCountCapacity;
069  // position in the WAL to start reading at
070  private long currentPosition;
071  private final long sleepForRetries;
072  private final int maxRetriesMultiplier;
073  private final boolean eofAutoRecovery;
074
075  // Indicates whether this particular worker is running
076  private boolean isReaderRunning = true;
077
078  private AtomicLong totalBufferUsed;
079  private long totalBufferQuota;
080  private final String walGroupId;
081
082  /**
083   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
084   * entries, and puts them on a batch queue.
085   * @param fs            the files system to use
086   * @param conf          configuration to use
087   * @param logQueue      The WAL queue to read off of
088   * @param startPosition position in the first WAL to start reading from
089   * @param filter        The filter to use while reading
090   * @param source        replication source
091   */
092  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
093    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
094    ReplicationSource source, String walGroupId) {
095    this.logQueue = logQueue;
096    this.currentPosition = startPosition;
097    this.fs = fs;
098    this.conf = conf;
099    this.filter = filter;
100    this.source = source;
101    this.replicationBatchSizeCapacity =
102      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
103    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
104    // memory used will be batchSizeCapacity * (nb.batches + 1)
105    // the +1 is for the current thread reading before placing onto the queue
106    int batchCount = conf.getInt("replication.source.nb.batches", 1);
107    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
108    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
109    // 1 second
110    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
111    // 5 minutes @ 1 sec per
112    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
113    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
114    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
115    this.walGroupId = walGroupId;
116    LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
117      + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
118      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
119      + ", replicationBatchQueueCapacity=" + batchCount);
120  }
121
122  @Override
123  public void run() {
124    int sleepMultiplier = 1;
125    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
126      WALEntryBatch batch = null;
127      try (WALEntryStream entryStream =
128        new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
129          source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId)) {
130        while (isReaderRunning()) { // loop here to keep reusing stream while we can
131          batch = null;
132          if (!source.isPeerEnabled()) {
133            Threads.sleep(sleepForRetries);
134            continue;
135          }
136          if (!checkQuota()) {
137            continue;
138          }
139          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
140          if (batch == null) {
141            // got no entries and didn't advance position in WAL
142            handleEmptyWALEntryBatch();
143            entryStream.reset(); // reuse stream
144            continue;
145          }
146          // if we have already switched a file, skip reading and put it directly to the ship queue
147          if (!batch.isEndOfFile()) {
148            readWALEntries(entryStream, batch);
149            currentPosition = entryStream.getPosition();
150          }
151          // need to propagate the batch even it has no entries since it may carry the last
152          // sequence id information for serial replication.
153          LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
154          entryBatchQueue.put(batch);
155          sleepMultiplier = 1;
156        }
157      } catch (WALEntryFilterRetryableException | IOException e) { // stream related
158        if (!handleEofException(e, batch)) {
159          LOG.warn("Failed to read stream of replication entries", e);
160          if (sleepMultiplier < maxRetriesMultiplier) {
161            sleepMultiplier++;
162          }
163          Threads.sleep(sleepForRetries * sleepMultiplier);
164        }
165      } catch (InterruptedException e) {
166        LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
167        Thread.currentThread().interrupt();
168      }
169    }
170  }
171
172  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
173  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
174    WALEdit edit = entry.getEdit();
175    if (edit == null || edit.isEmpty()) {
176      LOG.trace("Edit null or empty for entry {} ", entry);
177      return false;
178    }
179    LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
180      entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
181    long entrySize = getEntrySizeIncludeBulkLoad(entry);
182    long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
183    batch.addEntry(entry, entrySize);
184    updateBatchStats(batch, entry, entrySize);
185    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
186
187    // Stop if too many entries or too big
188    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
189      || batch.getNbEntries() >= replicationBatchCountCapacity;
190  }
191
192  protected static final boolean switched(WALEntryStream entryStream, Path path) {
193    Path newPath = entryStream.getCurrentPath();
194    return newPath == null || !path.getName().equals(newPath.getName());
195  }
196
197  // We need to get the WALEntryBatch from the caller so we can add entries in there
198  // This is required in case there is any exception in while reading entries
199  // we do not want to loss the existing entries in the batch
200  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
201    throws IOException, InterruptedException {
202    Path currentPath = entryStream.getCurrentPath();
203    for (;;) {
204      Entry entry = entryStream.next();
205      batch.setLastWalPosition(entryStream.getPosition());
206      entry = filterEntry(entry);
207      if (entry != null) {
208        if (addEntryToBatch(batch, entry)) {
209          break;
210        }
211      }
212      boolean hasNext = entryStream.hasNext();
213      // always return if we have switched to a new file
214      if (switched(entryStream, currentPath)) {
215        batch.setEndOfFile(true);
216        break;
217      }
218      if (!hasNext) {
219        break;
220      }
221    }
222  }
223
224  private void handleEmptyWALEntryBatch() throws InterruptedException {
225    LOG.trace("Didn't read any new entries from WAL");
226    if (logQueue.getQueue(walGroupId).isEmpty()) {
227      // we're done with current queue, either this is a recovered queue, or it is the special group
228      // for a sync replication peer and the peer has been transited to DA or S state.
229      LOG.debug("Stopping the replication source wal reader");
230      setReaderRunning(false);
231      // shuts down shipper thread immediately
232      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
233    } else {
234      Thread.sleep(sleepForRetries);
235    }
236  }
237
238  private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
239    throws IOException {
240    Path currentPath = entryStream.getCurrentPath();
241    if (!entryStream.hasNext()) {
242      // check whether we have switched a file
243      if (currentPath != null && switched(entryStream, currentPath)) {
244        return WALEntryBatch.endOfFile(currentPath);
245      } else {
246        return null;
247      }
248    }
249    if (currentPath != null) {
250      if (switched(entryStream, currentPath)) {
251        return WALEntryBatch.endOfFile(currentPath);
252      }
253    }
254    return createBatch(entryStream);
255  }
256
257  /**
258   * This is to handle the EOFException from the WAL entry stream. EOFException should be handled
259   * carefully because there are chances of data loss because of never replicating the data. Thus we
260   * should always try to ship existing batch of entries here. If there was only one log in the
261   * queue before EOF, we ship the empty batch here and since reader is still active, in the next
262   * iteration of reader we will stop the reader.
263   * <p/>
264   * If there was more than one log in the queue before EOF, we ship the existing batch and reset
265   * the wal patch and position to the log with EOF, so shipper can remove logs from replication
266   * queue
267   * @return true only the IOE can be handled
268   */
269  private boolean handleEofException(Exception e, WALEntryBatch batch) {
270    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
271    // Dump the log even if logQueue size is 1 if the source is from recovered Source
272    // since we don't add current log to recovered source queue so it is safe to remove.
273    if (
274      (e instanceof EOFException || e.getCause() instanceof EOFException)
275        && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery
276    ) {
277      Path path = queue.peek();
278      try {
279        if (!fs.exists(path)) {
280          // There is a chance that wal has moved to oldWALs directory, so look there also.
281          path = AbstractFSWALProvider.findArchivedLog(path, conf);
282          // path can be null if unable to locate in archiveDir.
283        }
284        if (path != null && fs.getFileStatus(path).getLen() == 0) {
285          LOG.warn("Forcing removal of 0 length log in queue: {}", path);
286          logQueue.remove(walGroupId);
287          currentPosition = 0;
288          if (batch != null) {
289            // After we removed the WAL from the queue, we should try shipping the existing batch of
290            // entries
291            addBatchToShippingQueue(batch);
292          }
293          return true;
294        }
295      } catch (IOException ioe) {
296        LOG.warn("Couldn't get file length information about log " + path, ioe);
297      } catch (InterruptedException ie) {
298        LOG.trace("Interrupted while adding WAL batch to ship queue");
299        Thread.currentThread().interrupt();
300      }
301    }
302    return false;
303  }
304
305  /**
306   * Update the batch try to ship and return true if shipped
307   * @param batch Batch of entries to ship
308   * @throws InterruptedException throws interrupted exception
309   */
310  private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
311    // need to propagate the batch even it has no entries since it may carry the last
312    // sequence id information for serial replication.
313    LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
314    entryBatchQueue.put(batch);
315  }
316
317  public Path getCurrentPath() {
318    // if we've read some WAL entries, get the Path we read from
319    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
320    if (batchQueueHead != null) {
321      return batchQueueHead.getLastWalPath();
322    }
323    // otherwise, we must be currently reading from the head of the log queue
324    return logQueue.getQueue(walGroupId).peek();
325  }
326
327  // returns false if we've already exceeded the global quota
328  private boolean checkQuota() {
329    // try not to go over total quota
330    if (totalBufferUsed.get() > totalBufferQuota) {
331      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
332        this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
333      Threads.sleep(sleepForRetries);
334      return false;
335    }
336    return true;
337  }
338
339  private WALEntryBatch createBatch(WALEntryStream entryStream) {
340    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
341  }
342
343  protected final Entry filterEntry(Entry entry) {
344    Entry filtered = filter.filter(entry);
345    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
346      LOG.trace("Filtered entry for replication: {}", entry);
347      source.getSourceMetrics().incrLogEditsFiltered();
348    }
349    return filtered;
350  }
351
352  /**
353   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
354   * batch to become available
355   * @return A batch of entries, along with the position in the log after reading the batch
356   * @throws InterruptedException if interrupted while waiting
357   */
358  public WALEntryBatch take() throws InterruptedException {
359    return entryBatchQueue.take();
360  }
361
362  public WALEntryBatch poll(long timeout) throws InterruptedException {
363    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
364  }
365
366  private long getEntrySizeIncludeBulkLoad(Entry entry) {
367    WALEdit edit = entry.getEdit();
368    return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
369  }
370
371  public static long getEntrySizeExcludeBulkLoad(Entry entry) {
372    WALEdit edit = entry.getEdit();
373    WALKey key = entry.getKey();
374    return edit.heapSize() + key.estimatedSerializedSizeOf();
375  }
376
377  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
378    WALEdit edit = entry.getEdit();
379    batch.incrementHeapSize(entrySize);
380    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
381    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
382    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
383  }
384
385  /**
386   * Count the number of different row keys in the given edit because of mini-batching. We assume
387   * that there's at least one Cell in the WALEdit.
388   * @param edit edit to count row keys from
389   * @return number of different row keys and HFiles
390   */
391  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
392    List<Cell> cells = edit.getCells();
393    int distinctRowKeys = 1;
394    int totalHFileEntries = 0;
395    Cell lastCell = cells.get(0);
396
397    int totalCells = edit.size();
398    for (int i = 0; i < totalCells; i++) {
399      // Count HFiles to be replicated
400      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
401        try {
402          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
403          List<StoreDescriptor> stores = bld.getStoresList();
404          int totalStores = stores.size();
405          for (int j = 0; j < totalStores; j++) {
406            totalHFileEntries += stores.get(j).getStoreFileList().size();
407          }
408        } catch (IOException e) {
409          LOG.error("Failed to deserialize bulk load entry from wal edit. "
410            + "Then its hfiles count will not be added into metric.", e);
411        }
412      }
413
414      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
415        distinctRowKeys++;
416      }
417      lastCell = cells.get(i);
418    }
419
420    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
421    return result;
422  }
423
424  /**
425   * Calculate the total size of all the store files
426   * @param edit edit to count row keys from
427   * @return the total size of the store files
428   */
429  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
430    List<Cell> cells = edit.getCells();
431    int totalStoreFilesSize = 0;
432
433    int totalCells = edit.size();
434    for (int i = 0; i < totalCells; i++) {
435      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
436        try {
437          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
438          List<StoreDescriptor> stores = bld.getStoresList();
439          int totalStores = stores.size();
440          for (int j = 0; j < totalStores; j++) {
441            totalStoreFilesSize =
442              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
443          }
444        } catch (IOException e) {
445          LOG.error("Failed to deserialize bulk load entry from wal edit. "
446            + "Size of HFiles part of cell will not be considered in replication "
447            + "request size calculation.", e);
448        }
449      }
450    }
451    return totalStoreFilesSize;
452  }
453
454  /**
455   * @param size delta size for grown buffer
456   * @return true if we should clear buffer and push all
457   */
458  private boolean acquireBufferQuota(long size) {
459    long newBufferUsed = totalBufferUsed.addAndGet(size);
460    // Record the new buffer usage
461    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
462    return newBufferUsed >= totalBufferQuota;
463  }
464
465  /** Returns whether the reader thread is running */
466  public boolean isReaderRunning() {
467    return isReaderRunning && !isInterrupted();
468  }
469
470  /**
471   * @param readerRunning the readerRunning to set
472   */
473  public void setReaderRunning(boolean readerRunning) {
474    this.isReaderRunning = readerRunning;
475  }
476}