1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.hbase.HConstants;
23 import org.apache.hadoop.hbase.util.ReflectionUtils;
24
25
26
27
28 @InterfaceAudience.Private
29 public class RpcRetryingCallerFactory {
30
31
32 public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
33 protected final Configuration conf;
34 private final long pause;
35 private final int retries;
36 private final int rpcTimeout;
37 private final RetryingCallerInterceptor interceptor;
38 private final int startLogErrorsCnt;
39 private final boolean enableBackPressure;
40 private ServerStatisticTracker stats;
41
42 public RpcRetryingCallerFactory(Configuration conf) {
43 this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
44 }
45
46 public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
47 this.conf = conf;
48 pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
49 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
50 retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
51 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
52 startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
53 AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
54 this.interceptor = interceptor;
55 enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
56 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
57 rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
58 }
59
60
61
62
63 public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
64 this.stats = statisticTracker;
65 }
66
67
68
69
70 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
71
72
73 RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
74 startLogErrorsCnt, rpcTimeout);
75
76
77 if (enableBackPressure && this.stats != null) {
78 caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, interceptor,
79 startLogErrorsCnt, stats);
80 }
81
82 return caller;
83 }
84
85
86
87
88 public <T> RpcRetryingCaller<T> newCaller() {
89
90
91 RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
92 startLogErrorsCnt, rpcTimeout);
93
94
95 if (enableBackPressure && this.stats != null) {
96 caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, interceptor,
97 startLogErrorsCnt, stats);
98 }
99
100 return caller;
101 }
102
103 public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
104 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
105 }
106
107 public static RpcRetryingCallerFactory instantiate(Configuration configuration,
108 ServerStatisticTracker stats) {
109 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
110 }
111
112 public static RpcRetryingCallerFactory instantiate(Configuration configuration,
113 RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
114 String clazzName = RpcRetryingCallerFactory.class.getName();
115 String rpcCallerFactoryClazz =
116 configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
117 RpcRetryingCallerFactory factory;
118 if (rpcCallerFactoryClazz.equals(clazzName)) {
119 factory = new RpcRetryingCallerFactory(configuration, interceptor);
120 } else {
121 factory = ReflectionUtils.instantiateWithCustomCtor(
122 rpcCallerFactoryClazz, new Class[] { Configuration.class },
123 new Object[] { configuration });
124 }
125
126
127 factory.setStatisticTracker(stats);
128 return factory;
129 }
130 }