001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
041
042/**
043 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
044 * ReplicationSourceWALReaderThread
045 */
046@InterfaceAudience.Private
047public class ReplicationSourceShipper extends Thread {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
049
050  // Hold the state of a replication worker thread
051  public enum WorkerState {
052    RUNNING,
053    STOPPED,
054    FINISHED,  // The worker is done processing a recovered queue
055  }
056
057  private final Configuration conf;
058  protected final String walGroupId;
059  protected final PriorityBlockingQueue<Path> queue;
060  private final ReplicationSourceInterface source;
061
062  // Last position in the log that we sent to ZooKeeper
063  // It will be accessed by the stats thread so make it volatile
064  private volatile long currentPosition = -1;
065  // Path of the current log
066  private Path currentPath;
067  // Current state of the worker thread
068  private volatile WorkerState state;
069  protected ReplicationSourceWALReader entryReader;
070
071  // How long should we sleep for each retry
072  protected final long sleepForRetries;
073  // Maximum number of retries before taking bold actions
074  protected final int maxRetriesMultiplier;
075  private final int DEFAULT_TIMEOUT = 20000;
076  private final int getEntriesTimeout;
077  private final int shipEditsTimeout;
078
079  public ReplicationSourceShipper(Configuration conf, String walGroupId,
080      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
081    this.conf = conf;
082    this.walGroupId = walGroupId;
083    this.queue = queue;
084    this.source = source;
085    this.sleepForRetries =
086        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
087    this.maxRetriesMultiplier =
088        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
089    this.getEntriesTimeout =
090        this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
091    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
092        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
093  }
094
095  @Override
096  public final void run() {
097    setWorkerState(WorkerState.RUNNING);
098    // Loop until we close down
099    while (isActive()) {
100      // Sleep until replication is enabled again
101      if (!source.isPeerEnabled()) {
102        // The peer enabled check is in memory, not expensive, so do not need to increase the
103        // sleep interval as it may cause a long lag when we enable the peer.
104        sleepForRetries("Replication is disabled", 1);
105        continue;
106      }
107      try {
108        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
109        if (entryBatch == null) {
110          // since there is no logs need to replicate, we refresh the ageOfLastShippedOp
111          source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
112            walGroupId);
113          continue;
114        }
115        // the NO_MORE_DATA instance has no path so do not call shipEdits
116        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
117          noMoreData();
118        } else {
119          shipEdits(entryBatch);
120        }
121      } catch (InterruptedException e) {
122        LOG.trace("Interrupted while waiting for next replication entry batch", e);
123        Thread.currentThread().interrupt();
124      }
125    }
126    // If the worker exits run loop without finishing its task, mark it as stopped.
127    if (!isFinished()) {
128      setWorkerState(WorkerState.STOPPED);
129    } else {
130      postFinish();
131    }
132  }
133
134  // To be implemented by recovered shipper
135  protected void noMoreData() {
136  }
137
138  // To be implemented by recovered shipper
139  protected void postFinish() {
140  }
141
142  /**
143   * get batchEntry size excludes bulk load file sizes.
144   * Uses ReplicationSourceWALReader's static method.
145   */
146  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
147    int totalSize = 0;
148    for(Entry entry : entryBatch.getWalEntries()) {
149      totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
150    }
151    return  totalSize;
152  }
153
154  /**
155   * Do the shipping logic
156   */
157  private void shipEdits(WALEntryBatch entryBatch) {
158    List<Entry> entries = entryBatch.getWalEntries();
159    int sleepMultiplier = 0;
160    if (entries.isEmpty()) {
161      if (updateLogPosition(entryBatch)) {
162        // if there was nothing to ship and it's not an error
163        // set "ageOfLastShippedOp" to <now> to indicate that we're current
164        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
165          walGroupId);
166      }
167      return;
168    }
169    int currentSize = (int) entryBatch.getHeapSize();
170    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
171    while (isActive()) {
172      try {
173        try {
174          source.tryThrottle(currentSize);
175        } catch (InterruptedException e) {
176          LOG.debug("Interrupted while sleeping for throttling control");
177          Thread.currentThread().interrupt();
178          // current thread might be interrupted to terminate
179          // directly go back to while() for confirm this
180          continue;
181        }
182
183        // create replicateContext here, so the entries can be GC'd upon return from this call
184        // stack
185        ReplicationEndpoint.ReplicateContext replicateContext =
186            new ReplicationEndpoint.ReplicateContext();
187        replicateContext.setEntries(entries).setSize(currentSize);
188        replicateContext.setWalGroupId(walGroupId);
189        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
190
191        long startTimeNs = System.nanoTime();
192        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
193        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
194        long endTimeNs = System.nanoTime();
195
196        if (!replicated) {
197          continue;
198        } else {
199          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
200        }
201        // Clean up hfile references
202        for (Entry entry : entries) {
203          cleanUpHFileRefs(entry.getEdit());
204
205          TableName tableName = entry.getKey().getTableName();
206          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
207              tableName.getNameAsString());
208        }
209        // Log and clean up WAL logs
210        updateLogPosition(entryBatch);
211
212        //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
213        //this sizeExcludeBulkLoad has to use same calculation that when calling
214        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
215        //same variable: totalBufferUsed
216        source.postShipEdits(entries, sizeExcludeBulkLoad);
217        // FIXME check relationship between wal group and overall
218        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
219          entryBatch.getNbHFiles());
220        source.getSourceMetrics().setAgeOfLastShippedOp(
221          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
222        if (LOG.isTraceEnabled()) {
223          LOG.trace("Replicated {} entries or {} operations in {} ms",
224              entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
225        }
226        break;
227      } catch (Exception ex) {
228        LOG.warn("{} threw unknown exception:",
229          source.getReplicationEndpoint().getClass().getName(), ex);
230        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
231          sleepMultiplier++;
232        }
233      }
234    }
235  }
236
237  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
238    String peerId = source.getPeerId();
239    if (peerId.contains("-")) {
240      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
241      // A peerId will not have "-" in its name, see HBASE-11394
242      peerId = peerId.split("-")[0];
243    }
244    List<Cell> cells = edit.getCells();
245    int totalCells = cells.size();
246    for (int i = 0; i < totalCells; i++) {
247      Cell cell = cells.get(i);
248      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
249        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
250        List<StoreDescriptor> stores = bld.getStoresList();
251        int totalStores = stores.size();
252        for (int j = 0; j < totalStores; j++) {
253          List<String> storeFileList = stores.get(j).getStoreFileList();
254          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
255          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
256        }
257      }
258    }
259  }
260
261  private boolean updateLogPosition(WALEntryBatch batch) {
262    boolean updated = false;
263    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
264    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
265    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
266    // position and the file will be removed soon in cleanOldLogs.
267    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
268      batch.getLastWalPosition() != currentPosition) {
269      source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
270        source.isRecovered(), batch);
271      updated = true;
272    }
273    // if end of file is true, then we can just skip to the next file in queue.
274    // the only exception is for recovered queue, if we reach the end of the queue, then there will
275    // no more files so here the currentPath may be null.
276    if (batch.isEndOfFile()) {
277      currentPath = entryReader.getCurrentPath();
278      currentPosition = 0L;
279    } else {
280      currentPath = batch.getLastWalPath();
281      currentPosition = batch.getLastWalPosition();
282    }
283    return updated;
284  }
285
286  public void startup(UncaughtExceptionHandler handler) {
287    String name = Thread.currentThread().getName();
288    Threads.setDaemonThreadRunning(this,
289      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
290  }
291
292  Path getCurrentPath() {
293    return entryReader.getCurrentPath();
294  }
295
296  long getCurrentPosition() {
297    return currentPosition;
298  }
299
300  void setWALReader(ReplicationSourceWALReader entryReader) {
301    this.entryReader = entryReader;
302  }
303
304  long getStartPosition() {
305    return 0;
306  }
307
308  private boolean isActive() {
309    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
310  }
311
312  protected final void setWorkerState(WorkerState state) {
313    this.state = state;
314  }
315
316  void stopWorker() {
317    setWorkerState(WorkerState.STOPPED);
318  }
319
320  public boolean isFinished() {
321    return state == WorkerState.FINISHED;
322  }
323
324  /**
325   * Do the sleeping logic
326   * @param msg Why we sleep
327   * @param sleepMultiplier by how many times the default sleeping time is augmented
328   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
329   */
330  public boolean sleepForRetries(String msg, int sleepMultiplier) {
331    try {
332      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
333      Thread.sleep(this.sleepForRetries * sleepMultiplier);
334    } catch (InterruptedException e) {
335      LOG.debug("Interrupted while sleeping between retries");
336      Thread.currentThread().interrupt();
337    }
338    return sleepMultiplier < maxRetriesMultiplier;
339  }
340}