001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.lang.reflect.UndeclaredThrowableException;
027import java.net.SocketTimeoutException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032import org.apache.hadoop.hbase.CallQueueTooBigException;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.ExceptionUtil;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.hadoop.util.StringUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
043
044/**
045 * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
046 * threadlocal outstanding timeouts as so we don't persist too much.
047 * Dynamic rather than static so can set the generic appropriately.
048 *
049 * This object has a state. It should not be used by in parallel by different threads.
050 * Reusing it is possible however, even between multiple threads. However, the user will
051 *  have to manage the synchronization on its side: there is no synchronization inside the class.
052 */
053@InterfaceAudience.Private
054public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
055  // LOG is being used in TestMultiRowRangeFilter, hence leaving it public
056  public static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerImpl.class);
057
058  /** How many retries are allowed before we start to log */
059  private final int startLogErrorsCnt;
060
061  private final long pause;
062  private final long pauseForCQTBE;
063  private final int maxAttempts;// how many times to try
064  private final int rpcTimeout;// timeout for each rpc request
065  private final AtomicBoolean cancelled = new AtomicBoolean(false);
066  private final RetryingCallerInterceptor interceptor;
067  private final RetryingCallerInterceptorContext context;
068  private final RetryingTimeTracker tracker;
069
070  public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) {
071    this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,
072        startLogErrorsCnt, 0);
073  }
074
075  public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries,
076      RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
077    this.pause = pause;
078    this.pauseForCQTBE = pauseForCQTBE;
079    this.maxAttempts = retries2Attempts(retries);
080    this.interceptor = interceptor;
081    context = interceptor.createEmptyContext();
082    this.startLogErrorsCnt = startLogErrorsCnt;
083    this.tracker = new RetryingTimeTracker();
084    this.rpcTimeout = rpcTimeout;
085  }
086
087  @Override
088  public void cancel(){
089    cancelled.set(true);
090    synchronized (cancelled){
091      cancelled.notifyAll();
092    }
093  }
094
095  @Override
096  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
097  throws IOException, RuntimeException {
098    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<>();
099    tracker.start();
100    context.clear();
101    for (int tries = 0;; tries++) {
102      long expectedSleep;
103      try {
104        // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
105        callable.prepare(tries != 0);
106        interceptor.intercept(context.prepare(callable, tries));
107        return callable.call(getTimeout(callTimeout));
108      } catch (PreemptiveFastFailException e) {
109        throw e;
110      } catch (Throwable t) {
111        Throwable e = t.getCause();
112        ExceptionUtil.rethrowIfInterrupt(t);
113        Throwable cause = t.getCause();
114        if (cause instanceof DoNotRetryIOException) {
115          // Fail fast
116          throw (DoNotRetryIOException) cause;
117        }
118        // translateException throws exception when should not retry: i.e. when request is bad.
119        interceptor.handleFailure(context, t);
120        t = translateException(t);
121
122        if (tries > startLogErrorsCnt) {
123          if (LOG.isInfoEnabled()) {
124            StringBuilder builder = new StringBuilder("Call exception, tries=").append(tries)
125                .append(", retries=").append(maxAttempts).append(", started=")
126                .append((EnvironmentEdgeManager.currentTime() - tracker.getStartTime()))
127                .append(" ms ago, ").append("cancelled=").append(cancelled.get())
128                .append(", msg=").append(t.getMessage())
129                .append(", details=").append(callable.getExceptionMessageAdditionalDetail())
130                .append(", see https://s.apache.org/timeout");
131            if (LOG.isDebugEnabled()) {
132              builder.append(", exception=").append(StringUtils.stringifyException(t));
133              LOG.debug(builder.toString());
134            } else {
135              LOG.info(builder.toString());
136            }
137          }
138        }
139
140        callable.throwable(t, maxAttempts != 1);
141        RetriesExhaustedException.ThrowableWithExtraContext qt =
142            new RetriesExhaustedException.ThrowableWithExtraContext(t,
143                EnvironmentEdgeManager.currentTime(), toString());
144        exceptions.add(qt);
145        if (tries >= maxAttempts - 1) {
146          throw new RetriesExhaustedException(tries, exceptions);
147        }
148        // If the server is dead, we need to wait a little before retrying, to give
149        // a chance to the regions to be moved
150        // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
151        // special when encountering CallQueueTooBigException, see #HBASE-17114
152        long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
153        expectedSleep = callable.sleep(pauseBase, tries);
154
155        // If, after the planned sleep, there won't be enough time left, we stop now.
156        long duration = singleCallDuration(expectedSleep);
157        if (duration > callTimeout) {
158          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
159              ": " +  t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
160          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
161        }
162      } finally {
163        interceptor.updateFailureInfo(context);
164      }
165      try {
166        if (expectedSleep > 0) {
167          synchronized (cancelled) {
168            if (cancelled.get()) return null;
169            cancelled.wait(expectedSleep);
170          }
171        }
172        if (cancelled.get()) return null;
173      } catch (InterruptedException e) {
174        throw new InterruptedIOException("Interrupted after " + tries
175            + " tries while maxAttempts=" + maxAttempts);
176      }
177    }
178  }
179
180  /**
181   * @return Calculate how long a single call took
182   */
183  private long singleCallDuration(final long expectedSleep) {
184    return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
185  }
186
187  @Override
188  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
189  throws IOException, RuntimeException {
190    // The code of this method should be shared with withRetries.
191    try {
192      callable.prepare(false);
193      return callable.call(callTimeout);
194    } catch (Throwable t) {
195      Throwable t2 = translateException(t);
196      ExceptionUtil.rethrowIfInterrupt(t2);
197      // It would be nice to clear the location cache here.
198      if (t2 instanceof IOException) {
199        throw (IOException)t2;
200      } else {
201        throw new RuntimeException(t2);
202      }
203    }
204  }
205
206  /**
207   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
208   * @param t the throwable to analyze
209   * @return the translated exception, if it's not a DoNotRetryIOException
210   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
211   */
212  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
213    if (t instanceof UndeclaredThrowableException) {
214      if (t.getCause() != null) {
215        t = t.getCause();
216      }
217    }
218    if (t instanceof RemoteException) {
219      t = ((RemoteException)t).unwrapRemoteException();
220    }
221    if (t instanceof LinkageError) {
222      throw new DoNotRetryIOException(t);
223    }
224    if (t instanceof ServiceException) {
225      ServiceException se = (ServiceException)t;
226      Throwable cause = se.getCause();
227      if (cause != null && cause instanceof DoNotRetryIOException) {
228        throw (DoNotRetryIOException)cause;
229      }
230      // Don't let ServiceException out; its rpc specific.
231      t = cause;
232      // t could be a RemoteException so go around again.
233      translateException(t);
234    } else if (t instanceof DoNotRetryIOException) {
235      throw (DoNotRetryIOException)t;
236    }
237    return t;
238  }
239
240  private int getTimeout(int callTimeout){
241    int timeout = tracker.getRemainingTime(callTimeout);
242    if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
243      timeout = rpcTimeout;
244    }
245    return timeout;
246  }
247
248  @Override
249  public String toString() {
250    return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +
251        ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
252  }
253}