001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
022
023import java.io.IOException;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
041
042/**
043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
044 * ReplicationSourceWALReaderThread
045 */
046@InterfaceAudience.Private
047public class ReplicationSourceShipper extends Thread {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
049
050  // Hold the state of a replication worker thread
051  public enum WorkerState {
052    RUNNING,
053    STOPPED,
054    FINISHED, // The worker is done processing a queue
055  }
056
057  private final Configuration conf;
058  final String walGroupId;
059  private final ReplicationSource source;
060
061  // Last position in the log that we sent to ZooKeeper
062  // It will be accessed by the stats thread so make it volatile
063  private volatile long currentPosition = -1;
064  // Path of the current log
065  private Path currentPath;
066  // Current state of the worker thread
067  private volatile WorkerState state;
068  final ReplicationSourceWALReader entryReader;
069
070  // How long should we sleep for each retry
071  private final long sleepForRetries;
072  // Maximum number of retries before taking bold actions
073  private final int maxRetriesMultiplier;
074  private final int DEFAULT_TIMEOUT = 20000;
075  private final int getEntriesTimeout;
076  private final int shipEditsTimeout;
077  private long accumulatedSizeSinceLastUpdate = 0L;
078  private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime();
079  private final long offsetUpdateIntervalMs;
080  private final long offsetUpdateSizeThresholdBytes;
081  private WALEntryBatch lastShippedBatch;
082
083  private static final String OFFSET_UPDATE_INTERVAL_MS_KEY =
084    "hbase.replication.shipper.offset.update.interval.ms";
085  private static final String OFFSET_UPDATE_SIZE_THRESHOLD_KEY =
086    "hbase.replication.shipper.offset.update.size.threshold";
087  private static final long DEFAULT_OFFSET_UPDATE_INTERVAL_MS = Long.MAX_VALUE;
088  private static final long DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD = -1L;
089
090  public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
091    ReplicationSourceWALReader walReader) {
092    this.conf = conf;
093    this.walGroupId = walGroupId;
094    this.source = source;
095    this.entryReader = walReader;
096    // 1 second
097    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
098    // 5 minutes @ 1 sec per
099    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
100    // 20 seconds
101    this.getEntriesTimeout =
102      this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT);
103    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
104      HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
105    this.offsetUpdateIntervalMs =
106      conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, DEFAULT_OFFSET_UPDATE_INTERVAL_MS);
107    this.offsetUpdateSizeThresholdBytes =
108      conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD);
109  }
110
111  @Override
112  public final void run() {
113    setWorkerState(WorkerState.RUNNING);
114    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
115    // Loop until we close down
116    while (isActive()) {
117      // Sleep until replication is enabled again
118      if (!source.isPeerEnabled()) {
119        // The peer enabled check is in memory, not expensive, so do not need to increase the
120        // sleep interval as it may cause a long lag when we enable the peer.
121        sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
122        continue;
123      }
124      try {
125        // check time-based offset persistence
126        if (shouldPersistLogPosition()) {
127          persistLogPosition();
128        }
129
130        long pollTimeout = getEntriesTimeout;
131        if (offsetUpdateIntervalMs != Long.MAX_VALUE) {
132          long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime;
133          long remaining = offsetUpdateIntervalMs - elapsed;
134          if (remaining > 0) {
135            pollTimeout = Math.min(getEntriesTimeout, remaining);
136          }
137        }
138        WALEntryBatch entryBatch = entryReader.poll(pollTimeout);
139        LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(),
140          entryBatch);
141
142        if (entryBatch == null) {
143          continue;
144        }
145        // the NO_MORE_DATA instance has no path so do not call shipEdits
146        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
147          noMoreData();
148        } else {
149          shipEdits(entryBatch);
150        }
151      } catch (InterruptedException | ReplicationRuntimeException e) {
152        LOG.warn("Interrupted while waiting for next replication entry batch", e);
153        Thread.currentThread().interrupt();
154      } catch (Exception e) {
155        // During source shutdown / peer removal we can see interrupted IOEs
156        // from replication queue updates. Do not restart in this case.
157        if (!source.isSourceActive() || isInterrupted() || !source.isPeerEnabled()) {
158          LOG.info("Ignoring persist failure during shutdown for walGroupId={}", walGroupId, e);
159          break;
160        }
161        LOG.error("Shipper {} failed to persist offset, restarting", walGroupId, e);
162        abortAndRestart(e);
163        return;
164      }
165    }
166
167    // If the worker exits run loop without finishing its task, mark it as stopped.
168    if (!isFinished()) {
169      try {
170        persistLogPosition();
171      } catch (Exception e) {
172        LOG.error("Failed persisting final offset for walGroupId={}", walGroupId, e);
173      }
174      setWorkerState(WorkerState.STOPPED);
175    } else {
176      source.removeWorker(this);
177      postFinish();
178    }
179  }
180
181  private void noMoreData() throws IOException {
182    // Flush any outstanding replication offset before marking the queue finished.
183    // Offset persistence may be delayed by size/time thresholds, so ensure the
184    // latest replicated position is stored before transitioning to FINISHED state.
185    persistLogPosition();
186
187    if (source.isRecovered()) {
188      LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
189        source.getQueueId());
190      source.getSourceMetrics().incrCompletedRecoveryQueue();
191    } else {
192      LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
193    }
194    setWorkerState(WorkerState.FINISHED);
195  }
196
197  // To be implemented by recovered shipper
198  protected void postFinish() {
199  }
200
201  /**
202   * Do the shipping logic
203   */
204  void shipEdits(WALEntryBatch entryBatch) throws IOException {
205    List<Entry> entries = entryBatch.getWalEntries();
206    int sleepMultiplier = 0;
207    int currentSize = (int) entryBatch.getHeapSize();
208    MetricsSource metrics = source.getSourceMetrics();
209    if (metrics != null && !entries.isEmpty()) {
210      metrics.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
211    }
212    if (entries.isEmpty()) {
213      // empty batch may mean WAL boundary advancement
214      lastShippedBatch = entryBatch;
215      persistLogPosition();
216      return;
217    }
218    while (isActive()) {
219      try {
220        try {
221          source.tryThrottle(currentSize);
222        } catch (InterruptedException e) {
223          LOG.debug("Interrupted while sleeping for throttling control");
224          Thread.currentThread().interrupt();
225          // current thread might be interrupted to terminate
226          // directly go back to while() for confirm this
227          continue;
228        }
229        // create replicateContext here, so the entries can be GC'd upon return from this call
230        // stack
231        ReplicationEndpoint.ReplicateContext replicateContext =
232          new ReplicationEndpoint.ReplicateContext();
233        replicateContext.setEntries(entries).setSize(currentSize);
234        replicateContext.setWalGroupId(walGroupId);
235        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
236
237        long startTimeNs = System.nanoTime();
238        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
239        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
240        long endTimeNs = System.nanoTime();
241
242        if (!replicated) {
243          continue;
244        } else {
245          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
246        }
247        // Clean up hfile references
248        for (Entry entry : entries) {
249          cleanUpHFileRefs(entry.getEdit());
250          LOG.trace("shipped entry {}: ", entry);
251        }
252
253        // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
254        // this sizeExcludeBulkLoad has to use same calculation that when calling
255        // acquireBufferQuota() in ReplicationSourceWALReader because they maintain
256        // same variable: totalBufferUsed
257        source.postShipEdits(entries, entryBatch.getUsedBufferSize());
258        // FIXME check relationship between wal group and overall
259        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
260          entryBatch.getNbHFiles());
261        source.getSourceMetrics().setAgeOfLastShippedOp(
262          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
263        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
264
265        if (LOG.isTraceEnabled()) {
266          LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(),
267            entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
268        }
269        break;
270      } catch (Exception ex) {
271        source.getSourceMetrics().incrementFailedBatches();
272        LOG.warn("{} threw unknown exception:",
273          source.getReplicationEndpoint().getClass().getName(), ex);
274        if (
275          sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
276            maxRetriesMultiplier)
277        ) {
278          sleepMultiplier++;
279        }
280      }
281    }
282
283    accumulatedSizeSinceLastUpdate += currentSize;
284    lastShippedBatch = entryBatch;
285    if (shouldPersistLogPosition()) {
286      persistLogPosition();
287    }
288  }
289
290  private boolean shouldPersistLogPosition() {
291    if (lastShippedBatch == null) {
292      return false;
293    }
294    // Default behaviour to update offset immediately after replicate()
295    if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) {
296      return true;
297    }
298
299    return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes)
300      || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs);
301  }
302
303  void persistLogPosition() throws IOException {
304    if (lastShippedBatch == null) {
305      return;
306    }
307
308    if (!source.isSourceActive() || isInterrupted() || !source.isPeerEnabled()) {
309      LOG.debug("Skip persistLogPosition for inactive/stopping source");
310      return;
311    }
312
313    ReplicationEndpoint endpoint = source.getReplicationEndpoint();
314    if (endpoint != null) {
315      endpoint.beforePersistingReplicationOffset();
316    }
317
318    // Log and clean up WAL logs
319    updateLogPosition(lastShippedBatch);
320    accumulatedSizeSinceLastUpdate = 0;
321    lastShippedBatch = null;
322    lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime();
323  }
324
325  void cleanUpHFileRefs(WALEdit edit) throws IOException {
326    String peerId = source.getPeerId();
327    if (peerId.contains("-")) {
328      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
329      // A peerId will not have "-" in its name, see HBASE-11394
330      peerId = peerId.split("-")[0];
331    }
332    List<Cell> cells = edit.getCells();
333    int totalCells = cells.size();
334    for (int i = 0; i < totalCells; i++) {
335      Cell cell = cells.get(i);
336      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
337        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
338        List<StoreDescriptor> stores = bld.getStoresList();
339        int totalStores = stores.size();
340        for (int j = 0; j < totalStores; j++) {
341          List<String> storeFileList = stores.get(j).getStoreFileList();
342          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
343          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
344        }
345      }
346    }
347  }
348
349  private boolean updateLogPosition(WALEntryBatch batch) {
350    boolean updated = false;
351    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
352    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
353    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
354    // position and the file will be removed soon in cleanOldLogs.
355    if (
356      batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
357        || batch.getLastWalPosition() != currentPosition
358    ) {
359      source.logPositionAndCleanOldLogs(batch);
360      updated = true;
361    }
362    // if end of file is true, then we can just skip to the next file in queue.
363    // the only exception is for recovered queue, if we reach the end of the queue, then there will
364    // no more files so here the currentPath may be null.
365    if (batch.isEndOfFile()) {
366      currentPath = entryReader.getCurrentPath();
367      currentPosition = 0L;
368    } else {
369      currentPath = batch.getLastWalPath();
370      currentPosition = batch.getLastWalPosition();
371    }
372    return updated;
373  }
374
375  public void startup(UncaughtExceptionHandler handler) {
376    String name = Thread.currentThread().getName();
377    Threads.setDaemonThreadRunning(this,
378      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
379      handler::uncaughtException);
380  }
381
382  Path getCurrentPath() {
383    return entryReader.getCurrentPath();
384  }
385
386  long getCurrentPosition() {
387    return currentPosition;
388  }
389
390  protected boolean isActive() {
391    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
392  }
393
394  protected final void setWorkerState(WorkerState state) {
395    this.state = state;
396  }
397
398  void stopWorker() {
399    setWorkerState(WorkerState.STOPPED);
400  }
401
402  public boolean isFinished() {
403    return state == WorkerState.FINISHED;
404  }
405
406  /**
407   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, in case
408   * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
409   * manage to ship those because the replication source is being terminated. In that case, it
410   * iterates through the batched entries and decrease the pending entries size from
411   * <code>ReplicationSourceManager.totalBufferUser</code>
412   * <p/>
413   * <b>NOTES</b> 1) This method should only be called upon replication source termination. It
414   * blocks waiting for both shipper and reader threads termination, to make sure no race conditions
415   * when updating <code>ReplicationSourceManager.totalBufferUser</code>. 2) It <b>does not</b>
416   * attempt to terminate reader and shipper threads. Those <b>must</b> have been triggered
417   * interruption/termination prior to calling this method.
418   */
419  void clearWALEntryBatch() {
420    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
421    while (this.isAlive() || this.entryReader.isAlive()) {
422      try {
423        if (EnvironmentEdgeManager.currentTime() >= timeout) {
424          LOG.warn("Shipper clearWALEntryBatch method timed out while waiting reader/shipper "
425            + "thread to stop. Not cleaning buffer usage. PeerId: {}; Shipper alive: {}; Reader "
426            + "alive: {}", this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
427          return;
428        } else {
429          // Wait both shipper and reader threads to stop
430          Thread.sleep(this.sleepForRetries);
431        }
432      } catch (InterruptedException e) {
433        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
434          + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
435        return;
436      }
437    }
438    long totalReleasedBytes = 0;
439    while (true) {
440      WALEntryBatch batch = entryReader.entryBatchQueue.poll();
441      if (batch == null) {
442        break;
443      }
444      totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
445    }
446    if (LOG.isTraceEnabled()) {
447      LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
448        totalReleasedBytes);
449    }
450  }
451
452  long getSleepForRetries() {
453    return sleepForRetries;
454  }
455
456  // Restart worker so replication resumes from last persisted offset.
457  void abortAndRestart(Throwable cause) {
458    LOG.warn("Restarting shipper for walGroupId={}", walGroupId, cause);
459    if (!source.isSourceActive() || !source.isPeerEnabled() || isInterrupted()) {
460      LOG.warn("abortAndRestart SKIPPED walGroupId={}, thread={}", walGroupId,
461        Thread.currentThread().getName());
462      return;
463    }
464    setWorkerState(WorkerState.STOPPED);
465    source.restartShipper(walGroupId, this);
466  }
467}