View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.DoNotRetryIOException;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
29  import org.apache.hadoop.hbase.security.User;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.net.NetUtils;
32  import org.apache.hadoop.util.ReflectionUtils;
33  import javax.net.SocketFactory;
34  import java.io.IOException;
35  import java.lang.reflect.Method;
36  import java.lang.reflect.Proxy;
37  import java.net.ConnectException;
38  import java.net.InetSocketAddress;
39  import java.net.SocketTimeoutException;
40  import java.util.HashMap;
41  import java.util.Map;
42  
43  /** A simple RPC mechanism.
44   *
45   * This is a local hbase copy of the hadoop RPC so we can do things like
46   * address HADOOP-414 for hbase-only and try other hbase-specific
47   * optimizations like using our own version of ObjectWritable.  Class has been
48   * renamed to avoid confusing it w/ hadoop versions.
49   * <p>
50   *
51   *
52   * A <i>protocol</i> is a Java interface.  All parameters and return types must
53   * be one of:
54   *
55   * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
56   * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
57   * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
58   *
59   * <li>a {@link String}; or</li>
60   *
61   * <li>a {@link Writable}; or</li>
62   *
63   * <li>an array of the above types</li> </ul>
64   *
65   * All methods in the protocol should throw only IOException.  No field data of
66   * the protocol instance is transmitted.
67   */
68  public class HBaseRPC {
69    // Leave this out in the hadoop ipc package but keep class name.  Do this
70    // so that we dont' get the logging of this class's invocations by doing our
71    // blanket enabling DEBUG on the o.a.h.h. package.
72    protected static final Log LOG =
73      LogFactory.getLog("org.apache.hadoop.ipc.HBaseRPC");
74  
75    private HBaseRPC() {
76      super();
77    }                                  // no public ctor
78  
79    /**
80     * Configuration key for the {@link RpcEngine} implementation to load to
81     * handle connection protocols.  Handlers for individual protocols can be
82     * configured using {@code "hbase.rpc.engine." + protocol.class.name}.
83     */
84    public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
85  
86    // thread-specific RPC timeout, which may override that of RpcEngine
87    private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
88      @Override
89        protected Integer initialValue() {
90          return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
91        }
92      };
93  
94    /**
95     * Returns a new instance of the configured {@link RpcEngine} implementation.
96     */
97    public static synchronized RpcEngine getProtocolEngine(Configuration conf) {
98      // check for a configured default engine
99      Class<?> impl =
100         conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
101 
102     LOG.debug("Using RpcEngine: "+impl.getName());
103     RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
104     return engine;
105   }
106 
107   /**
108    * A version mismatch for the RPC protocol.
109    */
110   @SuppressWarnings("serial")
111   public static class VersionMismatch extends IOException {
112     private static final long serialVersionUID = 0;
113     private String interfaceName;
114     private long clientVersion;
115     private long serverVersion;
116 
117     /**
118      * Create a version mismatch exception
119      * @param interfaceName the name of the protocol mismatch
120      * @param clientVersion the client's version of the protocol
121      * @param serverVersion the server's version of the protocol
122      */
123     public VersionMismatch(String interfaceName, long clientVersion,
124                            long serverVersion) {
125       super("Protocol " + interfaceName + " version mismatch. (client = " +
126             clientVersion + ", server = " + serverVersion + ")");
127       this.interfaceName = interfaceName;
128       this.clientVersion = clientVersion;
129       this.serverVersion = serverVersion;
130     }
131 
132     /**
133      * Get the interface name
134      * @return the java class name
135      *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
136      */
137     public String getInterfaceName() {
138       return interfaceName;
139     }
140 
141     /**
142      * @return the client's preferred version
143      */
144     public long getClientVersion() {
145       return clientVersion;
146     }
147 
148     /**
149      * @return the server's agreed to version.
150      */
151     public long getServerVersion() {
152       return serverVersion;
153     }
154   }
155 
156   /**
157    * An error requesting an RPC protocol that the server is not serving.
158    */
159   public static class UnknownProtocolException extends DoNotRetryIOException {
160     private Class<?> protocol;
161 
162     public UnknownProtocolException(String mesg) {
163       // required for unwrapping from a RemoteException
164       super(mesg);
165     }
166 
167     public UnknownProtocolException(Class<?> protocol) {
168       this(protocol, "Server is not handling protocol "+protocol.getName());
169     }
170 
171     public UnknownProtocolException(Class<?> protocol, String mesg) {
172       super(mesg);
173       this.protocol = protocol;
174     }
175 
176     public Class getProtocol() {
177       return protocol;
178     }
179   }
180 
181   /**
182    * @param protocol protocol interface
183    * @param clientVersion which client version we expect
184    * @param addr address of remote service
185    * @param conf configuration
186    * @param maxAttempts max attempts
187    * @param rpcTimeout timeout for each RPC
188    * @param timeout timeout in milliseconds
189    * @return proxy
190    * @throws IOException e
191    */
192   @SuppressWarnings("unchecked")
193   public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient,
194                                                Class<T> protocol,
195                                                long clientVersion,
196                                                InetSocketAddress addr,
197                                                Configuration conf,
198                                                int maxAttempts,
199                                                int rpcTimeout,
200                                                long timeout
201   ) throws IOException {
202     // HBase does limited number of reconnects which is different from hadoop.
203     long startTime = System.currentTimeMillis();
204     IOException ioe;
205     int reconnectAttempts = 0;
206     while (true) {
207       try {
208         return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
209       } catch(SocketTimeoutException te) {  // namenode is busy
210         LOG.info("Problem connecting to server: " + addr);
211         ioe = te;
212       } catch (IOException ioex) {
213         // We only handle the ConnectException.
214         ConnectException ce = null;
215         if (ioex instanceof ConnectException) {
216           ce = (ConnectException) ioex;
217           ioe = ce;
218         } else if (ioex.getCause() != null
219             && ioex.getCause() instanceof ConnectException) {
220           ce = (ConnectException) ioex.getCause();
221           ioe = ce;
222         } else if (ioex.getMessage().toLowerCase()
223             .contains("connection refused")) {
224           ce = new ConnectException(ioex.getMessage());
225           ioe = ce;
226         } else {
227           // This is the exception we can't handle.
228           ioe = ioex;
229         }
230         if (ce != null) {
231           handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
232               addr, ce);
233         }
234       }
235       // check if timed out
236       if (System.currentTimeMillis() - timeout >= startTime) {
237         throw ioe;
238       }
239 
240       // wait for retry
241       try {
242         Thread.sleep(1000);
243       } catch (InterruptedException ie) {
244         // IGNORE
245       }
246     }
247   }
248 
249   /**
250    * @param retries current retried times.
251    * @param maxAttmpts max attempts
252    * @param protocol protocol interface
253    * @param addr address of remote service
254    * @param ce ConnectException
255    * @throws RetriesExhaustedException
256    */
257   private static void handleConnectionException(int retries, int maxAttmpts,
258       Class<?> protocol, InetSocketAddress addr, ConnectException ce)
259       throws RetriesExhaustedException {
260     if (maxAttmpts >= 0 && retries >= maxAttmpts) {
261       LOG.info("Server at " + addr + " could not be reached after "
262           + maxAttmpts + " tries, giving up.");
263       throw new RetriesExhaustedException("Failed setting up proxy " + protocol
264           + " to " + addr.toString() + " after attempts=" + maxAttmpts, ce);
265     }
266   }
267   
268   /**
269    * Expert: Make multiple, parallel calls to a set of servers.
270    *
271    * @param method method to invoke
272    * @param params array of parameters
273    * @param addrs array of addresses
274    * @param conf configuration
275    * @return values
276    * @throws IOException e
277    * @deprecated Instead of calling statically, use
278    *     {@link HBaseRPC#getProtocolEngine(org.apache.hadoop.conf.Configuration)}
279    *     to obtain an {@link RpcEngine} instance and then use
280    *     {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
281    */
282   @Deprecated
283   public static Object[] call(Method method, Object[][] params,
284       InetSocketAddress[] addrs,
285       Class<? extends VersionedProtocol> protocol,
286       User ticket,
287       Configuration conf)
288     throws IOException, InterruptedException {
289     Object[] result = null;
290     RpcEngine engine = null;
291     try {
292       engine = getProtocolEngine(conf);
293       result = engine.call(method, params, addrs, protocol, ticket, conf);
294     } finally {
295       engine.close();
296     }
297     return result;
298   }
299 
300   /**
301    * Construct a server for a protocol implementation instance listening on a
302    * port and address.
303    *
304    * @param instance instance
305    * @param bindAddress bind address
306    * @param port port to bind to
307    * @param numHandlers number of handlers to start
308    * @param verbose verbose flag
309    * @param conf configuration
310    * @return Server
311    * @throws IOException e
312    */
313   public static RpcServer getServer(final Object instance,
314                                  final Class<?>[] ifaces,
315                                  final String bindAddress, final int port,
316                                  final int numHandlers,
317                                  int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
318     throws IOException {
319     return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
320   }
321 
322   /** Construct a server for a protocol implementation instance. */
323   public static RpcServer getServer(Class protocol,
324                                  final Object instance,
325                                  final Class<?>[] ifaces, String bindAddress,
326                                  int port,
327                                  final int numHandlers,
328                                  int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
329     throws IOException {
330     return getProtocolEngine(conf)
331         .getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
332   }
333 
334   public static void setRpcTimeout(int rpcTimeout) {
335     HBaseRPC.rpcTimeout.set(rpcTimeout);
336   }
337 
338   public static int getRpcTimeout() {
339     return HBaseRPC.rpcTimeout.get();
340   }
341 
342   public static void resetRpcTimeout() {
343     HBaseRPC.rpcTimeout.remove();
344   }
345 
346   /**
347    * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
348    * default timeout.
349    */
350   public static int getRpcTimeout(int defaultTimeout) {
351     return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get());
352   }
353 }