View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.BlockingRpcChannel;
23  import com.google.protobuf.Descriptors;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  
28  import java.io.IOException;
29  import java.net.ConnectException;
30  import java.net.InetSocketAddress;
31  import java.net.SocketAddress;
32  import java.net.SocketTimeoutException;
33  import java.net.UnknownHostException;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.client.MetricsConnection;
43  import org.apache.hadoop.hbase.codec.Codec;
44  import org.apache.hadoop.hbase.codec.KeyValueCodec;
45  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
46  import org.apache.hadoop.hbase.security.User;
47  import org.apache.hadoop.hbase.security.UserProvider;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.Pair;
50  import org.apache.hadoop.hbase.util.PoolMap;
51  import org.apache.hadoop.io.compress.CompressionCodec;
52  
53  /**
54   * Provides the basics for a RpcClient implementation like configuration and Logging.
55   */
56  @InterfaceAudience.Private
57  public abstract class AbstractRpcClient implements RpcClient {
58    // Log level is being changed in tests
59    public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
60  
61    protected final Configuration conf;
62    protected String clusterId;
63    protected final SocketAddress localAddr;
64    protected final MetricsConnection metrics;
65  
66    protected UserProvider userProvider;
67    protected final IPCUtil ipcUtil;
68  
69    protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
70    // time (in ms), it will be closed at any moment.
71    protected final int maxRetries; //the max. no. of retries for socket connections
72    protected final long failureSleep; // Time to sleep before retry on failure.
73    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
74    protected final boolean tcpKeepAlive; // if T then use keepalives
75    protected final Codec codec;
76    protected final CompressionCodec compressor;
77    protected final boolean fallbackAllowed;
78  
79    protected final int connectTO;
80    protected final int readTO;
81    protected final int writeTO;
82  
83    /**
84     * Construct an IPC client for the cluster <code>clusterId</code>
85     *
86     * @param conf configuration
87     * @param clusterId the cluster id
88     * @param localAddr client socket bind address.
89     * @param metrics the connection metrics
90     */
91    public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
92        MetricsConnection metrics) {
93      this.userProvider = UserProvider.instantiate(conf);
94      this.localAddr = localAddr;
95      this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
96      this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
97      this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
98          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
99      this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
100     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
101     this.ipcUtil = new IPCUtil(conf);
102 
103     this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
104     this.conf = conf;
105     this.codec = getCodec();
106     this.compressor = getCompressor(conf);
107     this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
108         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
109     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
110     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
111     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
112     this.metrics = metrics;
113 
114     // login the server principal (if using secure Hadoop)
115     if (LOG.isDebugEnabled()) {
116       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
117           ", tcpKeepAlive=" + this.tcpKeepAlive +
118           ", tcpNoDelay=" + this.tcpNoDelay +
119           ", connectTO=" + this.connectTO +
120           ", readTO=" + this.readTO +
121           ", writeTO=" + this.writeTO +
122           ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
123           ", maxRetries=" + this.maxRetries +
124           ", fallbackAllowed=" + this.fallbackAllowed +
125           ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
126     }
127   }
128 
129   @VisibleForTesting
130   public static String getDefaultCodec(final Configuration c) {
131     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
132     // Configuration will complain -- then no default codec (and we'll pb everything).  Else
133     // default is KeyValueCodec
134     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
135   }
136 
137   /**
138    * Encapsulate the ugly casting and RuntimeException conversion in private method.
139    * @return Codec to use on this client.
140    */
141   Codec getCodec() {
142     // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
143     // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
144     String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
145     if (className == null || className.length() == 0) return null;
146     try {
147       return (Codec)Class.forName(className).newInstance();
148     } catch (Exception e) {
149       throw new RuntimeException("Failed getting codec " + className, e);
150     }
151   }
152 
153   @Override
154   public boolean hasCellBlockSupport() {
155     return this.codec != null;
156   }
157 
158   /**
159    * Encapsulate the ugly casting and RuntimeException conversion in private method.
160    * @param conf configuration
161    * @return The compressor to use on this client.
162    */
163   private static CompressionCodec getCompressor(final Configuration conf) {
164     String className = conf.get("hbase.client.rpc.compressor", null);
165     if (className == null || className.isEmpty()) return null;
166     try {
167         return (CompressionCodec)Class.forName(className).newInstance();
168     } catch (Exception e) {
169       throw new RuntimeException("Failed getting compressor " + className, e);
170     }
171   }
172 
173   /**
174    * Return the pool type specified in the configuration, which must be set to
175    * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
176    * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
177    * otherwise default to the former.
178    *
179    * For applications with many user threads, use a small round-robin pool. For
180    * applications with few user threads, you may want to try using a
181    * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
182    * instances should not exceed the operating system's hard limit on the number of
183    * connections.
184    *
185    * @param config configuration
186    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
187    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
188    */
189   protected static PoolMap.PoolType getPoolType(Configuration config) {
190     return PoolMap.PoolType
191         .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
192             PoolMap.PoolType.ThreadLocal);
193   }
194 
195   /**
196    * Return the pool size specified in the configuration, which is applicable only if
197    * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
198    *
199    * @param config configuration
200    * @return the maximum pool size
201    */
202   protected static int getPoolSize(Configuration config) {
203     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
204   }
205 
206   /**
207    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
208    * threw an exception.
209    *
210    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
211    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
212    *               will be a
213    *               new Connection each time.
214    * @return A pair with the Message response and the Cell data (if any).
215    */
216   Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
217       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
218       throws ServiceException {
219     if (pcrc == null) {
220       pcrc = new PayloadCarryingRpcController();
221     }
222 
223     Pair<Message, CellScanner> val;
224     try {
225       final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
226       cs.setStartTime(EnvironmentEdgeManager.currentTime());
227       val = call(pcrc, md, param, returnType, ticket, isa, cs);
228       // Shove the results into controller so can be carried across the proxy/pb service void.
229       pcrc.setCellScanner(val.getSecond());
230 
231       cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
232       if (metrics != null) {
233         metrics.updateRpc(md, param, cs);
234       }
235       if (LOG.isTraceEnabled()) {
236         LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
237       }
238       return val.getFirst();
239     } catch (Throwable e) {
240       throw new ServiceException(e);
241     }
242   }
243 
244   /**
245    * Make a call, passing <code>param</code>, to the IPC server running at
246    * <code>address</code> which is servicing the <code>protocol</code> protocol,
247    * with the <code>ticket</code> credentials, returning the value.
248    * Throws exceptions if there are network problems or if the remote code
249    * threw an exception.
250    *
251    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
252    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
253    *               will be a
254    *               new Connection each time.
255    * @return A pair with the Message response and the Cell data (if any).
256    * @throws InterruptedException
257    * @throws java.io.IOException
258    */
259   protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
260       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
261       InetSocketAddress isa, MetricsConnection.CallStats callStats)
262       throws IOException, InterruptedException;
263 
264   @Override
265   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
266       int defaultOperationTimeout) throws UnknownHostException {
267     return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
268   }
269 
270   /**
271    * Takes an Exception and the address we were trying to connect to and return an IOException with
272    * the input exception as the cause. The new exception provides the stack trace of the place where
273    * the exception is thrown and some extra diagnostics information. If the exception is
274    * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
275    * an IOException.
276    * @param addr target address
277    * @param exception the relevant exception
278    * @return an exception to throw
279    */
280   protected IOException wrapException(InetSocketAddress addr, Exception exception) {
281     if (exception instanceof ConnectException) {
282       // connection refused; include the host:port in the error
283       return (ConnectException) new ConnectException("Call to " + addr
284           + " failed on connection exception: " + exception).initCause(exception);
285     } else if (exception instanceof SocketTimeoutException) {
286       return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
287           + " failed because " + exception).initCause(exception);
288     } else if (exception instanceof ConnectionClosingException) {
289       return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
290           + " failed on local exception: " + exception).initCause(exception);
291     } else {
292       return (IOException) new IOException("Call to " + addr + " failed on local exception: "
293           + exception).initCause(exception);
294     }
295   }
296 
297   /**
298    * Blocking rpc channel that goes via hbase rpc.
299    */
300   @VisibleForTesting
301   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
302     private final InetSocketAddress isa;
303     private final AbstractRpcClient rpcClient;
304     private final User ticket;
305     private final int channelOperationTimeout;
306 
307     /**
308      * @param channelOperationTimeout - the default timeout when no timeout is given
309      */
310     protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
311         final ServerName sn, final User ticket, int channelOperationTimeout)
312         throws UnknownHostException {
313       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
314       if (this.isa.isUnresolved()) {
315         throw new UnknownHostException(sn.getHostname());
316       }
317       this.rpcClient = rpcClient;
318       this.ticket = ticket;
319       this.channelOperationTimeout = channelOperationTimeout;
320     }
321 
322     @Override
323     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
324         Message param, Message returnType) throws ServiceException {
325       PayloadCarryingRpcController pcrc;
326       if (controller != null && controller instanceof PayloadCarryingRpcController) {
327         pcrc = (PayloadCarryingRpcController) controller;
328         if (!pcrc.hasCallTimeout()) {
329           pcrc.setCallTimeout(channelOperationTimeout);
330         }
331       } else {
332         pcrc = new PayloadCarryingRpcController();
333         pcrc.setCallTimeout(channelOperationTimeout);
334       }
335 
336       return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
337     }
338   }
339 }