001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.Collections; 025import java.util.List; 026import java.util.concurrent.CancellationException; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseIOException; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.RegionLocations; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 048 049/** 050 * Caller that goes to replica if the primary region does no answer within a configurable timeout. 051 * If the timeout is reached, it calls all the secondary replicas, and returns the first answer. If 052 * the answer comes from one of the secondary replica, it will be marked as stale. 053 */ 054@InterfaceAudience.Private 055public class RpcRetryingCallerWithReadReplicas { 056 private static final Logger LOG = 057 LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class); 058 059 protected final ExecutorService pool; 060 protected final ClusterConnection cConnection; 061 protected final Configuration conf; 062 protected final Get get; 063 protected final TableName tableName; 064 protected final int timeBeforeReplicas; 065 private final int operationTimeout; 066 private final int rpcTimeout; 067 private final int retries; 068 private final RpcControllerFactory rpcControllerFactory; 069 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 070 071 public RpcRetryingCallerWithReadReplicas(RpcControllerFactory rpcControllerFactory, 072 TableName tableName, ClusterConnection cConnection, final Get get, ExecutorService pool, 073 int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas) { 074 this.rpcControllerFactory = rpcControllerFactory; 075 this.tableName = tableName; 076 this.cConnection = cConnection; 077 this.conf = cConnection.getConfiguration(); 078 this.get = get; 079 this.pool = pool; 080 this.retries = retries; 081 this.operationTimeout = operationTimeout; 082 this.rpcTimeout = rpcTimeout; 083 this.timeBeforeReplicas = timeBeforeReplicas; 084 this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); 085 } 086 087 /** 088 * A RegionServerCallable that takes into account the replicas, i.e. - the call can be on any 089 * replica - we need to stop retrying when the call is completed - we can be interrupted 090 */ 091 class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> { 092 final int id; 093 094 public ReplicaRegionServerCallable(int id, HRegionLocation location) { 095 super(RpcRetryingCallerWithReadReplicas.this.cConnection, 096 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), 097 rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), 098 PRIORITY_UNSET); 099 this.id = id; 100 this.location = location; 101 } 102 103 /** 104 * Two responsibilities - if the call is already completed (by another replica) stops the 105 * retries. - set the location to the right region, depending on the replica. 106 */ 107 @Override 108 // TODO: Very like the super class implemenation. Can we shrink this down? 109 public void prepare(final boolean reload) throws IOException { 110 if (getRpcController().isCanceled()) return; 111 if (Thread.interrupted()) { 112 throw new InterruptedIOException(); 113 } 114 if (reload || location == null) { 115 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); 116 location = id < rl.size() ? rl.getRegionLocation(id) : null; 117 } 118 119 if (location == null || location.getServerName() == null) { 120 // With this exception, there will be a retry. The location can be null for a replica 121 // when the table is created or after a split. 122 throw new HBaseIOException("There is no location for replica id #" + id); 123 } 124 125 setStubByServiceName(this.location.getServerName()); 126 } 127 128 @Override 129 // TODO: Very like the super class implemenation. Can we shrink this down? 130 protected Result rpcCall() throws Exception { 131 if (getRpcController().isCanceled()) return null; 132 if (Thread.interrupted()) { 133 throw new InterruptedIOException(); 134 } 135 byte[] reg = location.getRegionInfo().getRegionName(); 136 ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); 137 HBaseRpcController hrc = (HBaseRpcController) getRpcController(); 138 hrc.reset(); 139 hrc.setCallTimeout(rpcTimeout); 140 hrc.setPriority(tableName); 141 ClientProtos.GetResponse response = getStub().get(hrc, request); 142 if (response == null) { 143 return null; 144 } 145 return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner()); 146 } 147 } 148 149 /** 150 * <p> 151 * Algo: - we put the query into the execution pool. - after x ms, if we don't have a result, we 152 * add the queries for the secondary replicas - we take the first answer - when done, we cancel 153 * what's left. Cancelling means: - removing from the pool if the actual call was not started - 154 * interrupting the call if it has started Client side, we need to take into account - a call is 155 * not executed immediately after being put into the pool - a call is a thread. Let's not multiply 156 * the number of thread by the number of replicas. Server side, if we can cancel when it's still 157 * in the handler pool, it's much better, as a call can take some i/o. 158 * </p> 159 * Globally, the number of retries, timeout and so on still applies, but it's per replica, not 160 * global. We continue until all retries are done, or all timeouts are exceeded. 161 */ 162 public Result call(int operationTimeout) 163 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { 164 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); 165 166 RegionLocations rl = null; 167 boolean skipPrimary = false; 168 try { 169 rl = getRegionLocations(true, 170 (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), 171 cConnection, tableName, get.getRow()); 172 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 173 // When there is no specific replica id specified. It just needs to load all replicas. 174 if (isTargetReplicaSpecified) { 175 throw e; 176 } else { 177 // We cannot get the primary replica location, it is possible that the region 178 // server hosting meta is down, it needs to proceed to try cached replicas. 179 if (cConnection instanceof ConnectionImplementation) { 180 rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName, get.getRow()); 181 if (rl == null) { 182 // No cached locations 183 throw e; 184 } 185 186 // Primary replica location is not known, skip primary replica 187 skipPrimary = true; 188 } else { 189 // For completeness 190 throw e; 191 } 192 } 193 } 194 195 final ResultBoundedCompletionService<Result> cs = 196 new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); 197 int startIndex = 0; 198 int endIndex = rl.size(); 199 200 if (isTargetReplicaSpecified) { 201 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); 202 endIndex = 1; 203 } else { 204 if (!skipPrimary) { 205 addCallsForReplica(cs, rl, 0, 0); 206 try { 207 // wait for the timeout to see whether the primary responds back 208 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, 209 // microseconds 210 if (f != null) { 211 return f.get(); // great we got a response 212 } 213 if (cConnection.getConnectionMetrics() != null) { 214 cConnection.getConnectionMetrics().incrHedgedReadOps(); 215 } 216 } catch (ExecutionException e) { 217 // We ignore the ExecutionException and continue with the secondary replicas 218 if (LOG.isDebugEnabled()) { 219 LOG.debug("Primary replica returns " + e.getCause()); 220 } 221 222 // Skip the result from the primary as we know that there is something wrong 223 startIndex = 1; 224 } catch (CancellationException e) { 225 throw new InterruptedIOException(); 226 } catch (InterruptedException e) { 227 throw new InterruptedIOException(); 228 } 229 } else { 230 // Since primary replica is skipped, the endIndex needs to be adjusted accordingly 231 endIndex--; 232 } 233 234 // submit call for the all of the secondaries at once 235 addCallsForReplica(cs, rl, 1, rl.size() - 1); 236 } 237 try { 238 ResultBoundedCompletionService<Result>.QueueingFuture<Result> f = 239 cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, 240 startIndex, endIndex); 241 if (f == null) { 242 throw new RetriesExhaustedException( 243 "Timed out after " + operationTimeout + "ms. Get is sent to replicas with startIndex: " 244 + startIndex + ", endIndex: " + endIndex + ", Locations: " + rl); 245 } 246 if ( 247 cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && !skipPrimary 248 && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID 249 ) { 250 cConnection.getConnectionMetrics().incrHedgedReadWin(); 251 } 252 return f.get(); 253 } catch (ExecutionException e) { 254 throwEnrichedException(e, retries); 255 } catch (CancellationException e) { 256 throw new InterruptedIOException(); 257 } catch (InterruptedException e) { 258 throw new InterruptedIOException(); 259 } finally { 260 // We get there because we were interrupted or because one or more of the 261 // calls succeeded or failed. In all case, we stop all our tasks. 262 cs.cancelAll(); 263 } 264 265 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 266 return null; // unreachable 267 } 268 269 /** 270 * Extract the real exception from the ExecutionException, and throws what makes more sense. 271 */ 272 static void throwEnrichedException(ExecutionException e, int retries) 273 throws RetriesExhaustedException, DoNotRetryIOException { 274 Throwable t = e.getCause(); 275 assert t != null; // That's what ExecutionException is about: holding an exception 276 t.printStackTrace(); 277 278 if (t instanceof RetriesExhaustedException) { 279 throw (RetriesExhaustedException) t; 280 } 281 282 if (t instanceof DoNotRetryIOException) { 283 throw (DoNotRetryIOException) t; 284 } 285 286 RetriesExhaustedException.ThrowableWithExtraContext qt = 287 new RetriesExhaustedException.ThrowableWithExtraContext(t, 288 EnvironmentEdgeManager.currentTime(), null); 289 290 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = 291 Collections.singletonList(qt); 292 293 throw new RetriesExhaustedException(retries, exceptions); 294 } 295 296 /** 297 * Creates the calls and submit them 298 * @param cs - the completion service to use for submitting 299 * @param rl - the region locations 300 * @param min - the id of the first replica, inclusive 301 * @param max - the id of the last replica, inclusive. 302 */ 303 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, RegionLocations rl, 304 int min, int max) { 305 for (int id = min; id <= max; id++) { 306 HRegionLocation hrl = rl.getRegionLocation(id); 307 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); 308 cs.submit(callOnReplica, rpcTimeout, operationTimeout, id); 309 } 310 } 311 312 static RegionLocations getRegionLocations(boolean useCache, int replicaId, 313 ClusterConnection cConnection, TableName tableName, byte[] row) 314 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 315 316 RegionLocations rl; 317 try { 318 if (useCache) { 319 rl = cConnection.locateRegion(tableName, row, true, true, replicaId); 320 } else { 321 rl = cConnection.relocateRegion(tableName, row, replicaId); 322 } 323 } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { 324 throw e; 325 } catch (IOException e) { 326 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 327 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e); 328 } 329 if (rl == null) { 330 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 331 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName); 332 } 333 334 return rl; 335 } 336}