001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
029import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.hadoop.hbase.wal.WAL.Entry;
033import org.apache.hadoop.hbase.wal.WALEdit;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
040
041/**
042 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
043 * ReplicationSourceWALReaderThread
044 */
045@InterfaceAudience.Private
046public class ReplicationSourceShipper extends Thread {
047  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
048
049  // Hold the state of a replication worker thread
050  public enum WorkerState {
051    RUNNING,
052    STOPPED,
053    FINISHED,  // The worker is done processing a recovered queue
054  }
055
056  protected final Configuration conf;
057  protected final String walGroupId;
058  protected final PriorityBlockingQueue<Path> queue;
059  protected final ReplicationSourceInterface source;
060
061  // Last position in the log that we sent to ZooKeeper
062  protected long lastLoggedPosition = -1;
063  // Path of the current log
064  protected volatile Path currentPath;
065  // Current state of the worker thread
066  private WorkerState state;
067  protected ReplicationSourceWALReader entryReader;
068
069  // How long should we sleep for each retry
070  protected final long sleepForRetries;
071  // Maximum number of retries before taking bold actions
072  protected final int maxRetriesMultiplier;
073
074  public ReplicationSourceShipper(Configuration conf, String walGroupId,
075      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
076    this.conf = conf;
077    this.walGroupId = walGroupId;
078    this.queue = queue;
079    this.source = source;
080    this.sleepForRetries =
081        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
082    this.maxRetriesMultiplier =
083        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
084  }
085
086  @Override
087  public void run() {
088    setWorkerState(WorkerState.RUNNING);
089    // Loop until we close down
090    while (isActive()) {
091      int sleepMultiplier = 1;
092      // Sleep until replication is enabled again
093      if (!source.isPeerEnabled()) {
094        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
095          sleepMultiplier++;
096        }
097        continue;
098      }
099
100      while (entryReader == null) {
101        if (sleepForRetries("Replication WAL entry reader thread not initialized",
102          sleepMultiplier)) {
103          sleepMultiplier++;
104        }
105      }
106
107      try {
108        WALEntryBatch entryBatch = entryReader.take();
109        shipEdits(entryBatch);
110      } catch (InterruptedException e) {
111        LOG.trace("Interrupted while waiting for next replication entry batch", e);
112        Thread.currentThread().interrupt();
113      }
114    }
115    // If the worker exits run loop without finishing its task, mark it as stopped.
116    if (state != WorkerState.FINISHED) {
117      setWorkerState(WorkerState.STOPPED);
118    }
119  }
120
121  /**
122   * Do the shipping logic
123   */
124  protected void shipEdits(WALEntryBatch entryBatch) {
125    List<Entry> entries = entryBatch.getWalEntries();
126    long lastReadPosition = entryBatch.getLastWalPosition();
127    currentPath = entryBatch.getLastWalPath();
128    int sleepMultiplier = 0;
129    if (entries.isEmpty()) {
130      if (lastLoggedPosition != lastReadPosition) {
131        updateLogPosition(lastReadPosition);
132        // if there was nothing to ship and it's not an error
133        // set "ageOfLastShippedOp" to <now> to indicate that we're current
134        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
135          walGroupId);
136      }
137      return;
138    }
139    int currentSize = (int) entryBatch.getHeapSize();
140    while (isActive()) {
141      try {
142        try {
143          source.tryThrottle(currentSize);
144        } catch (InterruptedException e) {
145          LOG.debug("Interrupted while sleeping for throttling control");
146          Thread.currentThread().interrupt();
147          // current thread might be interrupted to terminate
148          // directly go back to while() for confirm this
149          continue;
150        }
151
152        // create replicateContext here, so the entries can be GC'd upon return from this call
153        // stack
154        ReplicationEndpoint.ReplicateContext replicateContext =
155            new ReplicationEndpoint.ReplicateContext();
156        replicateContext.setEntries(entries).setSize(currentSize);
157        replicateContext.setWalGroupId(walGroupId);
158
159        long startTimeNs = System.nanoTime();
160        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
161        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
162        long endTimeNs = System.nanoTime();
163
164        if (!replicated) {
165          continue;
166        } else {
167          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
168        }
169
170        if (this.lastLoggedPosition != lastReadPosition) {
171          //Clean up hfile references
172          int size = entries.size();
173          for (int i = 0; i < size; i++) {
174            cleanUpHFileRefs(entries.get(i).getEdit());
175          }
176          //Log and clean up WAL logs
177          updateLogPosition(lastReadPosition);
178        }
179
180        source.postShipEdits(entries, currentSize);
181        // FIXME check relationship between wal group and overall
182        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
183          entryBatch.getNbHFiles());
184        source.getSourceMetrics().setAgeOfLastShippedOp(
185          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
186        if (LOG.isTraceEnabled()) {
187          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
188              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
189        }
190        break;
191      } catch (Exception ex) {
192        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
193            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
194        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
195          sleepMultiplier++;
196        }
197      }
198    }
199  }
200
201  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
202    String peerId = source.getPeerId();
203    if (peerId.contains("-")) {
204      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
205      // A peerId will not have "-" in its name, see HBASE-11394
206      peerId = peerId.split("-")[0];
207    }
208    List<Cell> cells = edit.getCells();
209    int totalCells = cells.size();
210    for (int i = 0; i < totalCells; i++) {
211      Cell cell = cells.get(i);
212      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
213        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
214        List<StoreDescriptor> stores = bld.getStoresList();
215        int totalStores = stores.size();
216        for (int j = 0; j < totalStores; j++) {
217          List<String> storeFileList = stores.get(j).getStoreFileList();
218          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
219          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
220        }
221      }
222    }
223  }
224
225  protected void updateLogPosition(long lastReadPosition) {
226    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
227      lastReadPosition, false, false);
228    lastLoggedPosition = lastReadPosition;
229  }
230
231  public void startup(UncaughtExceptionHandler handler) {
232    String name = Thread.currentThread().getName();
233    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
234        + source.getPeerClusterZnode(), handler);
235  }
236
237  public PriorityBlockingQueue<Path> getLogQueue() {
238    return this.queue;
239  }
240
241  public Path getCurrentPath() {
242    return this.entryReader.getCurrentPath();
243  }
244
245  public long getCurrentPosition() {
246    return this.lastLoggedPosition;
247  }
248
249  public void setWALReader(ReplicationSourceWALReader entryReader) {
250    this.entryReader = entryReader;
251  }
252
253  public long getStartPosition() {
254    return 0;
255  }
256
257  protected boolean isActive() {
258    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
259  }
260
261  public void setWorkerState(WorkerState state) {
262    this.state = state;
263  }
264
265  public WorkerState getWorkerState() {
266    return state;
267  }
268
269  public void stopWorker() {
270    setWorkerState(WorkerState.STOPPED);
271  }
272
273  public boolean isFinished() {
274    return state == WorkerState.FINISHED;
275  }
276
277  /**
278   * Do the sleeping logic
279   * @param msg Why we sleep
280   * @param sleepMultiplier by how many times the default sleeping time is augmented
281   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
282   */
283  public boolean sleepForRetries(String msg, int sleepMultiplier) {
284    try {
285      if (LOG.isTraceEnabled()) {
286        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
287      }
288      Thread.sleep(this.sleepForRetries * sleepMultiplier);
289    } catch (InterruptedException e) {
290      LOG.debug("Interrupted while sleeping between retries");
291      Thread.currentThread().interrupt();
292    }
293    return sleepMultiplier < maxRetriesMultiplier;
294  }
295}