001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.PriorityBlockingQueue;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.hbase.wal.WAL.Entry;
032import org.apache.hadoop.hbase.wal.WALEdit;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
039
040/**
041 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
042 * ReplicationSourceWALReaderThread
043 */
044@InterfaceAudience.Private
045public class ReplicationSourceShipper extends Thread {
046  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
047
048  // Hold the state of a replication worker thread
049  public enum WorkerState {
050    RUNNING,
051    STOPPED,
052    FINISHED,  // The worker is done processing a recovered queue
053  }
054
055  private final Configuration conf;
056  protected final String walGroupId;
057  protected final PriorityBlockingQueue<Path> queue;
058  private final ReplicationSourceInterface source;
059
060  // Last position in the log that we sent to ZooKeeper
061  // It will be accessed by the stats thread so make it volatile
062  private volatile long currentPosition = -1;
063  // Path of the current log
064  private Path currentPath;
065  // Current state of the worker thread
066  private volatile WorkerState state;
067  protected ReplicationSourceWALReader entryReader;
068
069  // How long should we sleep for each retry
070  protected final long sleepForRetries;
071  // Maximum number of retries before taking bold actions
072  protected final int maxRetriesMultiplier;
073  private final int DEFAULT_TIMEOUT = 20000;
074  private final int getEntriesTimeout;
075
076  public ReplicationSourceShipper(Configuration conf, String walGroupId,
077      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
078    this.conf = conf;
079    this.walGroupId = walGroupId;
080    this.queue = queue;
081    this.source = source;
082    this.sleepForRetries =
083        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
084    this.maxRetriesMultiplier =
085        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
086    this.getEntriesTimeout =
087        this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
088  }
089
090  @Override
091  public final void run() {
092    setWorkerState(WorkerState.RUNNING);
093    // Loop until we close down
094    while (isActive()) {
095      // Sleep until replication is enabled again
096      if (!source.isPeerEnabled()) {
097        // The peer enabled check is in memory, not expensive, so do not need to increase the
098        // sleep interval as it may cause a long lag when we enable the peer.
099        sleepForRetries("Replication is disabled", 1);
100        continue;
101      }
102      try {
103        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
104        if (entryBatch == null) {
105          // since there is no logs need to replicate, we refresh the ageOfLastShippedOp
106          source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
107            walGroupId);
108          continue;
109        }
110        // the NO_MORE_DATA instance has no path so do not call shipEdits
111        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
112          noMoreData();
113        } else {
114          shipEdits(entryBatch);
115        }
116      } catch (InterruptedException e) {
117        LOG.trace("Interrupted while waiting for next replication entry batch", e);
118        Thread.currentThread().interrupt();
119      }
120    }
121    // If the worker exits run loop without finishing its task, mark it as stopped.
122    if (!isFinished()) {
123      setWorkerState(WorkerState.STOPPED);
124    } else {
125      postFinish();
126    }
127  }
128
129  // To be implemented by recovered shipper
130  protected void noMoreData() {
131  }
132
133  // To be implemented by recovered shipper
134  protected void postFinish() {
135  }
136
137  /**
138   * get batchEntry size excludes bulk load file sizes.
139   * Uses ReplicationSourceWALReader's static method.
140   */
141  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
142    int totalSize = 0;
143    for(Entry entry : entryBatch.getWalEntries()) {
144      totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
145    }
146    return  totalSize;
147  }
148
149  /**
150   * Do the shipping logic
151   */
152  private void shipEdits(WALEntryBatch entryBatch) {
153    List<Entry> entries = entryBatch.getWalEntries();
154    int sleepMultiplier = 0;
155    if (entries.isEmpty()) {
156      if (updateLogPosition(entryBatch)) {
157        // if there was nothing to ship and it's not an error
158        // set "ageOfLastShippedOp" to <now> to indicate that we're current
159        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
160          walGroupId);
161      }
162      return;
163    }
164    int currentSize = (int) entryBatch.getHeapSize();
165    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
166    while (isActive()) {
167      try {
168        try {
169          source.tryThrottle(currentSize);
170        } catch (InterruptedException e) {
171          LOG.debug("Interrupted while sleeping for throttling control");
172          Thread.currentThread().interrupt();
173          // current thread might be interrupted to terminate
174          // directly go back to while() for confirm this
175          continue;
176        }
177
178        // create replicateContext here, so the entries can be GC'd upon return from this call
179        // stack
180        ReplicationEndpoint.ReplicateContext replicateContext =
181            new ReplicationEndpoint.ReplicateContext();
182        replicateContext.setEntries(entries).setSize(currentSize);
183        replicateContext.setWalGroupId(walGroupId);
184
185        long startTimeNs = System.nanoTime();
186        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
187        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
188        long endTimeNs = System.nanoTime();
189
190        if (!replicated) {
191          continue;
192        } else {
193          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
194        }
195        // Clean up hfile references
196        for (Entry entry : entries) {
197          cleanUpHFileRefs(entry.getEdit());
198
199          TableName tableName = entry.getKey().getTableName();
200          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
201              tableName.getNameAsString());
202        }
203        // Log and clean up WAL logs
204        updateLogPosition(entryBatch);
205
206        //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
207        //this sizeExcludeBulkLoad has to use same calculation that when calling
208        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
209        //same variable: totalBufferUsed
210        source.postShipEdits(entries, sizeExcludeBulkLoad);
211        // FIXME check relationship between wal group and overall
212        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
213          entryBatch.getNbHFiles());
214        source.getSourceMetrics().setAgeOfLastShippedOp(
215          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
216        if (LOG.isTraceEnabled()) {
217          LOG.trace("Replicated {} entries or {} operations in {} ms",
218              entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
219        }
220        break;
221      } catch (Exception ex) {
222        LOG.warn("{} threw unknown exception:",
223          source.getReplicationEndpoint().getClass().getName(), ex);
224        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
225          sleepMultiplier++;
226        }
227      }
228    }
229  }
230
231  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
232    String peerId = source.getPeerId();
233    if (peerId.contains("-")) {
234      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
235      // A peerId will not have "-" in its name, see HBASE-11394
236      peerId = peerId.split("-")[0];
237    }
238    List<Cell> cells = edit.getCells();
239    int totalCells = cells.size();
240    for (int i = 0; i < totalCells; i++) {
241      Cell cell = cells.get(i);
242      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
243        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
244        List<StoreDescriptor> stores = bld.getStoresList();
245        int totalStores = stores.size();
246        for (int j = 0; j < totalStores; j++) {
247          List<String> storeFileList = stores.get(j).getStoreFileList();
248          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
249          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
250        }
251      }
252    }
253  }
254
255  private boolean updateLogPosition(WALEntryBatch batch) {
256    boolean updated = false;
257    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
258    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
259    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
260    // position and the file will be removed soon in cleanOldLogs.
261    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
262      batch.getLastWalPosition() != currentPosition) {
263      source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
264        source.isRecovered(), batch);
265      updated = true;
266    }
267    // if end of file is true, then we can just skip to the next file in queue.
268    // the only exception is for recovered queue, if we reach the end of the queue, then there will
269    // no more files so here the currentPath may be null.
270    if (batch.isEndOfFile()) {
271      currentPath = entryReader.getCurrentPath();
272      currentPosition = 0L;
273    } else {
274      currentPath = batch.getLastWalPath();
275      currentPosition = batch.getLastWalPosition();
276    }
277    return updated;
278  }
279
280  public void startup(UncaughtExceptionHandler handler) {
281    String name = Thread.currentThread().getName();
282    Threads.setDaemonThreadRunning(this,
283      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
284  }
285
286  Path getCurrentPath() {
287    return entryReader.getCurrentPath();
288  }
289
290  long getCurrentPosition() {
291    return currentPosition;
292  }
293
294  void setWALReader(ReplicationSourceWALReader entryReader) {
295    this.entryReader = entryReader;
296  }
297
298  long getStartPosition() {
299    return 0;
300  }
301
302  private boolean isActive() {
303    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
304  }
305
306  protected final void setWorkerState(WorkerState state) {
307    this.state = state;
308  }
309
310  void stopWorker() {
311    setWorkerState(WorkerState.STOPPED);
312  }
313
314  public boolean isFinished() {
315    return state == WorkerState.FINISHED;
316  }
317
318  /**
319   * Do the sleeping logic
320   * @param msg Why we sleep
321   * @param sleepMultiplier by how many times the default sleeping time is augmented
322   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
323   */
324  public boolean sleepForRetries(String msg, int sleepMultiplier) {
325    try {
326      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
327      Thread.sleep(this.sleepForRetries * sleepMultiplier);
328    } catch (InterruptedException e) {
329      LOG.debug("Interrupted while sleeping between retries");
330      Thread.currentThread().interrupt();
331    }
332    return sleepMultiplier < maxRetriesMultiplier;
333  }
334}