1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.io.IOException;
22 import java.lang.reflect.InvocationHandler;
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.lang.reflect.Proxy;
26 import java.net.InetSocketAddress;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30
31 import javax.net.SocketFactory;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.client.Operation;
39 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
40 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
42 import org.apache.hadoop.hbase.regionserver.HRegionServer;
43 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
44 import org.apache.hadoop.hbase.security.User;
45 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
46 import org.apache.hadoop.ipc.RemoteException;
47 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.codehaus.jackson.map.ObjectMapper;
50
51 import com.google.protobuf.Message;
52 import com.google.protobuf.ServiceException;
53
54
55
56 @InterfaceAudience.Private
57 class ProtobufRpcEngine implements RpcEngine {
58 private static final Log LOG =
59 LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
60 protected final static ClientCache CLIENTS = new ClientCache();
61 @Override
62 public VersionedProtocol getProxy(
63 Class<? extends VersionedProtocol> protocol, long clientVersion,
64 InetSocketAddress addr, User ticket, Configuration conf,
65 SocketFactory factory, int rpcTimeout) throws IOException {
66 final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
67 rpcTimeout);
68 return (VersionedProtocol)Proxy.newProxyInstance(
69 protocol.getClassLoader(), new Class[]{protocol}, invoker);
70 }
71
72 @Override
73 public void stopProxy(VersionedProtocol proxy) {
74 if (proxy!=null) {
75 ((Invoker)Proxy.getInvocationHandler(proxy)).close();
76 }
77 }
78
79 @Override
80 public Server getServer(Class<? extends VersionedProtocol> protocol,
81 Object instance, Class<?>[] ifaces, String bindAddress, int port,
82 int numHandlers, int metaHandlerCount, boolean verbose,
83 Configuration conf, int highPriorityLevel) throws IOException {
84 return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
85 metaHandlerCount, verbose, highPriorityLevel);
86 }
87
88 static class Invoker implements InvocationHandler {
89 private static final Map<String, Message> returnTypes =
90 new ConcurrentHashMap<String, Message>();
91 private Class<? extends VersionedProtocol> protocol;
92 private InetSocketAddress address;
93 private User ticket;
94 private HBaseClient client;
95 private boolean isClosed = false;
96 final private int rpcTimeout;
97 private final long clientProtocolVersion;
98
99 public Invoker(Class<? extends VersionedProtocol> protocol,
100 InetSocketAddress addr, User ticket, Configuration conf,
101 SocketFactory factory, int rpcTimeout) throws IOException {
102 this.protocol = protocol;
103 this.address = addr;
104 this.ticket = ticket;
105 this.client = CLIENTS.getClient(conf, factory);
106 this.rpcTimeout = rpcTimeout;
107 Long version = Invocation.PROTOCOL_VERSION.get(protocol);
108 if (version != null) {
109 this.clientProtocolVersion = version;
110 } else {
111 try {
112 this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
113 } catch (NoSuchFieldException e) {
114 throw new RuntimeException("Exception encountered during " +
115 protocol, e);
116 } catch (IllegalAccessException e) {
117 throw new RuntimeException("Exception encountered during " +
118 protocol, e);
119 }
120 }
121 }
122
123 private RpcRequestBody constructRpcRequest(Method method,
124 Object[] params) throws ServiceException {
125 RpcRequestBody rpcRequest;
126 RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
127 builder.setMethodName(method.getName());
128 Message param;
129 int length = params.length;
130 if (length == 2) {
131
132
133 param = (Message)params[1];
134 } else if (length == 1) {
135 param = (Message)params[0];
136 } else {
137 throw new ServiceException("Too many parameters for request. Method: ["
138 + method.getName() + "]" + ", Expected: 2, Actual: "
139 + params.length);
140 }
141 builder.setRequestClassName(param.getClass().getName());
142 builder.setRequest(param.toByteString());
143 builder.setClientProtocolVersion(clientProtocolVersion);
144 rpcRequest = builder.build();
145 return rpcRequest;
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 @Override
167 public Object invoke(Object proxy, Method method, Object[] args)
168 throws ServiceException {
169 long startTime = 0;
170 if (LOG.isDebugEnabled()) {
171 startTime = System.currentTimeMillis();
172 }
173
174 RpcRequestBody rpcRequest = constructRpcRequest(method, args);
175 Message val = null;
176 try {
177 val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
178
179 if (LOG.isDebugEnabled()) {
180 long callTime = System.currentTimeMillis() - startTime;
181 if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
182 }
183 return val;
184 } catch (Throwable e) {
185 if (e instanceof RemoteException) {
186 Throwable cause = ((RemoteException)e).unwrapRemoteException();
187 throw new ServiceException(cause);
188 }
189 throw new ServiceException(e);
190 }
191 }
192
193 synchronized protected void close() {
194 if (!isClosed) {
195 isClosed = true;
196 CLIENTS.stopClient(client);
197 }
198 }
199
200 static Message getReturnProtoType(Method method) throws Exception {
201 if (returnTypes.containsKey(method.getName())) {
202 return returnTypes.get(method.getName());
203 }
204
205 Class<?> returnType = method.getReturnType();
206 Method newInstMethod = returnType.getMethod("getDefaultInstance");
207 newInstMethod.setAccessible(true);
208 Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
209 returnTypes.put(method.getName(), protoType);
210 return protoType;
211 }
212 }
213
214 public static class Server extends HBaseServer {
215 boolean verbose;
216 Object instance;
217 Class<?> implementation;
218 private static final String WARN_RESPONSE_TIME =
219 "hbase.ipc.warn.response.time";
220 private static final String WARN_RESPONSE_SIZE =
221 "hbase.ipc.warn.response.size";
222
223
224 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
225 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
226
227
228 private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
229
230 private final int warnResponseTime;
231 private final int warnResponseSize;
232
233 private static String classNameBase(String className) {
234 String[] names = className.split("\\.", -1);
235 if (names == null || names.length == 0) {
236 return className;
237 }
238 return names[names.length-1];
239 }
240
241 public Server(Object instance, final Class<?>[] ifaces,
242 Configuration conf, String bindAddress, int port,
243 int numHandlers, int metaHandlerCount, boolean verbose,
244 int highPriorityLevel)
245 throws IOException {
246 super(bindAddress, port, numHandlers, metaHandlerCount,
247 conf, classNameBase(instance.getClass().getName()),
248 highPriorityLevel);
249 this.instance = instance;
250 this.implementation = instance.getClass();
251 this.verbose = verbose;
252
253 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
254 DEFAULT_WARN_RESPONSE_TIME);
255 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
256 DEFAULT_WARN_RESPONSE_SIZE);
257 this.verbose = verbose;
258 this.instance = instance;
259 this.implementation = instance.getClass();
260 }
261 private static final Map<String, Message> methodArg =
262 new ConcurrentHashMap<String, Message>();
263 private static final Map<String, Method> methodInstances =
264 new ConcurrentHashMap<String, Method>();
265
266 private AuthenticationTokenSecretManager createSecretManager(){
267 if (!isSecurityEnabled ||
268 !(instance instanceof org.apache.hadoop.hbase.Server)) {
269 return null;
270 }
271 org.apache.hadoop.hbase.Server server =
272 (org.apache.hadoop.hbase.Server)instance;
273 Configuration conf = server.getConfiguration();
274 long keyUpdateInterval =
275 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
276 long maxAge =
277 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
278 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
279 server.getServerName().toString(), keyUpdateInterval, maxAge);
280 }
281
282 @Override
283 public void startThreads() {
284 AuthenticationTokenSecretManager mgr = createSecretManager();
285 if (mgr != null) {
286 setSecretManager(mgr);
287 mgr.start();
288 }
289 this.authManager = new ServiceAuthorizationManager();
290 HBasePolicyProvider.init(conf, authManager);
291
292
293 super.startThreads();
294 }
295
296 @Override
297
298
299
300
301
302 public Message call(Class<? extends VersionedProtocol> protocol,
303 RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
304 throws IOException {
305 try {
306 String methodName = rpcRequest.getMethodName();
307 Method method = getMethod(protocol, methodName);
308 if (method == null) {
309 throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
310 " doesn't exist in protocol " + protocol.getName());
311 }
312
313
314
315
316
317
318
319
320
321
322
323
324
325 long clientVersion = rpcRequest.getClientProtocolVersion();
326
327 if (verbose) {
328 LOG.info("Call: protocol name=" + protocol.getName() +
329 ", method=" + methodName);
330 }
331
332 status.setRPC(rpcRequest.getMethodName(),
333 new Object[]{rpcRequest.getRequest()}, receiveTime);
334 status.setRPCPacket(rpcRequest);
335 status.resume("Servicing call");
336
337 Message protoType = getMethodArgType(method);
338 Message param = protoType.newBuilderForType()
339 .mergeFrom(rpcRequest.getRequest()).build();
340 Message result;
341 Object impl = null;
342 if (protocol.isAssignableFrom(this.implementation)) {
343 impl = this.instance;
344 } else {
345 throw new HBaseRPC.UnknownProtocolException(protocol);
346 }
347
348 long startTime = System.currentTimeMillis();
349 if (method.getParameterTypes().length == 2) {
350
351
352 result = (Message)method.invoke(impl, null, param);
353 } else if (method.getParameterTypes().length == 1) {
354
355 result = (Message)method.invoke(impl, param);
356 } else {
357 throw new ServiceException("Too many parameters for method: ["
358 + method.getName() + "]" + ", allowed (at most): 2, Actual: "
359 + method.getParameterTypes().length);
360 }
361 int processingTime = (int) (System.currentTimeMillis() - startTime);
362 int qTime = (int) (startTime-receiveTime);
363 if (TRACELOG.isDebugEnabled()) {
364 TRACELOG.debug("Call #" + CurCall.get().id +
365 "; served=" + protocol.getSimpleName() + "#" + method.getName() +
366 ", queueTime=" + qTime +
367 ", processingTime=" + processingTime +
368 ", request=" + param.toString() +
369 " response=" + result.toString());
370 }
371 metrics.dequeuedCall(qTime);
372 metrics.processedCall(processingTime);
373
374 if (verbose) {
375 log("Return: "+result, LOG);
376 }
377 long responseSize = result.getSerializedSize();
378
379
380 boolean tooSlow = (processingTime > warnResponseTime
381 && warnResponseTime > -1);
382 boolean tooLarge = (responseSize > warnResponseSize
383 && warnResponseSize > -1);
384 if (tooSlow || tooLarge) {
385
386
387 StringBuilder buffer = new StringBuilder(256);
388 buffer.append(methodName);
389 buffer.append("(");
390 buffer.append(param.getClass().getName());
391 buffer.append(")");
392 buffer.append(", client version="+clientVersion);
393 logResponse(new Object[]{rpcRequest.getRequest()},
394 methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
395 status.getClient(), startTime, processingTime, qTime,
396 responseSize);
397 }
398 return result;
399 } catch (InvocationTargetException e) {
400 Throwable target = e.getTargetException();
401 if (target instanceof IOException) {
402 throw (IOException)target;
403 }
404 if (target instanceof ServiceException) {
405 throw ProtobufUtil.getRemoteException((ServiceException)target);
406 }
407 IOException ioe = new IOException(target.toString());
408 ioe.setStackTrace(target.getStackTrace());
409 throw ioe;
410 } catch (Throwable e) {
411 if (!(e instanceof IOException)) {
412 LOG.error("Unexpected throwable object ", e);
413 }
414 IOException ioe = new IOException(e.toString());
415 ioe.setStackTrace(e.getStackTrace());
416 throw ioe;
417 }
418 }
419
420 static Method getMethod(Class<? extends VersionedProtocol> protocol,
421 String methodName) {
422 Method method = methodInstances.get(methodName);
423 if (method != null) {
424 return method;
425 }
426 Method[] methods = protocol.getMethods();
427 for (Method m : methods) {
428 if (m.getName().equals(methodName)) {
429 m.setAccessible(true);
430 methodInstances.put(methodName, m);
431 return m;
432 }
433 }
434 return null;
435 }
436
437 static Message getMethodArgType(Method method) throws Exception {
438 Message protoType = methodArg.get(method.getName());
439 if (protoType != null) {
440 return protoType;
441 }
442
443 Class<?>[] args = method.getParameterTypes();
444 Class<?> arg;
445 if (args.length == 2) {
446
447
448 arg = args[1];
449 } else if (args.length == 1) {
450 arg = args[0];
451 } else {
452
453 return null;
454 }
455
456 Method newInstMethod = arg.getMethod("getDefaultInstance");
457 newInstMethod.setAccessible(true);
458 protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
459 methodArg.put(method.getName(), protoType);
460 return protoType;
461 }
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476 void logResponse(Object[] params, String methodName, String call, String tag,
477 String clientAddress, long startTime, int processingTime, int qTime,
478 long responseSize)
479 throws IOException {
480
481 ObjectMapper mapper = new ObjectMapper();
482
483 Map<String, Object> responseInfo = new HashMap<String, Object>();
484 responseInfo.put("starttimems", startTime);
485 responseInfo.put("processingtimems", processingTime);
486 responseInfo.put("queuetimems", qTime);
487 responseInfo.put("responsesize", responseSize);
488 responseInfo.put("client", clientAddress);
489 responseInfo.put("class", instance.getClass().getSimpleName());
490 responseInfo.put("method", methodName);
491 if (params.length == 2 && instance instanceof HRegionServer &&
492 params[0] instanceof byte[] &&
493 params[1] instanceof Operation) {
494
495
496 byte [] tableName =
497 HRegionInfo.parseRegionName((byte[]) params[0])[0];
498 responseInfo.put("table", Bytes.toStringBinary(tableName));
499
500 responseInfo.putAll(((Operation) params[1]).toMap());
501
502 LOG.warn("(operation" + tag + "): " +
503 mapper.writeValueAsString(responseInfo));
504 } else if (params.length == 1 && instance instanceof HRegionServer &&
505 params[0] instanceof Operation) {
506
507 responseInfo.putAll(((Operation) params[0]).toMap());
508
509 LOG.warn("(operation" + tag + "): " +
510 mapper.writeValueAsString(responseInfo));
511 } else {
512
513
514 responseInfo.put("call", call);
515 LOG.warn("(response" + tag + "): " +
516 mapper.writeValueAsString(responseInfo));
517 }
518 }
519 protected static void log(String value, Log LOG) {
520 String v = value;
521 if (v != null && v.length() > 55)
522 v = v.substring(0, 55)+"...";
523 LOG.info(v);
524 }
525 }
526 }