001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.concurrent.atomic.LongAccumulator;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.wal.WALEdit;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
042
043/**
044 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
045 * ReplicationSourceWALReaderThread
046 */
047@InterfaceAudience.Private
048public class ReplicationSourceShipper extends Thread {
049  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
050
051  // Hold the state of a replication worker thread
052  public enum WorkerState {
053    RUNNING,
054    STOPPED,
055    FINISHED, // The worker is done processing a queue
056  }
057
058  private final Configuration conf;
059  protected final String walGroupId;
060  protected final ReplicationSourceLogQueue logQueue;
061  private final ReplicationSource source;
062
063  // Last position in the log that we sent to ZooKeeper
064  // It will be accessed by the stats thread so make it volatile
065  private volatile long currentPosition = -1;
066  // Path of the current log
067  private Path currentPath;
068  // Current state of the worker thread
069  private volatile WorkerState state;
070  protected ReplicationSourceWALReader entryReader;
071
072  // How long should we sleep for each retry
073  protected final long sleepForRetries;
074  // Maximum number of retries before taking bold actions
075  protected final int maxRetriesMultiplier;
076  private final int DEFAULT_TIMEOUT = 20000;
077  private final int getEntriesTimeout;
078  private final int shipEditsTimeout;
079
080  public ReplicationSourceShipper(Configuration conf, String walGroupId,
081    ReplicationSourceLogQueue logQueue, ReplicationSource source) {
082    this.conf = conf;
083    this.walGroupId = walGroupId;
084    this.logQueue = logQueue;
085    this.source = source;
086    // 1 second
087    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
088    // 5 minutes @ 1 sec per
089    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
090    // 20 seconds
091    this.getEntriesTimeout =
092      this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
093    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
094      HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
095  }
096
097  @Override
098  public final void run() {
099    setWorkerState(WorkerState.RUNNING);
100    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
101    // Loop until we close down
102    while (isActive()) {
103      // Sleep until replication is enabled again
104      if (!source.isPeerEnabled()) {
105        // The peer enabled check is in memory, not expensive, so do not need to increase the
106        // sleep interval as it may cause a long lag when we enable the peer.
107        sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
108        continue;
109      }
110      try {
111        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
112        LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
113          entryBatch);
114        if (entryBatch == null) {
115          continue;
116        }
117        // the NO_MORE_DATA instance has no path so do not call shipEdits
118        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
119          noMoreData();
120        } else {
121          shipEdits(entryBatch);
122        }
123      } catch (InterruptedException | ReplicationRuntimeException e) {
124        // It is interrupted and needs to quit.
125        LOG.warn("Interrupted while waiting for next replication entry batch", e);
126        Thread.currentThread().interrupt();
127      }
128    }
129    // If the worker exits run loop without finishing its task, mark it as stopped.
130    if (!isFinished()) {
131      setWorkerState(WorkerState.STOPPED);
132    } else {
133      source.removeWorker(this);
134      postFinish();
135    }
136  }
137
138  private void noMoreData() {
139    if (source.isRecovered()) {
140      LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
141        source.getQueueId());
142      source.getSourceMetrics().incrCompletedRecoveryQueue();
143    } else {
144      LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
145    }
146    setWorkerState(WorkerState.FINISHED);
147  }
148
149  // To be implemented by recovered shipper
150  protected void postFinish() {
151  }
152
153  /**
154   * get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
155   * method.
156   */
157  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
158    int totalSize = 0;
159    for (Entry entry : entryBatch.getWalEntries()) {
160      totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
161    }
162    return totalSize;
163  }
164
165  /**
166   * Do the shipping logic
167   */
168  private void shipEdits(WALEntryBatch entryBatch) {
169    List<Entry> entries = entryBatch.getWalEntries();
170    int sleepMultiplier = 0;
171    if (entries.isEmpty()) {
172      updateLogPosition(entryBatch);
173      return;
174    }
175    int currentSize = (int) entryBatch.getHeapSize();
176    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
177    source.getSourceMetrics()
178      .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
179    while (isActive()) {
180      try {
181        try {
182          source.tryThrottle(currentSize);
183        } catch (InterruptedException e) {
184          LOG.debug("Interrupted while sleeping for throttling control");
185          Thread.currentThread().interrupt();
186          // current thread might be interrupted to terminate
187          // directly go back to while() for confirm this
188          continue;
189        }
190        // create replicateContext here, so the entries can be GC'd upon return from this call
191        // stack
192        ReplicationEndpoint.ReplicateContext replicateContext =
193          new ReplicationEndpoint.ReplicateContext();
194        replicateContext.setEntries(entries).setSize(currentSize);
195        replicateContext.setWalGroupId(walGroupId);
196        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
197
198        long startTimeNs = System.nanoTime();
199        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
200        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
201        long endTimeNs = System.nanoTime();
202
203        if (!replicated) {
204          continue;
205        } else {
206          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
207        }
208        // Clean up hfile references
209        for (Entry entry : entries) {
210          cleanUpHFileRefs(entry.getEdit());
211          LOG.trace("shipped entry {}: ", entry);
212        }
213        // Log and clean up WAL logs
214        updateLogPosition(entryBatch);
215
216        // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
217        // this sizeExcludeBulkLoad has to use same calculation that when calling
218        // acquireBufferQuota() in ReplicationSourceWALReader because they maintain
219        // same variable: totalBufferUsed
220        source.postShipEdits(entries, sizeExcludeBulkLoad);
221        // FIXME check relationship between wal group and overall
222        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
223          entryBatch.getNbHFiles());
224        source.getSourceMetrics().setAgeOfLastShippedOp(
225          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
226        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
227
228        if (LOG.isTraceEnabled()) {
229          LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(),
230            entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
231        }
232        break;
233      } catch (Exception ex) {
234        source.getSourceMetrics().incrementFailedBatches();
235        LOG.warn("{} threw unknown exception:",
236          source.getReplicationEndpoint().getClass().getName(), ex);
237        if (
238          sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
239            maxRetriesMultiplier)
240        ) {
241          sleepMultiplier++;
242        }
243      }
244    }
245  }
246
247  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
248    String peerId = source.getPeerId();
249    if (peerId.contains("-")) {
250      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
251      // A peerId will not have "-" in its name, see HBASE-11394
252      peerId = peerId.split("-")[0];
253    }
254    List<Cell> cells = edit.getCells();
255    int totalCells = cells.size();
256    for (int i = 0; i < totalCells; i++) {
257      Cell cell = cells.get(i);
258      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
259        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
260        List<StoreDescriptor> stores = bld.getStoresList();
261        int totalStores = stores.size();
262        for (int j = 0; j < totalStores; j++) {
263          List<String> storeFileList = stores.get(j).getStoreFileList();
264          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
265          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
266        }
267      }
268    }
269  }
270
271  private boolean updateLogPosition(WALEntryBatch batch) {
272    boolean updated = false;
273    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
274    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
275    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
276    // position and the file will be removed soon in cleanOldLogs.
277    if (
278      batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
279        || batch.getLastWalPosition() != currentPosition
280    ) {
281      source.logPositionAndCleanOldLogs(batch);
282      updated = true;
283    }
284    // if end of file is true, then we can just skip to the next file in queue.
285    // the only exception is for recovered queue, if we reach the end of the queue, then there will
286    // no more files so here the currentPath may be null.
287    if (batch.isEndOfFile()) {
288      currentPath = entryReader.getCurrentPath();
289      currentPosition = 0L;
290    } else {
291      currentPath = batch.getLastWalPath();
292      currentPosition = batch.getLastWalPosition();
293    }
294    return updated;
295  }
296
297  public void startup(UncaughtExceptionHandler handler) {
298    String name = Thread.currentThread().getName();
299    Threads.setDaemonThreadRunning(this,
300      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
301      handler::uncaughtException);
302  }
303
304  Path getCurrentPath() {
305    return entryReader.getCurrentPath();
306  }
307
308  long getCurrentPosition() {
309    return currentPosition;
310  }
311
312  void setWALReader(ReplicationSourceWALReader entryReader) {
313    this.entryReader = entryReader;
314  }
315
316  long getStartPosition() {
317    return 0;
318  }
319
320  protected boolean isActive() {
321    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
322  }
323
324  protected final void setWorkerState(WorkerState state) {
325    this.state = state;
326  }
327
328  void stopWorker() {
329    setWorkerState(WorkerState.STOPPED);
330  }
331
332  public boolean isFinished() {
333    return state == WorkerState.FINISHED;
334  }
335
336  /**
337   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case
338   * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
339   * manage to ship those because the replication source is being terminated. In that case, it
340   * iterates through the batched entries and decrease the pending entries size from
341   * <code>ReplicationSourceManager.totalBufferUser</code>
342   * <p/>
343   * <b>NOTES</b> 1) This method should only be called upon replication source termination. It
344   * blocks waiting for both shipper and reader threads termination, to make sure no race conditions
345   * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b>
346   * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered
347   * interruption/termination prior to calling this method.
348   */
349  void clearWALEntryBatch() {
350    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
351    while (this.isAlive() || this.entryReader.isAlive()) {
352      try {
353        if (EnvironmentEdgeManager.currentTime() >= timeout) {
354          LOG.warn(
355            "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
356              + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
357            this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
358          return;
359        } else {
360          // Wait both shipper and reader threads to stop
361          Thread.sleep(this.sleepForRetries);
362        }
363      } catch (InterruptedException e) {
364        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
365          + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
366        return;
367      }
368    }
369    LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0);
370    entryReader.entryBatchQueue.forEach(w -> {
371      entryReader.entryBatchQueue.remove(w);
372      w.getWalEntries().forEach(e -> {
373        long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
374        totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
375      });
376    });
377    if (LOG.isTraceEnabled()) {
378      LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
379        totalToDecrement.longValue());
380    }
381    long newBufferUsed =
382      source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
383    source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
384  }
385}