001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.OptionalLong; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.function.Consumer; 031import java.util.function.Supplier; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseServerException; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotEnabledException; 037import org.apache.hadoop.hbase.TableNotFoundException; 038import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; 039import org.apache.hadoop.hbase.exceptions.ScannerResetException; 040import org.apache.hadoop.hbase.ipc.HBaseRpcController; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.util.FutureUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.io.netty.util.Timer; 048 049@InterfaceAudience.Private 050public abstract class AsyncRpcRetryingCaller<T> { 051 052 private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class); 053 054 private final Timer retryTimer; 055 056 private final int priority; 057 058 private final long startNs; 059 060 private int tries = 1; 061 062 private final int maxAttempts; 063 064 private final int startLogErrorsCnt; 065 066 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 067 068 private final long rpcTimeoutNs; 069 070 protected final long operationTimeoutNs; 071 072 protected final AsyncConnectionImpl conn; 073 074 protected final CompletableFuture<T> future; 075 076 protected final HBaseRpcController controller; 077 078 private final HBaseServerExceptionPauseManager pauseManager; 079 080 public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, 081 long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, 082 long rpcTimeoutNs, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) { 083 this.retryTimer = retryTimer; 084 this.conn = conn; 085 this.priority = priority; 086 this.maxAttempts = maxAttempts; 087 this.operationTimeoutNs = operationTimeoutNs; 088 this.rpcTimeoutNs = rpcTimeoutNs; 089 this.startLogErrorsCnt = startLogErrorsCnt; 090 this.future = new CompletableFuture<>(); 091 this.controller = conn.rpcControllerFactory.newController(); 092 this.controller.setRequestAttributes(requestAttributes); 093 this.exceptions = new ArrayList<>(); 094 this.startNs = System.nanoTime(); 095 this.pauseManager = 096 new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); 097 } 098 099 private long elapsedMs() { 100 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); 101 } 102 103 protected final long remainingTimeNs() { 104 return pauseManager.remainingTimeNs(startNs); 105 } 106 107 protected final void completeExceptionally() { 108 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 109 } 110 111 protected final void resetCallTimeout() { 112 long callTimeoutNs; 113 if (operationTimeoutNs > 0) { 114 callTimeoutNs = remainingTimeNs(); 115 if (callTimeoutNs <= 0) { 116 completeExceptionally(); 117 return; 118 } 119 callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); 120 } else { 121 callTimeoutNs = rpcTimeoutNs; 122 } 123 resetController(controller, callTimeoutNs, priority, getTableName().orElse(null)); 124 } 125 126 private void tryScheduleRetry(Throwable error) { 127 OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs); 128 if (!maybePauseNsToUse.isPresent()) { 129 completeExceptionally(); 130 return; 131 } 132 long delayNs = maybePauseNsToUse.getAsLong(); 133 tries++; 134 if (HBaseServerException.isServerOverloaded(error)) { 135 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 136 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 137 } 138 retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); 139 } 140 141 protected Optional<TableName> getTableName() { 142 return Optional.empty(); 143 } 144 145 // Sub classes can override this method to change the error type, to control the retry logic. 146 // For example, during rolling upgrading, if we call this newly added method, we will get a 147 // UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to 148 // use the old method first, so the sub class could change the exception type to something not a 149 // DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to 150 // make the rpc call. 151 protected Throwable preProcessError(Throwable error) { 152 return error; 153 } 154 155 protected final void onError(Throwable t, Supplier<String> errMsg, 156 Consumer<Throwable> updateCachedLocation) { 157 if (future.isDone()) { 158 // Give up if the future is already done, this is possible if user has already canceled the 159 // future. And for timeline consistent read, we will also cancel some requests if we have 160 // already get one of the responses. 161 LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); 162 return; 163 } 164 Throwable error = preProcessError(translateException(t)); 165 // We use this retrying caller to open a scanner, as it is idempotent, but we may throw 166 // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will 167 // also fetch data when opening a scanner. The intention here is that if we hit a 168 // ScannerResetException when scanning then we should try to open a new scanner, instead of 169 // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are 170 // exactly trying to open a new scanner, so we should retry on ScannerResetException. 171 if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) { 172 future.completeExceptionally(error); 173 return; 174 } 175 if (tries > startLogErrorsCnt) { 176 LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts 177 + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) 178 + " ms, time elapsed = " + elapsedMs() + " ms", error); 179 } 180 updateCachedLocation.accept(error); 181 RetriesExhaustedException.ThrowableWithExtraContext qt = 182 new RetriesExhaustedException.ThrowableWithExtraContext(error, 183 EnvironmentEdgeManager.currentTime(), ""); 184 exceptions.add(qt); 185 if (tries >= maxAttempts) { 186 completeExceptionally(); 187 return; 188 } 189 // check whether the table has been disabled, notice that the check will introduce a request to 190 // meta, so here we only check for disabled for some specific exception types. 191 if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { 192 Optional<TableName> tableName = getTableName(); 193 if (tableName.isPresent()) { 194 FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> { 195 if (e != null) { 196 if (e instanceof TableNotFoundException) { 197 future.completeExceptionally(e); 198 } else { 199 // failed to test whether the table is disabled, not a big deal, continue retrying 200 tryScheduleRetry(error); 201 } 202 return; 203 } 204 if (disabled) { 205 future.completeExceptionally(new TableNotEnabledException(tableName.get())); 206 } else { 207 tryScheduleRetry(error); 208 } 209 }); 210 } else { 211 tryScheduleRetry(error); 212 } 213 } else { 214 tryScheduleRetry(error); 215 } 216 } 217 218 protected abstract void doCall(); 219 220 CompletableFuture<T> call() { 221 doCall(); 222 return future; 223 } 224}