View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import java.io.BufferedInputStream;
23  import java.io.BufferedOutputStream;
24  import java.io.Closeable;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.OutputStream;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.SocketTimeoutException;
35  import java.net.UnknownHostException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.HashMap;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.Map;
42  import java.util.Map.Entry;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.concurrent.ArrayBlockingQueue;
46  import java.util.concurrent.BlockingQueue;
47  import java.util.concurrent.ConcurrentSkipListMap;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.atomic.AtomicInteger;
50  
51  import javax.net.SocketFactory;
52  import javax.security.sasl.SaslException;
53  
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.CellScanner;
56  import org.apache.hadoop.hbase.DoNotRetryIOException;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.codec.Codec;
61  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
65  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
66  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
68  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
69  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
70  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
71  import org.apache.hadoop.hbase.security.AuthMethod;
72  import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
73  import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
74  import org.apache.hadoop.hbase.security.SecurityInfo;
75  import org.apache.hadoop.hbase.security.User;
76  import org.apache.hadoop.hbase.security.UserProvider;
77  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
78  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
79  import org.apache.hadoop.hbase.util.ExceptionUtil;
80  import org.apache.hadoop.hbase.util.Pair;
81  import org.apache.hadoop.hbase.util.PoolMap;
82  import org.apache.hadoop.io.IOUtils;
83  import org.apache.hadoop.io.Text;
84  import org.apache.hadoop.io.compress.CompressionCodec;
85  import org.apache.hadoop.ipc.RemoteException;
86  import org.apache.hadoop.net.NetUtils;
87  import org.apache.hadoop.security.SecurityUtil;
88  import org.apache.hadoop.security.UserGroupInformation;
89  import org.apache.hadoop.security.token.Token;
90  import org.apache.hadoop.security.token.TokenIdentifier;
91  import org.apache.hadoop.security.token.TokenSelector;
92  import org.apache.htrace.Span;
93  import org.apache.htrace.Trace;
94  import org.apache.htrace.TraceScope;
95  
96  import com.google.protobuf.Descriptors.MethodDescriptor;
97  import com.google.protobuf.Message;
98  import com.google.protobuf.Message.Builder;
99  import com.google.protobuf.RpcCallback;
100 
101 /**
102  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
103  * <p>See HBaseServer
104  */
105 @InterfaceAudience.Private
106 public class RpcClientImpl extends AbstractRpcClient {
107   protected final AtomicInteger callIdCnt = new AtomicInteger();
108 
109   protected final PoolMap<ConnectionId, Connection> connections;
110 
111   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
112 
113   protected final FailedServers failedServers;
114 
115   protected final SocketFactory socketFactory;           // how to create sockets
116 
117   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
118       TokenSelector<? extends TokenIdentifier>> tokenHandlers =
119       new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
120         TokenSelector<? extends TokenIdentifier>>();
121   static {
122     tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
123         new AuthenticationTokenSelector());
124   }
125 
126   /**
127    * Creates a connection. Can be overridden by a subclass for testing.
128    * @param remoteId - the ConnectionId to use for the connection creation.
129    */
130   protected Connection createConnection(ConnectionId remoteId, final Codec codec,
131       final CompressionCodec compressor)
132   throws IOException {
133     return new Connection(remoteId, codec, compressor);
134   }
135 
136   /**
137    * see {@link RpcClientImpl.Connection.CallSender}
138    */
139   private static class CallFuture {
140     final Call call;
141     final int priority;
142     final Span span;
143 
144     // We will use this to stop the writer
145     final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
146 
147     CallFuture(Call call, int priority, Span span) {
148       this.call = call;
149       this.priority = priority;
150       this.span = span;
151     }
152   }
153 
154   /** Thread that reads responses and notifies callers.  Each connection owns a
155    * socket connected to a remote address.  Calls are multiplexed through this
156    * socket: responses may be delivered out of order. */
157   protected class Connection extends Thread {
158     private ConnectionHeader header;              // connection header
159     protected ConnectionId remoteId;
160     protected Socket socket = null;                 // connected socket
161     protected DataInputStream in;
162     protected DataOutputStream out;
163     private Object outLock = new Object();
164     private InetSocketAddress server;             // server ip:port
165     private String serverPrincipal;  // server's krb5 principal name
166     private AuthMethod authMethod; // authentication method
167     private boolean useSasl;
168     private Token<? extends TokenIdentifier> token;
169     private HBaseSaslRpcClient saslRpcClient;
170     private int reloginMaxBackoff; // max pause before relogin on sasl failure
171     private final Codec codec;
172     private final CompressionCodec compressor;
173 
174     // currently active calls
175     protected final ConcurrentSkipListMap<Integer, Call> calls =
176       new ConcurrentSkipListMap<Integer, Call>();
177 
178     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
179     protected final CallSender callSender;
180 
181 
182     /**
183      * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
184      *  it gets into a java issue: an interruption during a write closes the socket/channel.
185      * A way to avoid this is to use a different thread for writing. This way, on interruptions,
186      *  we either cancel the writes or ignore the answer if the write is already done, but we
187      *  don't stop the write in the middle.
188      * This adds a thread per region server in the client, so it's kept as an option.
189      * <p>
190      * The implementation is simple: the client threads adds their call to the queue, and then
191      *  wait for an answer. The CallSender blocks on the queue, and writes the calls one
192      *  after the other. On interruption, the client cancels its call. The CallSender checks that
193      *  the call has not been canceled before writing it.
194      * </p>
195      * When the connection closes, all the calls not yet sent are dismissed. The client thread
196      *  is notified with an appropriate exception, as if the call was already sent but the answer
197      *  not yet received.
198      * </p>
199      */
200     private class CallSender extends Thread implements Closeable {
201       protected final BlockingQueue<CallFuture> callsToWrite;
202 
203 
204       public CallFuture sendCall(Call call, int priority, Span span)
205           throws InterruptedException, IOException {
206         CallFuture cts = new CallFuture(call, priority, span);
207         if (!callsToWrite.offer(cts)) {
208           throw new IOException("Can't add the call " + call.id +
209               " to the write queue. callsToWrite.size()=" + callsToWrite.size());
210         }
211         checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
212                        //  in the list while the cleanup was already done.
213         return cts;
214       }
215 
216       @Override
217       public void close(){
218         assert shouldCloseConnection.get();
219         callsToWrite.offer(CallFuture.DEATH_PILL);
220         // We don't care if we can't add the death pill to the queue: the writer
221         //  won't be blocked in the 'take', as its queue is full.
222       }
223 
224       CallSender(String name, Configuration conf) {
225         int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
226         callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
227         setDaemon(true);
228         setName(name + " - writer");
229       }
230 
231       public void remove(CallFuture cts){
232         callsToWrite.remove(cts);
233 
234         // By removing the call from the expected call list, we make the list smaller, but
235         //  it means as well that we don't know how many calls we cancelled.
236         calls.remove(cts.call.id);
237         cts.call.callComplete();
238       }
239 
240       /**
241        * Reads the call from the queue, write them on the socket.
242        */
243       @Override
244       public void run() {
245         while (!shouldCloseConnection.get()) {
246           CallFuture cts = null;
247           try {
248             cts = callsToWrite.take();
249           } catch (InterruptedException e) {
250             markClosed(new InterruptedIOException());
251           }
252 
253           if (cts == null || cts == CallFuture.DEATH_PILL) {
254             assert shouldCloseConnection.get();
255             break;
256           }
257 
258           if (cts.call.done) {
259             continue;
260           }
261 
262           if (cts.call.checkAndSetTimeout()) {
263             continue;
264           }
265 
266           try {
267             Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
268           } catch (IOException e) {
269             if (LOG.isDebugEnabled()) {
270               LOG.debug("call write error for call #" + cts.call.id
271                 + ", message =" + e.getMessage());
272             }
273             cts.call.setException(e);
274             markClosed(e);
275           }
276         }
277 
278         cleanup();
279       }
280 
281       /**
282        * Cleans the call not yet sent when we finish.
283        */
284       private void cleanup() {
285         assert shouldCloseConnection.get();
286 
287         IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
288         while (true) {
289           CallFuture cts = callsToWrite.poll();
290           if (cts == null) {
291             break;
292           }
293           if (cts.call != null && !cts.call.done) {
294             cts.call.setException(ie);
295           }
296         }
297       }
298     }
299 
300     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
301     throws IOException {
302       if (remoteId.getAddress().isUnresolved()) {
303         throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
304       }
305       this.server = remoteId.getAddress();
306       this.codec = codec;
307       this.compressor = compressor;
308 
309       UserGroupInformation ticket = remoteId.getTicket().getUGI();
310       SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
311       this.useSasl = userProvider.isHBaseSecurityEnabled();
312       if (useSasl && securityInfo != null) {
313         AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
314         if (tokenKind != null) {
315           TokenSelector<? extends TokenIdentifier> tokenSelector =
316               tokenHandlers.get(tokenKind);
317           if (tokenSelector != null) {
318             token = tokenSelector.selectToken(new Text(clusterId),
319                 ticket.getTokens());
320           } else if (LOG.isDebugEnabled()) {
321             LOG.debug("No token selector found for type "+tokenKind);
322           }
323         }
324         String serverKey = securityInfo.getServerPrincipal();
325         if (serverKey == null) {
326           throw new IOException(
327               "Can't obtain server Kerberos config key from SecurityInfo");
328         }
329         serverPrincipal = SecurityUtil.getServerPrincipal(
330             conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
331         if (LOG.isDebugEnabled()) {
332           LOG.debug("RPC Server Kerberos principal name for service="
333               + remoteId.getServiceName() + " is " + serverPrincipal);
334         }
335       }
336 
337       if (!useSasl) {
338         authMethod = AuthMethod.SIMPLE;
339       } else if (token != null) {
340         authMethod = AuthMethod.DIGEST;
341       } else {
342         authMethod = AuthMethod.KERBEROS;
343       }
344 
345       if (LOG.isDebugEnabled()) {
346         LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
347           ", sasl=" + useSasl);
348       }
349       reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
350       this.remoteId = remoteId;
351 
352       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
353       builder.setServiceName(remoteId.getServiceName());
354       UserInformation userInfoPB = getUserInfo(ticket);
355       if (userInfoPB != null) {
356         builder.setUserInfo(userInfoPB);
357       }
358       if (this.codec != null) {
359         builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
360       }
361       if (this.compressor != null) {
362         builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
363       }
364       builder.setVersionInfo(ProtobufUtil.getVersionInfo());
365       this.header = builder.build();
366 
367       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
368         remoteId.getAddress().toString() +
369         ((ticket==null)?" from an unknown user": (" from "
370         + ticket.getUserName())));
371       this.setDaemon(true);
372 
373       if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
374         callSender = new CallSender(getName(), conf);
375         callSender.start();
376       } else {
377         callSender = null;
378       }
379     }
380 
381     private UserInformation getUserInfo(UserGroupInformation ugi) {
382       if (ugi == null || authMethod == AuthMethod.DIGEST) {
383         // Don't send user for token auth
384         return null;
385       }
386       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
387       if (authMethod == AuthMethod.KERBEROS) {
388         // Send effective user for Kerberos auth
389         userInfoPB.setEffectiveUser(ugi.getUserName());
390       } else if (authMethod == AuthMethod.SIMPLE) {
391         //Send both effective user and real user for simple auth
392         userInfoPB.setEffectiveUser(ugi.getUserName());
393         if (ugi.getRealUser() != null) {
394           userInfoPB.setRealUser(ugi.getRealUser().getUserName());
395         }
396       }
397       return userInfoPB.build();
398     }
399 
400     protected synchronized void setupConnection() throws IOException {
401       short ioFailures = 0;
402       short timeoutFailures = 0;
403       while (true) {
404         try {
405           this.socket = socketFactory.createSocket();
406           this.socket.setTcpNoDelay(tcpNoDelay);
407           this.socket.setKeepAlive(tcpKeepAlive);
408           if (localAddr != null) {
409             this.socket.bind(localAddr);
410           }
411           NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
412           this.socket.setSoTimeout(readTO);
413           return;
414         } catch (SocketTimeoutException toe) {
415           /* The max number of retries is 45,
416            * which amounts to 20s*45 = 15 minutes retries.
417            */
418           handleConnectionFailure(timeoutFailures++, maxRetries, toe);
419         } catch (IOException ie) {
420           handleConnectionFailure(ioFailures++, maxRetries, ie);
421         }
422       }
423     }
424 
425     protected synchronized void closeConnection() {
426       if (socket == null) {
427         return;
428       }
429 
430       // close the current connection
431       try {
432         if (socket.getOutputStream() != null) {
433           socket.getOutputStream().close();
434         }
435       } catch (IOException ignored) {  // Can happen if the socket is already closed
436         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
437       }
438       try {
439         if (socket.getInputStream() != null) {
440           socket.getInputStream().close();
441         }
442       } catch (IOException ignored) {  // Can happen if the socket is already closed
443         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
444       }
445       try {
446         if (socket.getChannel() != null) {
447           socket.getChannel().close();
448         }
449       } catch (IOException ignored) {  // Can happen if the socket is already closed
450         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
451       }
452       try {
453         socket.close();
454       } catch (IOException e) {
455         LOG.warn("Not able to close a socket", e);
456       }
457 
458       // set socket to null so that the next call to setupIOstreams
459       // can start the process of connect all over again.
460       socket = null;
461     }
462 
463     /**
464      *  Handle connection failures
465      *
466      * If the current number of retries is equal to the max number of retries,
467      * stop retrying and throw the exception; Otherwise backoff N seconds and
468      * try connecting again.
469      *
470      * This Method is only called from inside setupIOstreams(), which is
471      * synchronized. Hence the sleep is synchronized; the locks will be retained.
472      *
473      * @param curRetries current number of retries
474      * @param maxRetries max number of retries allowed
475      * @param ioe failure reason
476      * @throws IOException if max number of retries is reached
477      */
478     private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
479     throws IOException {
480       closeConnection();
481 
482       // throw the exception if the maximum number of retries is reached
483       if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
484         throw ioe;
485       }
486 
487       // otherwise back off and retry
488       try {
489         Thread.sleep(failureSleep);
490       } catch (InterruptedException ie) {
491         ExceptionUtil.rethrowIfInterrupt(ie);
492       }
493 
494       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
495         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
496         " time(s).");
497     }
498 
499     /**
500      * @throws IOException if the connection is not open.
501      */
502     private void checkIsOpen() throws IOException {
503       if (shouldCloseConnection.get()) {
504         throw new ConnectionClosingException(getName() + " is closing");
505       }
506     }
507 
508     /* wait till someone signals us to start reading RPC response or
509      * it is idle too long, it is marked as to be closed,
510      * or the client is marked as not running.
511      *
512      * @return true if it is time to read a response; false otherwise.
513      */
514     protected synchronized boolean waitForWork() throws InterruptedException {
515       // beware of the concurrent access to the calls list: we can add calls, but as well
516       //  remove them.
517       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
518 
519       while (true) {
520         if (shouldCloseConnection.get()) {
521           return false;
522         }
523 
524         if (!running.get()) {
525           markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
526           return false;
527         }
528 
529         if (!calls.isEmpty()) {
530           // shouldCloseConnection can be set to true by a parallel thread here. The caller
531           //  will need to check anyway.
532           return true;
533         }
534 
535         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
536           // Connection is idle.
537           // We expect the number of calls to be zero here, but actually someone can
538           //  adds a call at the any moment, as there is no synchronization between this task
539           //  and adding new calls. It's not a big issue, but it will get an exception.
540           markClosed(new IOException(
541               "idle connection closed with " + calls.size() + " pending request(s)"));
542           return false;
543         }
544 
545         wait(Math.min(minIdleTimeBeforeClose, 1000));
546       }
547     }
548 
549     public InetSocketAddress getRemoteAddress() {
550       return remoteId.getAddress();
551     }
552 
553     @Override
554     public void run() {
555       if (LOG.isTraceEnabled()) {
556         LOG.trace(getName() + ": starting, connections " + connections.size());
557       }
558 
559       try {
560         while (waitForWork()) { // Wait here for work - read or close connection
561           readResponse();
562         }
563       } catch (InterruptedException t) {
564         if (LOG.isTraceEnabled()) {
565           LOG.trace(getName() + ": interrupted while waiting for call responses");
566         }
567         markClosed(ExceptionUtil.asInterrupt(t));
568       } catch (Throwable t) {
569         if (LOG.isDebugEnabled()) {
570           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
571         }
572         markClosed(new IOException("Unexpected throwable while waiting call responses", t));
573       }
574 
575       close();
576 
577       if (LOG.isTraceEnabled()) {
578         LOG.trace(getName() + ": stopped, connections " + connections.size());
579       }
580     }
581 
582     private synchronized void disposeSasl() {
583       if (saslRpcClient != null) {
584         try {
585           saslRpcClient.dispose();
586           saslRpcClient = null;
587         } catch (IOException ioe) {
588           LOG.error("Error disposing of SASL client", ioe);
589         }
590       }
591     }
592 
593     private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
594       UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
595       UserGroupInformation currentUser =
596         UserGroupInformation.getCurrentUser();
597       UserGroupInformation realUser = currentUser.getRealUser();
598       return authMethod == AuthMethod.KERBEROS &&
599           loginUser != null &&
600           //Make sure user logged in using Kerberos either keytab or TGT
601           loginUser.hasKerberosCredentials() &&
602           // relogin only in case it is the login user (e.g. JT)
603           // or superuser (like oozie).
604           (loginUser.equals(currentUser) || loginUser.equals(realUser));
605     }
606 
607     private synchronized boolean setupSaslConnection(final InputStream in2,
608         final OutputStream out2) throws IOException {
609       saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
610           conf.get("hbase.rpc.protection",
611               QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
612       return saslRpcClient.saslConnect(in2, out2);
613     }
614 
615     /**
616      * If multiple clients with the same principal try to connect
617      * to the same server at the same time, the server assumes a
618      * replay attack is in progress. This is a feature of kerberos.
619      * In order to work around this, what is done is that the client
620      * backs off randomly and tries to initiate the connection
621      * again.
622      * The other problem is to do with ticket expiry. To handle that,
623      * a relogin is attempted.
624      * <p>
625      * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
626      * method. In case when the user doesn't have valid credentials, we don't
627      * need to retry (from cache or ticket). In such cases, it is prudent to
628      * throw a runtime exception when we receive a SaslException from the
629      * underlying authentication implementation, so there is no retry from
630      * other high level (for eg, HCM or HBaseAdmin).
631      * </p>
632      */
633     private synchronized void handleSaslConnectionFailure(
634         final int currRetries,
635         final int maxRetries, final Exception ex, final Random rand,
636         final UserGroupInformation user)
637     throws IOException, InterruptedException{
638       user.doAs(new PrivilegedExceptionAction<Object>() {
639         @Override
640         public Object run() throws IOException, InterruptedException {
641           closeConnection();
642           if (shouldAuthenticateOverKrb()) {
643             if (currRetries < maxRetries) {
644               if (LOG.isDebugEnabled()) {
645                 LOG.debug("Exception encountered while connecting to " +
646                     "the server : " + ex);
647               }
648               //try re-login
649               if (UserGroupInformation.isLoginKeytabBased()) {
650                 UserGroupInformation.getLoginUser().reloginFromKeytab();
651               } else {
652                 UserGroupInformation.getLoginUser().reloginFromTicketCache();
653               }
654               disposeSasl();
655               //have granularity of milliseconds
656               //we are sleeping with the Connection lock held but since this
657               //connection instance is being used for connecting to the server
658               //in question, it is okay
659               Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
660               return null;
661             } else {
662               String msg = "Couldn't setup connection for " +
663               UserGroupInformation.getLoginUser().getUserName() +
664               " to " + serverPrincipal;
665               LOG.warn(msg, ex);
666               throw (IOException) new IOException(msg).initCause(ex);
667             }
668           } else {
669             LOG.warn("Exception encountered while connecting to " +
670                 "the server : " + ex);
671           }
672           if (ex instanceof RemoteException) {
673             throw (RemoteException)ex;
674           }
675           if (ex instanceof SaslException) {
676             String msg = "SASL authentication failed." +
677               " The most likely cause is missing or invalid credentials." +
678               " Consider 'kinit'.";
679             LOG.fatal(msg, ex);
680             throw new RuntimeException(msg, ex);
681           }
682           throw new IOException(ex);
683         }
684       });
685     }
686 
687     protected synchronized void setupIOstreams() throws IOException {
688       if (socket != null) {
689         // The connection is already available. Perfect.
690         return;
691       }
692 
693       if (shouldCloseConnection.get()){
694         throw new ConnectionClosingException("This connection is closing");
695       }
696 
697       if (failedServers.isFailedServer(remoteId.getAddress())) {
698         if (LOG.isDebugEnabled()) {
699           LOG.debug("Not trying to connect to " + server +
700               " this server is in the failed servers list");
701         }
702         IOException e = new FailedServerException(
703             "This server is in the failed servers list: " + server);
704         markClosed(e);
705         close();
706         throw e;
707       }
708 
709       try {
710         if (LOG.isDebugEnabled()) {
711           LOG.debug("Connecting to " + server);
712         }
713         short numRetries = 0;
714         final short MAX_RETRIES = 5;
715         Random rand = null;
716         while (true) {
717           setupConnection();
718           InputStream inStream = NetUtils.getInputStream(socket);
719           // This creates a socket with a write timeout. This timeout cannot be changed.
720           OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
721           // Write out the preamble -- MAGIC, version, and auth to use.
722           writeConnectionHeaderPreamble(outStream);
723           if (useSasl) {
724             final InputStream in2 = inStream;
725             final OutputStream out2 = outStream;
726             UserGroupInformation ticket = remoteId.getTicket().getUGI();
727             if (authMethod == AuthMethod.KERBEROS) {
728               if (ticket != null && ticket.getRealUser() != null) {
729                 ticket = ticket.getRealUser();
730               }
731             }
732             boolean continueSasl;
733             if (ticket == null) throw new FatalConnectionException("ticket/user is null");
734             try {
735               continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
736                 @Override
737                 public Boolean run() throws IOException {
738                   return setupSaslConnection(in2, out2);
739                 }
740               });
741             } catch (Exception ex) {
742               ExceptionUtil.rethrowIfInterrupt(ex);
743               if (rand == null) {
744                 rand = new Random();
745               }
746               handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
747               continue;
748             }
749             if (continueSasl) {
750               // Sasl connect is successful. Let's set up Sasl i/o streams.
751               inStream = saslRpcClient.getInputStream(inStream);
752               outStream = saslRpcClient.getOutputStream(outStream);
753             } else {
754               // fall back to simple auth because server told us so.
755               authMethod = AuthMethod.SIMPLE;
756               useSasl = false;
757             }
758           }
759           this.in = new DataInputStream(new BufferedInputStream(inStream));
760           synchronized (this.outLock) {
761             this.out = new DataOutputStream(new BufferedOutputStream(outStream));
762           }
763           // Now write out the connection header
764           writeConnectionHeader();
765 
766           // start the receiver thread after the socket connection has been set up
767           start();
768           return;
769         }
770       } catch (Throwable t) {
771         IOException e = ExceptionUtil.asInterrupt(t);
772         if (e == null) {
773           failedServers.addToFailedServers(remoteId.address);
774           if (t instanceof LinkageError) {
775             // probably the hbase hadoop version does not match the running hadoop version
776             e = new DoNotRetryIOException(t);
777           } else if (t instanceof IOException) {
778             e = (IOException) t;
779           } else {
780             e = new IOException("Could not set up IO Streams to " + server, t);
781           }
782         }
783         markClosed(e);
784         close();
785         throw e;
786       }
787     }
788 
789     /**
790      * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
791      */
792     private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
793       // Assemble the preamble up in a buffer first and then send it.  Writing individual elements,
794       // they are getting sent across piecemeal according to wireshark and then server is messing
795       // up the reading on occasion (the passed in stream is not buffered yet).
796 
797       // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
798       int rpcHeaderLen = HConstants.RPC_HEADER.length;
799       byte [] preamble = new byte [rpcHeaderLen + 2];
800       System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
801       preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
802       preamble[rpcHeaderLen + 1] = authMethod.code;
803       outStream.write(preamble);
804       outStream.flush();
805     }
806 
807     /**
808      * Write the connection header.
809      */
810     private synchronized void writeConnectionHeader() throws IOException {
811       synchronized (this.outLock) {
812         this.out.writeInt(this.header.getSerializedSize());
813         this.header.writeTo(this.out);
814         this.out.flush();
815       }
816     }
817 
818     /** Close the connection. */
819     protected synchronized void close() {
820       if (!shouldCloseConnection.get()) {
821         LOG.error(getName() + ": the connection is not in the closed state");
822         return;
823       }
824 
825       // release the resources
826       // first thing to do;take the connection out of the connection list
827       synchronized (connections) {
828         connections.removeValue(remoteId, this);
829       }
830 
831       // close the streams and therefore the socket
832       synchronized(this.outLock) {
833         if (this.out != null) {
834           IOUtils.closeStream(out);
835           this.out = null;
836         }
837       }
838       IOUtils.closeStream(in);
839       this.in = null;
840       if (this.socket != null) {
841         try {
842           this.socket.close();
843           this.socket = null;
844         } catch (IOException e) {
845           LOG.error("Error while closing socket", e);
846         }
847       }
848 
849       disposeSasl();
850 
851       // log the info
852       if (LOG.isTraceEnabled()) {
853         LOG.trace(getName() + ": closing ipc connection to " + server);
854       }
855 
856       cleanupCalls(true);
857 
858       if (LOG.isTraceEnabled()) {
859         LOG.trace(getName() + ": ipc connection to " + server + " closed");
860       }
861     }
862 
863     protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
864       TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
865       try {
866         writeRequest(call, priority, span);
867       } finally {
868         ts.close();
869       }
870     }
871 
872     /**
873      * Initiates a call by sending the parameter to the remote server.
874      * Note: this is not called from the Connection thread, but by other
875      * threads.
876      * @see #readResponse()
877      */
878     private void writeRequest(Call call, final int priority, Span span) throws IOException {
879       RequestHeader.Builder builder = RequestHeader.newBuilder();
880       builder.setCallId(call.id);
881       if (span != null) {
882         builder.setTraceInfo(
883             RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
884       }
885       builder.setMethodName(call.md.getName());
886       builder.setRequestParam(call.param != null);
887       ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
888       if (cellBlock != null) {
889         CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
890         cellBlockBuilder.setLength(cellBlock.limit());
891         builder.setCellBlockMeta(cellBlockBuilder.build());
892       }
893       // Only pass priority if there one.  Let zero be same as no priority.
894       if (priority != 0) builder.setPriority(priority);
895       RequestHeader header = builder.build();
896 
897       setupIOstreams();
898 
899       // Now we're going to write the call. We take the lock, then check that the connection
900       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
901       //  know where we stand, we have to close the connection.
902       checkIsOpen();
903       IOException writeException = null;
904       synchronized (this.outLock) {
905         if (Thread.interrupted()) throw new InterruptedIOException();
906 
907         calls.put(call.id, call); // We put first as we don't want the connection to become idle.
908         checkIsOpen(); // Now we're checking that it didn't became idle in between.
909 
910         try {
911           IPCUtil.write(this.out, header, call.param, cellBlock);
912         } catch (IOException e) {
913           // We set the value inside the synchronized block, this way the next in line
914           //  won't even try to write. Otherwise we might miss a call in the calls map?
915           shouldCloseConnection.set(true);
916           writeException = e;
917           interrupt();
918         }
919       }
920 
921       // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
922       if (writeException != null) {
923         markClosed(writeException);
924         close();
925       }
926 
927       // We added a call, and may be started the connection close. In both cases, we
928       //  need to notify the reader.
929       synchronized (this) {
930         notifyAll();
931       }
932 
933       // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
934       if (writeException != null) throw writeException;
935     }
936 
937     /* Receive a response.
938      * Because only one receiver, so no synchronization on in.
939      */
940     protected void readResponse() {
941       if (shouldCloseConnection.get()) return;
942       Call call = null;
943       boolean expectedCall = false;
944       try {
945         // See HBaseServer.Call.setResponse for where we write out the response.
946         // Total size of the response.  Unused.  But have to read it in anyways.
947         int totalSize = in.readInt();
948 
949         // Read the header
950         ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
951         int id = responseHeader.getCallId();
952         call = calls.remove(id); // call.done have to be set before leaving this method
953         expectedCall = (call != null && !call.done);
954         if (!expectedCall) {
955           // So we got a response for which we have no corresponding 'call' here on the client-side.
956           // We probably timed out waiting, cleaned up all references, and now the server decides
957           // to return a response.  There is nothing we can do w/ the response at this stage. Clean
958           // out the wire of the response so its out of the way and we can get other responses on
959           // this connection.
960           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
961           int whatIsLeftToRead = totalSize - readSoFar;
962           IOUtils.skipFully(in, whatIsLeftToRead);
963           return;
964         }
965         if (responseHeader.hasException()) {
966           ExceptionResponse exceptionResponse = responseHeader.getException();
967           RemoteException re = createRemoteException(exceptionResponse);
968           call.setException(re);
969           if (isFatalConnectionException(exceptionResponse)) {
970             markClosed(re);
971           }
972         } else {
973           Message value = null;
974           if (call.responseDefaultType != null) {
975             Builder builder = call.responseDefaultType.newBuilderForType();
976             ProtobufUtil.mergeDelimitedFrom(builder, in);
977             value = builder.build();
978           }
979           CellScanner cellBlockScanner = null;
980           if (responseHeader.hasCellBlockMeta()) {
981             int size = responseHeader.getCellBlockMeta().getLength();
982             byte [] cellBlock = new byte[size];
983             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
984             cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
985           }
986           call.setResponse(value, cellBlockScanner);
987         }
988       } catch (IOException e) {
989         if (expectedCall) call.setException(e);
990         if (e instanceof SocketTimeoutException) {
991           // Clean up open calls but don't treat this as a fatal condition,
992           // since we expect certain responses to not make it by the specified
993           // {@link ConnectionId#rpcTimeout}.
994           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
995         } else {
996           // Treat this as a fatal condition and close this connection
997           markClosed(e);
998         }
999       } finally {
1000         cleanupCalls(false);
1001       }
1002     }
1003 
1004     /**
1005      * @return True if the exception is a fatal connection exception.
1006      */
1007     private boolean isFatalConnectionException(final ExceptionResponse e) {
1008       return e.getExceptionClassName().
1009         equals(FatalConnectionException.class.getName());
1010     }
1011 
1012     /**
1013      * @param e exception to be wrapped
1014      * @return RemoteException made from passed <code>e</code>
1015      */
1016     private RemoteException createRemoteException(final ExceptionResponse e) {
1017       String innerExceptionClassName = e.getExceptionClassName();
1018       boolean doNotRetry = e.getDoNotRetry();
1019       return e.hasHostname()?
1020         // If a hostname then add it to the RemoteWithExtrasException
1021         new RemoteWithExtrasException(innerExceptionClassName,
1022           e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1023         new RemoteWithExtrasException(innerExceptionClassName,
1024           e.getStackTrace(), doNotRetry);
1025     }
1026 
1027     protected synchronized boolean markClosed(IOException e) {
1028       if (e == null) throw new NullPointerException();
1029 
1030       boolean ret = shouldCloseConnection.compareAndSet(false, true);
1031       if (ret) {
1032         if (LOG.isTraceEnabled()) {
1033           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1034         }
1035         if (callSender != null) {
1036           callSender.close();
1037         }
1038         notifyAll();
1039       }
1040       return ret;
1041     }
1042 
1043 
1044     /**
1045      * Cleanup the calls older than a given timeout, in milli seconds.
1046      * @param allCalls true for all calls, false for only the calls in timeout
1047      */
1048     protected synchronized void cleanupCalls(boolean allCalls) {
1049       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1050       while (itor.hasNext()) {
1051         Call c = itor.next().getValue();
1052         if (c.done) {
1053           // To catch the calls without timeout that were cancelled.
1054           itor.remove();
1055         } else if (allCalls) {
1056           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1057           IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1058               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1059           c.setException(ie);
1060           itor.remove();
1061         } else if (c.checkAndSetTimeout()) {
1062           itor.remove();
1063         } else {
1064           // We expect the call to be ordered by timeout. It may not be the case, but stopping
1065           //  at the first valid call allows to be sure that we still have something to do without
1066           //  spending too much time by reading the full list.
1067           break;
1068         }
1069       }
1070     }
1071   }
1072 
1073   /**
1074    * Construct an IPC cluster client whose values are of the {@link Message} class.
1075    * @param conf configuration
1076    * @param clusterId the cluster id
1077    * @param factory socket factory
1078    */
1079   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1080     this(conf, clusterId, factory, null);
1081   }
1082 
1083   /**
1084    * Construct an IPC cluster client whose values are of the {@link Message} class.
1085    * @param conf configuration
1086    * @param clusterId the cluster id
1087    * @param factory socket factory
1088    * @param localAddr client socket bind address
1089    */
1090   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1091       SocketAddress localAddr) {
1092     super(conf, clusterId, localAddr);
1093 
1094     this.socketFactory = factory;
1095     this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1096     this.failedServers = new FailedServers(conf);
1097   }
1098 
1099   /**
1100    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
1101    * @param conf configuration
1102    * @param clusterId the cluster id
1103    */
1104   public RpcClientImpl(Configuration conf, String clusterId) {
1105     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1106   }
1107 
1108   /**
1109    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
1110    *
1111    * This method is called with reflection by the RpcClientFactory to create an instance
1112    *
1113    * @param conf configuration
1114    * @param clusterId the cluster id
1115    * @param localAddr client socket bind address.
1116    */
1117   public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
1118     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1119   }
1120 
1121   /** Stop all threads related to this client.  No further calls may be made
1122    * using this client. */
1123   @Override
1124   public void close() {
1125     if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1126     if (!running.compareAndSet(true, false)) return;
1127 
1128     Set<Connection> connsToClose = null;
1129     // wake up all connections
1130     synchronized (connections) {
1131       for (Connection conn : connections.values()) {
1132         conn.interrupt();
1133         if (conn.callSender != null) {
1134           conn.callSender.interrupt();
1135         }
1136 
1137         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
1138         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
1139         if (!conn.isAlive()) {
1140           if (connsToClose == null) {
1141             connsToClose = new HashSet<Connection>();
1142           }
1143           connsToClose.add(conn);
1144         }
1145       }
1146     }
1147     if (connsToClose != null) {
1148       for (Connection conn : connsToClose) {
1149         conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1150         conn.close();
1151       }
1152     }
1153     // wait until all connections are closed
1154     while (!connections.isEmpty()) {
1155       try {
1156         Thread.sleep(10);
1157       } catch (InterruptedException e) {
1158         LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1159             " connections.");
1160         Thread.currentThread().interrupt();
1161         return;
1162       }
1163     }
1164   }
1165 
1166   /** Make a call, passing <code>param</code>, to the IPC server running at
1167    * <code>address</code> which is servicing the <code>protocol</code> protocol,
1168    * with the <code>ticket</code> credentials, returning the value.
1169    * Throws exceptions if there are network problems or if the remote code
1170    * threw an exception.
1171    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
1172    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
1173    *          new Connection each time.
1174    * @return A pair with the Message response and the Cell data (if any).
1175    * @throws InterruptedException
1176    * @throws IOException
1177    */
1178   @Override
1179   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1180       Message param, Message returnType, User ticket, InetSocketAddress addr)
1181       throws IOException, InterruptedException {
1182     if (pcrc == null) {
1183       pcrc = new PayloadCarryingRpcController();
1184     }
1185     CellScanner cells = pcrc.cellScanner();
1186 
1187     final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1188         pcrc.getCallTimeout());
1189 
1190     final Connection connection = getConnection(ticket, call, addr);
1191 
1192     final CallFuture cts;
1193     if (connection.callSender != null) {
1194       cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1195         pcrc.notifyOnCancel(new RpcCallback<Object>() {
1196           @Override
1197           public void run(Object parameter) {
1198             connection.callSender.remove(cts);
1199           }
1200         });
1201         if (pcrc.isCanceled()) {
1202           // To finish if the call was cancelled before we set the notification (race condition)
1203           call.callComplete();
1204           return new Pair<Message, CellScanner>(call.response, call.cells);
1205         }
1206     } else {
1207       cts = null;
1208       connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1209     }
1210 
1211     while (!call.done) {
1212       if (call.checkAndSetTimeout()) {
1213         if (cts != null) connection.callSender.remove(cts);
1214         break;
1215       }
1216       if (connection.shouldCloseConnection.get()) {
1217         throw new ConnectionClosingException("Call id=" + call.id +
1218             " on server " + addr + " aborted: connection is closing");
1219       }
1220       try {
1221         synchronized (call) {
1222           if (call.done) break;
1223           call.wait(Math.min(call.remainingTime(), 1000) + 1);
1224         }
1225       } catch (InterruptedException e) {
1226         call.setException(new InterruptedIOException());
1227         if (cts != null) connection.callSender.remove(cts);
1228         throw e;
1229       }
1230     }
1231 
1232     if (call.error != null) {
1233       if (call.error instanceof RemoteException) {
1234         call.error.fillInStackTrace();
1235         throw call.error;
1236       }
1237       // local exception
1238       throw wrapException(addr, call.error);
1239     }
1240 
1241     return new Pair<Message, CellScanner>(call.response, call.cells);
1242   }
1243 
1244 
1245   /**
1246    * Interrupt the connections to the given ip:port server. This should be called if the server
1247    *  is known as actually dead. This will not prevent current operation to be retried, and,
1248    *  depending on their own behavior, they may retry on the same server. This can be a feature,
1249    *  for example at startup. In any case, they're likely to get connection refused (if the
1250    *  process died) or no route to host: i.e. their next retries should be faster and with a
1251    *  safe exception.
1252    */
1253   @Override
1254   public void cancelConnections(ServerName sn) {
1255     synchronized (connections) {
1256       for (Connection connection : connections.values()) {
1257         if (connection.isAlive() &&
1258             connection.getRemoteAddress().getPort() == sn.getPort() &&
1259             connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1260           LOG.info("The server on " + sn.toString() +
1261               " is dead - stopping the connection " + connection.remoteId);
1262           connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
1263                                   // This will close the connection as well.
1264         }
1265       }
1266     }
1267   }
1268 
1269   /**
1270    *  Get a connection from the pool, or create a new one and add it to the
1271    * pool. Connections to a given host/port are reused.
1272    */
1273   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1274   throws IOException {
1275     if (!running.get()) throw new StoppedRpcClientException();
1276     Connection connection;
1277     ConnectionId remoteId =
1278       new ConnectionId(ticket, call.md.getService().getName(), addr);
1279     synchronized (connections) {
1280       connection = connections.get(remoteId);
1281       if (connection == null) {
1282         connection = createConnection(remoteId, this.codec, this.compressor);
1283         connections.put(remoteId, connection);
1284       }
1285     }
1286 
1287     return connection;
1288   }
1289 }