001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020
021import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.Collections;
026import java.util.List;
027import java.util.concurrent.CancellationException;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseIOException;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.ipc.HBaseRpcController;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
050
051/**
052 * Caller that goes to replica if the primary region does no answer within a configurable
053 * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
054 * the first answer. If the answer comes from one of the secondary replica, it will
055 * be marked as stale.
056 */
057@InterfaceAudience.Private
058public class RpcRetryingCallerWithReadReplicas {
059  private static final Logger LOG =
060      LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class);
061
062  protected final ExecutorService pool;
063  protected final ClusterConnection cConnection;
064  protected final Configuration conf;
065  protected final Get get;
066  protected final TableName tableName;
067  protected final int timeBeforeReplicas;
068  private final int operationTimeout;
069  private final int rpcTimeout;
070  private final int retries;
071  private final RpcControllerFactory rpcControllerFactory;
072  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
073
074  public RpcRetryingCallerWithReadReplicas(
075      RpcControllerFactory rpcControllerFactory, TableName tableName,
076      ClusterConnection cConnection, final Get get,
077      ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
078      int timeBeforeReplicas) {
079    this.rpcControllerFactory = rpcControllerFactory;
080    this.tableName = tableName;
081    this.cConnection = cConnection;
082    this.conf = cConnection.getConfiguration();
083    this.get = get;
084    this.pool = pool;
085    this.retries = retries;
086    this.operationTimeout = operationTimeout;
087    this.rpcTimeout = rpcTimeout;
088    this.timeBeforeReplicas = timeBeforeReplicas;
089    this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
090  }
091
092  /**
093   * A RegionServerCallable that takes into account the replicas, i.e.
094   * - the call can be on any replica
095   * - we need to stop retrying when the call is completed
096   * - we can be interrupted
097   */
098  class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> {
099    final int id;
100    public ReplicaRegionServerCallable(int id, HRegionLocation location) {
101      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
102          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
103          rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
104      this.id = id;
105      this.location = location;
106    }
107
108    /**
109     * Two responsibilities
110     * - if the call is already completed (by another replica) stops the retries.
111     * - set the location to the right region, depending on the replica.
112     */
113    @Override
114    // TODO: Very like the super class implemenation. Can we shrink this down?
115    public void prepare(final boolean reload) throws IOException {
116      if (getRpcController().isCanceled()) return;
117      if (Thread.interrupted()) {
118        throw new InterruptedIOException();
119      }
120      if (reload || location == null) {
121        RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
122        location = id < rl.size() ? rl.getRegionLocation(id) : null;
123      }
124
125      if (location == null || location.getServerName() == null) {
126        // With this exception, there will be a retry. The location can be null for a replica
127        //  when the table is created or after a split.
128        throw new HBaseIOException("There is no location for replica id #" + id);
129      }
130
131      setStubByServiceName(this.location.getServerName());
132    }
133
134    @Override
135    // TODO: Very like the super class implemenation. Can we shrink this down?
136    protected Result rpcCall() throws Exception {
137      if (getRpcController().isCanceled()) return null;
138      if (Thread.interrupted()) {
139        throw new InterruptedIOException();
140      }
141      byte[] reg = location.getRegionInfo().getRegionName();
142      ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
143      HBaseRpcController hrc = (HBaseRpcController)getRpcController();
144      hrc.reset();
145      hrc.setCallTimeout(rpcTimeout);
146      hrc.setPriority(tableName);
147      ClientProtos.GetResponse response = getStub().get(hrc, request);
148      if (response == null) {
149        return null;
150      }
151      return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner());
152    }
153  }
154
155  /**
156   * <p>
157   * Algo:
158   * - we put the query into the execution pool.
159   * - after x ms, if we don't have a result, we add the queries for the secondary replicas
160   * - we take the first answer
161   * - when done, we cancel what's left. Cancelling means:
162   * - removing from the pool if the actual call was not started
163   * - interrupting the call if it has started
164   * Client side, we need to take into account
165   * - a call is not executed immediately after being put into the pool
166   * - a call is a thread. Let's not multiply the number of thread by the number of replicas.
167   * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
168   * can take some i/o.
169   * </p>
170   * Globally, the number of retries, timeout and so on still applies, but it's per replica,
171   * not global. We continue until all retries are done, or all timeouts are exceeded.
172   */
173  public Result call(int operationTimeout)
174      throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
175    boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
176
177    RegionLocations rl = null;
178    boolean skipPrimary = false;
179    try {
180      rl = getRegionLocations(true,
181        (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
182        cConnection, tableName, get.getRow());
183    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
184      // When there is no specific replica id specified. It just needs to load all replicas.
185      if (isTargetReplicaSpecified) {
186        throw e;
187      } else {
188        // We cannot get the primary replica location, it is possible that the region
189        // server hosting meta is down, it needs to proceed to try cached replicas.
190        if (cConnection instanceof ConnectionImplementation) {
191          rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
192          if (rl == null) {
193            // No cached locations
194            throw e;
195          }
196
197          // Primary replica location is not known, skip primary replica
198          skipPrimary = true;
199        } else {
200          // For completeness
201          throw e;
202        }
203      }
204    }
205
206    final ResultBoundedCompletionService<Result> cs =
207        new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
208    int startIndex = 0;
209    int endIndex = rl.size();
210
211    if(isTargetReplicaSpecified) {
212      addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
213      endIndex = 1;
214    } else {
215      if (!skipPrimary) {
216        addCallsForReplica(cs, rl, 0, 0);
217        try {
218          // wait for the timeout to see whether the primary responds back
219          Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
220          if (f != null) {
221            return f.get(); //great we got a response
222          }
223          if (cConnection.getConnectionMetrics() != null) {
224            cConnection.getConnectionMetrics().incrHedgedReadOps();
225          }
226        } catch (ExecutionException e) {
227          // We ignore the ExecutionException and continue with the secondary replicas
228          if (LOG.isDebugEnabled()) {
229            LOG.debug("Primary replica returns " + e.getCause());
230          }
231
232          // Skip the result from the primary as we know that there is something wrong
233          startIndex = 1;
234        } catch (CancellationException e) {
235          throw new InterruptedIOException();
236        } catch (InterruptedException e) {
237          throw new InterruptedIOException();
238        }
239      } else {
240        // Since primary replica is skipped, the endIndex needs to be adjusted accordingly
241        endIndex --;
242      }
243
244      // submit call for the all of the secondaries at once
245      addCallsForReplica(cs, rl, 1, rl.size() - 1);
246    }
247    try {
248      ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
249          cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
250      if (f == null) {
251        throw new RetriesExhaustedException("Timed out after " + operationTimeout +
252            "ms. Get is sent to replicas with startIndex: " + startIndex +
253            ", endIndex: " + endIndex + ", Locations: " + rl);
254      }
255      if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
256          !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
257        cConnection.getConnectionMetrics().incrHedgedReadWin();
258      }
259      return f.get();
260    } catch (ExecutionException e) {
261      throwEnrichedException(e, retries);
262    } catch (CancellationException e) {
263      throw new InterruptedIOException();
264    } catch (InterruptedException e) {
265      throw new InterruptedIOException();
266    } finally {
267      // We get there because we were interrupted or because one or more of the
268      // calls succeeded or failed. In all case, we stop all our tasks.
269      cs.cancelAll();
270    }
271
272    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
273    return null; // unreachable
274  }
275
276  /**
277   * Extract the real exception from the ExecutionException, and throws what makes more
278   * sense.
279   */
280  static void throwEnrichedException(ExecutionException e, int retries)
281      throws RetriesExhaustedException, DoNotRetryIOException {
282    Throwable t = e.getCause();
283    assert t != null; // That's what ExecutionException is about: holding an exception
284    t.printStackTrace();
285
286    if (t instanceof RetriesExhaustedException) {
287      throw (RetriesExhaustedException) t;
288    }
289
290    if (t instanceof DoNotRetryIOException) {
291      throw (DoNotRetryIOException) t;
292    }
293
294    RetriesExhaustedException.ThrowableWithExtraContext qt =
295        new RetriesExhaustedException.ThrowableWithExtraContext(t,
296            EnvironmentEdgeManager.currentTime(), null);
297
298    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
299        Collections.singletonList(qt);
300
301    throw new RetriesExhaustedException(retries, exceptions);
302  }
303
304  /**
305   * Creates the calls and submit them
306   *
307   * @param cs  - the completion service to use for submitting
308   * @param rl  - the region locations
309   * @param min - the id of the first replica, inclusive
310   * @param max - the id of the last replica, inclusive.
311   */
312  private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
313                                 RegionLocations rl, int min, int max) {
314    for (int id = min; id <= max; id++) {
315      HRegionLocation hrl = rl.getRegionLocation(id);
316      ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
317      cs.submit(callOnReplica, operationTimeout, id);
318    }
319  }
320
321  static RegionLocations getRegionLocations(boolean useCache, int replicaId,
322                 ClusterConnection cConnection, TableName tableName, byte[] row)
323      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
324
325    RegionLocations rl;
326    try {
327      if (useCache) {
328        rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
329      } else {
330        rl = cConnection.relocateRegion(tableName, row, replicaId);
331      }
332    } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
333      throw e;
334    } catch (IOException e) {
335      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
336          + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e);
337    }
338    if (rl == null) {
339      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
340          + " of region for " + Bytes.toStringBinary(row) + " in " + tableName);
341    }
342
343    return rl;
344  }
345}