001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.PriorityBlockingQueue;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.replication.WALEntryFilter;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
047
048/**
049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
050 *
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054public class ReplicationSourceWALReader extends Thread {
055  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
056
057  private final PriorityBlockingQueue<Path> logQueue;
058  private final FileSystem fs;
059  private final Configuration conf;
060  private final WALEntryFilter filter;
061  private final ReplicationSource source;
062
063  protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
064  // max (heap) size of each batch - multiply by number of batches in queue to get total
065  private final long replicationBatchSizeCapacity;
066  // max count of each batch - multiply by number of batches in queue to get total
067  protected final int replicationBatchCountCapacity;
068  // position in the WAL to start reading at
069  private long currentPosition;
070  private final long sleepForRetries;
071  private final int maxRetriesMultiplier;
072  private final boolean eofAutoRecovery;
073
074  //Indicates whether this particular worker is running
075  private boolean isReaderRunning = true;
076
077  private AtomicLong totalBufferUsed;
078  private long totalBufferQuota;
079
080  /**
081   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
082   * entries, and puts them on a batch queue.
083   * @param fs the files system to use
084   * @param conf configuration to use
085   * @param logQueue The WAL queue to read off of
086   * @param startPosition position in the first WAL to start reading from
087   * @param filter The filter to use while reading
088   * @param source replication source
089   */
090  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
091      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
092      ReplicationSource source) {
093    this.logQueue = logQueue;
094    this.currentPosition = startPosition;
095    this.fs = fs;
096    this.conf = conf;
097    this.filter = filter;
098    this.source = source;
099    this.replicationBatchSizeCapacity =
100        this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
101    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
102    // memory used will be batchSizeCapacity * (nb.batches + 1)
103    // the +1 is for the current thread reading before placing onto the queue
104    int batchCount = conf.getInt("replication.source.nb.batches", 1);
105    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
106    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
107      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
108    this.sleepForRetries =
109        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
110    this.maxRetriesMultiplier =
111        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
112    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
113    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
114    LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
115        + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
116        + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
117        + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
118        + ", replicationBatchQueueCapacity=" + batchCount);
119  }
120
121  @Override
122  public void run() {
123    int sleepMultiplier = 1;
124    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
125      try (WALEntryStream entryStream =
126          new WALEntryStream(logQueue, conf, currentPosition,
127              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
128              source.getSourceMetrics())) {
129        while (isReaderRunning()) { // loop here to keep reusing stream while we can
130          if (!checkQuota()) {
131            continue;
132          }
133          WALEntryBatch batch = readWALEntries(entryStream);
134          if (batch != null && batch.getNbEntries() > 0) {
135            if (LOG.isTraceEnabled()) {
136              LOG.trace(String.format("Read %s WAL entries eligible for replication",
137                batch.getNbEntries()));
138            }
139            entryBatchQueue.put(batch);
140            sleepMultiplier = 1;
141          } else { // got no entries and didn't advance position in WAL
142            handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
143          }
144          currentPosition = entryStream.getPosition();
145          entryStream.reset(); // reuse stream
146        }
147      } catch (IOException e) { // stream related
148        if (sleepMultiplier < maxRetriesMultiplier) {
149          LOG.debug("Failed to read stream of replication entries: " + e);
150          sleepMultiplier++;
151        } else {
152          LOG.error("Failed to read stream of replication entries", e);
153          handleEofException(e);
154        }
155        Threads.sleep(sleepForRetries * sleepMultiplier);
156      } catch (InterruptedException e) {
157        LOG.trace("Interrupted while sleeping between WAL reads");
158        Thread.currentThread().interrupt();
159      }
160    }
161  }
162
163  private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
164    WALEntryBatch batch = null;
165    while (entryStream.hasNext()) {
166      if (batch == null) {
167        batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
168      }
169      Entry entry = entryStream.next();
170      entry = filterEntry(entry);
171      if (entry != null) {
172        WALEdit edit = entry.getEdit();
173        if (edit != null && !edit.isEmpty()) {
174          long entrySize = getEntrySize(entry);
175          batch.addEntry(entry);
176          updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
177          boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
178          // Stop if too many entries or too big
179          if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
180              || batch.getNbEntries() >= replicationBatchCountCapacity) {
181            break;
182          }
183        }
184      }
185    }
186    return batch;
187  }
188
189  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
190      throws InterruptedException {
191    LOG.trace("Didn't read any new entries from WAL");
192    Thread.sleep(sleepForRetries);
193  }
194
195  // if we get an EOF due to a zero-length log, and there are other logs in queue
196  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
197  // enabled, then dump the log
198  private void handleEofException(IOException e) {
199    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
200      logQueue.size() > 1 && this.eofAutoRecovery) {
201      try {
202        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
203          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
204          logQueue.remove();
205          currentPosition = 0;
206        }
207      } catch (IOException ioe) {
208        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
209      }
210    }
211  }
212
213  public Path getCurrentPath() {
214    // if we've read some WAL entries, get the Path we read from
215    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
216    if (batchQueueHead != null) {
217      return batchQueueHead.lastWalPath;
218    }
219    // otherwise, we must be currently reading from the head of the log queue
220    return logQueue.peek();
221  }
222
223  //returns false if we've already exceeded the global quota
224  private boolean checkQuota() {
225    // try not to go over total quota
226    if (totalBufferUsed.get() > totalBufferQuota) {
227      Threads.sleep(sleepForRetries);
228      return false;
229    }
230    return true;
231  }
232
233  private Entry filterEntry(Entry entry) {
234    Entry filtered = filter.filter(entry);
235    if (entry != null && filtered == null) {
236      source.getSourceMetrics().incrLogEditsFiltered();
237    }
238    return filtered;
239  }
240
241  /**
242   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
243   * batch to become available
244   * @return A batch of entries, along with the position in the log after reading the batch
245   * @throws InterruptedException if interrupted while waiting
246   */
247  public WALEntryBatch take() throws InterruptedException {
248    return entryBatchQueue.take();
249  }
250
251  private long getEntrySize(Entry entry) {
252    WALEdit edit = entry.getEdit();
253    return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
254  }
255
256  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
257    WALEdit edit = entry.getEdit();
258    if (edit != null && !edit.isEmpty()) {
259      batch.incrementHeapSize(entrySize);
260      Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
261      batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
262      batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
263    }
264    batch.lastWalPosition = entryPosition;
265  }
266
267  /**
268   * Count the number of different row keys in the given edit because of mini-batching. We assume
269   * that there's at least one Cell in the WALEdit.
270   * @param edit edit to count row keys from
271   * @return number of different row keys and HFiles
272   */
273  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
274    List<Cell> cells = edit.getCells();
275    int distinctRowKeys = 1;
276    int totalHFileEntries = 0;
277    Cell lastCell = cells.get(0);
278
279    int totalCells = edit.size();
280    for (int i = 0; i < totalCells; i++) {
281      // Count HFiles to be replicated
282      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
283        try {
284          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
285          List<StoreDescriptor> stores = bld.getStoresList();
286          int totalStores = stores.size();
287          for (int j = 0; j < totalStores; j++) {
288            totalHFileEntries += stores.get(j).getStoreFileList().size();
289          }
290        } catch (IOException e) {
291          LOG.error("Failed to deserialize bulk load entry from wal edit. "
292              + "Then its hfiles count will not be added into metric.");
293        }
294      }
295
296      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
297        distinctRowKeys++;
298      }
299      lastCell = cells.get(i);
300    }
301
302    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
303    return result;
304  }
305
306  /**
307   * Calculate the total size of all the store files
308   * @param edit edit to count row keys from
309   * @return the total size of the store files
310   */
311  private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
312    List<Cell> cells = edit.getCells();
313    int totalStoreFilesSize = 0;
314
315    int totalCells = edit.size();
316    for (int i = 0; i < totalCells; i++) {
317      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
318        try {
319          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
320          List<StoreDescriptor> stores = bld.getStoresList();
321          int totalStores = stores.size();
322          for (int j = 0; j < totalStores; j++) {
323            totalStoreFilesSize =
324                (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
325          }
326        } catch (IOException e) {
327          LOG.error("Failed to deserialize bulk load entry from wal edit. "
328              + "Size of HFiles part of cell will not be considered in replication "
329              + "request size calculation.",
330            e);
331        }
332      }
333    }
334    return totalStoreFilesSize;
335  }
336
337  /**
338   * @param size delta size for grown buffer
339   * @return true if we should clear buffer and push all
340   */
341  private boolean acquireBufferQuota(long size) {
342    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
343  }
344
345  /**
346   * @return whether the reader thread is running
347   */
348  public boolean isReaderRunning() {
349    return isReaderRunning && !isInterrupted();
350  }
351
352  /**
353   * @param readerRunning the readerRunning to set
354   */
355  public void setReaderRunning(boolean readerRunning) {
356    this.isReaderRunning = readerRunning;
357  }
358
359  /**
360   * Holds a batch of WAL entries to replicate, along with some statistics
361   *
362   */
363  static class WALEntryBatch {
364    private List<Entry> walEntries;
365    // last WAL that was read
366    private Path lastWalPath;
367    // position in WAL of last entry in this batch
368    private long lastWalPosition = 0;
369    // number of distinct row keys in this batch
370    private int nbRowKeys = 0;
371    // number of HFiles
372    private int nbHFiles = 0;
373    // heap size of data we need to replicate
374    private long heapSize = 0;
375
376    /**
377     * @param walEntries
378     * @param lastWalPath Path of the WAL the last entry in this batch was read from
379     * @param lastWalPosition Position in the WAL the last entry in this batch was read from
380     */
381    WALEntryBatch(int maxNbEntries, Path lastWalPath) {
382      this.walEntries = new ArrayList<>(maxNbEntries);
383      this.lastWalPath = lastWalPath;
384    }
385
386    public void addEntry(Entry entry) {
387      walEntries.add(entry);
388    }
389
390    /**
391     * @return the WAL Entries.
392     */
393    public List<Entry> getWalEntries() {
394      return walEntries;
395    }
396
397    /**
398     * @return the path of the last WAL that was read.
399     */
400    public Path getLastWalPath() {
401      return lastWalPath;
402    }
403
404    /**
405     * @return the position in the last WAL that was read.
406     */
407    public long getLastWalPosition() {
408      return lastWalPosition;
409    }
410
411    public int getNbEntries() {
412      return walEntries.size();
413    }
414
415    /**
416     * @return the number of distinct row keys in this batch
417     */
418    public int getNbRowKeys() {
419      return nbRowKeys;
420    }
421
422    /**
423     * @return the number of HFiles in this batch
424     */
425    public int getNbHFiles() {
426      return nbHFiles;
427    }
428
429    /**
430     * @return total number of operations in this batch
431     */
432    public int getNbOperations() {
433      return getNbRowKeys() + getNbHFiles();
434    }
435
436    /**
437     * @return the heap size of this batch
438     */
439    public long getHeapSize() {
440      return heapSize;
441    }
442
443    private void incrementNbRowKeys(int increment) {
444      nbRowKeys += increment;
445    }
446
447    private void incrementNbHFiles(int increment) {
448      nbHFiles += increment;
449    }
450
451    private void incrementHeapSize(long increment) {
452      heapSize += increment;
453    }
454  }
455}