001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.util.ReflectionUtils;
022import org.apache.yetus.audience.InterfaceAudience;
023
024/**
025 * Factory to create an {@link RpcRetryingCaller}
026 */
027@InterfaceAudience.Private
028public class RpcRetryingCallerFactory {
029
030  /** Configuration key for a custom {@link RpcRetryingCaller} */
031  public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
032  protected final Configuration conf;
033  private final ConnectionConfiguration connectionConf;
034  private final RetryingCallerInterceptor interceptor;
035  private final int startLogErrorsCnt;
036
037  public RpcRetryingCallerFactory(Configuration conf) {
038    this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
039  }
040
041  public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
042    this.conf = conf;
043    this.connectionConf = new ConnectionConfiguration(conf);
044    startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
045      AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
046    this.interceptor = interceptor;
047  }
048
049  /**
050   * Create a new RetryingCaller with specific rpc timeout.
051   */
052  public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
053    // We store the values in the factory instance. This way, constructing new objects
054    // is cheap as it does not require parsing a complex structure.
055    return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
056      connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
057      interceptor, startLogErrorsCnt, rpcTimeout);
058  }
059
060  /**
061   * Create a new RetryingCaller with configured rpc timeout.
062   */
063  public <T> RpcRetryingCaller<T> newCaller() {
064    // We store the values in the factory instance. This way, constructing new objects
065    // is cheap as it does not require parsing a complex structure.
066    return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
067      connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
068      interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout());
069  }
070
071  public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
072    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
073  }
074
075  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
076    ServerStatisticTracker stats) {
077    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
078  }
079
080  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
081    RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
082    String clazzName = RpcRetryingCallerFactory.class.getName();
083    String rpcCallerFactoryClazz =
084      configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
085    RpcRetryingCallerFactory factory;
086    if (rpcCallerFactoryClazz.equals(clazzName)) {
087      factory = new RpcRetryingCallerFactory(configuration, interceptor);
088    } else {
089      factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
090        new Class[] { Configuration.class }, new Object[] { configuration });
091    }
092    return factory;
093  }
094}