View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import java.io.IOException;
22  import java.lang.reflect.InvocationHandler;
23  import java.lang.reflect.InvocationTargetException;
24  import java.lang.reflect.Method;
25  import java.lang.reflect.Proxy;
26  import java.net.InetSocketAddress;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  
31  import javax.net.SocketFactory;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.client.Operation;
39  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
40  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
42  import org.apache.hadoop.hbase.regionserver.HRegionServer;
43  import org.apache.hadoop.hbase.security.HBasePolicyProvider;
44  import org.apache.hadoop.hbase.security.User;
45  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
46  import org.apache.hadoop.ipc.RemoteException;
47  import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.codehaus.jackson.map.ObjectMapper;
50  
51  import com.google.protobuf.Message;
52  import com.google.protobuf.ServiceException;
53  /**
54   * The {@link RpcEngine} implementation for ProtoBuf-based RPCs.
55   */
56  @InterfaceAudience.Private
57  class ProtobufRpcEngine implements RpcEngine {
58    private static final Log LOG =
59        LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
60    protected final static ClientCache CLIENTS = new ClientCache();
61    @Override
62    public VersionedProtocol getProxy(
63        Class<? extends VersionedProtocol> protocol, long clientVersion,
64        InetSocketAddress addr, User ticket, Configuration conf,
65        SocketFactory factory, int rpcTimeout) throws IOException {
66      final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
67          rpcTimeout);
68      return (VersionedProtocol)Proxy.newProxyInstance(
69          protocol.getClassLoader(), new Class[]{protocol}, invoker);
70    }
71  
72    @Override
73    public void stopProxy(VersionedProtocol proxy) {
74      if (proxy!=null) {
75        ((Invoker)Proxy.getInvocationHandler(proxy)).close();
76      }
77    }
78  
79    @Override
80    public Server getServer(Class<? extends VersionedProtocol> protocol,
81        Object instance, Class<?>[] ifaces, String bindAddress, int port,
82        int numHandlers, int metaHandlerCount, boolean verbose,
83        Configuration conf, int highPriorityLevel) throws IOException {
84      return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
85          metaHandlerCount, verbose, highPriorityLevel);
86    }
87  
88    static class Invoker implements InvocationHandler {
89      private static final Map<String, Message> returnTypes =
90          new ConcurrentHashMap<String, Message>();
91      private Class<? extends VersionedProtocol> protocol;
92      private InetSocketAddress address;
93      private User ticket;
94      private HBaseClient client;
95      private boolean isClosed = false;
96      final private int rpcTimeout;
97      private final long clientProtocolVersion;
98  
99      public Invoker(Class<? extends VersionedProtocol> protocol,
100         InetSocketAddress addr, User ticket, Configuration conf,
101         SocketFactory factory, int rpcTimeout) throws IOException {
102       this.protocol = protocol;
103       this.address = addr;
104       this.ticket = ticket;
105       this.client = CLIENTS.getClient(conf, factory);
106       this.rpcTimeout = rpcTimeout;
107       Long version = Invocation.PROTOCOL_VERSION.get(protocol);
108       if (version != null) {
109         this.clientProtocolVersion = version;
110       } else {
111         try {
112           this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
113         } catch (NoSuchFieldException e) {
114           throw new RuntimeException("Exception encountered during " +
115                                       protocol, e);
116         } catch (IllegalAccessException e) {
117           throw new RuntimeException("Exception encountered during " +
118                                       protocol, e);
119         }
120       }
121     }
122 
123     private RpcRequestBody constructRpcRequest(Method method,
124         Object[] params) throws ServiceException {
125       RpcRequestBody rpcRequest;
126       RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
127       builder.setMethodName(method.getName());
128       Message param;
129       int length = params.length;
130       if (length == 2) {
131         // RpcController + Message in the method args
132         // (generated code from RPC bits in .proto files have RpcController)
133         param = (Message)params[1];
134       } else if (length == 1) { // Message
135         param = (Message)params[0];
136       } else {
137         throw new ServiceException("Too many parameters for request. Method: ["
138             + method.getName() + "]" + ", Expected: 2, Actual: "
139             + params.length);
140       }
141       builder.setRequestClassName(param.getClass().getName());
142       builder.setRequest(param.toByteString());
143       builder.setClientProtocolVersion(clientProtocolVersion);
144       rpcRequest = builder.build();
145       return rpcRequest;
146     }
147 
148     /**
149      * This is the client side invoker of RPC method. It only throws
150      * ServiceException, since the invocation proxy expects only
151      * ServiceException to be thrown by the method in case protobuf service.
152      *
153      * ServiceException has the following causes:
154      * <ol>
155      * <li>Exceptions encountered on the client side in this method are
156      * set as cause in ServiceException as is.</li>
157      * <li>Exceptions from the server are wrapped in RemoteException and are
158      * set as cause in ServiceException</li>
159      * </ol>
160      *
161      * Note that the client calling protobuf RPC methods, must handle
162      * ServiceException by getting the cause from the ServiceException. If the
163      * cause is RemoteException, then unwrap it to get the exception thrown by
164      * the server.
165      */
166     @Override
167     public Object invoke(Object proxy, Method method, Object[] args)
168         throws ServiceException {
169       long startTime = 0;
170       if (LOG.isDebugEnabled()) {
171         startTime = System.currentTimeMillis();
172       }
173 
174       RpcRequestBody rpcRequest = constructRpcRequest(method, args);
175       Message val = null;
176       try {
177         val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
178 
179         if (LOG.isDebugEnabled()) {
180           long callTime = System.currentTimeMillis() - startTime;
181           if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
182         }
183         return val;
184       } catch (Throwable e) {
185         if (e instanceof RemoteException) {
186           Throwable cause = ((RemoteException)e).unwrapRemoteException();
187           throw new ServiceException(cause);
188         }
189         throw new ServiceException(e);
190       }
191     }
192 
193     synchronized protected void close() {
194       if (!isClosed) {
195         isClosed = true;
196         CLIENTS.stopClient(client);
197       }
198     }
199 
200    static Message getReturnProtoType(Method method) throws Exception {
201       if (returnTypes.containsKey(method.getName())) {
202         return returnTypes.get(method.getName());
203       }
204 
205       Class<?> returnType = method.getReturnType();
206       Method newInstMethod = returnType.getMethod("getDefaultInstance");
207       newInstMethod.setAccessible(true);
208       Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
209       returnTypes.put(method.getName(), protoType);
210       return protoType;
211     }
212   }
213 
214   public static class Server extends HBaseServer {
215     boolean verbose;
216     Object instance;
217     Class<?> implementation;
218     private static final String WARN_RESPONSE_TIME =
219         "hbase.ipc.warn.response.time";
220     private static final String WARN_RESPONSE_SIZE =
221         "hbase.ipc.warn.response.size";
222 
223     /** Default value for above params */
224     private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
225     private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
226 
227     /** Names for suffixed metrics */
228     private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
229 
230     private final int warnResponseTime;
231     private final int warnResponseSize;
232 
233     private static String classNameBase(String className) {
234       String[] names = className.split("\\.", -1);
235       if (names == null || names.length == 0) {
236         return className;
237       }
238       return names[names.length-1];
239     }
240 
241     public Server(Object instance, final Class<?>[] ifaces,
242         Configuration conf, String bindAddress,  int port,
243         int numHandlers, int metaHandlerCount, boolean verbose,
244         int highPriorityLevel)
245         throws IOException {
246       super(bindAddress, port, numHandlers, metaHandlerCount,
247           conf, classNameBase(instance.getClass().getName()),
248           highPriorityLevel);
249       this.instance = instance;
250       this.implementation = instance.getClass();
251       this.verbose = verbose;
252 
253       this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
254           DEFAULT_WARN_RESPONSE_TIME);
255       this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
256           DEFAULT_WARN_RESPONSE_SIZE);
257       this.verbose = verbose;
258       this.instance = instance;
259       this.implementation = instance.getClass();
260     }
261     private static final Map<String, Message> methodArg =
262         new ConcurrentHashMap<String, Message>();
263     private static final Map<String, Method> methodInstances =
264         new ConcurrentHashMap<String, Method>();
265 
266     private AuthenticationTokenSecretManager createSecretManager(){
267       if (!isSecurityEnabled ||
268           !(instance instanceof org.apache.hadoop.hbase.Server)) {
269         return null;
270       }
271       org.apache.hadoop.hbase.Server server =
272           (org.apache.hadoop.hbase.Server)instance;
273       Configuration conf = server.getConfiguration();
274       long keyUpdateInterval =
275           conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
276       long maxAge =
277           conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
278       return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
279           server.getServerName().toString(), keyUpdateInterval, maxAge);
280     }
281 
282     @Override
283     public void startThreads() {
284       AuthenticationTokenSecretManager mgr = createSecretManager();
285       if (mgr != null) {
286         setSecretManager(mgr);
287         mgr.start();
288       }
289       this.authManager = new ServiceAuthorizationManager();
290       HBasePolicyProvider.init(conf, authManager);
291 
292       // continue with base startup
293       super.startThreads();
294     }
295 
296     @Override
297     /**
298      * This is a server side method, which is invoked over RPC. On success
299      * the return response has protobuf response payload. On failure, the
300      * exception name and the stack trace are returned in the protobuf response.
301      */
302     public Message call(Class<? extends VersionedProtocol> protocol,
303         RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
304         throws IOException {
305       try {
306         String methodName = rpcRequest.getMethodName();
307         Method method = getMethod(protocol, methodName);
308         if (method == null) {
309           throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
310               " doesn't exist in protocol " + protocol.getName());
311         }
312 
313         /**
314          * RPCs for a particular interface (ie protocol) are done using a
315          * IPC connection that is setup using rpcProxy.
316          * The rpcProxy's has a declared protocol name that is
317          * sent form client to server at connection time.
318          */
319         //TODO: use the clientVersion to do protocol compatibility checks, and
320         //this could be used here to handle complex use cases like deciding
321         //which implementation of the protocol should be used to service the
322         //current request, etc. Ideally, we shouldn't land up in a situation
323         //where we need to support such a use case.
324         //For now the clientVersion field is simply ignored
325         long clientVersion = rpcRequest.getClientProtocolVersion();
326 
327         if (verbose) {
328           LOG.info("Call: protocol name=" + protocol.getName() +
329               ", method=" + methodName);
330         }
331 
332         status.setRPC(rpcRequest.getMethodName(),
333             new Object[]{rpcRequest.getRequest()}, receiveTime);
334         status.setRPCPacket(rpcRequest);
335         status.resume("Servicing call");
336         //get an instance of the method arg type
337         Message protoType = getMethodArgType(method);
338         Message param = protoType.newBuilderForType()
339             .mergeFrom(rpcRequest.getRequest()).build();
340         Message result;
341         Object impl = null;
342         if (protocol.isAssignableFrom(this.implementation)) {
343           impl = this.instance;
344         } else {
345           throw new HBaseRPC.UnknownProtocolException(protocol);
346         }
347 
348         long startTime = System.currentTimeMillis();
349         if (method.getParameterTypes().length == 2) {
350           // RpcController + Message in the method args
351           // (generated code from RPC bits in .proto files have RpcController)
352           result = (Message)method.invoke(impl, null, param);
353         } else if (method.getParameterTypes().length == 1) {
354           // Message (hand written code usually has only a single argument)
355           result = (Message)method.invoke(impl, param);
356         } else {
357           throw new ServiceException("Too many parameters for method: ["
358               + method.getName() + "]" + ", allowed (at most): 2, Actual: "
359               + method.getParameterTypes().length);
360         }
361         int processingTime = (int) (System.currentTimeMillis() - startTime);
362         int qTime = (int) (startTime-receiveTime);
363         if (TRACELOG.isDebugEnabled()) {
364           TRACELOG.debug("Call #" + CurCall.get().id +
365               "; served=" + protocol.getSimpleName() + "#" + method.getName() +
366               ", queueTime=" + qTime +
367               ", processingTime=" + processingTime +
368               ", request=" + param.toString() +
369               " response=" + result.toString());
370         }
371         metrics.dequeuedCall(qTime);
372         metrics.processedCall(processingTime);
373 
374         if (verbose) {
375           log("Return: "+result, LOG);
376         }
377         long responseSize = result.getSerializedSize();
378         // log any RPC responses that are slower than the configured warn
379         // response time or larger than configured warning size
380         boolean tooSlow = (processingTime > warnResponseTime
381             && warnResponseTime > -1);
382         boolean tooLarge = (responseSize > warnResponseSize
383             && warnResponseSize > -1);
384         if (tooSlow || tooLarge) {
385           // when tagging, we let TooLarge trump TooSmall to keep output simple
386           // note that large responses will often also be slow.
387           StringBuilder buffer = new StringBuilder(256);
388           buffer.append(methodName);
389           buffer.append("(");
390           buffer.append(param.getClass().getName());
391           buffer.append(")");
392           buffer.append(", client version="+clientVersion);
393           logResponse(new Object[]{rpcRequest.getRequest()},
394               methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
395               status.getClient(), startTime, processingTime, qTime,
396               responseSize);
397         }
398         return result;
399       } catch (InvocationTargetException e) {
400         Throwable target = e.getTargetException();
401         if (target instanceof IOException) {
402           throw (IOException)target;
403         }
404         if (target instanceof ServiceException) {
405           throw ProtobufUtil.getRemoteException((ServiceException)target);
406         }
407         IOException ioe = new IOException(target.toString());
408         ioe.setStackTrace(target.getStackTrace());
409         throw ioe;
410       } catch (Throwable e) {
411         if (!(e instanceof IOException)) {
412           LOG.error("Unexpected throwable object ", e);
413         }
414         IOException ioe = new IOException(e.toString());
415         ioe.setStackTrace(e.getStackTrace());
416         throw ioe;
417       }
418     }
419 
420     static Method getMethod(Class<? extends VersionedProtocol> protocol,
421         String methodName) {
422       Method method = methodInstances.get(methodName);
423       if (method != null) {
424         return method;
425       }
426       Method[] methods = protocol.getMethods();
427       for (Method m : methods) {
428         if (m.getName().equals(methodName)) {
429           m.setAccessible(true);
430           methodInstances.put(methodName, m);
431           return m;
432         }
433       }
434       return null;
435     }
436 
437     static Message getMethodArgType(Method method) throws Exception {
438       Message protoType = methodArg.get(method.getName());
439       if (protoType != null) {
440         return protoType;
441       }
442 
443       Class<?>[] args = method.getParameterTypes();
444       Class<?> arg;
445       if (args.length == 2) {
446         // RpcController + Message in the method args
447         // (generated code from RPC bits in .proto files have RpcController)
448         arg = args[1];
449       } else if (args.length == 1) {
450         arg = args[0];
451       } else {
452         //unexpected
453         return null;
454       }
455       //in the protobuf methods, args[1] is the only significant argument
456       Method newInstMethod = arg.getMethod("getDefaultInstance");
457       newInstMethod.setAccessible(true);
458       protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
459       methodArg.put(method.getName(), protoType);
460       return protoType;
461     }
462     /**
463      * Logs an RPC response to the LOG file, producing valid JSON objects for
464      * client Operations.
465      * @param params The parameters received in the call.
466      * @param methodName The name of the method invoked
467      * @param call The string representation of the call
468      * @param tag  The tag that will be used to indicate this event in the log.
469      * @param clientAddress   The address of the client who made this call.
470      * @param startTime       The time that the call was initiated, in ms.
471      * @param processingTime  The duration that the call took to run, in ms.
472      * @param qTime           The duration that the call spent on the queue
473      *                        prior to being initiated, in ms.
474      * @param responseSize    The size in bytes of the response buffer.
475      */
476      void logResponse(Object[] params, String methodName, String call, String tag,
477          String clientAddress, long startTime, int processingTime, int qTime,
478          long responseSize)
479       throws IOException {
480       // for JSON encoding
481       ObjectMapper mapper = new ObjectMapper();
482       // base information that is reported regardless of type of call
483       Map<String, Object> responseInfo = new HashMap<String, Object>();
484       responseInfo.put("starttimems", startTime);
485       responseInfo.put("processingtimems", processingTime);
486       responseInfo.put("queuetimems", qTime);
487       responseInfo.put("responsesize", responseSize);
488       responseInfo.put("client", clientAddress);
489       responseInfo.put("class", instance.getClass().getSimpleName());
490       responseInfo.put("method", methodName);
491       if (params.length == 2 && instance instanceof HRegionServer &&
492           params[0] instanceof byte[] &&
493           params[1] instanceof Operation) {
494         // if the slow process is a query, we want to log its table as well
495         // as its own fingerprint
496         byte [] tableName =
497           HRegionInfo.parseRegionName((byte[]) params[0])[0];
498         responseInfo.put("table", Bytes.toStringBinary(tableName));
499         // annotate the response map with operation details
500         responseInfo.putAll(((Operation) params[1]).toMap());
501         // report to the log file
502         LOG.warn("(operation" + tag + "): " +
503             mapper.writeValueAsString(responseInfo));
504       } else if (params.length == 1 && instance instanceof HRegionServer &&
505           params[0] instanceof Operation) {
506         // annotate the response map with operation details
507         responseInfo.putAll(((Operation) params[0]).toMap());
508         // report to the log file
509         LOG.warn("(operation" + tag + "): " +
510             mapper.writeValueAsString(responseInfo));
511       } else {
512         // can't get JSON details, so just report call.toString() along with
513         // a more generic tag.
514         responseInfo.put("call", call);
515         LOG.warn("(response" + tag + "): " +
516             mapper.writeValueAsString(responseInfo));
517       }
518     }
519     protected static void log(String value, Log LOG) {
520       String v = value;
521       if (v != null && v.length() > 55)
522         v = v.substring(0, 55)+"...";
523       LOG.info(v);
524     }
525   }
526 }