001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.lang.reflect.UndeclaredThrowableException;
027import java.net.SocketTimeoutException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032import org.apache.hadoop.hbase.CallQueueTooBigException;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.ExceptionUtil;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.hadoop.util.StringUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
043
044/**
045 * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
046 * threadlocal outstanding timeouts as so we don't persist too much.
047 * Dynamic rather than static so can set the generic appropriately.
048 *
049 * This object has a state. It should not be used by in parallel by different threads.
050 * Reusing it is possible however, even between multiple threads. However, the user will
051 *  have to manage the synchronization on its side: there is no synchronization inside the class.
052 */
053@InterfaceAudience.Private
054public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
055  // LOG is being used in TestMultiRowRangeFilter, hence leaving it public
056  public static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerImpl.class);
057
058  /** How many retries are allowed before we start to log */
059  private final int startLogErrorsCnt;
060
061  private final long pause;
062  private final long pauseForCQTBE;
063  private final int maxAttempts;// how many times to try
064  private final int rpcTimeout;// timeout for each rpc request
065  private final AtomicBoolean cancelled = new AtomicBoolean(false);
066  private final RetryingCallerInterceptor interceptor;
067  private final RetryingCallerInterceptorContext context;
068  private final RetryingTimeTracker tracker;
069
070  public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) {
071    this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,
072        startLogErrorsCnt, 0);
073  }
074
075  public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries,
076      RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
077    this.pause = pause;
078    this.pauseForCQTBE = pauseForCQTBE;
079    this.maxAttempts = retries2Attempts(retries);
080    this.interceptor = interceptor;
081    context = interceptor.createEmptyContext();
082    this.startLogErrorsCnt = startLogErrorsCnt;
083    this.tracker = new RetryingTimeTracker();
084    this.rpcTimeout = rpcTimeout;
085  }
086
087  @Override
088  public void cancel(){
089    cancelled.set(true);
090    synchronized (cancelled){
091      cancelled.notifyAll();
092    }
093  }
094
095  @Override
096  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
097  throws IOException, RuntimeException {
098    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<>();
099    tracker.start();
100    context.clear();
101    for (int tries = 0;; tries++) {
102      long expectedSleep;
103      try {
104        // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
105        callable.prepare(tries != 0);
106        interceptor.intercept(context.prepare(callable, tries));
107        return callable.call(getTimeout(callTimeout));
108      } catch (PreemptiveFastFailException e) {
109        throw e;
110      } catch (Throwable t) {
111        ExceptionUtil.rethrowIfInterrupt(t);
112        Throwable cause = t.getCause();
113        if (cause instanceof DoNotRetryIOException) {
114          // Fail fast
115          throw (DoNotRetryIOException) cause;
116        }
117        // translateException throws exception when should not retry: i.e. when request is bad.
118        interceptor.handleFailure(context, t);
119        t = translateException(t);
120
121        if (tries > startLogErrorsCnt) {
122          if (LOG.isInfoEnabled()) {
123            StringBuilder builder = new StringBuilder("Call exception, tries=").append(tries)
124                .append(", retries=").append(maxAttempts).append(", started=")
125                .append((EnvironmentEdgeManager.currentTime() - tracker.getStartTime()))
126                .append(" ms ago, ").append("cancelled=").append(cancelled.get())
127                .append(", msg=").append(t.getMessage())
128                .append(", details=").append(callable.getExceptionMessageAdditionalDetail())
129                .append(", see https://s.apache.org/timeout");
130            if (LOG.isDebugEnabled()) {
131              builder.append(", exception=").append(StringUtils.stringifyException(t));
132              LOG.debug(builder.toString());
133            } else {
134              LOG.info(builder.toString());
135            }
136          }
137        }
138
139        callable.throwable(t, maxAttempts != 1);
140        RetriesExhaustedException.ThrowableWithExtraContext qt =
141            new RetriesExhaustedException.ThrowableWithExtraContext(t,
142                EnvironmentEdgeManager.currentTime(), toString());
143        exceptions.add(qt);
144        if (tries >= maxAttempts - 1) {
145          throw new RetriesExhaustedException(tries, exceptions);
146        }
147        // If the server is dead, we need to wait a little before retrying, to give
148        // a chance to the regions to be moved
149        // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
150        // special when encountering CallQueueTooBigException, see #HBASE-17114
151        long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
152        expectedSleep = callable.sleep(pauseBase, tries);
153
154        // If, after the planned sleep, there won't be enough time left, we stop now.
155        long duration = singleCallDuration(expectedSleep);
156        if (duration > callTimeout) {
157          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
158              ": " +  t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
159          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
160        }
161      } finally {
162        interceptor.updateFailureInfo(context);
163      }
164      try {
165        if (expectedSleep > 0) {
166          synchronized (cancelled) {
167            if (cancelled.get()) return null;
168            cancelled.wait(expectedSleep);
169          }
170        }
171        if (cancelled.get()) return null;
172      } catch (InterruptedException e) {
173        throw new InterruptedIOException("Interrupted after " + tries
174            + " tries while maxAttempts=" + maxAttempts);
175      }
176    }
177  }
178
179  /**
180   * @return Calculate how long a single call took
181   */
182  private long singleCallDuration(final long expectedSleep) {
183    return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
184  }
185
186  @Override
187  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
188  throws IOException, RuntimeException {
189    // The code of this method should be shared with withRetries.
190    try {
191      callable.prepare(false);
192      return callable.call(callTimeout);
193    } catch (Throwable t) {
194      Throwable t2 = translateException(t);
195      ExceptionUtil.rethrowIfInterrupt(t2);
196      // It would be nice to clear the location cache here.
197      if (t2 instanceof IOException) {
198        throw (IOException)t2;
199      } else {
200        throw new RuntimeException(t2);
201      }
202    }
203  }
204
205  /**
206   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
207   * @param t the throwable to analyze
208   * @return the translated exception, if it's not a DoNotRetryIOException
209   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
210   */
211  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
212    if (t instanceof UndeclaredThrowableException) {
213      if (t.getCause() != null) {
214        t = t.getCause();
215      }
216    }
217    if (t instanceof RemoteException) {
218      t = ((RemoteException)t).unwrapRemoteException();
219    }
220    if (t instanceof LinkageError) {
221      throw new DoNotRetryIOException(t);
222    }
223    if (t instanceof ServiceException) {
224      ServiceException se = (ServiceException)t;
225      Throwable cause = se.getCause();
226      if (cause != null && cause instanceof DoNotRetryIOException) {
227        throw (DoNotRetryIOException)cause;
228      }
229      // Don't let ServiceException out; its rpc specific.
230      t = cause;
231      // t could be a RemoteException so go around again.
232      translateException(t);
233    } else if (t instanceof DoNotRetryIOException) {
234      throw (DoNotRetryIOException)t;
235    }
236    return t;
237  }
238
239  private int getTimeout(int callTimeout){
240    int timeout = tracker.getRemainingTime(callTimeout);
241    if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
242      timeout = rpcTimeout;
243    }
244    return timeout;
245  }
246
247  @Override
248  public String toString() {
249    return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() +
250        ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
251  }
252}