001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.CancellationException; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Future; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseIOException; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.RegionLocations; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.ipc.HBaseRpcController; 041import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046 047import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 048 049/** 050 * Caller that goes to replica if the primary region does no answer within a configurable 051 * timeout. If the timeout is reached, it calls all the secondary replicas, and returns 052 * the first answer. If the answer comes from one of the secondary replica, it will 053 * be marked as stale. 054 */ 055@InterfaceAudience.Private 056public class RpcRetryingCallerWithReadReplicas { 057 private static final Logger LOG = 058 LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class); 059 060 protected final ExecutorService pool; 061 protected final ClusterConnection cConnection; 062 protected final Configuration conf; 063 protected final Get get; 064 protected final TableName tableName; 065 protected final int timeBeforeReplicas; 066 private final int operationTimeout; 067 private final int rpcTimeout; 068 private final int retries; 069 private final RpcControllerFactory rpcControllerFactory; 070 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 071 072 public RpcRetryingCallerWithReadReplicas( 073 RpcControllerFactory rpcControllerFactory, TableName tableName, 074 ClusterConnection cConnection, final Get get, 075 ExecutorService pool, int retries, int operationTimeout, int rpcTimeout, 076 int timeBeforeReplicas) { 077 this.rpcControllerFactory = rpcControllerFactory; 078 this.tableName = tableName; 079 this.cConnection = cConnection; 080 this.conf = cConnection.getConfiguration(); 081 this.get = get; 082 this.pool = pool; 083 this.retries = retries; 084 this.operationTimeout = operationTimeout; 085 this.rpcTimeout = rpcTimeout; 086 this.timeBeforeReplicas = timeBeforeReplicas; 087 this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); 088 } 089 090 /** 091 * A RegionServerCallable that takes into account the replicas, i.e. 092 * - the call can be on any replica 093 * - we need to stop retrying when the call is completed 094 * - we can be interrupted 095 */ 096 class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> { 097 final int id; 098 public ReplicaRegionServerCallable(int id, HRegionLocation location) { 099 super(RpcRetryingCallerWithReadReplicas.this.cConnection, 100 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), 101 rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET); 102 this.id = id; 103 this.location = location; 104 } 105 106 /** 107 * Two responsibilities 108 * - if the call is already completed (by another replica) stops the retries. 109 * - set the location to the right region, depending on the replica. 110 */ 111 @Override 112 // TODO: Very like the super class implemenation. Can we shrink this down? 113 public void prepare(final boolean reload) throws IOException { 114 if (getRpcController().isCanceled()) return; 115 if (Thread.interrupted()) { 116 throw new InterruptedIOException(); 117 } 118 if (reload || location == null) { 119 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); 120 location = id < rl.size() ? rl.getRegionLocation(id) : null; 121 } 122 123 if (location == null || location.getServerName() == null) { 124 // With this exception, there will be a retry. The location can be null for a replica 125 // when the table is created or after a split. 126 throw new HBaseIOException("There is no location for replica id #" + id); 127 } 128 129 setStubByServiceName(this.location.getServerName()); 130 } 131 132 @Override 133 // TODO: Very like the super class implemenation. Can we shrink this down? 134 protected Result rpcCall() throws Exception { 135 if (getRpcController().isCanceled()) return null; 136 if (Thread.interrupted()) { 137 throw new InterruptedIOException(); 138 } 139 byte[] reg = location.getRegionInfo().getRegionName(); 140 ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); 141 HBaseRpcController hrc = (HBaseRpcController)getRpcController(); 142 hrc.reset(); 143 hrc.setCallTimeout(rpcTimeout); 144 hrc.setPriority(tableName); 145 ClientProtos.GetResponse response = getStub().get(hrc, request); 146 if (response == null) { 147 return null; 148 } 149 return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner()); 150 } 151 } 152 153 /** 154 * <p> 155 * Algo: 156 * - we put the query into the execution pool. 157 * - after x ms, if we don't have a result, we add the queries for the secondary replicas 158 * - we take the first answer 159 * - when done, we cancel what's left. Cancelling means: 160 * - removing from the pool if the actual call was not started 161 * - interrupting the call if it has started 162 * Client side, we need to take into account 163 * - a call is not executed immediately after being put into the pool 164 * - a call is a thread. Let's not multiply the number of thread by the number of replicas. 165 * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call 166 * can take some i/o. 167 * </p> 168 * Globally, the number of retries, timeout and so on still applies, but it's per replica, 169 * not global. We continue until all retries are done, or all timeouts are exceeded. 170 */ 171 public Result call(int operationTimeout) 172 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { 173 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); 174 175 RegionLocations rl = null; 176 boolean skipPrimary = false; 177 try { 178 rl = getRegionLocations(true, 179 (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), 180 cConnection, tableName, get.getRow()); 181 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 182 // When there is no specific replica id specified. It just needs to load all replicas. 183 if (isTargetReplicaSpecified) { 184 throw e; 185 } else { 186 // We cannot get the primary replica location, it is possible that the region 187 // server hosting meta is down, it needs to proceed to try cached replicas. 188 if (cConnection instanceof ConnectionImplementation) { 189 rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow()); 190 if (rl == null) { 191 // No cached locations 192 throw e; 193 } 194 195 // Primary replica location is not known, skip primary replica 196 skipPrimary = true; 197 } else { 198 // For completeness 199 throw e; 200 } 201 } 202 } 203 204 final ResultBoundedCompletionService<Result> cs = 205 new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); 206 int startIndex = 0; 207 int endIndex = rl.size(); 208 209 if(isTargetReplicaSpecified) { 210 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); 211 endIndex = 1; 212 } else { 213 if (!skipPrimary) { 214 addCallsForReplica(cs, rl, 0, 0); 215 try { 216 // wait for the timeout to see whether the primary responds back 217 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds 218 if (f != null) { 219 return f.get(); //great we got a response 220 } 221 if (cConnection.getConnectionMetrics() != null) { 222 cConnection.getConnectionMetrics().incrHedgedReadOps(); 223 } 224 } catch (ExecutionException e) { 225 // We ignore the ExecutionException and continue with the secondary replicas 226 if (LOG.isDebugEnabled()) { 227 LOG.debug("Primary replica returns " + e.getCause()); 228 } 229 230 // Skip the result from the primary as we know that there is something wrong 231 startIndex = 1; 232 } catch (CancellationException e) { 233 throw new InterruptedIOException(); 234 } catch (InterruptedException e) { 235 throw new InterruptedIOException(); 236 } 237 } else { 238 // Since primary replica is skipped, the endIndex needs to be adjusted accordingly 239 endIndex --; 240 } 241 242 // submit call for the all of the secondaries at once 243 addCallsForReplica(cs, rl, 1, rl.size() - 1); 244 } 245 try { 246 ResultBoundedCompletionService<Result>.QueueingFuture<Result> f = 247 cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex); 248 if (f == null) { 249 throw new RetriesExhaustedException("Timed out after " + operationTimeout + 250 "ms. Get is sent to replicas with startIndex: " + startIndex + 251 ", endIndex: " + endIndex + ", Locations: " + rl); 252 } 253 if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && 254 !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { 255 cConnection.getConnectionMetrics().incrHedgedReadWin(); 256 } 257 return f.get(); 258 } catch (ExecutionException e) { 259 throwEnrichedException(e, retries); 260 } catch (CancellationException e) { 261 throw new InterruptedIOException(); 262 } catch (InterruptedException e) { 263 throw new InterruptedIOException(); 264 } finally { 265 // We get there because we were interrupted or because one or more of the 266 // calls succeeded or failed. In all case, we stop all our tasks. 267 cs.cancelAll(); 268 } 269 270 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 271 return null; // unreachable 272 } 273 274 /** 275 * Extract the real exception from the ExecutionException, and throws what makes more 276 * sense. 277 */ 278 static void throwEnrichedException(ExecutionException e, int retries) 279 throws RetriesExhaustedException, DoNotRetryIOException { 280 Throwable t = e.getCause(); 281 assert t != null; // That's what ExecutionException is about: holding an exception 282 t.printStackTrace(); 283 284 if (t instanceof RetriesExhaustedException) { 285 throw (RetriesExhaustedException) t; 286 } 287 288 if (t instanceof DoNotRetryIOException) { 289 throw (DoNotRetryIOException) t; 290 } 291 292 RetriesExhaustedException.ThrowableWithExtraContext qt = 293 new RetriesExhaustedException.ThrowableWithExtraContext(t, 294 EnvironmentEdgeManager.currentTime(), null); 295 296 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = 297 Collections.singletonList(qt); 298 299 throw new RetriesExhaustedException(retries, exceptions); 300 } 301 302 /** 303 * Creates the calls and submit them 304 * 305 * @param cs - the completion service to use for submitting 306 * @param rl - the region locations 307 * @param min - the id of the first replica, inclusive 308 * @param max - the id of the last replica, inclusive. 309 */ 310 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, 311 RegionLocations rl, int min, int max) { 312 for (int id = min; id <= max; id++) { 313 HRegionLocation hrl = rl.getRegionLocation(id); 314 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); 315 cs.submit(callOnReplica, operationTimeout, id); 316 } 317 } 318 319 static RegionLocations getRegionLocations(boolean useCache, int replicaId, 320 ClusterConnection cConnection, TableName tableName, byte[] row) 321 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 322 323 RegionLocations rl; 324 try { 325 if (useCache) { 326 rl = cConnection.locateRegion(tableName, row, true, true, replicaId); 327 } else { 328 rl = cConnection.relocateRegion(tableName, row, replicaId); 329 } 330 } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { 331 throw e; 332 } catch (IOException e) { 333 throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e); 334 } 335 if (rl == null) { 336 throw new RetriesExhaustedException("Can't get the location for replica " + replicaId); 337 } 338 339 return rl; 340 } 341}