001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.Collections;
025import java.util.List;
026import java.util.concurrent.CancellationException;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HBaseIOException;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.RegionLocations;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
046import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
048
049/**
050 * Caller that goes to replica if the primary region does no answer within a configurable timeout.
051 * If the timeout is reached, it calls all the secondary replicas, and returns the first answer. If
052 * the answer comes from one of the secondary replica, it will be marked as stale.
053 */
054@InterfaceAudience.Private
055public class RpcRetryingCallerWithReadReplicas {
056  private static final Logger LOG =
057    LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class);
058
059  protected final ExecutorService pool;
060  protected final ClusterConnection cConnection;
061  protected final Configuration conf;
062  protected final Get get;
063  protected final TableName tableName;
064  protected final int timeBeforeReplicas;
065  private final int operationTimeout;
066  private final int rpcTimeout;
067  private final int retries;
068  private final RpcControllerFactory rpcControllerFactory;
069  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
070
071  public RpcRetryingCallerWithReadReplicas(RpcControllerFactory rpcControllerFactory,
072    TableName tableName, ClusterConnection cConnection, final Get get, ExecutorService pool,
073    int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas) {
074    this.rpcControllerFactory = rpcControllerFactory;
075    this.tableName = tableName;
076    this.cConnection = cConnection;
077    this.conf = cConnection.getConfiguration();
078    this.get = get;
079    this.pool = pool;
080    this.retries = retries;
081    this.operationTimeout = operationTimeout;
082    this.rpcTimeout = rpcTimeout;
083    this.timeBeforeReplicas = timeBeforeReplicas;
084    this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
085  }
086
087  /**
088   * A RegionServerCallable that takes into account the replicas, i.e. - the call can be on any
089   * replica - we need to stop retrying when the call is completed - we can be interrupted
090   */
091  class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> {
092    final int id;
093
094    public ReplicaRegionServerCallable(int id, HRegionLocation location) {
095      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
096        RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
097        rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(),
098        PRIORITY_UNSET);
099      this.id = id;
100      this.location = location;
101    }
102
103    /**
104     * Two responsibilities - if the call is already completed (by another replica) stops the
105     * retries. - set the location to the right region, depending on the replica.
106     */
107    @Override
108    // TODO: Very like the super class implemenation. Can we shrink this down?
109    public void prepare(final boolean reload) throws IOException {
110      if (getRpcController().isCanceled()) return;
111      if (Thread.interrupted()) {
112        throw new InterruptedIOException();
113      }
114      if (reload || location == null) {
115        RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
116        location = id < rl.size() ? rl.getRegionLocation(id) : null;
117      }
118
119      if (location == null || location.getServerName() == null) {
120        // With this exception, there will be a retry. The location can be null for a replica
121        // when the table is created or after a split.
122        throw new HBaseIOException("There is no location for replica id #" + id);
123      }
124
125      setStubByServiceName(this.location.getServerName());
126    }
127
128    @Override
129    // TODO: Very like the super class implemenation. Can we shrink this down?
130    protected Result rpcCall() throws Exception {
131      if (getRpcController().isCanceled()) return null;
132      if (Thread.interrupted()) {
133        throw new InterruptedIOException();
134      }
135      byte[] reg = location.getRegionInfo().getRegionName();
136      ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
137      HBaseRpcController hrc = (HBaseRpcController) getRpcController();
138      hrc.reset();
139      hrc.setCallTimeout(rpcTimeout);
140      hrc.setPriority(tableName);
141      ClientProtos.GetResponse response = getStub().get(hrc, request);
142      if (response == null) {
143        return null;
144      }
145      return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner());
146    }
147  }
148
149  /**
150   * <p>
151   * Algo: - we put the query into the execution pool. - after x ms, if we don't have a result, we
152   * add the queries for the secondary replicas - we take the first answer - when done, we cancel
153   * what's left. Cancelling means: - removing from the pool if the actual call was not started -
154   * interrupting the call if it has started Client side, we need to take into account - a call is
155   * not executed immediately after being put into the pool - a call is a thread. Let's not multiply
156   * the number of thread by the number of replicas. Server side, if we can cancel when it's still
157   * in the handler pool, it's much better, as a call can take some i/o.
158   * </p>
159   * Globally, the number of retries, timeout and so on still applies, but it's per replica, not
160   * global. We continue until all retries are done, or all timeouts are exceeded.
161   */
162  public Result call(int operationTimeout)
163    throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
164    boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
165
166    RegionLocations rl = null;
167    boolean skipPrimary = false;
168    try {
169      rl = getRegionLocations(true,
170        (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
171        cConnection, tableName, get.getRow());
172    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
173      // When there is no specific replica id specified. It just needs to load all replicas.
174      if (isTargetReplicaSpecified) {
175        throw e;
176      } else {
177        // We cannot get the primary replica location, it is possible that the region
178        // server hosting meta is down, it needs to proceed to try cached replicas.
179        if (cConnection instanceof ConnectionImplementation) {
180          rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName, get.getRow());
181          if (rl == null) {
182            // No cached locations
183            throw e;
184          }
185
186          // Primary replica location is not known, skip primary replica
187          skipPrimary = true;
188        } else {
189          // For completeness
190          throw e;
191        }
192      }
193    }
194
195    final ResultBoundedCompletionService<Result> cs =
196      new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
197    int startIndex = 0;
198    int endIndex = rl.size();
199
200    if (isTargetReplicaSpecified) {
201      addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
202      endIndex = 1;
203    } else {
204      if (!skipPrimary) {
205        addCallsForReplica(cs, rl, 0, 0);
206        try {
207          // wait for the timeout to see whether the primary responds back
208          Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes,
209                                                                                 // microseconds
210          if (f != null) {
211            return f.get(); // great we got a response
212          }
213          if (cConnection.getConnectionMetrics() != null) {
214            cConnection.getConnectionMetrics().incrHedgedReadOps();
215          }
216        } catch (ExecutionException e) {
217          // We ignore the ExecutionException and continue with the secondary replicas
218          if (LOG.isDebugEnabled()) {
219            LOG.debug("Primary replica returns " + e.getCause());
220          }
221
222          // Skip the result from the primary as we know that there is something wrong
223          startIndex = 1;
224        } catch (CancellationException e) {
225          throw new InterruptedIOException();
226        } catch (InterruptedException e) {
227          throw new InterruptedIOException();
228        }
229      } else {
230        // Since primary replica is skipped, the endIndex needs to be adjusted accordingly
231        endIndex--;
232      }
233
234      // submit call for the all of the secondaries at once
235      addCallsForReplica(cs, rl, 1, rl.size() - 1);
236    }
237    try {
238      ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
239        cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS,
240          startIndex, endIndex);
241      if (f == null) {
242        throw new RetriesExhaustedException(
243          "Timed out after " + operationTimeout + "ms. Get is sent to replicas with startIndex: "
244            + startIndex + ", endIndex: " + endIndex + ", Locations: " + rl);
245      }
246      if (
247        cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && !skipPrimary
248          && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID
249      ) {
250        cConnection.getConnectionMetrics().incrHedgedReadWin();
251      }
252      return f.get();
253    } catch (ExecutionException e) {
254      throwEnrichedException(e, retries);
255    } catch (CancellationException e) {
256      throw new InterruptedIOException();
257    } catch (InterruptedException e) {
258      throw new InterruptedIOException();
259    } finally {
260      // We get there because we were interrupted or because one or more of the
261      // calls succeeded or failed. In all case, we stop all our tasks.
262      cs.cancelAll();
263    }
264
265    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
266    return null; // unreachable
267  }
268
269  /**
270   * Extract the real exception from the ExecutionException, and throws what makes more sense.
271   */
272  static void throwEnrichedException(ExecutionException e, int retries)
273    throws RetriesExhaustedException, DoNotRetryIOException {
274    Throwable t = e.getCause();
275    assert t != null; // That's what ExecutionException is about: holding an exception
276    t.printStackTrace();
277
278    if (t instanceof RetriesExhaustedException) {
279      throw (RetriesExhaustedException) t;
280    }
281
282    if (t instanceof DoNotRetryIOException) {
283      throw (DoNotRetryIOException) t;
284    }
285
286    RetriesExhaustedException.ThrowableWithExtraContext qt =
287      new RetriesExhaustedException.ThrowableWithExtraContext(t,
288        EnvironmentEdgeManager.currentTime(), null);
289
290    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
291      Collections.singletonList(qt);
292
293    throw new RetriesExhaustedException(retries, exceptions);
294  }
295
296  /**
297   * Creates the calls and submit them
298   * @param cs  - the completion service to use for submitting
299   * @param rl  - the region locations
300   * @param min - the id of the first replica, inclusive
301   * @param max - the id of the last replica, inclusive.
302   */
303  private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, RegionLocations rl,
304    int min, int max) {
305    for (int id = min; id <= max; id++) {
306      HRegionLocation hrl = rl.getRegionLocation(id);
307      ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
308      cs.submit(callOnReplica, rpcTimeout, operationTimeout, id);
309    }
310  }
311
312  static RegionLocations getRegionLocations(boolean useCache, int replicaId,
313    ClusterConnection cConnection, TableName tableName, byte[] row)
314    throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
315
316    RegionLocations rl;
317    try {
318      if (useCache) {
319        rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
320      } else {
321        rl = cConnection.relocateRegion(tableName, row, replicaId);
322      }
323    } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
324      throw e;
325    } catch (IOException e) {
326      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
327        + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e);
328    }
329    if (rl == null) {
330      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
331        + " of region for " + Bytes.toStringBinary(row) + " in " + tableName);
332    }
333
334    return rl;
335  }
336}