001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.hbase.wal.WAL.Entry;
032import org.apache.hadoop.hbase.wal.WALEdit;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
038
039/**
040 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
041 * ReplicationSourceWALReaderThread
042 */
043@InterfaceAudience.Private
044public class ReplicationSourceShipper extends Thread {
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
046
047  // Hold the state of a replication worker thread
048  public enum WorkerState {
049    RUNNING,
050    STOPPED,
051    FINISHED,  // The worker is done processing a recovered queue
052  }
053
054  private final Configuration conf;
055  protected final String walGroupId;
056  protected final PriorityBlockingQueue<Path> queue;
057  private final ReplicationSourceInterface source;
058
059  // Last position in the log that we sent to ZooKeeper
060  // It will be accessed by the stats thread so make it volatile
061  private volatile long currentPosition = -1;
062  // Path of the current log
063  private Path currentPath;
064  // Current state of the worker thread
065  private volatile WorkerState state;
066  protected ReplicationSourceWALReader entryReader;
067
068  // How long should we sleep for each retry
069  protected final long sleepForRetries;
070  // Maximum number of retries before taking bold actions
071  protected final int maxRetriesMultiplier;
072  private final int DEFAULT_TIMEOUT = 20000;
073  private final int getEntriesTimeout;
074  private final int shipEditsTimeout;
075
076  public ReplicationSourceShipper(Configuration conf, String walGroupId,
077      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
078    this.conf = conf;
079    this.walGroupId = walGroupId;
080    this.queue = queue;
081    this.source = source;
082    this.sleepForRetries =
083        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
084    this.maxRetriesMultiplier =
085        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
086    this.getEntriesTimeout =
087        this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
088    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
089        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
090  }
091
092  @Override
093  public final void run() {
094    setWorkerState(WorkerState.RUNNING);
095    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
096    // Loop until we close down
097    while (isActive()) {
098      // Sleep until replication is enabled again
099      if (!source.isPeerEnabled()) {
100        // The peer enabled check is in memory, not expensive, so do not need to increase the
101        // sleep interval as it may cause a long lag when we enable the peer.
102        sleepForRetries("Replication is disabled", 1);
103        continue;
104      }
105      try {
106        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
107        LOG.debug("Shipper from source {} got entry batch from reader: {}",
108            source.getQueueId(), entryBatch);
109        if (entryBatch == null) {
110          continue;
111        }
112        // the NO_MORE_DATA instance has no path so do not call shipEdits
113        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
114          noMoreData();
115        } else {
116          shipEdits(entryBatch);
117        }
118      } catch (InterruptedException | ReplicationRuntimeException e) {
119        // It is interrupted and needs to quit.
120        LOG.warn("Interrupted while waiting for next replication entry batch", e);
121        Thread.currentThread().interrupt();
122      }
123    }
124    // If the worker exits run loop without finishing its task, mark it as stopped.
125    if (!isFinished()) {
126      setWorkerState(WorkerState.STOPPED);
127    } else {
128      postFinish();
129    }
130  }
131
132  // To be implemented by recovered shipper
133  protected void noMoreData() {
134  }
135
136  // To be implemented by recovered shipper
137  protected void postFinish() {
138  }
139
140  /**
141   * get batchEntry size excludes bulk load file sizes.
142   * Uses ReplicationSourceWALReader's static method.
143   */
144  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
145    int totalSize = 0;
146    for(Entry entry : entryBatch.getWalEntries()) {
147      totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
148    }
149    return  totalSize;
150  }
151
152  /**
153   * Do the shipping logic
154   */
155  private void shipEdits(WALEntryBatch entryBatch) {
156    List<Entry> entries = entryBatch.getWalEntries();
157    int sleepMultiplier = 0;
158    if (entries.isEmpty()) {
159      updateLogPosition(entryBatch);
160      return;
161    }
162    int currentSize = (int) entryBatch.getHeapSize();
163    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
164    source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
165        .getKey().getWriteTime());
166    while (isActive()) {
167      try {
168        try {
169          source.tryThrottle(currentSize);
170        } catch (InterruptedException e) {
171          LOG.debug("Interrupted while sleeping for throttling control");
172          Thread.currentThread().interrupt();
173          // current thread might be interrupted to terminate
174          // directly go back to while() for confirm this
175          continue;
176        }
177        // create replicateContext here, so the entries can be GC'd upon return from this call
178        // stack
179        ReplicationEndpoint.ReplicateContext replicateContext =
180            new ReplicationEndpoint.ReplicateContext();
181        replicateContext.setEntries(entries).setSize(currentSize);
182        replicateContext.setWalGroupId(walGroupId);
183        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
184
185        long startTimeNs = System.nanoTime();
186        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
187        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
188        long endTimeNs = System.nanoTime();
189
190        if (!replicated) {
191          continue;
192        } else {
193          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
194        }
195        // Clean up hfile references
196        for (Entry entry : entries) {
197          cleanUpHFileRefs(entry.getEdit());
198          LOG.trace("shipped entry {}: ", entry);
199        }
200        // Log and clean up WAL logs
201        updateLogPosition(entryBatch);
202
203        //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
204        //this sizeExcludeBulkLoad has to use same calculation that when calling
205        //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
206        //same variable: totalBufferUsed
207        source.postShipEdits(entries, sizeExcludeBulkLoad);
208        // FIXME check relationship between wal group and overall
209        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
210          entryBatch.getNbHFiles());
211        source.getSourceMetrics().setAgeOfLastShippedOp(
212          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
213        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
214
215        if (LOG.isTraceEnabled()) {
216          LOG.debug("Replicated {} entries or {} operations in {} ms",
217              entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
218        }
219        break;
220      } catch (Exception ex) {
221        LOG.warn("{} threw unknown exception:",
222          source.getReplicationEndpoint().getClass().getName(), ex);
223        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
224          sleepMultiplier++;
225        }
226      }
227    }
228  }
229
230  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
231    String peerId = source.getPeerId();
232    if (peerId.contains("-")) {
233      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
234      // A peerId will not have "-" in its name, see HBASE-11394
235      peerId = peerId.split("-")[0];
236    }
237    List<Cell> cells = edit.getCells();
238    int totalCells = cells.size();
239    for (int i = 0; i < totalCells; i++) {
240      Cell cell = cells.get(i);
241      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
242        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
243        List<StoreDescriptor> stores = bld.getStoresList();
244        int totalStores = stores.size();
245        for (int j = 0; j < totalStores; j++) {
246          List<String> storeFileList = stores.get(j).getStoreFileList();
247          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
248          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
249        }
250      }
251    }
252  }
253
254  private boolean updateLogPosition(WALEntryBatch batch) {
255    boolean updated = false;
256    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
257    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
258    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
259    // position and the file will be removed soon in cleanOldLogs.
260    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
261      batch.getLastWalPosition() != currentPosition) {
262      source.logPositionAndCleanOldLogs(batch);
263      updated = true;
264    }
265    // if end of file is true, then we can just skip to the next file in queue.
266    // the only exception is for recovered queue, if we reach the end of the queue, then there will
267    // no more files so here the currentPath may be null.
268    if (batch.isEndOfFile()) {
269      currentPath = entryReader.getCurrentPath();
270      currentPosition = 0L;
271    } else {
272      currentPath = batch.getLastWalPath();
273      currentPosition = batch.getLastWalPosition();
274    }
275    return updated;
276  }
277
278  public void startup(UncaughtExceptionHandler handler) {
279    String name = Thread.currentThread().getName();
280    Threads.setDaemonThreadRunning(this,
281      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
282      handler::uncaughtException);
283  }
284
285  Path getCurrentPath() {
286    return entryReader.getCurrentPath();
287  }
288
289  long getCurrentPosition() {
290    return currentPosition;
291  }
292
293  void setWALReader(ReplicationSourceWALReader entryReader) {
294    this.entryReader = entryReader;
295  }
296
297  long getStartPosition() {
298    return 0;
299  }
300
301  protected boolean isActive() {
302    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
303  }
304
305  protected final void setWorkerState(WorkerState state) {
306    this.state = state;
307  }
308
309  void stopWorker() {
310    setWorkerState(WorkerState.STOPPED);
311  }
312
313  public boolean isFinished() {
314    return state == WorkerState.FINISHED;
315  }
316
317  /**
318   * Do the sleeping logic
319   * @param msg Why we sleep
320   * @param sleepMultiplier by how many times the default sleeping time is augmented
321   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
322   */
323  public boolean sleepForRetries(String msg, int sleepMultiplier) {
324    try {
325      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
326      Thread.sleep(this.sleepForRetries * sleepMultiplier);
327    } catch (InterruptedException e) {
328      LOG.debug("Interrupted while sleeping between retries");
329      Thread.currentThread().interrupt();
330    }
331    return sleepMultiplier < maxRetriesMultiplier;
332  }
333}