001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.function.Consumer; 033import java.util.function.Supplier; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.NotServingRegionException; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.TableNotEnabledException; 038import org.apache.hadoop.hbase.TableNotFoundException; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.FutureUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.io.netty.util.Timer; 047 048@InterfaceAudience.Private 049public abstract class AsyncRpcRetryingCaller<T> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class); 052 053 private final Timer retryTimer; 054 055 private final long startNs; 056 057 private final long pauseNs; 058 059 private int tries = 1; 060 061 private final int maxAttempts; 062 063 private final int startLogErrorsCnt; 064 065 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 066 067 private final long rpcTimeoutNs; 068 069 protected final long operationTimeoutNs; 070 071 protected final AsyncConnectionImpl conn; 072 073 protected final CompletableFuture<T> future; 074 075 protected final HBaseRpcController controller; 076 077 public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, 078 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 079 this.retryTimer = retryTimer; 080 this.conn = conn; 081 this.pauseNs = pauseNs; 082 this.maxAttempts = maxAttempts; 083 this.operationTimeoutNs = operationTimeoutNs; 084 this.rpcTimeoutNs = rpcTimeoutNs; 085 this.startLogErrorsCnt = startLogErrorsCnt; 086 this.future = new CompletableFuture<>(); 087 this.controller = conn.rpcControllerFactory.newController(); 088 this.exceptions = new ArrayList<>(); 089 this.startNs = System.nanoTime(); 090 } 091 092 private long elapsedMs() { 093 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); 094 } 095 096 protected final long remainingTimeNs() { 097 return operationTimeoutNs - (System.nanoTime() - startNs); 098 } 099 100 protected final void completeExceptionally() { 101 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 102 } 103 104 protected final void resetCallTimeout() { 105 long callTimeoutNs; 106 if (operationTimeoutNs > 0) { 107 callTimeoutNs = remainingTimeNs(); 108 if (callTimeoutNs <= 0) { 109 completeExceptionally(); 110 return; 111 } 112 callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); 113 } else { 114 callTimeoutNs = rpcTimeoutNs; 115 } 116 resetController(controller, callTimeoutNs); 117 } 118 119 private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) { 120 long delayNs; 121 if (operationTimeoutNs > 0) { 122 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 123 if (maxDelayNs <= 0) { 124 completeExceptionally(); 125 return; 126 } 127 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); 128 } else { 129 delayNs = getPauseTime(pauseNs, tries - 1); 130 } 131 tries++; 132 retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); 133 } 134 135 protected Optional<TableName> getTableName() { 136 return Optional.empty(); 137 } 138 139 protected final void onError(Throwable t, Supplier<String> errMsg, 140 Consumer<Throwable> updateCachedLocation) { 141 if (future.isDone()) { 142 // Give up if the future is already done, this is possible if user has already canceled the 143 // future. And for timeline consistent read, we will also cancel some requests if we have 144 // already get one of the responses. 145 LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); 146 return; 147 } 148 Throwable error = translateException(t); 149 if (error instanceof DoNotRetryIOException) { 150 future.completeExceptionally(error); 151 return; 152 } 153 if (tries > startLogErrorsCnt) { 154 LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts + 155 ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + 156 " ms, time elapsed = " + elapsedMs() + " ms", error); 157 } 158 updateCachedLocation.accept(error); 159 RetriesExhaustedException.ThrowableWithExtraContext qt = 160 new RetriesExhaustedException.ThrowableWithExtraContext(error, 161 EnvironmentEdgeManager.currentTime(), ""); 162 exceptions.add(qt); 163 if (tries >= maxAttempts) { 164 completeExceptionally(); 165 return; 166 } 167 // check whether the table has been disabled, notice that the check will introduce a request to 168 // meta, so here we only check for disabled for some specific exception types. 169 if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { 170 Optional<TableName> tableName = getTableName(); 171 if (tableName.isPresent()) { 172 FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> { 173 if (e != null) { 174 if (e instanceof TableNotFoundException) { 175 future.completeExceptionally(e); 176 } else { 177 // failed to test whether the table is disabled, not a big deal, continue retrying 178 tryScheduleRetry(error, updateCachedLocation); 179 } 180 return; 181 } 182 if (disabled) { 183 future.completeExceptionally(new TableNotEnabledException(tableName.get())); 184 } else { 185 tryScheduleRetry(error, updateCachedLocation); 186 } 187 }); 188 } 189 } else { 190 tryScheduleRetry(error, updateCachedLocation); 191 } 192 } 193 194 protected abstract void doCall(); 195 196 CompletableFuture<T> call() { 197 doCall(); 198 return future; 199 } 200}