1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.DoNotRetryIOException;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
29 import org.apache.hadoop.hbase.security.User;
30 import org.apache.hadoop.net.NetUtils;
31 import org.apache.hadoop.util.ReflectionUtils;
32
33 import javax.net.SocketFactory;
34 import java.io.IOException;
35 import java.lang.reflect.Field;
36 import java.lang.reflect.Proxy;
37 import java.net.ConnectException;
38 import java.net.InetSocketAddress;
39 import java.net.SocketTimeoutException;
40 import java.util.HashMap;
41 import java.util.Map;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class HBaseRPC {
59
60
61
62 protected static final Log LOG =
63 LogFactory.getLog("org.apache.hadoop.ipc.HBaseRPC");
64
65 private HBaseRPC() {
66 super();
67 }
68
69
70
71
72
73
74 public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
75
76
77 private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
78 = new HashMap<Class,RpcEngine>();
79
80
81 private static final Map<Class,RpcEngine> PROXY_ENGINES
82 = new HashMap<Class,RpcEngine>();
83
84
85 private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
86 @Override
87 protected Integer initialValue() {
88 return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
89 }
90 };
91
92 static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
93 throws NoSuchFieldException, IllegalAccessException {
94 Field versionField = protocol.getField("VERSION");
95 versionField.setAccessible(true);
96 return versionField.getLong(protocol);
97 }
98
99
100 static void setProtocolEngine(Configuration conf,
101 Class protocol, Class engine) {
102 conf.setClass(RPC_ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
103 }
104
105
106 private static synchronized RpcEngine getProtocolEngine(Class protocol,
107 Configuration conf) {
108 RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
109 if (engine == null) {
110
111 Class<?> defaultEngine =
112 conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
113
114
115 Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
116 defaultEngine);
117 LOG.debug("Using "+impl.getName()+" for "+protocol.getName());
118 engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
119 if (protocol.isInterface())
120 PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
121 protocol),
122 engine);
123 PROTOCOL_ENGINES.put(protocol, engine);
124 }
125 return engine;
126 }
127
128
129 private static synchronized RpcEngine getProxyEngine(Object proxy) {
130 return PROXY_ENGINES.get(proxy.getClass());
131 }
132
133
134
135
136 public static class VersionMismatch extends IOException {
137 private static final long serialVersionUID = 0;
138 private String interfaceName;
139 private long clientVersion;
140 private long serverVersion;
141
142
143
144
145
146
147
148 public VersionMismatch(String interfaceName, long clientVersion,
149 long serverVersion) {
150 super("Protocol " + interfaceName + " version mismatch. (client = " +
151 clientVersion + ", server = " + serverVersion + ")");
152 this.interfaceName = interfaceName;
153 this.clientVersion = clientVersion;
154 this.serverVersion = serverVersion;
155 }
156
157
158
159
160
161
162 public String getInterfaceName() {
163 return interfaceName;
164 }
165
166
167
168
169 public long getClientVersion() {
170 return clientVersion;
171 }
172
173
174
175
176 public long getServerVersion() {
177 return serverVersion;
178 }
179 }
180
181
182
183
184 public static class UnknownProtocolException extends DoNotRetryIOException {
185 private Class<?> protocol;
186
187 public UnknownProtocolException(String mesg) {
188
189 super(mesg);
190 }
191
192 public UnknownProtocolException(Class<?> protocol) {
193 this(protocol, "Server is not handling protocol "+protocol.getName());
194 }
195
196 public UnknownProtocolException(Class<?> protocol, String mesg) {
197 super(mesg);
198 this.protocol = protocol;
199 }
200
201 public Class getProtocol() {
202 return protocol;
203 }
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217 @SuppressWarnings("unchecked")
218 public static VersionedProtocol waitForProxy(Class protocol,
219 long clientVersion,
220 InetSocketAddress addr,
221 Configuration conf,
222 int maxAttempts,
223 int rpcTimeout,
224 long timeout
225 ) throws IOException {
226
227 long startTime = System.currentTimeMillis();
228 IOException ioe;
229 int reconnectAttempts = 0;
230 while (true) {
231 try {
232 return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
233 } catch(SocketTimeoutException te) {
234 LOG.info("Problem connecting to server: " + addr);
235 ioe = te;
236 } catch (IOException ioex) {
237
238 ConnectException ce = null;
239 if (ioex instanceof ConnectException) {
240 ce = (ConnectException) ioex;
241 ioe = ce;
242 } else if (ioex.getCause() != null
243 && ioex.getCause() instanceof ConnectException) {
244 ce = (ConnectException) ioex.getCause();
245 ioe = ce;
246 } else if (ioex.getMessage().toLowerCase()
247 .contains("connection refused")) {
248 ce = new ConnectException(ioex.getMessage());
249 ioe = ce;
250 } else {
251
252 ioe = ioex;
253 }
254 if (ce != null) {
255 handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
256 addr, ce);
257 }
258 }
259
260 if (System.currentTimeMillis() - timeout >= startTime) {
261 throw ioe;
262 }
263
264
265 try {
266 Thread.sleep(1000);
267 } catch (InterruptedException ie) {
268
269 }
270 }
271 }
272
273
274
275
276
277
278
279
280
281 private static void handleConnectionException(int retries, int maxAttmpts,
282 Class<?> protocol, InetSocketAddress addr, ConnectException ce)
283 throws RetriesExhaustedException {
284 if (maxAttmpts >= 0 && retries >= maxAttmpts) {
285 LOG.info("Server at " + addr + " could not be reached after "
286 + maxAttmpts + " tries, giving up.");
287 throw new RetriesExhaustedException("Failed setting up proxy " + protocol
288 + " to " + addr.toString() + " after attempts=" + maxAttmpts, ce);
289 }
290 }
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
306 long clientVersion, InetSocketAddress addr, Configuration conf,
307 SocketFactory factory, int rpcTimeout) throws IOException {
308 return getProxy(protocol, clientVersion, addr,
309 User.getCurrent(), conf, factory, rpcTimeout);
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326 public static VersionedProtocol getProxy(
327 Class<? extends VersionedProtocol> protocol,
328 long clientVersion, InetSocketAddress addr, User ticket,
329 Configuration conf, SocketFactory factory, int rpcTimeout)
330 throws IOException {
331 RpcEngine engine = getProtocolEngine(protocol,conf);
332 VersionedProtocol proxy = engine
333 .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
334 Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
335 return proxy;
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349 public static VersionedProtocol getProxy(
350 Class<? extends VersionedProtocol> protocol,
351 long clientVersion, InetSocketAddress addr, Configuration conf,
352 int rpcTimeout)
353 throws IOException {
354
355 return getProxy(protocol, clientVersion, addr, conf, NetUtils
356 .getDefaultSocketFactory(conf), rpcTimeout);
357 }
358
359
360
361
362
363 public static void stopProxy(VersionedProtocol proxy) {
364 if (proxy!=null) {
365 getProxyEngine(proxy).stopProxy(proxy);
366 }
367 }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 public static RpcServer getServer(final Object instance,
383 final Class<?>[] ifaces,
384 final String bindAddress, final int port,
385 final int numHandlers,
386 int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
387 throws IOException {
388 return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
389 }
390
391
392 public static RpcServer getServer(Class protocol,
393 final Object instance,
394 final Class<?>[] ifaces, String bindAddress,
395 int port,
396 final int numHandlers,
397 int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
398 throws IOException {
399 return getProtocolEngine(protocol, conf)
400 .getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
401 }
402
403 public static void setRpcTimeout(int rpcTimeout) {
404 HBaseRPC.rpcTimeout.set(rpcTimeout);
405 }
406
407 public static int getRpcTimeout() {
408 return HBaseRPC.rpcTimeout.get();
409 }
410
411 public static void resetRpcTimeout() {
412 HBaseRPC.rpcTimeout.remove();
413 }
414 }