View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.DoNotRetryIOException;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
29  import org.apache.hadoop.hbase.security.User;
30  import org.apache.hadoop.net.NetUtils;
31  import org.apache.hadoop.util.ReflectionUtils;
32  
33  import javax.net.SocketFactory;
34  import java.io.IOException;
35  import java.lang.reflect.Field;
36  import java.lang.reflect.Proxy;
37  import java.net.ConnectException;
38  import java.net.InetSocketAddress;
39  import java.net.SocketTimeoutException;
40  import java.util.HashMap;
41  import java.util.Map;
42  
43  /** A simple RPC mechanism.
44   *
45   * This is a local hbase copy of the hadoop RPC so we can do things like
46   * address HADOOP-414 for hbase-only and try other hbase-specific
47   * optimizations.  Class has been renamed to avoid confusing it w/ hadoop
48   * versions.
49   * <p>
50   *
51   *
52   * A <i>protocol</i> is a Java interface.  All parameters and return types must
53   * be Protobuf objects.
54   * All methods in the protocol should throw only IOException.  No field data of
55   * the protocol instance is transmitted.
56   */
57  @InterfaceAudience.Private
58  public class HBaseRPC {
59    // Leave this out in the hadoop ipc package but keep class name.  Do this
60    // so that we dont' get the logging of this class's invocations by doing our
61    // blanket enabling DEBUG on the o.a.h.h. package.
62    protected static final Log LOG =
63      LogFactory.getLog("org.apache.hadoop.ipc.HBaseRPC");
64  
65    private HBaseRPC() {
66      super();
67    }                                  // no public ctor
68  
69    /**
70     * Configuration key for the {@link RpcEngine} implementation to load to
71     * handle connection protocols.  Handlers for individual protocols can be
72     * configured using {@code "hbase.rpc.engine." + protocol.class.name}.
73     */
74    public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
75  
76    // cache of RpcEngines by protocol
77    private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
78      = new HashMap<Class,RpcEngine>();
79  
80    // track what RpcEngine is used by a proxy class, for stopProxy()
81    private static final Map<Class,RpcEngine> PROXY_ENGINES
82      = new HashMap<Class,RpcEngine>();
83  
84    // thread-specific RPC timeout, which may override that of RpcEngine
85    private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
86      @Override
87        protected Integer initialValue() {
88          return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
89        }
90      };
91  
92    static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
93        throws NoSuchFieldException, IllegalAccessException {
94      Field versionField = protocol.getField("VERSION");
95      versionField.setAccessible(true);
96      return versionField.getLong(protocol);
97    }
98  
99    // set a protocol to use a non-default RpcEngine
100   static void setProtocolEngine(Configuration conf,
101                                 Class protocol, Class engine) {
102     conf.setClass(RPC_ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
103   }
104 
105   // return the RpcEngine configured to handle a protocol
106   private static synchronized RpcEngine getProtocolEngine(Class protocol,
107                                                           Configuration conf) {
108     RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
109     if (engine == null) {
110       // check for a configured default engine
111       Class<?> defaultEngine =
112           conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
113 
114       // check for a per interface override
115       Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
116                                     defaultEngine);
117       LOG.debug("Using "+impl.getName()+" for "+protocol.getName());
118       engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
119       if (protocol.isInterface())
120         PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
121                                               protocol),
122                           engine);
123       PROTOCOL_ENGINES.put(protocol, engine);
124     }
125     return engine;
126   }
127 
128   // return the RpcEngine that handles a proxy object
129   private static synchronized RpcEngine getProxyEngine(Object proxy) {
130     return PROXY_ENGINES.get(proxy.getClass());
131   }
132 
133   /**
134    * A version mismatch for the RPC protocol.
135    */
136   public static class VersionMismatch extends IOException {
137     private static final long serialVersionUID = 0;
138     private String interfaceName;
139     private long clientVersion;
140     private long serverVersion;
141 
142     /**
143      * Create a version mismatch exception
144      * @param interfaceName the name of the protocol mismatch
145      * @param clientVersion the client's version of the protocol
146      * @param serverVersion the server's version of the protocol
147      */
148     public VersionMismatch(String interfaceName, long clientVersion,
149                            long serverVersion) {
150       super("Protocol " + interfaceName + " version mismatch. (client = " +
151             clientVersion + ", server = " + serverVersion + ")");
152       this.interfaceName = interfaceName;
153       this.clientVersion = clientVersion;
154       this.serverVersion = serverVersion;
155     }
156 
157     /**
158      * Get the interface name
159      * @return the java class name
160      *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
161      */
162     public String getInterfaceName() {
163       return interfaceName;
164     }
165 
166     /**
167      * @return the client's preferred version
168      */
169     public long getClientVersion() {
170       return clientVersion;
171     }
172 
173     /**
174      * @return the server's agreed to version.
175      */
176     public long getServerVersion() {
177       return serverVersion;
178     }
179   }
180 
181   /**
182    * An error requesting an RPC protocol that the server is not serving.
183    */
184   public static class UnknownProtocolException extends DoNotRetryIOException {
185     private Class<?> protocol;
186 
187     public UnknownProtocolException(String mesg) {
188       // required for unwrapping from a RemoteException
189       super(mesg);
190     }
191 
192     public UnknownProtocolException(Class<?> protocol) {
193       this(protocol, "Server is not handling protocol "+protocol.getName());
194     }
195 
196     public UnknownProtocolException(Class<?> protocol, String mesg) {
197       super(mesg);
198       this.protocol = protocol;
199     }
200 
201     public Class getProtocol() {
202       return protocol;
203     }
204   }
205 
206   /**
207    * @param protocol protocol interface
208    * @param clientVersion which client version we expect
209    * @param addr address of remote service
210    * @param conf configuration
211    * @param maxAttempts max attempts
212    * @param rpcTimeout timeout for each RPC
213    * @param timeout timeout in milliseconds
214    * @return proxy
215    * @throws IOException e
216    */
217   @SuppressWarnings("unchecked")
218   public static VersionedProtocol waitForProxy(Class protocol,
219                                                long clientVersion,
220                                                InetSocketAddress addr,
221                                                Configuration conf,
222                                                int maxAttempts,
223                                                int rpcTimeout,
224                                                long timeout
225                                                ) throws IOException {
226     // HBase does limited number of reconnects which is different from hadoop.
227     long startTime = System.currentTimeMillis();
228     IOException ioe;
229     int reconnectAttempts = 0;
230     while (true) {
231       try {
232         return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
233       } catch(SocketTimeoutException te) {  // namenode is busy
234         LOG.info("Problem connecting to server: " + addr);
235         ioe = te;
236       } catch (IOException ioex) {
237         // We only handle the ConnectException.
238         ConnectException ce = null;
239         if (ioex instanceof ConnectException) {
240           ce = (ConnectException) ioex;
241           ioe = ce;
242         } else if (ioex.getCause() != null
243             && ioex.getCause() instanceof ConnectException) {
244           ce = (ConnectException) ioex.getCause();
245           ioe = ce;
246         } else if (ioex.getMessage().toLowerCase()
247             .contains("connection refused")) {
248           ce = new ConnectException(ioex.getMessage());
249           ioe = ce;
250         } else {
251           // This is the exception we can't handle.
252           ioe = ioex;
253         }
254         if (ce != null) {
255           handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
256               addr, ce);
257         }
258       }
259       // check if timed out
260       if (System.currentTimeMillis() - timeout >= startTime) {
261         throw ioe;
262       }
263 
264       // wait for retry
265       try {
266         Thread.sleep(1000);
267       } catch (InterruptedException ie) {
268         // IGNORE
269       }
270     }
271   }
272 
273   /**
274    * @param retries current retried times.
275    * @param maxAttmpts max attempts
276    * @param protocol protocol interface
277    * @param addr address of remote service
278    * @param ce ConnectException
279    * @throws RetriesExhaustedException
280    */
281   private static void handleConnectionException(int retries, int maxAttmpts,
282       Class<?> protocol, InetSocketAddress addr, ConnectException ce)
283       throws RetriesExhaustedException {
284     if (maxAttmpts >= 0 && retries >= maxAttmpts) {
285       LOG.info("Server at " + addr + " could not be reached after "
286           + maxAttmpts + " tries, giving up.");
287       throw new RetriesExhaustedException("Failed setting up proxy " + protocol
288           + " to " + addr.toString() + " after attempts=" + maxAttmpts, ce);
289     }
290   }
291   
292   /**
293    * Construct a client-side proxy object that implements the named protocol,
294    * talking to a server at the named address.
295    *
296    * @param protocol interface
297    * @param clientVersion version we are expecting
298    * @param addr remote address
299    * @param conf configuration
300    * @param factory socket factory
301    * @param rpcTimeout timeout for each RPC
302    * @return proxy
303    * @throws IOException e
304    */
305   public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
306       long clientVersion, InetSocketAddress addr, Configuration conf,
307       SocketFactory factory, int rpcTimeout) throws IOException {
308     return getProxy(protocol, clientVersion, addr,
309         User.getCurrent(), conf, factory, rpcTimeout);
310   }
311 
312   /**
313    * Construct a client-side proxy object that implements the named protocol,
314    * talking to a server at the named address.
315    *
316    * @param protocol interface
317    * @param clientVersion version we are expecting
318    * @param addr remote address
319    * @param ticket ticket
320    * @param conf configuration
321    * @param factory socket factory
322    * @param rpcTimeout timeout for each RPC
323    * @return proxy
324    * @throws IOException e
325    */
326   public static VersionedProtocol getProxy(
327       Class<? extends VersionedProtocol> protocol,
328       long clientVersion, InetSocketAddress addr, User ticket,
329       Configuration conf, SocketFactory factory, int rpcTimeout)
330   throws IOException {
331     RpcEngine engine = getProtocolEngine(protocol,conf);
332     VersionedProtocol proxy = engine
333             .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
334                 Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
335     return proxy;
336   }
337 
338   /**
339    * Construct a client-side proxy object with the default SocketFactory
340    *
341    * @param protocol interface
342    * @param clientVersion version we are expecting
343    * @param addr remote address
344    * @param conf configuration
345    * @param rpcTimeout timeout for each RPC
346    * @return a proxy instance
347    * @throws IOException e
348    */
349   public static VersionedProtocol getProxy(
350       Class<? extends VersionedProtocol> protocol,
351       long clientVersion, InetSocketAddress addr, Configuration conf,
352       int rpcTimeout)
353       throws IOException {
354 
355     return getProxy(protocol, clientVersion, addr, conf, NetUtils
356         .getDefaultSocketFactory(conf), rpcTimeout);
357   }
358 
359   /**
360    * Stop this proxy and release its invoker's resource
361    * @param proxy the proxy to be stopped
362    */
363   public static void stopProxy(VersionedProtocol proxy) {
364     if (proxy!=null) {
365       getProxyEngine(proxy).stopProxy(proxy);
366     }
367   }
368 
369   /**
370    * Construct a server for a protocol implementation instance listening on a
371    * port and address.
372    *
373    * @param instance instance
374    * @param bindAddress bind address
375    * @param port port to bind to
376    * @param numHandlers number of handlers to start
377    * @param verbose verbose flag
378    * @param conf configuration
379    * @return Server
380    * @throws IOException e
381    */
382   public static RpcServer getServer(final Object instance,
383                                  final Class<?>[] ifaces,
384                                  final String bindAddress, final int port,
385                                  final int numHandlers,
386                                  int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
387     throws IOException {
388     return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
389   }
390 
391   /** Construct a server for a protocol implementation instance. */
392   public static RpcServer getServer(Class protocol,
393                                  final Object instance,
394                                  final Class<?>[] ifaces, String bindAddress,
395                                  int port,
396                                  final int numHandlers,
397                                  int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
398     throws IOException {
399     return getProtocolEngine(protocol, conf)
400         .getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
401   }
402 
403   public static void setRpcTimeout(int rpcTimeout) {
404     HBaseRPC.rpcTimeout.set(rpcTimeout);
405   }
406 
407   public static int getRpcTimeout() {
408     return HBaseRPC.rpcTimeout.get();
409   }
410 
411   public static void resetRpcTimeout() {
412     HBaseRPC.rpcTimeout.remove();
413   }
414 }