001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.function.Consumer; 031import java.util.function.Supplier; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseServerException; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotEnabledException; 037import org.apache.hadoop.hbase.TableNotFoundException; 038import org.apache.hadoop.hbase.exceptions.ScannerResetException; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.FutureUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.io.netty.util.Timer; 047 048@InterfaceAudience.Private 049public abstract class AsyncRpcRetryingCaller<T> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class); 052 053 private final Timer retryTimer; 054 055 private final int priority; 056 057 private final long startNs; 058 059 private final long pauseNs; 060 061 private final long pauseNsForServerOverloaded; 062 063 private int tries = 1; 064 065 private final int maxAttempts; 066 067 private final int startLogErrorsCnt; 068 069 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 070 071 private final long rpcTimeoutNs; 072 073 protected final long operationTimeoutNs; 074 075 protected final AsyncConnectionImpl conn; 076 077 protected final CompletableFuture<T> future; 078 079 protected final HBaseRpcController controller; 080 081 public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, 082 long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, 083 long rpcTimeoutNs, int startLogErrorsCnt) { 084 this.retryTimer = retryTimer; 085 this.conn = conn; 086 this.priority = priority; 087 this.pauseNs = pauseNs; 088 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 089 this.maxAttempts = maxAttempts; 090 this.operationTimeoutNs = operationTimeoutNs; 091 this.rpcTimeoutNs = rpcTimeoutNs; 092 this.startLogErrorsCnt = startLogErrorsCnt; 093 this.future = new CompletableFuture<>(); 094 this.controller = conn.rpcControllerFactory.newController(); 095 this.controller.setPriority(priority); 096 this.exceptions = new ArrayList<>(); 097 this.startNs = System.nanoTime(); 098 } 099 100 private long elapsedMs() { 101 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); 102 } 103 104 protected final long remainingTimeNs() { 105 return operationTimeoutNs - (System.nanoTime() - startNs); 106 } 107 108 protected final void completeExceptionally() { 109 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 110 } 111 112 protected final void resetCallTimeout() { 113 long callTimeoutNs; 114 if (operationTimeoutNs > 0) { 115 callTimeoutNs = remainingTimeNs(); 116 if (callTimeoutNs <= 0) { 117 completeExceptionally(); 118 return; 119 } 120 callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); 121 } else { 122 callTimeoutNs = rpcTimeoutNs; 123 } 124 resetController(controller, callTimeoutNs, priority); 125 } 126 127 private void tryScheduleRetry(Throwable error) { 128 long pauseNsToUse = 129 HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; 130 long delayNs; 131 if (operationTimeoutNs > 0) { 132 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 133 if (maxDelayNs <= 0) { 134 completeExceptionally(); 135 return; 136 } 137 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 138 } else { 139 delayNs = getPauseTime(pauseNsToUse, tries - 1); 140 } 141 tries++; 142 retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); 143 } 144 145 protected Optional<TableName> getTableName() { 146 return Optional.empty(); 147 } 148 149 protected final void onError(Throwable t, Supplier<String> errMsg, 150 Consumer<Throwable> updateCachedLocation) { 151 if (future.isDone()) { 152 // Give up if the future is already done, this is possible if user has already canceled the 153 // future. And for timeline consistent read, we will also cancel some requests if we have 154 // already get one of the responses. 155 LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); 156 return; 157 } 158 Throwable error = translateException(t); 159 // We use this retrying caller to open a scanner, as it is idempotent, but we may throw 160 // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will 161 // also fetch data when opening a scanner. The intention here is that if we hit a 162 // ScannerResetException when scanning then we should try to open a new scanner, instead of 163 // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are 164 // exactly trying to open a new scanner, so we should retry on ScannerResetException. 165 if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) { 166 future.completeExceptionally(error); 167 return; 168 } 169 if (tries > startLogErrorsCnt) { 170 LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts 171 + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) 172 + " ms, time elapsed = " + elapsedMs() + " ms", error); 173 } 174 updateCachedLocation.accept(error); 175 RetriesExhaustedException.ThrowableWithExtraContext qt = 176 new RetriesExhaustedException.ThrowableWithExtraContext(error, 177 EnvironmentEdgeManager.currentTime(), ""); 178 exceptions.add(qt); 179 if (tries >= maxAttempts) { 180 completeExceptionally(); 181 return; 182 } 183 // check whether the table has been disabled, notice that the check will introduce a request to 184 // meta, so here we only check for disabled for some specific exception types. 185 if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { 186 Optional<TableName> tableName = getTableName(); 187 if (tableName.isPresent()) { 188 FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> { 189 if (e != null) { 190 if (e instanceof TableNotFoundException) { 191 future.completeExceptionally(e); 192 } else { 193 // failed to test whether the table is disabled, not a big deal, continue retrying 194 tryScheduleRetry(error); 195 } 196 return; 197 } 198 if (disabled) { 199 future.completeExceptionally(new TableNotEnabledException(tableName.get())); 200 } else { 201 tryScheduleRetry(error); 202 } 203 }); 204 } else { 205 tryScheduleRetry(error); 206 } 207 } else { 208 tryScheduleRetry(error); 209 } 210 } 211 212 protected abstract void doCall(); 213 214 CompletableFuture<T> call() { 215 doCall(); 216 return future; 217 } 218}