001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.function.Consumer; 033import java.util.function.Supplier; 034import org.apache.hadoop.hbase.CallQueueTooBigException; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.NotServingRegionException; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TableNotEnabledException; 039import org.apache.hadoop.hbase.TableNotFoundException; 040import org.apache.hadoop.hbase.exceptions.ScannerResetException; 041import org.apache.hadoop.hbase.ipc.HBaseRpcController; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.FutureUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.io.netty.util.Timer; 049 050@InterfaceAudience.Private 051public abstract class AsyncRpcRetryingCaller<T> { 052 053 private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class); 054 055 private final Timer retryTimer; 056 057 private final int priority; 058 059 private final long startNs; 060 061 private final long pauseNs; 062 063 private final long pauseForCQTBENs; 064 065 private int tries = 1; 066 067 private final int maxAttempts; 068 069 private final int startLogErrorsCnt; 070 071 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 072 073 private final long rpcTimeoutNs; 074 075 protected final long operationTimeoutNs; 076 077 protected final AsyncConnectionImpl conn; 078 079 protected final CompletableFuture<T> future; 080 081 protected final HBaseRpcController controller; 082 083 public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, 084 long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, 085 long rpcTimeoutNs, int startLogErrorsCnt) { 086 this.retryTimer = retryTimer; 087 this.conn = conn; 088 this.priority = priority; 089 this.pauseNs = pauseNs; 090 this.pauseForCQTBENs = pauseForCQTBENs; 091 this.maxAttempts = maxAttempts; 092 this.operationTimeoutNs = operationTimeoutNs; 093 this.rpcTimeoutNs = rpcTimeoutNs; 094 this.startLogErrorsCnt = startLogErrorsCnt; 095 this.future = new CompletableFuture<>(); 096 this.controller = conn.rpcControllerFactory.newController(); 097 this.controller.setPriority(priority); 098 this.exceptions = new ArrayList<>(); 099 this.startNs = System.nanoTime(); 100 } 101 102 private long elapsedMs() { 103 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); 104 } 105 106 protected final long remainingTimeNs() { 107 return operationTimeoutNs - (System.nanoTime() - startNs); 108 } 109 110 protected final void completeExceptionally() { 111 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 112 } 113 114 protected final void resetCallTimeout() { 115 long callTimeoutNs; 116 if (operationTimeoutNs > 0) { 117 callTimeoutNs = remainingTimeNs(); 118 if (callTimeoutNs <= 0) { 119 completeExceptionally(); 120 return; 121 } 122 callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); 123 } else { 124 callTimeoutNs = rpcTimeoutNs; 125 } 126 resetController(controller, callTimeoutNs, priority); 127 } 128 129 private void tryScheduleRetry(Throwable error) { 130 long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; 131 long delayNs; 132 if (operationTimeoutNs > 0) { 133 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 134 if (maxDelayNs <= 0) { 135 completeExceptionally(); 136 return; 137 } 138 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 139 } else { 140 delayNs = getPauseTime(pauseNsToUse, tries - 1); 141 } 142 tries++; 143 retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); 144 } 145 146 protected Optional<TableName> getTableName() { 147 return Optional.empty(); 148 } 149 150 protected final void onError(Throwable t, Supplier<String> errMsg, 151 Consumer<Throwable> updateCachedLocation) { 152 if (future.isDone()) { 153 // Give up if the future is already done, this is possible if user has already canceled the 154 // future. And for timeline consistent read, we will also cancel some requests if we have 155 // already get one of the responses. 156 LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); 157 return; 158 } 159 Throwable error = translateException(t); 160 // We use this retrying caller to open a scanner, as it is idempotent, but we may throw 161 // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will 162 // also fetch data when opening a scanner. The intention here is that if we hit a 163 // ScannerResetException when scanning then we should try to open a new scanner, instead of 164 // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are 165 // exactly trying to open a new scanner, so we should retry on ScannerResetException. 166 if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) { 167 future.completeExceptionally(error); 168 return; 169 } 170 if (tries > startLogErrorsCnt) { 171 LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts + 172 ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + 173 " ms, time elapsed = " + elapsedMs() + " ms", error); 174 } 175 updateCachedLocation.accept(error); 176 RetriesExhaustedException.ThrowableWithExtraContext qt = 177 new RetriesExhaustedException.ThrowableWithExtraContext(error, 178 EnvironmentEdgeManager.currentTime(), ""); 179 exceptions.add(qt); 180 if (tries >= maxAttempts) { 181 completeExceptionally(); 182 return; 183 } 184 // check whether the table has been disabled, notice that the check will introduce a request to 185 // meta, so here we only check for disabled for some specific exception types. 186 if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { 187 Optional<TableName> tableName = getTableName(); 188 if (tableName.isPresent()) { 189 FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> { 190 if (e != null) { 191 if (e instanceof TableNotFoundException) { 192 future.completeExceptionally(e); 193 } else { 194 // failed to test whether the table is disabled, not a big deal, continue retrying 195 tryScheduleRetry(error); 196 } 197 return; 198 } 199 if (disabled) { 200 future.completeExceptionally(new TableNotEnabledException(tableName.get())); 201 } else { 202 tryScheduleRetry(error); 203 } 204 }); 205 } else { 206 tryScheduleRetry(error); 207 } 208 } else { 209 tryScheduleRetry(error); 210 } 211 } 212 213 protected abstract void doCall(); 214 215 CompletableFuture<T> call() { 216 doCall(); 217 return future; 218 } 219}