View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.BlockingRpcChannel;
23  import com.google.protobuf.Descriptors;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  
28  import java.io.IOException;
29  import java.net.ConnectException;
30  import java.net.InetSocketAddress;
31  import java.net.SocketAddress;
32  import java.net.SocketTimeoutException;
33  import java.net.UnknownHostException;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.codec.Codec;
43  import org.apache.hadoop.hbase.codec.KeyValueCodec;
44  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
45  import org.apache.hadoop.hbase.security.User;
46  import org.apache.hadoop.hbase.security.UserProvider;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.PoolMap;
50  import org.apache.hadoop.io.compress.CompressionCodec;
51  
52  /**
53   * Provides the basics for a RpcClient implementation like configuration and Logging.
54   */
55  @InterfaceAudience.Private
56  public abstract class AbstractRpcClient implements RpcClient {
57    public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
58  
59    protected final Configuration conf;
60    protected String clusterId;
61    protected final SocketAddress localAddr;
62  
63    protected UserProvider userProvider;
64    protected final IPCUtil ipcUtil;
65  
66    protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
67    // time (in ms), it will be closed at any moment.
68    protected final int maxRetries; //the max. no. of retries for socket connections
69    protected final long failureSleep; // Time to sleep before retry on failure.
70    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
71    protected final boolean tcpKeepAlive; // if T then use keepalives
72    protected final Codec codec;
73    protected final CompressionCodec compressor;
74    protected final boolean fallbackAllowed;
75  
76    protected final int connectTO;
77    protected final int readTO;
78    protected final int writeTO;
79  
80    /**
81     * Construct an IPC client for the cluster <code>clusterId</code>
82     *
83     * @param conf configuration
84     * @param clusterId the cluster id
85     * @param localAddr client socket bind address.
86     */
87    public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
88      this.userProvider = UserProvider.instantiate(conf);
89      this.localAddr = localAddr;
90      this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
91      this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
92      this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
93          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
94      this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
95      this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
96      this.ipcUtil = new IPCUtil(conf);
97  
98      this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
99      this.conf = conf;
100     this.codec = getCodec();
101     this.compressor = getCompressor(conf);
102     this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
103         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
104     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
105     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
106     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
107 
108     // login the server principal (if using secure Hadoop)
109     if (LOG.isDebugEnabled()) {
110       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
111           ", tcpKeepAlive=" + this.tcpKeepAlive +
112           ", tcpNoDelay=" + this.tcpNoDelay +
113           ", connectTO=" + this.connectTO +
114           ", readTO=" + this.readTO +
115           ", writeTO=" + this.writeTO +
116           ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
117           ", maxRetries=" + this.maxRetries +
118           ", fallbackAllowed=" + this.fallbackAllowed +
119           ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
120     }
121   }
122 
123   @VisibleForTesting
124   public static String getDefaultCodec(final Configuration c) {
125     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
126     // Configuration will complain -- then no default codec (and we'll pb everything).  Else
127     // default is KeyValueCodec
128     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
129   }
130 
131   /**
132    * Encapsulate the ugly casting and RuntimeException conversion in private method.
133    * @return Codec to use on this client.
134    */
135   Codec getCodec() {
136     // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
137     // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
138     String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
139     if (className == null || className.length() == 0) return null;
140     try {
141       return (Codec)Class.forName(className).newInstance();
142     } catch (Exception e) {
143       throw new RuntimeException("Failed getting codec " + className, e);
144     }
145   }
146 
147   @Override
148   public boolean hasCellBlockSupport() {
149     return this.codec != null;
150   }
151 
152   /**
153    * Encapsulate the ugly casting and RuntimeException conversion in private method.
154    * @param conf configuration
155    * @return The compressor to use on this client.
156    */
157   private static CompressionCodec getCompressor(final Configuration conf) {
158     String className = conf.get("hbase.client.rpc.compressor", null);
159     if (className == null || className.isEmpty()) return null;
160     try {
161         return (CompressionCodec)Class.forName(className).newInstance();
162     } catch (Exception e) {
163       throw new RuntimeException("Failed getting compressor " + className, e);
164     }
165   }
166 
167   /**
168    * Return the pool type specified in the configuration, which must be set to
169    * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
170    * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
171    * otherwise default to the former.
172    *
173    * For applications with many user threads, use a small round-robin pool. For
174    * applications with few user threads, you may want to try using a
175    * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
176    * instances should not exceed the operating system's hard limit on the number of
177    * connections.
178    *
179    * @param config configuration
180    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
181    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
182    */
183   protected static PoolMap.PoolType getPoolType(Configuration config) {
184     return PoolMap.PoolType
185         .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
186             PoolMap.PoolType.ThreadLocal);
187   }
188 
189   /**
190    * Return the pool size specified in the configuration, which is applicable only if
191    * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
192    *
193    * @param config configuration
194    * @return the maximum pool size
195    */
196   protected static int getPoolSize(Configuration config) {
197     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
198   }
199 
200   /**
201    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
202    * threw an exception.
203    *
204    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
205    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
206    *               will be a
207    *               new Connection each time.
208    * @return A pair with the Message response and the Cell data (if any).
209    */
210   Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
211       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
212       throws ServiceException {
213     if (pcrc == null) {
214       pcrc = new PayloadCarryingRpcController();
215     }
216 
217     long startTime = 0;
218     if (LOG.isTraceEnabled()) {
219       startTime = EnvironmentEdgeManager.currentTime();
220     }
221     Pair<Message, CellScanner> val;
222     try {
223       val = call(pcrc, md, param, returnType, ticket, isa);
224       // Shove the results into controller so can be carried across the proxy/pb service void.
225       pcrc.setCellScanner(val.getSecond());
226 
227       if (LOG.isTraceEnabled()) {
228         long callTime = EnvironmentEdgeManager.currentTime() - startTime;
229         LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
230       }
231       return val.getFirst();
232     } catch (Throwable e) {
233       throw new ServiceException(e);
234     }
235   }
236 
237   /**
238    * Make a call, passing <code>param</code>, to the IPC server running at
239    * <code>address</code> which is servicing the <code>protocol</code> protocol,
240    * with the <code>ticket</code> credentials, returning the value.
241    * Throws exceptions if there are network problems or if the remote code
242    * threw an exception.
243    *
244    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
245    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
246    *               will be a
247    *               new Connection each time.
248    * @return A pair with the Message response and the Cell data (if any).
249    * @throws InterruptedException
250    * @throws java.io.IOException
251    */
252   protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
253       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
254       InetSocketAddress isa) throws IOException, InterruptedException;
255 
256   @Override
257   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
258       int defaultOperationTimeout) throws UnknownHostException {
259     return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
260   }
261 
262   /**
263    * Takes an Exception and the address we were trying to connect to and return an IOException with
264    * the input exception as the cause. The new exception provides the stack trace of the place where
265    * the exception is thrown and some extra diagnostics information. If the exception is
266    * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
267    * an IOException.
268    * @param addr target address
269    * @param exception the relevant exception
270    * @return an exception to throw
271    */
272   protected IOException wrapException(InetSocketAddress addr, Exception exception) {
273     if (exception instanceof ConnectException) {
274       // connection refused; include the host:port in the error
275       return (ConnectException) new ConnectException("Call to " + addr
276           + " failed on connection exception: " + exception).initCause(exception);
277     } else if (exception instanceof SocketTimeoutException) {
278       return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
279           + " failed because " + exception).initCause(exception);
280     } else if (exception instanceof ConnectionClosingException) {
281       return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
282           + " failed on local exception: " + exception).initCause(exception);
283     } else {
284       return (IOException) new IOException("Call to " + addr + " failed on local exception: "
285           + exception).initCause(exception);
286     }
287   }
288 
289   /**
290    * Blocking rpc channel that goes via hbase rpc.
291    */
292   @VisibleForTesting
293   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
294     private final InetSocketAddress isa;
295     private final AbstractRpcClient rpcClient;
296     private final User ticket;
297     private final int channelOperationTimeout;
298 
299     /**
300      * @param channelOperationTimeout - the default timeout when no timeout is given
301      */
302     protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
303         final ServerName sn, final User ticket, int channelOperationTimeout)
304         throws UnknownHostException {
305       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
306       if (this.isa.isUnresolved()) {
307         throw new UnknownHostException(sn.getHostname());
308       }
309       this.rpcClient = rpcClient;
310       this.ticket = ticket;
311       this.channelOperationTimeout = channelOperationTimeout;
312     }
313 
314     @Override
315     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
316         Message param, Message returnType) throws ServiceException {
317       PayloadCarryingRpcController pcrc;
318       if (controller != null && controller instanceof PayloadCarryingRpcController) {
319         pcrc = (PayloadCarryingRpcController) controller;
320         if (!pcrc.hasCallTimeout()) {
321           pcrc.setCallTimeout(channelOperationTimeout);
322         }
323       } else {
324         pcrc = new PayloadCarryingRpcController();
325         pcrc.setCallTimeout(channelOperationTimeout);
326       }
327 
328       return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
329     }
330   }
331 }