001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.HConstants;
022import org.apache.hadoop.hbase.util.ReflectionUtils;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Factory to create an {@link RpcRetryingCaller}
029 */
030@InterfaceAudience.Private
031public class RpcRetryingCallerFactory {
032
033  /** Configuration key for a custom {@link RpcRetryingCaller} */
034  public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
035  private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class);
036  protected final Configuration conf;
037  private final long pause;
038  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
039  private final int retries;
040  private final int rpcTimeout;
041  private final RetryingCallerInterceptor interceptor;
042  private final int startLogErrorsCnt;
043  /* These below data members are UNUSED!!!*/
044  private final boolean enableBackPressure;
045  private ServerStatisticTracker stats;
046
047  public RpcRetryingCallerFactory(Configuration conf) {
048    this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
049  }
050
051  public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
052    this.conf = conf;
053    pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
054        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
055    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
056    if (configuredPauseForCQTBE < pause) {
057      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
058          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
059          + ", will use " + pause + " instead.");
060      this.pauseForCQTBE = pause;
061    } else {
062      this.pauseForCQTBE = configuredPauseForCQTBE;
063    }
064    retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
065        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
066    startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
067        AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
068    this.interceptor = interceptor;
069    enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
070        HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
071    rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
072  }
073
074  /**
075   * Set the tracker that should be used for tracking statistics about the server
076   */
077  public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
078    this.stats = statisticTracker;
079  }
080
081  /**
082   * Create a new RetryingCaller with specific rpc timeout.
083   */
084  public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
085    // We store the values in the factory instance. This way, constructing new objects
086    //  is cheap as it does not require parsing a complex structure.
087    RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries,
088        interceptor, startLogErrorsCnt, rpcTimeout);
089    return caller;
090  }
091
092  /**
093   * Create a new RetryingCaller with configured rpc timeout.
094   */
095  public <T> RpcRetryingCaller<T> newCaller() {
096    // We store the values in the factory instance. This way, constructing new objects
097    //  is cheap as it does not require parsing a complex structure.
098    RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries,
099        interceptor, startLogErrorsCnt, rpcTimeout);
100    return caller;
101  }
102
103  public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
104    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
105  }
106
107  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
108      ServerStatisticTracker stats) {
109    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
110  }
111
112  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
113      RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
114    String clazzName = RpcRetryingCallerFactory.class.getName();
115    String rpcCallerFactoryClazz =
116        configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
117    RpcRetryingCallerFactory factory;
118    if (rpcCallerFactoryClazz.equals(clazzName)) {
119      factory = new RpcRetryingCallerFactory(configuration, interceptor);
120    } else {
121      factory = ReflectionUtils.instantiateWithCustomCtor(
122          rpcCallerFactoryClazz, new Class[] { Configuration.class },
123          new Object[] { configuration });
124    }
125
126    // setting for backwards compat with existing caller factories, rather than in the ctor
127    factory.setStatisticTracker(stats);
128    return factory;
129  }
130}