001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.HConstants; 022import org.apache.hadoop.hbase.util.ReflectionUtils; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Factory to create an {@link RpcRetryingCaller} 029 */ 030@InterfaceAudience.Private 031public class RpcRetryingCallerFactory { 032 033 /** Configuration key for a custom {@link RpcRetryingCaller} */ 034 public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; 035 private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class); 036 protected final Configuration conf; 037 private final long pause; 038 private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 039 private final int retries; 040 private final int rpcTimeout; 041 private final RetryingCallerInterceptor interceptor; 042 private final int startLogErrorsCnt; 043 /* These below data members are UNUSED!!!*/ 044 private final boolean enableBackPressure; 045 private ServerStatisticTracker stats; 046 047 public RpcRetryingCallerFactory(Configuration conf) { 048 this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); 049 } 050 051 public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { 052 this.conf = conf; 053 pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 054 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 055 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 056 if (configuredPauseForCQTBE < pause) { 057 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 058 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 059 + ", will use " + pause + " instead."); 060 this.pauseForCQTBE = pause; 061 } else { 062 this.pauseForCQTBE = configuredPauseForCQTBE; 063 } 064 retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 065 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 066 startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, 067 AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 068 this.interceptor = interceptor; 069 enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, 070 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); 071 rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 072 } 073 074 /** 075 * Set the tracker that should be used for tracking statistics about the server 076 */ 077 public void setStatisticTracker(ServerStatisticTracker statisticTracker) { 078 this.stats = statisticTracker; 079 } 080 081 /** 082 * Create a new RetryingCaller with specific rpc timeout. 083 */ 084 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 085 // We store the values in the factory instance. This way, constructing new objects 086 // is cheap as it does not require parsing a complex structure. 087 RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, 088 interceptor, startLogErrorsCnt, rpcTimeout); 089 return caller; 090 } 091 092 /** 093 * Create a new RetryingCaller with configured rpc timeout. 094 */ 095 public <T> RpcRetryingCaller<T> newCaller() { 096 // We store the values in the factory instance. This way, constructing new objects 097 // is cheap as it does not require parsing a complex structure. 098 RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, 099 interceptor, startLogErrorsCnt, rpcTimeout); 100 return caller; 101 } 102 103 public static RpcRetryingCallerFactory instantiate(Configuration configuration) { 104 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); 105 } 106 107 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 108 ServerStatisticTracker stats) { 109 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); 110 } 111 112 public static RpcRetryingCallerFactory instantiate(Configuration configuration, 113 RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { 114 String clazzName = RpcRetryingCallerFactory.class.getName(); 115 String rpcCallerFactoryClazz = 116 configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); 117 RpcRetryingCallerFactory factory; 118 if (rpcCallerFactoryClazz.equals(clazzName)) { 119 factory = new RpcRetryingCallerFactory(configuration, interceptor); 120 } else { 121 factory = ReflectionUtils.instantiateWithCustomCtor( 122 rpcCallerFactoryClazz, new Class[] { Configuration.class }, 123 new Object[] { configuration }); 124 } 125 126 // setting for backwards compat with existing caller factories, rather than in the ctor 127 factory.setStatisticTracker(stats); 128 return factory; 129 } 130}