1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.protobuf.BlockingRpcChannel;
23 import com.google.protobuf.Descriptors;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import java.net.SocketTimeoutException;
33 import java.net.UnknownHostException;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.CellScanner;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.codec.Codec;
43 import org.apache.hadoop.hbase.codec.KeyValueCodec;
44 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
45 import org.apache.hadoop.hbase.security.User;
46 import org.apache.hadoop.hbase.security.UserProvider;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.util.PoolMap;
50 import org.apache.hadoop.io.compress.CompressionCodec;
51
52
53
54
55 @InterfaceAudience.Private
56 public abstract class AbstractRpcClient implements RpcClient {
57 public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
58
59 protected final Configuration conf;
60 protected String clusterId;
61 protected final SocketAddress localAddr;
62
63 protected UserProvider userProvider;
64 protected final IPCUtil ipcUtil;
65
66 protected final int minIdleTimeBeforeClose;
67
68 protected final int maxRetries;
69 protected final long failureSleep;
70 protected final boolean tcpNoDelay;
71 protected final boolean tcpKeepAlive;
72 protected final Codec codec;
73 protected final CompressionCodec compressor;
74 protected final boolean fallbackAllowed;
75
76 protected final int connectTO;
77 protected final int readTO;
78 protected final int writeTO;
79
80
81
82
83
84
85
86
87 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
88 this.userProvider = UserProvider.instantiate(conf);
89 this.localAddr = localAddr;
90 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
91 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
92 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
93 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
94 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
95 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
96 this.ipcUtil = new IPCUtil(conf);
97
98 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000);
99 this.conf = conf;
100 this.codec = getCodec();
101 this.compressor = getCompressor(conf);
102 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
103 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
104 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
105 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
106 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
107
108
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
111 ", tcpKeepAlive=" + this.tcpKeepAlive +
112 ", tcpNoDelay=" + this.tcpNoDelay +
113 ", connectTO=" + this.connectTO +
114 ", readTO=" + this.readTO +
115 ", writeTO=" + this.writeTO +
116 ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
117 ", maxRetries=" + this.maxRetries +
118 ", fallbackAllowed=" + this.fallbackAllowed +
119 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
120 }
121 }
122
123 @VisibleForTesting
124 public static String getDefaultCodec(final Configuration c) {
125
126
127
128 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
129 }
130
131
132
133
134
135 Codec getCodec() {
136
137
138 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
139 if (className == null || className.length() == 0) return null;
140 try {
141 return (Codec)Class.forName(className).newInstance();
142 } catch (Exception e) {
143 throw new RuntimeException("Failed getting codec " + className, e);
144 }
145 }
146
147 @Override
148 public boolean hasCellBlockSupport() {
149 return this.codec != null;
150 }
151
152
153
154
155
156
157 private static CompressionCodec getCompressor(final Configuration conf) {
158 String className = conf.get("hbase.client.rpc.compressor", null);
159 if (className == null || className.isEmpty()) return null;
160 try {
161 return (CompressionCodec)Class.forName(className).newInstance();
162 } catch (Exception e) {
163 throw new RuntimeException("Failed getting compressor " + className, e);
164 }
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 protected static PoolMap.PoolType getPoolType(Configuration config) {
184 return PoolMap.PoolType
185 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
186 PoolMap.PoolType.ThreadLocal);
187 }
188
189
190
191
192
193
194
195
196 protected static int getPoolSize(Configuration config) {
197 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
198 }
199
200
201
202
203
204
205
206
207
208
209
210 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
211 Message param, Message returnType, final User ticket, final InetSocketAddress isa)
212 throws ServiceException {
213 if (pcrc == null) {
214 pcrc = new PayloadCarryingRpcController();
215 }
216
217 long startTime = 0;
218 if (LOG.isTraceEnabled()) {
219 startTime = EnvironmentEdgeManager.currentTime();
220 }
221 Pair<Message, CellScanner> val;
222 try {
223 val = call(pcrc, md, param, returnType, ticket, isa);
224
225 pcrc.setCellScanner(val.getSecond());
226
227 if (LOG.isTraceEnabled()) {
228 long callTime = EnvironmentEdgeManager.currentTime() - startTime;
229 LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
230 }
231 return val.getFirst();
232 } catch (Throwable e) {
233 throw new ServiceException(e);
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
253 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
254 InetSocketAddress isa) throws IOException, InterruptedException;
255
256 @Override
257 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
258 int defaultOperationTimeout) throws UnknownHostException {
259 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
260 }
261
262
263
264
265
266
267
268
269
270
271
272 protected IOException wrapException(InetSocketAddress addr, Exception exception) {
273 if (exception instanceof ConnectException) {
274
275 return (ConnectException) new ConnectException("Call to " + addr
276 + " failed on connection exception: " + exception).initCause(exception);
277 } else if (exception instanceof SocketTimeoutException) {
278 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
279 + " failed because " + exception).initCause(exception);
280 } else if (exception instanceof ConnectionClosingException) {
281 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
282 + " failed on local exception: " + exception).initCause(exception);
283 } else {
284 return (IOException) new IOException("Call to " + addr + " failed on local exception: "
285 + exception).initCause(exception);
286 }
287 }
288
289
290
291
292 @VisibleForTesting
293 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
294 private final InetSocketAddress isa;
295 private final AbstractRpcClient rpcClient;
296 private final User ticket;
297 private final int channelOperationTimeout;
298
299
300
301
302 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
303 final ServerName sn, final User ticket, int channelOperationTimeout)
304 throws UnknownHostException {
305 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
306 if (this.isa.isUnresolved()) {
307 throw new UnknownHostException(sn.getHostname());
308 }
309 this.rpcClient = rpcClient;
310 this.ticket = ticket;
311 this.channelOperationTimeout = channelOperationTimeout;
312 }
313
314 @Override
315 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
316 Message param, Message returnType) throws ServiceException {
317 PayloadCarryingRpcController pcrc;
318 if (controller != null && controller instanceof PayloadCarryingRpcController) {
319 pcrc = (PayloadCarryingRpcController) controller;
320 if (!pcrc.hasCallTimeout()) {
321 pcrc.setCallTimeout(channelOperationTimeout);
322 }
323 } else {
324 pcrc = new PayloadCarryingRpcController();
325 pcrc.setCallTimeout(channelOperationTimeout);
326 }
327
328 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
329 }
330 }
331 }