001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
022
023import java.io.IOException;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
041
042/**
043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
044 * ReplicationSourceWALReaderThread
045 */
046@InterfaceAudience.Private
047public class ReplicationSourceShipper extends Thread {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
049
050  // Hold the state of a replication worker thread
051  public enum WorkerState {
052    RUNNING,
053    STOPPED,
054    FINISHED, // The worker is done processing a queue
055  }
056
057  private final Configuration conf;
058  final String walGroupId;
059  private final ReplicationSource source;
060
061  // Last position in the log that we sent to ZooKeeper
062  // It will be accessed by the stats thread so make it volatile
063  private volatile long currentPosition = -1;
064  // Path of the current log
065  private Path currentPath;
066  // Current state of the worker thread
067  private volatile WorkerState state;
068  final ReplicationSourceWALReader entryReader;
069
070  // How long should we sleep for each retry
071  private final long sleepForRetries;
072  // Maximum number of retries before taking bold actions
073  private final int maxRetriesMultiplier;
074  private final int DEFAULT_TIMEOUT = 20000;
075  private final int getEntriesTimeout;
076  private final int shipEditsTimeout;
077
078  public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
079    ReplicationSourceWALReader walReader) {
080    this.conf = conf;
081    this.walGroupId = walGroupId;
082    this.source = source;
083    this.entryReader = walReader;
084    // 1 second
085    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
086    // 5 minutes @ 1 sec per
087    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
088    // 20 seconds
089    this.getEntriesTimeout =
090      this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
091    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
092      HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
093  }
094
095  @Override
096  public final void run() {
097    setWorkerState(WorkerState.RUNNING);
098    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
099    // Loop until we close down
100    while (isActive()) {
101      // Sleep until replication is enabled again
102      if (!source.isPeerEnabled()) {
103        // The peer enabled check is in memory, not expensive, so do not need to increase the
104        // sleep interval as it may cause a long lag when we enable the peer.
105        sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
106        continue;
107      }
108      try {
109        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
110        LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
111          entryBatch);
112        if (entryBatch == null) {
113          continue;
114        }
115        // the NO_MORE_DATA instance has no path so do not call shipEdits
116        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
117          noMoreData();
118        } else {
119          shipEdits(entryBatch);
120        }
121      } catch (InterruptedException | ReplicationRuntimeException e) {
122        // It is interrupted and needs to quit.
123        LOG.warn("Interrupted while waiting for next replication entry batch", e);
124        Thread.currentThread().interrupt();
125      }
126    }
127    // If the worker exits run loop without finishing its task, mark it as stopped.
128    if (!isFinished()) {
129      setWorkerState(WorkerState.STOPPED);
130    } else {
131      source.removeWorker(this);
132      postFinish();
133    }
134  }
135
136  private void noMoreData() {
137    if (source.isRecovered()) {
138      LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
139        source.getQueueId());
140      source.getSourceMetrics().incrCompletedRecoveryQueue();
141    } else {
142      LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
143    }
144    setWorkerState(WorkerState.FINISHED);
145  }
146
147  // To be implemented by recovered shipper
148  protected void postFinish() {
149  }
150
151  /**
152   * Do the shipping logic
153   */
154  private void shipEdits(WALEntryBatch entryBatch) {
155    List<Entry> entries = entryBatch.getWalEntries();
156    int sleepMultiplier = 0;
157    if (entries.isEmpty()) {
158      updateLogPosition(entryBatch);
159      return;
160    }
161    int currentSize = (int) entryBatch.getHeapSize();
162    source.getSourceMetrics()
163      .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
164    while (isActive()) {
165      try {
166        try {
167          source.tryThrottle(currentSize);
168        } catch (InterruptedException e) {
169          LOG.debug("Interrupted while sleeping for throttling control");
170          Thread.currentThread().interrupt();
171          // current thread might be interrupted to terminate
172          // directly go back to while() for confirm this
173          continue;
174        }
175        // create replicateContext here, so the entries can be GC'd upon return from this call
176        // stack
177        ReplicationEndpoint.ReplicateContext replicateContext =
178          new ReplicationEndpoint.ReplicateContext();
179        replicateContext.setEntries(entries).setSize(currentSize);
180        replicateContext.setWalGroupId(walGroupId);
181        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
182
183        long startTimeNs = System.nanoTime();
184        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
185        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
186        long endTimeNs = System.nanoTime();
187
188        if (!replicated) {
189          continue;
190        } else {
191          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
192        }
193        // Clean up hfile references
194        for (Entry entry : entries) {
195          cleanUpHFileRefs(entry.getEdit());
196          LOG.trace("shipped entry {}: ", entry);
197        }
198        // Log and clean up WAL logs
199        updateLogPosition(entryBatch);
200
201        // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
202        // this sizeExcludeBulkLoad has to use same calculation that when calling
203        // acquireBufferQuota() in ReplicationSourceWALReader because they maintain
204        // same variable: totalBufferUsed
205        source.postShipEdits(entries, entryBatch.getUsedBufferSize());
206        // FIXME check relationship between wal group and overall
207        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
208          entryBatch.getNbHFiles());
209        source.getSourceMetrics().setAgeOfLastShippedOp(
210          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
211        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
212
213        if (LOG.isTraceEnabled()) {
214          LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(),
215            entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
216        }
217        break;
218      } catch (Exception ex) {
219        source.getSourceMetrics().incrementFailedBatches();
220        LOG.warn("{} threw unknown exception:",
221          source.getReplicationEndpoint().getClass().getName(), ex);
222        if (
223          sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
224            maxRetriesMultiplier)
225        ) {
226          sleepMultiplier++;
227        }
228      }
229    }
230  }
231
232  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
233    String peerId = source.getPeerId();
234    if (peerId.contains("-")) {
235      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
236      // A peerId will not have "-" in its name, see HBASE-11394
237      peerId = peerId.split("-")[0];
238    }
239    List<Cell> cells = edit.getCells();
240    int totalCells = cells.size();
241    for (int i = 0; i < totalCells; i++) {
242      Cell cell = cells.get(i);
243      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
244        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
245        List<StoreDescriptor> stores = bld.getStoresList();
246        int totalStores = stores.size();
247        for (int j = 0; j < totalStores; j++) {
248          List<String> storeFileList = stores.get(j).getStoreFileList();
249          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
250          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
251        }
252      }
253    }
254  }
255
256  private boolean updateLogPosition(WALEntryBatch batch) {
257    boolean updated = false;
258    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
259    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
260    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
261    // position and the file will be removed soon in cleanOldLogs.
262    if (
263      batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
264        || batch.getLastWalPosition() != currentPosition
265    ) {
266      source.logPositionAndCleanOldLogs(batch);
267      updated = true;
268    }
269    // if end of file is true, then we can just skip to the next file in queue.
270    // the only exception is for recovered queue, if we reach the end of the queue, then there will
271    // no more files so here the currentPath may be null.
272    if (batch.isEndOfFile()) {
273      currentPath = entryReader.getCurrentPath();
274      currentPosition = 0L;
275    } else {
276      currentPath = batch.getLastWalPath();
277      currentPosition = batch.getLastWalPosition();
278    }
279    return updated;
280  }
281
282  public void startup(UncaughtExceptionHandler handler) {
283    String name = Thread.currentThread().getName();
284    Threads.setDaemonThreadRunning(this,
285      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
286      handler::uncaughtException);
287  }
288
289  Path getCurrentPath() {
290    return entryReader.getCurrentPath();
291  }
292
293  long getCurrentPosition() {
294    return currentPosition;
295  }
296
297  protected boolean isActive() {
298    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
299  }
300
301  protected final void setWorkerState(WorkerState state) {
302    this.state = state;
303  }
304
305  void stopWorker() {
306    setWorkerState(WorkerState.STOPPED);
307  }
308
309  public boolean isFinished() {
310    return state == WorkerState.FINISHED;
311  }
312
313  /**
314   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case
315   * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
316   * manage to ship those because the replication source is being terminated. In that case, it
317   * iterates through the batched entries and decrease the pending entries size from
318   * <code>ReplicationSourceManager.totalBufferUser</code>
319   * <p/>
320   * <b>NOTES</b> 1) This method should only be called upon replication source termination. It
321   * blocks waiting for both shipper and reader threads termination, to make sure no race conditions
322   * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b>
323   * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered
324   * interruption/termination prior to calling this method.
325   */
326  void clearWALEntryBatch() {
327    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
328    while (this.isAlive() || this.entryReader.isAlive()) {
329      try {
330        if (EnvironmentEdgeManager.currentTime() >= timeout) {
331          LOG.warn(
332            "Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
333              + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
334            this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
335          return;
336        } else {
337          // Wait both shipper and reader threads to stop
338          Thread.sleep(this.sleepForRetries);
339        }
340      } catch (InterruptedException e) {
341        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
342          + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
343        return;
344      }
345    }
346    long totalReleasedBytes = 0;
347    while (true) {
348      WALEntryBatch batch = entryReader.entryBatchQueue.poll();
349      if (batch == null) {
350        break;
351      }
352      totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
353    }
354    if (LOG.isTraceEnabled()) {
355      LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
356        totalReleasedBytes);
357    }
358  }
359}