View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import java.lang.reflect.Proxy;
24  import java.lang.reflect.Method;
25  import java.lang.reflect.Array;
26  import java.lang.reflect.InvocationHandler;
27  import java.lang.reflect.InvocationTargetException;
28  
29  import java.net.InetSocketAddress;
30  import java.io.*;
31  import java.util.Map;
32  import java.util.HashMap;
33  
34  import javax.net.SocketFactory;
35  
36  import org.apache.commons.logging.*;
37  
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.client.Operation;
40  import org.apache.hadoop.hbase.client.UserProvider;
41  import org.apache.hadoop.hbase.io.HbaseObjectWritable;
42  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
43  import org.apache.hadoop.hbase.regionserver.HRegionServer;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Objects;
46  import org.apache.hadoop.io.*;
47  import org.apache.hadoop.ipc.RPC;
48  import org.apache.hadoop.hbase.ipc.VersionedProtocol;
49  import org.apache.hadoop.hbase.security.User;
50  import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
51  import org.apache.hadoop.conf.*;
52  
53  import org.codehaus.jackson.map.ObjectMapper;
54  
55  /** An RpcEngine implementation for Writable data. */
56  class WritableRpcEngine implements RpcEngine {
57    // LOG is NOT in hbase subpackage intentionally so that the default HBase
58    // DEBUG log level does NOT emit RPC-level logging. 
59    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
60  
61    private static class Invoker implements InvocationHandler {
62      private Class<? extends VersionedProtocol> protocol;
63      private InetSocketAddress address;
64      private User ticket;
65      private HBaseClient client;
66      final private int rpcTimeout;
67  
68      public Invoker(HBaseClient client,
69                     Class<? extends VersionedProtocol> protocol,
70                     InetSocketAddress address, User ticket,
71                     Configuration conf, int rpcTimeout) {
72        this.protocol = protocol;
73        this.address = address;
74        this.ticket = ticket;
75        this.client = client;
76        this.rpcTimeout = rpcTimeout;
77      }
78  
79      public Object invoke(Object proxy, Method method, Object[] args)
80          throws Throwable {
81        final boolean logDebug = LOG.isDebugEnabled();
82        long startTime = 0;
83        if (logDebug) {
84          startTime = System.currentTimeMillis();
85        }
86  
87        HbaseObjectWritable value = (HbaseObjectWritable)
88          client.call(new Invocation(method, protocol, args), address,
89                      protocol, ticket, rpcTimeout);
90        if (logDebug) {
91          // FIGURE HOW TO TURN THIS OFF!
92          long callTime = System.currentTimeMillis() - startTime;
93          LOG.debug("Call: " + method.getName() + " " + callTime);
94        }
95        return value.get();
96      }
97    }
98  
99    private Configuration conf;
100   private HBaseClient client;
101   private UserProvider userProvider;
102 
103   @Override
104   public void setConf(Configuration config) {
105     this.conf = config;
106     // check for an already created client
107     if (this.client != null) {
108       this.client.stop();
109     }
110     this.client = new HBaseClient(HbaseObjectWritable.class, conf);
111     this.userProvider = UserProvider.instantiate(config);
112   }
113 
114   @Override
115   public Configuration getConf() {
116     return conf;
117   }
118 
119   /** Construct a client-side proxy object that implements the named protocol,
120    * talking to a server at the named address. */
121   @Override
122   public <T extends VersionedProtocol> T getProxy(
123       Class<T> protocol, long clientVersion,
124       InetSocketAddress addr, Configuration conf, int rpcTimeout)
125     throws IOException {
126     if (this.client == null) {
127       throw new IOException("Client must be initialized by calling setConf(Configuration)");
128     }
129 
130     T proxy =
131           (T) Proxy.newProxyInstance(
132               protocol.getClassLoader(), new Class[] { protocol },
133               new Invoker(client, protocol, addr, userProvider.getCurrent(), conf,
134                   HBaseRPC.getRpcTimeout(rpcTimeout)));
135 
136     /*
137      * TODO: checking protocol version only needs to be done once when we setup a new
138      * HBaseClient.Connection.  Doing it every time we retrieve a proxy instance is resulting
139      * in unnecessary RPC traffic.
140      */
141     long serverVersion = ((VersionedProtocol)proxy)
142       .getProtocolVersion(protocol.getName(), clientVersion);
143     if (serverVersion != clientVersion) {
144       throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
145                                     serverVersion);
146     }
147 
148     return proxy;
149   }
150 
151 
152 
153   /** Expert: Make multiple, parallel calls to a set of servers. */
154   @Override
155   public Object[] call(Method method, Object[][] params,
156                        InetSocketAddress[] addrs,
157                        Class<? extends VersionedProtocol> protocol,
158                        User ticket, Configuration conf)
159     throws IOException, InterruptedException {
160     if (this.client == null) {
161       throw new IOException("Client must be initialized by calling setConf(Configuration)");
162     }
163 
164     Invocation[] invocations = new Invocation[params.length];
165     for (int i = 0; i < params.length; i++) {
166       invocations[i] = new Invocation(method, protocol, params[i]);
167     }
168 
169     Writable[] wrappedValues =
170         client.call(invocations, addrs, protocol, ticket);
171 
172     if (method.getReturnType() == Void.TYPE) {
173       return null;
174     }
175 
176     Object[] values =
177         (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
178     for (int i = 0; i < values.length; i++) {
179       if (wrappedValues[i] != null) {
180         values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
181       }
182     }
183 
184     return values;
185   }
186 
187   @Override
188   public void close() {
189     if (this.client != null) {
190       this.client.stop();
191     }
192   }
193 
194   /** Construct a server for a protocol implementation instance listening on a
195    * port and address. */
196   public Server getServer(Class<? extends VersionedProtocol> protocol,
197                           Object instance,
198                           Class<?>[] ifaces,
199                           String bindAddress, int port,
200                           int numHandlers,
201                           int metaHandlerCount, boolean verbose,
202                           Configuration conf, int highPriorityLevel)
203     throws IOException {
204     return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
205         metaHandlerCount, verbose, highPriorityLevel);
206   }
207 
208   /** An RPC Server. */
209   public static class Server extends HBaseServer {
210     private Object instance;
211     private Class<?> implementation;
212     private Class<?>[] ifaces;
213     private boolean verbose;
214     private boolean authorize = false;
215 
216     // for JSON encoding
217     private static ObjectMapper mapper = new ObjectMapper();
218 
219     private static final String WARN_RESPONSE_TIME =
220       "hbase.ipc.warn.response.time";
221     private static final String WARN_RESPONSE_SIZE =
222       "hbase.ipc.warn.response.size";
223 
224     /** Default value for above params */
225     private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
226     private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
227 
228     /** Names for suffixed metrics */
229     private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
230 
231     private final int warnResponseTime;
232     private final int warnResponseSize;
233 
234     private static String classNameBase(String className) {
235       String[] names = className.split("\\.", -1);
236       if (names == null || names.length == 0) {
237         return className;
238       }
239       return names[names.length-1];
240     }
241 
242     /** Construct an RPC server.
243      * @param instance the instance whose methods will be called
244      * @param conf the configuration to use
245      * @param bindAddress the address to bind on to listen for connection
246      * @param port the port to listen for connections on
247      * @param numHandlers the number of method handler threads to run
248      * @param verbose whether each call should be logged
249      * @throws IOException e
250      */
251     public Server(Object instance, final Class<?>[] ifaces,
252                   Configuration conf, String bindAddress,  int port,
253                   int numHandlers, int metaHandlerCount, boolean verbose,
254                   int highPriorityLevel) throws IOException {
255       super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount,
256           conf, classNameBase(instance.getClass().getName()),
257           highPriorityLevel);
258       this.instance = instance;
259       this.implementation = instance.getClass();
260       this.verbose = verbose;
261 
262       this.ifaces = ifaces;
263 
264       // create metrics for the advertised interfaces this server implements.
265       String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
266       this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes);
267 
268       this.authorize =
269         conf.getBoolean(
270             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false);
271 
272       this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
273           DEFAULT_WARN_RESPONSE_TIME);
274       this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
275           DEFAULT_WARN_RESPONSE_SIZE);
276     }
277 
278     @Override
279     public Writable call(Class<? extends VersionedProtocol> protocol,
280         Writable param, long receivedTime, MonitoredRPCHandler status)
281     throws IOException {
282       try {
283         Invocation call = (Invocation)param;
284         if(call.getMethodName() == null) {
285           throw new IOException("Could not find requested method, the usual " +
286               "cause is a version mismatch between client and server.");
287         }
288         if (verbose) log("Call: " + call);
289         status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
290         status.setRPCPacket(param);
291         status.resume("Servicing call");
292 
293         Method method =
294           protocol.getMethod(call.getMethodName(),
295                                    call.getParameterClasses());
296         method.setAccessible(true);
297 
298         //Verify protocol version.
299         //Bypass the version check for VersionedProtocol
300         if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
301           long clientVersion = call.getProtocolVersion();
302           ProtocolSignature serverInfo = ((VersionedProtocol) instance)
303               .getProtocolSignature(protocol.getCanonicalName(), call
304                   .getProtocolVersion(), call.getClientMethodsHash());
305           long serverVersion = serverInfo.getVersion();
306           if (serverVersion != clientVersion) {
307             LOG.warn("Version mismatch: client version=" + clientVersion
308                 + ", server version=" + serverVersion);
309             throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
310                 serverVersion);
311           }
312         }
313         Object impl = null;
314         if (protocol.isAssignableFrom(this.implementation)) {
315           impl = this.instance;
316         }
317         else {
318           throw new HBaseRPC.UnknownProtocolException(protocol);
319         }
320 
321         long startTime = System.currentTimeMillis();
322         Object[] params = call.getParameters();
323         Object value = method.invoke(impl, params);
324         int processingTime = (int) (System.currentTimeMillis() - startTime);
325         int qTime = (int) (startTime-receivedTime);
326         if (TRACELOG.isDebugEnabled()) {
327           TRACELOG.debug("Call #" + CurCall.get().id +
328               "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
329               " queueTime=" + qTime +
330               " processingTime=" + processingTime +
331               " contents=" + Objects.describeQuantity(params));
332         }
333         rpcMetrics.rpcQueueTime.inc(qTime);
334         rpcMetrics.rpcProcessingTime.inc(processingTime);
335         rpcMetrics.inc(call.getMethodName(), processingTime);
336         if (verbose) log("Return: "+value);
337 
338         HbaseObjectWritable retVal =
339           new HbaseObjectWritable(method.getReturnType(), value);
340         long responseSize = retVal.getWritableSize();
341         // log any RPC responses that are slower than the configured warn
342         // response time or larger than configured warning size
343         boolean tooSlow = (processingTime > warnResponseTime
344             && warnResponseTime > -1);
345         boolean tooLarge = (responseSize > warnResponseSize
346             && warnResponseSize > -1);
347         if (tooSlow || tooLarge) {
348           // when tagging, we let TooLarge trump TooSmall to keep output simple
349           // note that large responses will often also be slow.
350           logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
351               status.getClient(), startTime, processingTime, qTime,
352               responseSize);
353           // provides a count of log-reported slow responses
354           if (tooSlow) {
355             rpcMetrics.rpcSlowResponseTime.inc(processingTime);
356           }
357         }
358         if (processingTime > 1000) {
359           // we use a hard-coded one second period so that we can clearly
360           // indicate the time period we're warning about in the name of the 
361           // metric itself
362           rpcMetrics.inc(call.getMethodName() + ABOVE_ONE_SEC_METRIC,
363               processingTime);
364         }
365 
366         return retVal;
367       } catch (InvocationTargetException e) {
368         Throwable target = e.getTargetException();
369         if (target instanceof IOException) {
370           throw (IOException)target;
371         }
372         IOException ioe = new IOException(target.toString());
373         ioe.setStackTrace(target.getStackTrace());
374         throw ioe;
375       } catch (Throwable e) {
376         if (!(e instanceof IOException)) {
377           LOG.error("Unexpected throwable object ", e);
378         }
379         IOException ioe = new IOException(e.toString());
380         ioe.setStackTrace(e.getStackTrace());
381         throw ioe;
382       }
383     }
384 
385     /**
386      * Logs an RPC response to the LOG file, producing valid JSON objects for
387      * client Operations.
388      * @param call The call to log.
389      * @param tag  The tag that will be used to indicate this event in the log.
390      * @param clientAddress   The address of the client who made this call.
391      * @param startTime       The time that the call was initiated, in ms.
392      * @param processingTime  The duration that the call took to run, in ms.
393      * @param qTime           The duration that the call spent on the queue 
394      *                        prior to being initiated, in ms.
395      * @param responseSize    The size in bytes of the response buffer.
396      */
397     private void logResponse(Invocation call, String tag, String clientAddress,
398         long startTime, int processingTime, int qTime, long responseSize)
399       throws IOException {
400       Object params[] = call.getParameters();
401       // for JSON encoding
402       ObjectMapper mapper = new ObjectMapper();
403       // base information that is reported regardless of type of call
404       Map<String, Object> responseInfo = new HashMap<String, Object>();
405       responseInfo.put("starttimems", startTime);
406       responseInfo.put("processingtimems", processingTime);
407       responseInfo.put("queuetimems", qTime);
408       responseInfo.put("responsesize", responseSize);
409       responseInfo.put("client", clientAddress);
410       responseInfo.put("class", instance.getClass().getSimpleName());
411       responseInfo.put("method", call.getMethodName());
412       if (params.length == 2 && instance instanceof HRegionServer &&
413           params[0] instanceof byte[] &&
414           params[1] instanceof Operation) {
415         // if the slow process is a query, we want to log its table as well 
416         // as its own fingerprint
417         byte [] tableName =
418           HRegionInfo.parseRegionName((byte[]) params[0])[0];
419         responseInfo.put("table", Bytes.toStringBinary(tableName));
420         // annotate the response map with operation details
421         responseInfo.putAll(((Operation) params[1]).toMap());
422         // report to the log file
423         LOG.warn("(operation" + tag + "): " +
424             mapper.writeValueAsString(responseInfo));
425       } else if (params.length == 1 && instance instanceof HRegionServer &&
426           params[0] instanceof Operation) {
427         // annotate the response map with operation details
428         responseInfo.putAll(((Operation) params[0]).toMap());
429         // report to the log file
430         LOG.warn("(operation" + tag + "): " +
431             mapper.writeValueAsString(responseInfo));
432       } else {
433         // can't get JSON details, so just report call.toString() along with 
434         // a more generic tag.
435         responseInfo.put("call", call.toString());
436         LOG.warn("(response" + tag + "): " +
437             mapper.writeValueAsString(responseInfo));
438       }
439     }
440   }
441 
442   protected static void log(String value) {
443     String v = value;
444     if (v != null && v.length() > 55)
445       v = v.substring(0, 55)+"...";
446     LOG.info(v);
447   }
448 }