001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.atomic.LongAccumulator;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
041
042/**
043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
044 * ReplicationSourceWALReaderThread
045 */
046@InterfaceAudience.Private
047public class ReplicationSourceShipper extends Thread {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
049
050  // Hold the state of a replication worker thread
051  public enum WorkerState {
052    RUNNING,
053    STOPPED,
054    FINISHED, // The worker is done processing a recovered queue
055  }
056
057  private final Configuration conf;
058  protected final String walGroupId;
059  protected final ReplicationSourceLogQueue logQueue;
060  private final ReplicationSource source;
061
062  // Last position in the log that we sent to ZooKeeper
063  // It will be accessed by the stats thread so make it volatile
064  private volatile long currentPosition = -1;
065  // Path of the current log
066  private Path currentPath;
067  // Current state of the worker thread
068  private volatile WorkerState state;
069  protected ReplicationSourceWALReader entryReader;
070
071  // How long should we sleep for each retry
072  protected final long sleepForRetries;
073  // Maximum number of retries before taking bold actions
074  protected final int maxRetriesMultiplier;
075  private final int DEFAULT_TIMEOUT = 20000;
076  private final int getEntriesTimeout;
077  private final int shipEditsTimeout;
078
079  public ReplicationSourceShipper(Configuration conf, String walGroupId,
080    ReplicationSourceLogQueue logQueue, ReplicationSource source) {
081    this.conf = conf;
082    this.walGroupId = walGroupId;
083    this.logQueue = logQueue;
084    this.source = source;
085    // 1 second
086    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
087    // 5 minutes @ 1 sec per
088    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
089    // 20 seconds
090    this.getEntriesTimeout =
091      this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
092    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
093      HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
094  }
095
096  @Override
097  public final void run() {
098    setWorkerState(WorkerState.RUNNING);
099    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
100    // Loop until we close down
101    while (isActive()) {
102      // Sleep until replication is enabled again
103      if (!source.isPeerEnabled()) {
104        // The peer enabled check is in memory, not expensive, so do not need to increase the
105        // sleep interval as it may cause a long lag when we enable the peer.
106        sleepForRetries("Replication is disabled", 1);
107        continue;
108      }
109      try {
110        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
111        LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
112          entryBatch);
113        if (entryBatch == null) {
114          continue;
115        }
116        // the NO_MORE_DATA instance has no path so do not call shipEdits
117        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
118          noMoreData();
119        } else {
120          shipEdits(entryBatch);
121        }
122      } catch (InterruptedException | ReplicationRuntimeException e) {
123        // It is interrupted and needs to quit.
124        LOG.warn("Interrupted while waiting for next replication entry batch", e);
125        Thread.currentThread().interrupt();
126      }
127    }
128    // If the worker exits run loop without finishing its task, mark it as stopped.
129    if (!isFinished()) {
130      setWorkerState(WorkerState.STOPPED);
131    } else {
132      source.workerThreads.remove(this.walGroupId);
133      postFinish();
134    }
135  }
136
137  // To be implemented by recovered shipper
138  protected void noMoreData() {
139  }
140
141  // To be implemented by recovered shipper
142  protected void postFinish() {
143  }
144
145  /**
146   * get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
147   * method.
148   */
149  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
150    int totalSize = 0;
151    for (Entry entry : entryBatch.getWalEntries()) {
152      totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
153    }
154    return totalSize;
155  }
156
157  /**
158   * Do the shipping logic
159   */
160  private void shipEdits(WALEntryBatch entryBatch) {
161    List<Entry> entries = entryBatch.getWalEntries();
162    int sleepMultiplier = 0;
163    if (entries.isEmpty()) {
164      updateLogPosition(entryBatch);
165      return;
166    }
167    int currentSize = (int) entryBatch.getHeapSize();
168    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
169    source.getSourceMetrics()
170      .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
171    while (isActive()) {
172      try {
173        try {
174          source.tryThrottle(currentSize);
175        } catch (InterruptedException e) {
176          LOG.debug("Interrupted while sleeping for throttling control");
177          Thread.currentThread().interrupt();
178          // current thread might be interrupted to terminate
179          // directly go back to while() for confirm this
180          continue;
181        }
182        // create replicateContext here, so the entries can be GC'd upon return from this call
183        // stack
184        ReplicationEndpoint.ReplicateContext replicateContext =
185          new ReplicationEndpoint.ReplicateContext();
186        replicateContext.setEntries(entries).setSize(currentSize);
187        replicateContext.setWalGroupId(walGroupId);
188        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
189
190        long startTimeNs = System.nanoTime();
191        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
192        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
193        long endTimeNs = System.nanoTime();
194
195        if (!replicated) {
196          continue;
197        } else {
198          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
199        }
200        // Clean up hfile references
201        for (Entry entry : entries) {
202          cleanUpHFileRefs(entry.getEdit());
203          LOG.trace("shipped entry {}: ", entry);
204        }
205        // Log and clean up WAL logs
206        updateLogPosition(entryBatch);
207
208        // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
209        // this sizeExcludeBulkLoad has to use same calculation that when calling
210        // acquireBufferQuota() in ReplicationSourceWALReader because they maintain
211        // same variable: totalBufferUsed
212        source.postShipEdits(entries, sizeExcludeBulkLoad);
213        // FIXME check relationship between wal group and overall
214        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
215          entryBatch.getNbHFiles());
216        source.getSourceMetrics().setAgeOfLastShippedOp(
217          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
218        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
219
220        if (LOG.isTraceEnabled()) {
221          LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(),
222            entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
223        }
224        break;
225      } catch (Exception ex) {
226        source.getSourceMetrics().incrementFailedBatches();
227        LOG.warn("{} threw unknown exception:",
228          source.getReplicationEndpoint().getClass().getName(), ex);
229        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
230          sleepMultiplier++;
231        }
232      }
233    }
234  }
235
236  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
237    String peerId = source.getPeerId();
238    if (peerId.contains("-")) {
239      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
240      // A peerId will not have "-" in its name, see HBASE-11394
241      peerId = peerId.split("-")[0];
242    }
243    List<Cell> cells = edit.getCells();
244    int totalCells = cells.size();
245    for (int i = 0; i < totalCells; i++) {
246      Cell cell = cells.get(i);
247      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
248        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
249        List<StoreDescriptor> stores = bld.getStoresList();
250        int totalStores = stores.size();
251        for (int j = 0; j < totalStores; j++) {
252          List<String> storeFileList = stores.get(j).getStoreFileList();
253          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
254          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
255        }
256      }
257    }
258  }
259
260  private boolean updateLogPosition(WALEntryBatch batch) {
261    boolean updated = false;
262    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
263    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
264    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
265    // position and the file will be removed soon in cleanOldLogs.
266    if (
267      batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
268        || batch.getLastWalPosition() != currentPosition
269    ) {
270      source.logPositionAndCleanOldLogs(batch);
271      updated = true;
272    }
273    // if end of file is true, then we can just skip to the next file in queue.
274    // the only exception is for recovered queue, if we reach the end of the queue, then there will
275    // no more files so here the currentPath may be null.
276    if (batch.isEndOfFile()) {
277      currentPath = entryReader.getCurrentPath();
278      currentPosition = 0L;
279    } else {
280      currentPath = batch.getLastWalPath();
281      currentPosition = batch.getLastWalPosition();
282    }
283    return updated;
284  }
285
286  public void startup(UncaughtExceptionHandler handler) {
287    String name = Thread.currentThread().getName();
288    Threads.setDaemonThreadRunning(this,
289      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
290      handler::uncaughtException);
291  }
292
293  Path getCurrentPath() {
294    return entryReader.getCurrentPath();
295  }
296
297  long getCurrentPosition() {
298    return currentPosition;
299  }
300
301  void setWALReader(ReplicationSourceWALReader entryReader) {
302    this.entryReader = entryReader;
303  }
304
305  long getStartPosition() {
306    return 0;
307  }
308
309  protected boolean isActive() {
310    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
311  }
312
313  protected final void setWorkerState(WorkerState state) {
314    this.state = state;
315  }
316
317  void stopWorker() {
318    setWorkerState(WorkerState.STOPPED);
319  }
320
321  public boolean isFinished() {
322    return state == WorkerState.FINISHED;
323  }
324
325  /**
326   * Do the sleeping logic
327   * @param msg             Why we sleep
328   * @param sleepMultiplier by how many times the default sleeping time is augmented
329   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
330   */
331  public boolean sleepForRetries(String msg, int sleepMultiplier) {
332    try {
333      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
334      Thread.sleep(this.sleepForRetries * sleepMultiplier);
335    } catch (InterruptedException e) {
336      LOG.debug("Interrupted while sleeping between retries");
337      Thread.currentThread().interrupt();
338    }
339    return sleepMultiplier < maxRetriesMultiplier;
340  }
341
342  /**
343   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case
344   * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
345   * manage to ship those because the replication source is being terminated. In that case, it
346   * iterates through the batched entries and decrease the pending entries size from
347   * <code>ReplicationSourceManager.totalBufferUser</code>
348   * <p/>
349   * <b>NOTES</b> 1) This method should only be called upon replication source termination. It
350   * blocks waiting for both shipper and reader threads termination, to make sure no race conditions
351   * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b>
352   * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered
353   * interruption/termination prior to calling this method.
354   */
355  void clearWALEntryBatch() {
356    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
357    while (this.isAlive() || this.entryReader.isAlive()) {
358      try {
359        if (EnvironmentEdgeManager.currentTime() >= timeout) {
360          LOG.warn(
361            "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
362              + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
363            this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
364          return;
365        } else {
366          // Wait both shipper and reader threads to stop
367          Thread.sleep(this.sleepForRetries);
368        }
369      } catch (InterruptedException e) {
370        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
371          + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
372        return;
373      }
374    }
375    LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0);
376    entryReader.entryBatchQueue.forEach(w -> {
377      entryReader.entryBatchQueue.remove(w);
378      w.getWalEntries().forEach(e -> {
379        long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
380        totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
381      });
382    });
383    if (LOG.isTraceEnabled()) {
384      LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
385        totalToDecrement.longValue());
386    }
387    long newBufferUsed =
388      source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
389    source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
390  }
391}