001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Future;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HBaseIOException;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.RegionLocations;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.ipc.HBaseRpcController;
041import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046
047import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
048
049/**
050 * Caller that goes to replica if the primary region does no answer within a configurable
051 * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
052 * the first answer. If the answer comes from one of the secondary replica, it will
053 * be marked as stale.
054 */
055@InterfaceAudience.Private
056public class RpcRetryingCallerWithReadReplicas {
057  private static final Logger LOG =
058      LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class);
059
060  protected final ExecutorService pool;
061  protected final ClusterConnection cConnection;
062  protected final Configuration conf;
063  protected final Get get;
064  protected final TableName tableName;
065  protected final int timeBeforeReplicas;
066  private final int operationTimeout;
067  private final int rpcTimeout;
068  private final int retries;
069  private final RpcControllerFactory rpcControllerFactory;
070  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
071
072  public RpcRetryingCallerWithReadReplicas(
073      RpcControllerFactory rpcControllerFactory, TableName tableName,
074      ClusterConnection cConnection, final Get get,
075      ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
076      int timeBeforeReplicas) {
077    this.rpcControllerFactory = rpcControllerFactory;
078    this.tableName = tableName;
079    this.cConnection = cConnection;
080    this.conf = cConnection.getConfiguration();
081    this.get = get;
082    this.pool = pool;
083    this.retries = retries;
084    this.operationTimeout = operationTimeout;
085    this.rpcTimeout = rpcTimeout;
086    this.timeBeforeReplicas = timeBeforeReplicas;
087    this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
088  }
089
090  /**
091   * A RegionServerCallable that takes into account the replicas, i.e.
092   * - the call can be on any replica
093   * - we need to stop retrying when the call is completed
094   * - we can be interrupted
095   */
096  class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> {
097    final int id;
098    public ReplicaRegionServerCallable(int id, HRegionLocation location) {
099      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
100          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
101          rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
102      this.id = id;
103      this.location = location;
104    }
105
106    /**
107     * Two responsibilities
108     * - if the call is already completed (by another replica) stops the retries.
109     * - set the location to the right region, depending on the replica.
110     */
111    @Override
112    // TODO: Very like the super class implemenation. Can we shrink this down?
113    public void prepare(final boolean reload) throws IOException {
114      if (getRpcController().isCanceled()) return;
115      if (Thread.interrupted()) {
116        throw new InterruptedIOException();
117      }
118      if (reload || location == null) {
119        RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
120        location = id < rl.size() ? rl.getRegionLocation(id) : null;
121      }
122
123      if (location == null || location.getServerName() == null) {
124        // With this exception, there will be a retry. The location can be null for a replica
125        //  when the table is created or after a split.
126        throw new HBaseIOException("There is no location for replica id #" + id);
127      }
128
129      setStubByServiceName(this.location.getServerName());
130    }
131
132    @Override
133    // TODO: Very like the super class implemenation. Can we shrink this down?
134    protected Result rpcCall() throws Exception {
135      if (getRpcController().isCanceled()) return null;
136      if (Thread.interrupted()) {
137        throw new InterruptedIOException();
138      }
139      byte[] reg = location.getRegionInfo().getRegionName();
140      ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
141      HBaseRpcController hrc = (HBaseRpcController)getRpcController();
142      hrc.reset();
143      hrc.setCallTimeout(rpcTimeout);
144      hrc.setPriority(tableName);
145      ClientProtos.GetResponse response = getStub().get(hrc, request);
146      if (response == null) {
147        return null;
148      }
149      return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner());
150    }
151  }
152
153  /**
154   * <p>
155   * Algo:
156   * - we put the query into the execution pool.
157   * - after x ms, if we don't have a result, we add the queries for the secondary replicas
158   * - we take the first answer
159   * - when done, we cancel what's left. Cancelling means:
160   * - removing from the pool if the actual call was not started
161   * - interrupting the call if it has started
162   * Client side, we need to take into account
163   * - a call is not executed immediately after being put into the pool
164   * - a call is a thread. Let's not multiply the number of thread by the number of replicas.
165   * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
166   * can take some i/o.
167   * </p>
168   * Globally, the number of retries, timeout and so on still applies, but it's per replica,
169   * not global. We continue until all retries are done, or all timeouts are exceeded.
170   */
171  public Result call(int operationTimeout)
172      throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
173    boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
174
175    RegionLocations rl = null;
176    boolean skipPrimary = false;
177    try {
178      rl = getRegionLocations(true,
179        (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
180        cConnection, tableName, get.getRow());
181    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
182      // When there is no specific replica id specified. It just needs to load all replicas.
183      if (isTargetReplicaSpecified) {
184        throw e;
185      } else {
186        // We cannot get the primary replica location, it is possible that the region
187        // server hosting meta is down, it needs to proceed to try cached replicas.
188        if (cConnection instanceof ConnectionImplementation) {
189          rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
190          if (rl == null) {
191            // No cached locations
192            throw e;
193          }
194
195          // Primary replica location is not known, skip primary replica
196          skipPrimary = true;
197        } else {
198          // For completeness
199          throw e;
200        }
201      }
202    }
203
204    final ResultBoundedCompletionService<Result> cs =
205        new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
206    int startIndex = 0;
207    int endIndex = rl.size();
208
209    if(isTargetReplicaSpecified) {
210      addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
211      endIndex = 1;
212    } else {
213      if (!skipPrimary) {
214        addCallsForReplica(cs, rl, 0, 0);
215        try {
216          // wait for the timeout to see whether the primary responds back
217          Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
218          if (f != null) {
219            return f.get(); //great we got a response
220          }
221          if (cConnection.getConnectionMetrics() != null) {
222            cConnection.getConnectionMetrics().incrHedgedReadOps();
223          }
224        } catch (ExecutionException e) {
225          // We ignore the ExecutionException and continue with the secondary replicas
226          if (LOG.isDebugEnabled()) {
227            LOG.debug("Primary replica returns " + e.getCause());
228          }
229
230          // Skip the result from the primary as we know that there is something wrong
231          startIndex = 1;
232        } catch (CancellationException e) {
233          throw new InterruptedIOException();
234        } catch (InterruptedException e) {
235          throw new InterruptedIOException();
236        }
237      } else {
238        // Since primary replica is skipped, the endIndex needs to be adjusted accordingly
239        endIndex --;
240      }
241
242      // submit call for the all of the secondaries at once
243      addCallsForReplica(cs, rl, 1, rl.size() - 1);
244    }
245    try {
246      ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
247          cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
248      if (f == null) {
249        throw new RetriesExhaustedException("Timed out after " + operationTimeout +
250            "ms. Get is sent to replicas with startIndex: " + startIndex +
251            ", endIndex: " + endIndex + ", Locations: " + rl);
252      }
253      if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
254          !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
255        cConnection.getConnectionMetrics().incrHedgedReadWin();
256      }
257      return f.get();
258    } catch (ExecutionException e) {
259      throwEnrichedException(e, retries);
260    } catch (CancellationException e) {
261      throw new InterruptedIOException();
262    } catch (InterruptedException e) {
263      throw new InterruptedIOException();
264    } finally {
265      // We get there because we were interrupted or because one or more of the
266      // calls succeeded or failed. In all case, we stop all our tasks.
267      cs.cancelAll();
268    }
269
270    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
271    return null; // unreachable
272  }
273
274  /**
275   * Extract the real exception from the ExecutionException, and throws what makes more
276   * sense.
277   */
278  static void throwEnrichedException(ExecutionException e, int retries)
279      throws RetriesExhaustedException, DoNotRetryIOException {
280    Throwable t = e.getCause();
281    assert t != null; // That's what ExecutionException is about: holding an exception
282    t.printStackTrace();
283
284    if (t instanceof RetriesExhaustedException) {
285      throw (RetriesExhaustedException) t;
286    }
287
288    if (t instanceof DoNotRetryIOException) {
289      throw (DoNotRetryIOException) t;
290    }
291
292    RetriesExhaustedException.ThrowableWithExtraContext qt =
293        new RetriesExhaustedException.ThrowableWithExtraContext(t,
294            EnvironmentEdgeManager.currentTime(), null);
295
296    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
297        Collections.singletonList(qt);
298
299    throw new RetriesExhaustedException(retries, exceptions);
300  }
301
302  /**
303   * Creates the calls and submit them
304   *
305   * @param cs  - the completion service to use for submitting
306   * @param rl  - the region locations
307   * @param min - the id of the first replica, inclusive
308   * @param max - the id of the last replica, inclusive.
309   */
310  private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
311                                 RegionLocations rl, int min, int max) {
312    for (int id = min; id <= max; id++) {
313      HRegionLocation hrl = rl.getRegionLocation(id);
314      ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
315      cs.submit(callOnReplica, operationTimeout, id);
316    }
317  }
318
319  static RegionLocations getRegionLocations(boolean useCache, int replicaId,
320                 ClusterConnection cConnection, TableName tableName, byte[] row)
321      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
322
323    RegionLocations rl;
324    try {
325      if (useCache) {
326        rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
327      } else {
328        rl = cConnection.relocateRegion(tableName, row, replicaId);
329      }
330    } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
331      throw e;
332    } catch (IOException e) {
333      throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e);
334    }
335    if (rl == null) {
336      throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
337    }
338
339    return rl;
340  }
341}