001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.PriorityBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.replication.WALEntryFilter;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WALEdit;
039import org.apache.hadoop.hbase.wal.WALKey;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
047
048/**
049 * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
050 * onto a queue
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054class ReplicationSourceWALReader extends Thread {
055  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
056
057  private final PriorityBlockingQueue<Path> logQueue;
058  private final FileSystem fs;
059  private final Configuration conf;
060  private final WALEntryFilter filter;
061  private final ReplicationSource source;
062
063  private final BlockingQueue<WALEntryBatch> entryBatchQueue;
064  // max (heap) size of each batch - multiply by number of batches in queue to get total
065  private final long replicationBatchSizeCapacity;
066  // max count of each batch - multiply by number of batches in queue to get total
067  private final int replicationBatchCountCapacity;
068  // position in the WAL to start reading at
069  private long currentPosition;
070  private final long sleepForRetries;
071  private final int maxRetriesMultiplier;
072  private final boolean eofAutoRecovery;
073
074  //Indicates whether this particular worker is running
075  private boolean isReaderRunning = true;
076
077  private AtomicLong totalBufferUsed;
078  private long totalBufferQuota;
079
080  /**
081   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
082   * entries, and puts them on a batch queue.
083   * @param fs the files system to use
084   * @param conf configuration to use
085   * @param logQueue The WAL queue to read off of
086   * @param startPosition position in the first WAL to start reading from
087   * @param filter The filter to use while reading
088   * @param source replication source
089   */
090  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
091      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
092      ReplicationSource source) {
093    this.logQueue = logQueue;
094    this.currentPosition = startPosition;
095    this.fs = fs;
096    this.conf = conf;
097    this.filter = filter;
098    this.source = source;
099    this.replicationBatchSizeCapacity =
100        this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
101    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
102    // memory used will be batchSizeCapacity * (nb.batches + 1)
103    // the +1 is for the current thread reading before placing onto the queue
104    int batchCount = conf.getInt("replication.source.nb.batches", 1);
105    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
106    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
107    this.sleepForRetries =
108        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
109    this.maxRetriesMultiplier =
110        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
111    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
112    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
113    LOG.info("peerClusterZnode=" + source.getQueueId()
114        + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
115        + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
116        + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
117        + ", replicationBatchQueueCapacity=" + batchCount);
118  }
119
120  @Override
121  public void run() {
122    int sleepMultiplier = 1;
123    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
124      try (WALEntryStream entryStream =
125          new WALEntryStream(logQueue, conf, currentPosition,
126              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
127              source.getSourceMetrics())) {
128        while (isReaderRunning()) { // loop here to keep reusing stream while we can
129          if (!source.isPeerEnabled()) {
130            Threads.sleep(sleepForRetries);
131            continue;
132          }
133          if (!checkQuota()) {
134            continue;
135          }
136          WALEntryBatch batch = readWALEntries(entryStream);
137          currentPosition = entryStream.getPosition();
138          if (batch != null) {
139            // need to propagate the batch even it has no entries since it may carry the last
140            // sequence id information for serial replication.
141            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
142            entryBatchQueue.put(batch);
143            sleepMultiplier = 1;
144          } else { // got no entries and didn't advance position in WAL
145            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
146            entryStream.reset(); // reuse stream
147          }
148        }
149      } catch (IOException e) { // stream related
150        if (sleepMultiplier < maxRetriesMultiplier) {
151          LOG.debug("Failed to read stream of replication entries: " + e);
152          sleepMultiplier++;
153        } else {
154          LOG.error("Failed to read stream of replication entries", e);
155          handleEofException(e);
156        }
157        Threads.sleep(sleepForRetries * sleepMultiplier);
158      } catch (InterruptedException e) {
159        LOG.trace("Interrupted while sleeping between WAL reads");
160        Thread.currentThread().interrupt();
161      }
162    }
163  }
164
165  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
166  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
167    WALEdit edit = entry.getEdit();
168    if (edit == null || edit.isEmpty()) {
169      LOG.debug("Edit null or empty for entry {} ", entry);
170      return false;
171    }
172    LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
173        entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
174    long entrySize = getEntrySizeIncludeBulkLoad(entry);
175    long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
176    batch.addEntry(entry, entrySize);
177    updateBatchStats(batch, entry, entrySize);
178    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
179
180    // Stop if too many entries or too big
181    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
182      batch.getNbEntries() >= replicationBatchCountCapacity;
183  }
184
185  protected static final boolean switched(WALEntryStream entryStream, Path path) {
186    Path newPath = entryStream.getCurrentPath();
187    return newPath == null || !path.getName().equals(newPath.getName());
188  }
189
190  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
191      throws IOException, InterruptedException {
192    Path currentPath = entryStream.getCurrentPath();
193    if (!entryStream.hasNext()) {
194      // check whether we have switched a file
195      if (currentPath != null && switched(entryStream, currentPath)) {
196        return WALEntryBatch.endOfFile(currentPath);
197      } else {
198        return null;
199      }
200    }
201    if (currentPath != null) {
202      if (switched(entryStream, currentPath)) {
203        return WALEntryBatch.endOfFile(currentPath);
204      }
205    } else {
206      // when reading from the entry stream first time we will enter here
207      currentPath = entryStream.getCurrentPath();
208    }
209    WALEntryBatch batch = createBatch(entryStream);
210    for (;;) {
211      Entry entry = entryStream.next();
212      batch.setLastWalPosition(entryStream.getPosition());
213      entry = filterEntry(entry);
214      if (entry != null) {
215        if (addEntryToBatch(batch, entry)) {
216          break;
217        }
218      }
219      boolean hasNext = entryStream.hasNext();
220      // always return if we have switched to a new file
221      if (switched(entryStream, currentPath)) {
222        batch.setEndOfFile(true);
223        break;
224      }
225      if (!hasNext) {
226        break;
227      }
228    }
229    return batch;
230  }
231
232  private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
233    LOG.trace("Didn't read any new entries from WAL");
234    if (source.isRecovered()) {
235      // we're done with queue recovery, shut ourself down
236      setReaderRunning(false);
237      // shuts down shipper thread immediately
238      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
239    } else {
240      Thread.sleep(sleepForRetries);
241    }
242  }
243
244  // if we get an EOF due to a zero-length log, and there are other logs in queue
245  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
246  // enabled, then dump the log
247  private void handleEofException(IOException e) {
248    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
249      logQueue.size() > 1 && this.eofAutoRecovery) {
250      try {
251        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
252          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
253          logQueue.remove();
254          currentPosition = 0;
255        }
256      } catch (IOException ioe) {
257        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
258      }
259    }
260  }
261
262  public Path getCurrentPath() {
263    // if we've read some WAL entries, get the Path we read from
264    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
265    if (batchQueueHead != null) {
266      return batchQueueHead.getLastWalPath();
267    }
268    // otherwise, we must be currently reading from the head of the log queue
269    return logQueue.peek();
270  }
271
272  //returns false if we've already exceeded the global quota
273  private boolean checkQuota() {
274    // try not to go over total quota
275    if (totalBufferUsed.get() > totalBufferQuota) {
276      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
277          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
278      Threads.sleep(sleepForRetries);
279      return false;
280    }
281    return true;
282  }
283
284  protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
285    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
286  }
287
288  protected final Entry filterEntry(Entry entry) {
289    Entry filtered = filter.filter(entry);
290    if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
291      LOG.debug("Filtered entry for replication: {}", entry);
292      source.getSourceMetrics().incrLogEditsFiltered();
293    }
294    return filtered;
295  }
296
297  /**
298   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
299   * batch to become available
300   * @return A batch of entries, along with the position in the log after reading the batch
301   * @throws InterruptedException if interrupted while waiting
302   */
303  public WALEntryBatch take() throws InterruptedException {
304    return entryBatchQueue.take();
305  }
306
307  public WALEntryBatch poll(long timeout) throws InterruptedException {
308    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
309  }
310
311  private long getEntrySizeIncludeBulkLoad(Entry entry) {
312    WALEdit edit = entry.getEdit();
313    return  getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
314  }
315
316  public static long getEntrySizeExcludeBulkLoad(Entry entry) {
317    WALEdit edit = entry.getEdit();
318    WALKey key = entry.getKey();
319    return edit.heapSize() + key.estimatedSerializedSizeOf();
320  }
321
322
323  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
324    WALEdit edit = entry.getEdit();
325    batch.incrementHeapSize(entrySize);
326    Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
327    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
328    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
329  }
330
331  /**
332   * Count the number of different row keys in the given edit because of mini-batching. We assume
333   * that there's at least one Cell in the WALEdit.
334   * @param edit edit to count row keys from
335   * @return number of different row keys and HFiles
336   */
337  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
338    List<Cell> cells = edit.getCells();
339    int distinctRowKeys = 1;
340    int totalHFileEntries = 0;
341    Cell lastCell = cells.get(0);
342
343    int totalCells = edit.size();
344    for (int i = 0; i < totalCells; i++) {
345      // Count HFiles to be replicated
346      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
347        try {
348          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
349          List<StoreDescriptor> stores = bld.getStoresList();
350          int totalStores = stores.size();
351          for (int j = 0; j < totalStores; j++) {
352            totalHFileEntries += stores.get(j).getStoreFileList().size();
353          }
354        } catch (IOException e) {
355          LOG.error("Failed to deserialize bulk load entry from wal edit. "
356              + "Then its hfiles count will not be added into metric.");
357        }
358      }
359
360      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
361        distinctRowKeys++;
362      }
363      lastCell = cells.get(i);
364    }
365
366    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
367    return result;
368  }
369
370  /**
371   * Calculate the total size of all the store files
372   * @param edit edit to count row keys from
373   * @return the total size of the store files
374   */
375  private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
376    List<Cell> cells = edit.getCells();
377    int totalStoreFilesSize = 0;
378
379    int totalCells = edit.size();
380    for (int i = 0; i < totalCells; i++) {
381      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
382        try {
383          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
384          List<StoreDescriptor> stores = bld.getStoresList();
385          int totalStores = stores.size();
386          for (int j = 0; j < totalStores; j++) {
387            totalStoreFilesSize =
388                (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
389          }
390        } catch (IOException e) {
391          LOG.error("Failed to deserialize bulk load entry from wal edit. "
392              + "Size of HFiles part of cell will not be considered in replication "
393              + "request size calculation.",
394            e);
395        }
396      }
397    }
398    return totalStoreFilesSize;
399  }
400
401  /**
402   * @param size delta size for grown buffer
403   * @return true if we should clear buffer and push all
404   */
405  private boolean acquireBufferQuota(long size) {
406    long newBufferUsed = totalBufferUsed.addAndGet(size);
407    // Record the new buffer usage
408    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
409    return newBufferUsed >= totalBufferQuota;
410  }
411
412  /**
413   * @return whether the reader thread is running
414   */
415  public boolean isReaderRunning() {
416    return isReaderRunning && !isInterrupted();
417  }
418
419  /**
420   * @param readerRunning the readerRunning to set
421   */
422  public void setReaderRunning(boolean readerRunning) {
423    this.isReaderRunning = readerRunning;
424  }
425}