001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.hadoop.hbase.wal.WAL.Entry;
033import org.apache.hadoop.hbase.wal.WALEdit;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
040
041/**
042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
043 * ReplicationSourceWALReaderThread
044 */
045@InterfaceAudience.Private
046public class ReplicationSourceShipper extends Thread {
047  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
048
049  // Hold the state of a replication worker thread
050  public enum WorkerState {
051    RUNNING,
052    STOPPED,
053    FINISHED,  // The worker is done processing a recovered queue
054  }
055
056  private final Configuration conf;
057  protected final String walGroupId;
058  protected final PriorityBlockingQueue<Path> queue;
059  private final ReplicationSourceInterface source;
060
061  // Last position in the log that we sent to ZooKeeper
062  // It will be accessed by the stats thread so make it volatile
063  private volatile long currentPosition = -1;
064  // Path of the current log
065  private Path currentPath;
066  // Current state of the worker thread
067  private volatile WorkerState state;
068  protected ReplicationSourceWALReader entryReader;
069
070  // How long should we sleep for each retry
071  protected final long sleepForRetries;
072  // Maximum number of retries before taking bold actions
073  protected final int maxRetriesMultiplier;
074  private final int DEFAULT_TIMEOUT = 20000;
075  private final int getEntriesTimeout;
076  private final int shipEditsTimeout;
077
078  public ReplicationSourceShipper(Configuration conf, String walGroupId,
079      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
080    this.conf = conf;
081    this.walGroupId = walGroupId;
082    this.queue = queue;
083    this.source = source;
084    this.sleepForRetries =
085        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
086    this.maxRetriesMultiplier =
087        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
088    this.getEntriesTimeout =
089        this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
090    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
091        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
092  }
093
094  @Override
095  public final void run() {
096    setWorkerState(WorkerState.RUNNING);
097    LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
098    // Loop until we close down
099    while (isActive()) {
100      // Sleep until replication is enabled again
101      if (!source.isPeerEnabled()) {
102        // The peer enabled check is in memory, not expensive, so do not need to increase the
103        // sleep interval as it may cause a long lag when we enable the peer.
104        sleepForRetries("Replication is disabled", 1);
105        continue;
106      }
107      try {
108        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
109        LOG.debug("Shipper from source {} got entry batch from reader: {}",
110            source.getQueueId(), entryBatch);
111        if (entryBatch == null) {
112          continue;
113        }
114        // the NO_MORE_DATA instance has no path so do not call shipEdits
115        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
116          noMoreData();
117        } else {
118          shipEdits(entryBatch);
119        }
120      } catch (InterruptedException | ReplicationRuntimeException e) {
121        // It is interrupted and needs to quit.
122        LOG.warn("Interrupted while waiting for next replication entry batch", e);
123        Thread.currentThread().interrupt();
124      }
125    }
126    // If the worker exits run loop without finishing its task, mark it as stopped.
127    if (!isFinished()) {
128      setWorkerState(WorkerState.STOPPED);
129    } else {
130      postFinish();
131    }
132  }
133
134  // To be implemented by recovered shipper
135  protected void noMoreData() {
136  }
137
138  // To be implemented by recovered shipper
139  protected void postFinish() {
140  }
141
142  /**
143   * get batchEntry size excludes bulk load file sizes.
144   * Uses ReplicationSourceWALReader's static method.
145   */
146  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
147    int totalSize = 0;
148    for(Entry entry : entryBatch.getWalEntries()) {
149      totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
150    }
151    return  totalSize;
152  }
153
154  /**
155   * Do the shipping logic
156   */
157  private void shipEdits(WALEntryBatch entryBatch) {
158    List<Entry> entries = entryBatch.getWalEntries();
159    int sleepMultiplier = 0;
160    if (entries.isEmpty()) {
161      updateLogPosition(entryBatch);
162      return;
163    }
164    int currentSize = (int) entryBatch.getHeapSize();
165    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
166    source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
167        .getKey().getWriteTime());
168    while (isActive()) {
169      try {
170        try {
171          source.tryThrottle(currentSize);
172        } catch (InterruptedException e) {
173          LOG.debug("Interrupted while sleeping for throttling control");
174          Thread.currentThread().interrupt();
175          // current thread might be interrupted to terminate
176          // directly go back to while() for confirm this
177          continue;
178        }
179        // create replicateContext here, so the entries can be GC'd upon return from this call
180        // stack
181        ReplicationEndpoint.ReplicateContext replicateContext =
182            new ReplicationEndpoint.ReplicateContext();
183        replicateContext.setEntries(entries).setSize(currentSize);
184        replicateContext.setWalGroupId(walGroupId);
185        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
186
187        long startTimeNs = System.nanoTime();
188        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
189        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
190        long endTimeNs = System.nanoTime();
191
192        if (!replicated) {
193          continue;
194        } else {
195          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
196        }
197        // Clean up hfile references
198        for (Entry entry : entries) {
199          cleanUpHFileRefs(entry.getEdit());
200          LOG.trace("shipped entry {}: ", entry);
201          TableName tableName = entry.getKey().getTableName();
202          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
203              tableName.getNameAsString());
204        }
205        // Log and clean up WAL logs
206        updateLogPosition(entryBatch);
207
208        //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
209        //this sizeExcludeBulkLoad has to use same calculation that when calling
210        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
211        //same variable: totalBufferUsed
212        source.postShipEdits(entries, sizeExcludeBulkLoad);
213        // FIXME check relationship between wal group and overall
214        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
215          entryBatch.getNbHFiles());
216        source.getSourceMetrics().setAgeOfLastShippedOp(
217          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
218        if (LOG.isTraceEnabled()) {
219          LOG.debug("Replicated {} entries or {} operations in {} ms",
220              entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
221        }
222        break;
223      } catch (Exception ex) {
224        LOG.warn("{} threw unknown exception:",
225          source.getReplicationEndpoint().getClass().getName(), ex);
226        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
227          sleepMultiplier++;
228        }
229      }
230    }
231  }
232
233  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
234    String peerId = source.getPeerId();
235    if (peerId.contains("-")) {
236      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
237      // A peerId will not have "-" in its name, see HBASE-11394
238      peerId = peerId.split("-")[0];
239    }
240    List<Cell> cells = edit.getCells();
241    int totalCells = cells.size();
242    for (int i = 0; i < totalCells; i++) {
243      Cell cell = cells.get(i);
244      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
245        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
246        List<StoreDescriptor> stores = bld.getStoresList();
247        int totalStores = stores.size();
248        for (int j = 0; j < totalStores; j++) {
249          List<String> storeFileList = stores.get(j).getStoreFileList();
250          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
251          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
252        }
253      }
254    }
255  }
256
257  private boolean updateLogPosition(WALEntryBatch batch) {
258    boolean updated = false;
259    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
260    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
261    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
262    // position and the file will be removed soon in cleanOldLogs.
263    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
264      batch.getLastWalPosition() != currentPosition) {
265      source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
266        source.isRecovered(), batch);
267      updated = true;
268    }
269    // if end of file is true, then we can just skip to the next file in queue.
270    // the only exception is for recovered queue, if we reach the end of the queue, then there will
271    // no more files so here the currentPath may be null.
272    if (batch.isEndOfFile()) {
273      currentPath = entryReader.getCurrentPath();
274      currentPosition = 0L;
275    } else {
276      currentPath = batch.getLastWalPath();
277      currentPosition = batch.getLastWalPosition();
278    }
279    return updated;
280  }
281
282  public void startup(UncaughtExceptionHandler handler) {
283    String name = Thread.currentThread().getName();
284    Threads.setDaemonThreadRunning(this,
285      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
286  }
287
288  Path getCurrentPath() {
289    return entryReader.getCurrentPath();
290  }
291
292  long getCurrentPosition() {
293    return currentPosition;
294  }
295
296  void setWALReader(ReplicationSourceWALReader entryReader) {
297    this.entryReader = entryReader;
298  }
299
300  long getStartPosition() {
301    return 0;
302  }
303
304  protected boolean isActive() {
305    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
306  }
307
308  protected final void setWorkerState(WorkerState state) {
309    this.state = state;
310  }
311
312  void stopWorker() {
313    setWorkerState(WorkerState.STOPPED);
314  }
315
316  public boolean isFinished() {
317    return state == WorkerState.FINISHED;
318  }
319
320  /**
321   * Do the sleeping logic
322   * @param msg Why we sleep
323   * @param sleepMultiplier by how many times the default sleeping time is augmented
324   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
325   */
326  public boolean sleepForRetries(String msg, int sleepMultiplier) {
327    try {
328      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
329      Thread.sleep(this.sleepForRetries * sleepMultiplier);
330    } catch (InterruptedException e) {
331      LOG.debug("Interrupted while sleeping between retries");
332      Thread.currentThread().interrupt();
333    }
334    return sleepMultiplier < maxRetriesMultiplier;
335  }
336}