001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.PriorityBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.replication.WALEntryFilter;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALKey;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
048
049/**
050 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
051 * onto a queue
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055class ReplicationSourceWALReader extends Thread {
056  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
057
058  private final PriorityBlockingQueue<Path> logQueue;
059  private final FileSystem fs;
060  private final Configuration conf;
061  private final WALEntryFilter filter;
062  private final ReplicationSource source;
063
064  private final BlockingQueue<WALEntryBatch> entryBatchQueue;
065  // max (heap) size of each batch - multiply by number of batches in queue to get total
066  private final long replicationBatchSizeCapacity;
067  // max count of each batch - multiply by number of batches in queue to get total
068  private final int replicationBatchCountCapacity;
069  // position in the WAL to start reading at
070  private long currentPosition;
071  private final long sleepForRetries;
072  private final int maxRetriesMultiplier;
073  private final boolean eofAutoRecovery;
074
075  //Indicates whether this particular worker is running
076  private boolean isReaderRunning = true;
077
078  private AtomicLong totalBufferUsed;
079  private long totalBufferQuota;
080
081  /**
082   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
083   * entries, and puts them on a batch queue.
084   * @param fs the files system to use
085   * @param conf configuration to use
086   * @param logQueue The WAL queue to read off of
087   * @param startPosition position in the first WAL to start reading from
088   * @param filter The filter to use while reading
089   * @param source replication source
090   */
091  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
092      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
093      ReplicationSource source) {
094    this.logQueue = logQueue;
095    this.currentPosition = startPosition;
096    this.fs = fs;
097    this.conf = conf;
098    this.filter = filter;
099    this.source = source;
100    this.replicationBatchSizeCapacity =
101        this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
102    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
103    // memory used will be batchSizeCapacity * (nb.batches + 1)
104    // the +1 is for the current thread reading before placing onto the queue
105    int batchCount = conf.getInt("replication.source.nb.batches", 1);
106    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
107    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
108      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
109    this.sleepForRetries =
110        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
111    this.maxRetriesMultiplier =
112        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
113    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
114    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
115    LOG.info("peerClusterZnode=" + source.getQueueId()
116        + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
117        + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
118        + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
119        + ", replicationBatchQueueCapacity=" + batchCount);
120  }
121
122  @Override
123  public void run() {
124    int sleepMultiplier = 1;
125    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
126      try (WALEntryStream entryStream =
127          new WALEntryStream(logQueue, conf, currentPosition,
128              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
129              source.getSourceMetrics())) {
130        while (isReaderRunning()) { // loop here to keep reusing stream while we can
131          if (!source.isPeerEnabled()) {
132            Threads.sleep(sleepForRetries);
133            continue;
134          }
135          if (!checkQuota()) {
136            continue;
137          }
138          WALEntryBatch batch = readWALEntries(entryStream);
139          currentPosition = entryStream.getPosition();
140          if (batch != null) {
141            // need to propagate the batch even it has no entries since it may carry the last
142            // sequence id information for serial replication.
143            LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries());
144            entryBatchQueue.put(batch);
145            sleepMultiplier = 1;
146          } else { // got no entries and didn't advance position in WAL
147            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
148            entryStream.reset(); // reuse stream
149          }
150        }
151      } catch (IOException e) { // stream related
152        if (sleepMultiplier < maxRetriesMultiplier) {
153          LOG.debug("Failed to read stream of replication entries: " + e);
154          sleepMultiplier++;
155        } else {
156          LOG.error("Failed to read stream of replication entries", e);
157          handleEofException(e);
158        }
159        Threads.sleep(sleepForRetries * sleepMultiplier);
160      } catch (InterruptedException e) {
161        LOG.trace("Interrupted while sleeping between WAL reads");
162        Thread.currentThread().interrupt();
163      }
164    }
165  }
166
167  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
168  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
169    WALEdit edit = entry.getEdit();
170    if (edit == null || edit.isEmpty()) {
171      return false;
172    }
173    long entrySize = getEntrySizeIncludeBulkLoad(entry);
174    long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
175    batch.addEntry(entry);
176    updateBatchStats(batch, entry, entrySize);
177    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
178
179    // Stop if too many entries or too big
180    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
181      batch.getNbEntries() >= replicationBatchCountCapacity;
182  }
183
184  protected static final boolean switched(WALEntryStream entryStream, Path path) {
185    Path newPath = entryStream.getCurrentPath();
186    return newPath == null || !path.getName().equals(newPath.getName());
187  }
188
189  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
190      throws IOException, InterruptedException {
191    Path currentPath = entryStream.getCurrentPath();
192    if (!entryStream.hasNext()) {
193      // check whether we have switched a file
194      if (currentPath != null && switched(entryStream, currentPath)) {
195        return WALEntryBatch.endOfFile(currentPath);
196      } else {
197        return null;
198      }
199    }
200    if (currentPath != null) {
201      if (switched(entryStream, currentPath)) {
202        return WALEntryBatch.endOfFile(currentPath);
203      }
204    } else {
205      // when reading from the entry stream first time we will enter here
206      currentPath = entryStream.getCurrentPath();
207    }
208    WALEntryBatch batch = createBatch(entryStream);
209    for (;;) {
210      Entry entry = entryStream.next();
211      batch.setLastWalPosition(entryStream.getPosition());
212      entry = filterEntry(entry);
213      if (entry != null) {
214        if (addEntryToBatch(batch, entry)) {
215          break;
216        }
217      }
218      boolean hasNext = entryStream.hasNext();
219      // always return if we have switched to a new file
220      if (switched(entryStream, currentPath)) {
221        batch.setEndOfFile(true);
222        break;
223      }
224      if (!hasNext) {
225        break;
226      }
227    }
228    return batch;
229  }
230
231  private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
232    LOG.trace("Didn't read any new entries from WAL");
233    if (source.isRecovered()) {
234      // we're done with queue recovery, shut ourself down
235      setReaderRunning(false);
236      // shuts down shipper thread immediately
237      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
238    } else {
239      Thread.sleep(sleepForRetries);
240    }
241  }
242
243  // if we get an EOF due to a zero-length log, and there are other logs in queue
244  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
245  // enabled, then dump the log
246  private void handleEofException(IOException e) {
247    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
248      logQueue.size() > 1 && this.eofAutoRecovery) {
249      try {
250        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
251          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
252          logQueue.remove();
253          currentPosition = 0;
254        }
255      } catch (IOException ioe) {
256        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
257      }
258    }
259  }
260
261  public Path getCurrentPath() {
262    // if we've read some WAL entries, get the Path we read from
263    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
264    if (batchQueueHead != null) {
265      return batchQueueHead.getLastWalPath();
266    }
267    // otherwise, we must be currently reading from the head of the log queue
268    return logQueue.peek();
269  }
270
271  //returns false if we've already exceeded the global quota
272  private boolean checkQuota() {
273    // try not to go over total quota
274    if (totalBufferUsed.get() > totalBufferQuota) {
275      Threads.sleep(sleepForRetries);
276      return false;
277    }
278    return true;
279  }
280
281  protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
282    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
283  }
284
285  protected final Entry filterEntry(Entry entry) {
286    Entry filtered = filter.filter(entry);
287    if (entry != null && filtered == null) {
288      source.getSourceMetrics().incrLogEditsFiltered();
289    }
290    return filtered;
291  }
292
293  /**
294   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
295   * batch to become available
296   * @return A batch of entries, along with the position in the log after reading the batch
297   * @throws InterruptedException if interrupted while waiting
298   */
299  public WALEntryBatch take() throws InterruptedException {
300    return entryBatchQueue.take();
301  }
302
303  public WALEntryBatch poll(long timeout) throws InterruptedException {
304    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
305  }
306
307  private long getEntrySizeIncludeBulkLoad(Entry entry) {
308    WALEdit edit = entry.getEdit();
309    WALKey key = entry.getKey();
310    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
311        key.estimatedSerializedSizeOf();
312  }
313
314  public static long getEntrySizeExcludeBulkLoad(Entry entry) {
315    WALEdit edit = entry.getEdit();
316    WALKey key = entry.getKey();
317    return edit.heapSize() + key.estimatedSerializedSizeOf();
318  }
319
320
321  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
322    WALEdit edit = entry.getEdit();
323    batch.incrementHeapSize(entrySize);
324    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
325    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
326    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
327  }
328
329  /**
330   * Count the number of different row keys in the given edit because of mini-batching. We assume
331   * that there's at least one Cell in the WALEdit.
332   * @param edit edit to count row keys from
333   * @return number of different row keys and HFiles
334   */
335  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
336    List<Cell> cells = edit.getCells();
337    int distinctRowKeys = 1;
338    int totalHFileEntries = 0;
339    Cell lastCell = cells.get(0);
340
341    int totalCells = edit.size();
342    for (int i = 0; i < totalCells; i++) {
343      // Count HFiles to be replicated
344      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
345        try {
346          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
347          List<StoreDescriptor> stores = bld.getStoresList();
348          int totalStores = stores.size();
349          for (int j = 0; j < totalStores; j++) {
350            totalHFileEntries += stores.get(j).getStoreFileList().size();
351          }
352        } catch (IOException e) {
353          LOG.error("Failed to deserialize bulk load entry from wal edit. "
354              + "Then its hfiles count will not be added into metric.");
355        }
356      }
357
358      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
359        distinctRowKeys++;
360      }
361      lastCell = cells.get(i);
362    }
363
364    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
365    return result;
366  }
367
368  /**
369   * Calculate the total size of all the store files
370   * @param edit edit to count row keys from
371   * @return the total size of the store files
372   */
373  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
374    List<Cell> cells = edit.getCells();
375    int totalStoreFilesSize = 0;
376
377    int totalCells = edit.size();
378    for (int i = 0; i < totalCells; i++) {
379      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
380        try {
381          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
382          List<StoreDescriptor> stores = bld.getStoresList();
383          int totalStores = stores.size();
384          for (int j = 0; j < totalStores; j++) {
385            totalStoreFilesSize =
386                (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
387          }
388        } catch (IOException e) {
389          LOG.error("Failed to deserialize bulk load entry from wal edit. "
390              + "Size of HFiles part of cell will not be considered in replication "
391              + "request size calculation.",
392            e);
393        }
394      }
395    }
396    return totalStoreFilesSize;
397  }
398
399  /**
400   * @param size delta size for grown buffer
401   * @return true if we should clear buffer and push all
402   */
403  private boolean acquireBufferQuota(long size) {
404    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
405  }
406
407  /**
408   * @return whether the reader thread is running
409   */
410  public boolean isReaderRunning() {
411    return isReaderRunning && !isInterrupted();
412  }
413
414  /**
415   * @param readerRunning the readerRunning to set
416   */
417  public void setReaderRunning(boolean readerRunning) {
418    this.isReaderRunning = readerRunning;
419  }
420}