001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020 021import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.Collections; 026import java.util.List; 027import java.util.concurrent.CancellationException; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseIOException; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 050 051/** 052 * Caller that goes to replica if the primary region does no answer within a configurable 053 * timeout. If the timeout is reached, it calls all the secondary replicas, and returns 054 * the first answer. If the answer comes from one of the secondary replica, it will 055 * be marked as stale. 056 */ 057@InterfaceAudience.Private 058public class RpcRetryingCallerWithReadReplicas { 059 private static final Logger LOG = 060 LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class); 061 062 protected final ExecutorService pool; 063 protected final ClusterConnection cConnection; 064 protected final Configuration conf; 065 protected final Get get; 066 protected final TableName tableName; 067 protected final int timeBeforeReplicas; 068 private final int operationTimeout; 069 private final int rpcTimeout; 070 private final int retries; 071 private final RpcControllerFactory rpcControllerFactory; 072 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 073 074 public RpcRetryingCallerWithReadReplicas( 075 RpcControllerFactory rpcControllerFactory, TableName tableName, 076 ClusterConnection cConnection, final Get get, 077 ExecutorService pool, int retries, int operationTimeout, int rpcTimeout, 078 int timeBeforeReplicas) { 079 this.rpcControllerFactory = rpcControllerFactory; 080 this.tableName = tableName; 081 this.cConnection = cConnection; 082 this.conf = cConnection.getConfiguration(); 083 this.get = get; 084 this.pool = pool; 085 this.retries = retries; 086 this.operationTimeout = operationTimeout; 087 this.rpcTimeout = rpcTimeout; 088 this.timeBeforeReplicas = timeBeforeReplicas; 089 this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); 090 } 091 092 /** 093 * A RegionServerCallable that takes into account the replicas, i.e. 094 * - the call can be on any replica 095 * - we need to stop retrying when the call is completed 096 * - we can be interrupted 097 */ 098 class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> { 099 final int id; 100 public ReplicaRegionServerCallable(int id, HRegionLocation location) { 101 super(RpcRetryingCallerWithReadReplicas.this.cConnection, 102 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), 103 rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET); 104 this.id = id; 105 this.location = location; 106 } 107 108 /** 109 * Two responsibilities 110 * - if the call is already completed (by another replica) stops the retries. 111 * - set the location to the right region, depending on the replica. 112 */ 113 @Override 114 // TODO: Very like the super class implemenation. Can we shrink this down? 115 public void prepare(final boolean reload) throws IOException { 116 if (getRpcController().isCanceled()) return; 117 if (Thread.interrupted()) { 118 throw new InterruptedIOException(); 119 } 120 if (reload || location == null) { 121 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); 122 location = id < rl.size() ? rl.getRegionLocation(id) : null; 123 } 124 125 if (location == null || location.getServerName() == null) { 126 // With this exception, there will be a retry. The location can be null for a replica 127 // when the table is created or after a split. 128 throw new HBaseIOException("There is no location for replica id #" + id); 129 } 130 131 setStubByServiceName(this.location.getServerName()); 132 } 133 134 @Override 135 // TODO: Very like the super class implemenation. Can we shrink this down? 136 protected Result rpcCall() throws Exception { 137 if (getRpcController().isCanceled()) return null; 138 if (Thread.interrupted()) { 139 throw new InterruptedIOException(); 140 } 141 byte[] reg = location.getRegionInfo().getRegionName(); 142 ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); 143 HBaseRpcController hrc = (HBaseRpcController)getRpcController(); 144 hrc.reset(); 145 hrc.setCallTimeout(rpcTimeout); 146 hrc.setPriority(tableName); 147 ClientProtos.GetResponse response = getStub().get(hrc, request); 148 if (response == null) { 149 return null; 150 } 151 return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner()); 152 } 153 } 154 155 /** 156 * <p> 157 * Algo: 158 * - we put the query into the execution pool. 159 * - after x ms, if we don't have a result, we add the queries for the secondary replicas 160 * - we take the first answer 161 * - when done, we cancel what's left. Cancelling means: 162 * - removing from the pool if the actual call was not started 163 * - interrupting the call if it has started 164 * Client side, we need to take into account 165 * - a call is not executed immediately after being put into the pool 166 * - a call is a thread. Let's not multiply the number of thread by the number of replicas. 167 * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call 168 * can take some i/o. 169 * </p> 170 * Globally, the number of retries, timeout and so on still applies, but it's per replica, 171 * not global. We continue until all retries are done, or all timeouts are exceeded. 172 */ 173 public Result call(int operationTimeout) 174 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { 175 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); 176 177 RegionLocations rl = null; 178 boolean skipPrimary = false; 179 try { 180 rl = getRegionLocations(true, 181 (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), 182 cConnection, tableName, get.getRow()); 183 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 184 // When there is no specific replica id specified. It just needs to load all replicas. 185 if (isTargetReplicaSpecified) { 186 throw e; 187 } else { 188 // We cannot get the primary replica location, it is possible that the region 189 // server hosting meta is down, it needs to proceed to try cached replicas. 190 if (cConnection instanceof ConnectionImplementation) { 191 rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow()); 192 if (rl == null) { 193 // No cached locations 194 throw e; 195 } 196 197 // Primary replica location is not known, skip primary replica 198 skipPrimary = true; 199 } else { 200 // For completeness 201 throw e; 202 } 203 } 204 } 205 206 final ResultBoundedCompletionService<Result> cs = 207 new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); 208 int startIndex = 0; 209 int endIndex = rl.size(); 210 211 if(isTargetReplicaSpecified) { 212 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); 213 endIndex = 1; 214 } else { 215 if (!skipPrimary) { 216 addCallsForReplica(cs, rl, 0, 0); 217 try { 218 // wait for the timeout to see whether the primary responds back 219 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds 220 if (f != null) { 221 return f.get(); //great we got a response 222 } 223 if (cConnection.getConnectionMetrics() != null) { 224 cConnection.getConnectionMetrics().incrHedgedReadOps(); 225 } 226 } catch (ExecutionException e) { 227 // We ignore the ExecutionException and continue with the secondary replicas 228 if (LOG.isDebugEnabled()) { 229 LOG.debug("Primary replica returns " + e.getCause()); 230 } 231 232 // Skip the result from the primary as we know that there is something wrong 233 startIndex = 1; 234 } catch (CancellationException e) { 235 throw new InterruptedIOException(); 236 } catch (InterruptedException e) { 237 throw new InterruptedIOException(); 238 } 239 } else { 240 // Since primary replica is skipped, the endIndex needs to be adjusted accordingly 241 endIndex --; 242 } 243 244 // submit call for the all of the secondaries at once 245 addCallsForReplica(cs, rl, 1, rl.size() - 1); 246 } 247 try { 248 ResultBoundedCompletionService<Result>.QueueingFuture<Result> f = 249 cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex); 250 if (f == null) { 251 throw new RetriesExhaustedException("Timed out after " + operationTimeout + 252 "ms. Get is sent to replicas with startIndex: " + startIndex + 253 ", endIndex: " + endIndex + ", Locations: " + rl); 254 } 255 if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && 256 !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { 257 cConnection.getConnectionMetrics().incrHedgedReadWin(); 258 } 259 return f.get(); 260 } catch (ExecutionException e) { 261 throwEnrichedException(e, retries); 262 } catch (CancellationException e) { 263 throw new InterruptedIOException(); 264 } catch (InterruptedException e) { 265 throw new InterruptedIOException(); 266 } finally { 267 // We get there because we were interrupted or because one or more of the 268 // calls succeeded or failed. In all case, we stop all our tasks. 269 cs.cancelAll(); 270 } 271 272 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 273 return null; // unreachable 274 } 275 276 /** 277 * Extract the real exception from the ExecutionException, and throws what makes more 278 * sense. 279 */ 280 static void throwEnrichedException(ExecutionException e, int retries) 281 throws RetriesExhaustedException, DoNotRetryIOException { 282 Throwable t = e.getCause(); 283 assert t != null; // That's what ExecutionException is about: holding an exception 284 t.printStackTrace(); 285 286 if (t instanceof RetriesExhaustedException) { 287 throw (RetriesExhaustedException) t; 288 } 289 290 if (t instanceof DoNotRetryIOException) { 291 throw (DoNotRetryIOException) t; 292 } 293 294 RetriesExhaustedException.ThrowableWithExtraContext qt = 295 new RetriesExhaustedException.ThrowableWithExtraContext(t, 296 EnvironmentEdgeManager.currentTime(), null); 297 298 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = 299 Collections.singletonList(qt); 300 301 throw new RetriesExhaustedException(retries, exceptions); 302 } 303 304 /** 305 * Creates the calls and submit them 306 * 307 * @param cs - the completion service to use for submitting 308 * @param rl - the region locations 309 * @param min - the id of the first replica, inclusive 310 * @param max - the id of the last replica, inclusive. 311 */ 312 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, 313 RegionLocations rl, int min, int max) { 314 for (int id = min; id <= max; id++) { 315 HRegionLocation hrl = rl.getRegionLocation(id); 316 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); 317 cs.submit(callOnReplica, operationTimeout, id); 318 } 319 } 320 321 static RegionLocations getRegionLocations(boolean useCache, int replicaId, 322 ClusterConnection cConnection, TableName tableName, byte[] row) 323 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 324 325 RegionLocations rl; 326 try { 327 if (useCache) { 328 rl = cConnection.locateRegion(tableName, row, true, true, replicaId); 329 } else { 330 rl = cConnection.relocateRegion(tableName, row, replicaId); 331 } 332 } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { 333 throw e; 334 } catch (IOException e) { 335 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 336 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e); 337 } 338 if (rl == null) { 339 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 340 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName); 341 } 342 343 return rl; 344 } 345}