1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.protobuf.BlockingRpcChannel;
23 import com.google.protobuf.Descriptors;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import java.net.SocketTimeoutException;
33 import java.net.UnknownHostException;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.CellScanner;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.client.MetricsConnection;
43 import org.apache.hadoop.hbase.codec.Codec;
44 import org.apache.hadoop.hbase.codec.KeyValueCodec;
45 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
46 import org.apache.hadoop.hbase.security.User;
47 import org.apache.hadoop.hbase.security.UserProvider;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.Pair;
50 import org.apache.hadoop.hbase.util.PoolMap;
51 import org.apache.hadoop.io.compress.CompressionCodec;
52
53
54
55
56 @InterfaceAudience.Private
57 public abstract class AbstractRpcClient implements RpcClient {
58
59 public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
60
61 protected final Configuration conf;
62 protected String clusterId;
63 protected final SocketAddress localAddr;
64 protected final MetricsConnection metrics;
65
66 protected UserProvider userProvider;
67 protected final IPCUtil ipcUtil;
68
69 protected final int minIdleTimeBeforeClose;
70
71 protected final int maxRetries;
72 protected final long failureSleep;
73 protected final boolean tcpNoDelay;
74 protected final boolean tcpKeepAlive;
75 protected final Codec codec;
76 protected final CompressionCodec compressor;
77 protected final boolean fallbackAllowed;
78
79 protected final int connectTO;
80 protected final int readTO;
81 protected final int writeTO;
82
83
84
85
86
87
88
89
90
91 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
92 MetricsConnection metrics) {
93 this.userProvider = UserProvider.instantiate(conf);
94 this.localAddr = localAddr;
95 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
96 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
97 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
98 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
99 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
100 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
101 this.ipcUtil = new IPCUtil(conf);
102
103 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000);
104 this.conf = conf;
105 this.codec = getCodec();
106 this.compressor = getCompressor(conf);
107 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
108 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
109 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
110 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
111 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
112 this.metrics = metrics;
113
114
115 if (LOG.isDebugEnabled()) {
116 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
117 ", tcpKeepAlive=" + this.tcpKeepAlive +
118 ", tcpNoDelay=" + this.tcpNoDelay +
119 ", connectTO=" + this.connectTO +
120 ", readTO=" + this.readTO +
121 ", writeTO=" + this.writeTO +
122 ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
123 ", maxRetries=" + this.maxRetries +
124 ", fallbackAllowed=" + this.fallbackAllowed +
125 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
126 }
127 }
128
129 @VisibleForTesting
130 public static String getDefaultCodec(final Configuration c) {
131
132
133
134 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
135 }
136
137
138
139
140
141 Codec getCodec() {
142
143
144 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
145 if (className == null || className.length() == 0) return null;
146 try {
147 return (Codec)Class.forName(className).newInstance();
148 } catch (Exception e) {
149 throw new RuntimeException("Failed getting codec " + className, e);
150 }
151 }
152
153 @Override
154 public boolean hasCellBlockSupport() {
155 return this.codec != null;
156 }
157
158
159
160
161
162
163 private static CompressionCodec getCompressor(final Configuration conf) {
164 String className = conf.get("hbase.client.rpc.compressor", null);
165 if (className == null || className.isEmpty()) return null;
166 try {
167 return (CompressionCodec)Class.forName(className).newInstance();
168 } catch (Exception e) {
169 throw new RuntimeException("Failed getting compressor " + className, e);
170 }
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 protected static PoolMap.PoolType getPoolType(Configuration config) {
190 return PoolMap.PoolType
191 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
192 PoolMap.PoolType.ThreadLocal);
193 }
194
195
196
197
198
199
200
201
202 protected static int getPoolSize(Configuration config) {
203 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
204 }
205
206
207
208
209
210
211
212
213
214
215
216 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
217 Message param, Message returnType, final User ticket, final InetSocketAddress isa)
218 throws ServiceException {
219 if (pcrc == null) {
220 pcrc = new PayloadCarryingRpcController();
221 }
222
223 Pair<Message, CellScanner> val;
224 try {
225 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
226 cs.setStartTime(EnvironmentEdgeManager.currentTime());
227 val = call(pcrc, md, param, returnType, ticket, isa, cs);
228
229 pcrc.setCellScanner(val.getSecond());
230
231 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
232 if (metrics != null) {
233 metrics.updateRpc(md, param, cs);
234 }
235 if (LOG.isTraceEnabled()) {
236 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
237 }
238 return val.getFirst();
239 } catch (Throwable e) {
240 throw new ServiceException(e);
241 }
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
260 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
261 InetSocketAddress isa, MetricsConnection.CallStats callStats)
262 throws IOException, InterruptedException;
263
264 @Override
265 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
266 int defaultOperationTimeout) throws UnknownHostException {
267 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
268 }
269
270
271
272
273
274
275
276
277
278
279
280 protected IOException wrapException(InetSocketAddress addr, Exception exception) {
281 if (exception instanceof ConnectException) {
282
283 return (ConnectException) new ConnectException("Call to " + addr
284 + " failed on connection exception: " + exception).initCause(exception);
285 } else if (exception instanceof SocketTimeoutException) {
286 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
287 + " failed because " + exception).initCause(exception);
288 } else if (exception instanceof ConnectionClosingException) {
289 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
290 + " failed on local exception: " + exception).initCause(exception);
291 } else {
292 return (IOException) new IOException("Call to " + addr + " failed on local exception: "
293 + exception).initCause(exception);
294 }
295 }
296
297
298
299
300 @VisibleForTesting
301 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
302 private final InetSocketAddress isa;
303 private final AbstractRpcClient rpcClient;
304 private final User ticket;
305 private final int channelOperationTimeout;
306
307
308
309
310 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
311 final ServerName sn, final User ticket, int channelOperationTimeout)
312 throws UnknownHostException {
313 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
314 if (this.isa.isUnresolved()) {
315 throw new UnknownHostException(sn.getHostname());
316 }
317 this.rpcClient = rpcClient;
318 this.ticket = ticket;
319 this.channelOperationTimeout = channelOperationTimeout;
320 }
321
322 @Override
323 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
324 Message param, Message returnType) throws ServiceException {
325 PayloadCarryingRpcController pcrc;
326 if (controller != null && controller instanceof PayloadCarryingRpcController) {
327 pcrc = (PayloadCarryingRpcController) controller;
328 if (!pcrc.hasCallTimeout()) {
329 pcrc.setCallTimeout(channelOperationTimeout);
330 }
331 } else {
332 pcrc = new PayloadCarryingRpcController();
333 pcrc.setCallTimeout(channelOperationTimeout);
334 }
335
336 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
337 }
338 }
339 }