View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import java.io.BufferedInputStream;
23  import java.io.BufferedOutputStream;
24  import java.io.Closeable;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.OutputStream;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.SocketTimeoutException;
35  import java.net.UnknownHostException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.HashMap;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.Map;
42  import java.util.Map.Entry;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.concurrent.ArrayBlockingQueue;
46  import java.util.concurrent.BlockingQueue;
47  import java.util.concurrent.ConcurrentSkipListMap;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.atomic.AtomicInteger;
50  
51  import javax.net.SocketFactory;
52  import javax.security.sasl.SaslException;
53  
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.DoNotRetryIOException;
59  import org.apache.hadoop.hbase.HConstants;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.client.MetricsConnection;
63  import org.apache.hadoop.hbase.codec.Codec;
64  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
68  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
69  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
70  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
71  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
72  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
73  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
74  import org.apache.hadoop.hbase.security.AuthMethod;
75  import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
76  import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
77  import org.apache.hadoop.hbase.security.SecurityInfo;
78  import org.apache.hadoop.hbase.security.User;
79  import org.apache.hadoop.hbase.security.UserProvider;
80  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.ExceptionUtil;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PoolMap;
85  import org.apache.hadoop.io.IOUtils;
86  import org.apache.hadoop.io.Text;
87  import org.apache.hadoop.io.compress.CompressionCodec;
88  import org.apache.hadoop.ipc.RemoteException;
89  import org.apache.hadoop.net.NetUtils;
90  import org.apache.hadoop.security.SecurityUtil;
91  import org.apache.hadoop.security.UserGroupInformation;
92  import org.apache.hadoop.security.token.Token;
93  import org.apache.hadoop.security.token.TokenIdentifier;
94  import org.apache.hadoop.security.token.TokenSelector;
95  import org.apache.htrace.Span;
96  import org.apache.htrace.Trace;
97  import org.apache.htrace.TraceScope;
98  
99  import com.google.common.annotations.VisibleForTesting;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcCallback;
104 
105 /**
106  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
107  * <p>See HBaseServer
108  */
109 @InterfaceAudience.Private
110 public class RpcClientImpl extends AbstractRpcClient {
111   private static final Log LOG = LogFactory.getLog(RpcClientImpl.class);
112   protected final AtomicInteger callIdCnt = new AtomicInteger();
113 
114   protected final PoolMap<ConnectionId, Connection> connections;
115 
116   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
117 
118   protected final FailedServers failedServers;
119 
120   protected final SocketFactory socketFactory;           // how to create sockets
121 
122   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
123       justification="the rest of the system which live in the different package can use")
124   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
125       TokenSelector<? extends TokenIdentifier>> tokenHandlers =
126       new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
127         TokenSelector<? extends TokenIdentifier>>();
128   static {
129     tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
130         new AuthenticationTokenSelector());
131   }
132 
133   /**
134    * Creates a connection. Can be overridden by a subclass for testing.
135    * @param remoteId - the ConnectionId to use for the connection creation.
136    */
137   protected Connection createConnection(ConnectionId remoteId, final Codec codec,
138       final CompressionCodec compressor)
139   throws IOException {
140     return new Connection(remoteId, codec, compressor);
141   }
142 
143   /**
144    * see {@link RpcClientImpl.Connection.CallSender}
145    */
146   private static class CallFuture {
147     final Call call;
148     final int priority;
149     final Span span;
150 
151     // We will use this to stop the writer
152     final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
153 
154     CallFuture(Call call, int priority, Span span) {
155       this.call = call;
156       this.priority = priority;
157       this.span = span;
158     }
159   }
160 
161   /** Thread that reads responses and notifies callers.  Each connection owns a
162    * socket connected to a remote address.  Calls are multiplexed through this
163    * socket: responses may be delivered out of order. */
164   protected class Connection extends Thread {
165     private ConnectionHeader header;              // connection header
166     protected ConnectionId remoteId;
167     protected Socket socket = null;                 // connected socket
168     protected DataInputStream in;
169     protected DataOutputStream out;
170     private Object outLock = new Object();
171     private InetSocketAddress server;             // server ip:port
172     private String serverPrincipal;  // server's krb5 principal name
173     private AuthMethod authMethod; // authentication method
174     private boolean useSasl;
175     private Token<? extends TokenIdentifier> token;
176     private HBaseSaslRpcClient saslRpcClient;
177     private int reloginMaxBackoff; // max pause before relogin on sasl failure
178     private final Codec codec;
179     private final CompressionCodec compressor;
180 
181     // currently active calls
182     protected final ConcurrentSkipListMap<Integer, Call> calls =
183       new ConcurrentSkipListMap<Integer, Call>();
184 
185     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
186     protected final CallSender callSender;
187 
188 
189     /**
190      * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
191      *  it gets into a java issue: an interruption during a write closes the socket/channel.
192      * A way to avoid this is to use a different thread for writing. This way, on interruptions,
193      *  we either cancel the writes or ignore the answer if the write is already done, but we
194      *  don't stop the write in the middle.
195      * This adds a thread per region server in the client, so it's kept as an option.
196      * <p>
197      * The implementation is simple: the client threads adds their call to the queue, and then
198      *  wait for an answer. The CallSender blocks on the queue, and writes the calls one
199      *  after the other. On interruption, the client cancels its call. The CallSender checks that
200      *  the call has not been canceled before writing it.
201      * </p>
202      * When the connection closes, all the calls not yet sent are dismissed. The client thread
203      *  is notified with an appropriate exception, as if the call was already sent but the answer
204      *  not yet received.
205      * </p>
206      */
207     private class CallSender extends Thread implements Closeable {
208       protected final BlockingQueue<CallFuture> callsToWrite;
209 
210 
211       public CallFuture sendCall(Call call, int priority, Span span)
212           throws InterruptedException, IOException {
213         CallFuture cts = new CallFuture(call, priority, span);
214         if (!callsToWrite.offer(cts)) {
215           throw new IOException("Can't add the call " + call.id +
216               " to the write queue. callsToWrite.size()=" + callsToWrite.size());
217         }
218         checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
219                        //  in the list while the cleanup was already done.
220         return cts;
221       }
222 
223       @Override
224       public void close(){
225         assert shouldCloseConnection.get();
226         callsToWrite.offer(CallFuture.DEATH_PILL);
227         // We don't care if we can't add the death pill to the queue: the writer
228         //  won't be blocked in the 'take', as its queue is full.
229       }
230 
231       CallSender(String name, Configuration conf) {
232         int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
233         callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
234         setDaemon(true);
235         setName(name + " - writer");
236       }
237 
238       public void remove(CallFuture cts){
239         callsToWrite.remove(cts);
240 
241         // By removing the call from the expected call list, we make the list smaller, but
242         //  it means as well that we don't know how many calls we cancelled.
243         calls.remove(cts.call.id);
244         cts.call.callComplete();
245       }
246 
247       /**
248        * Reads the call from the queue, write them on the socket.
249        */
250       @Override
251       public void run() {
252         while (!shouldCloseConnection.get()) {
253           CallFuture cts = null;
254           try {
255             cts = callsToWrite.take();
256           } catch (InterruptedException e) {
257             markClosed(new InterruptedIOException());
258           }
259 
260           if (cts == null || cts == CallFuture.DEATH_PILL) {
261             assert shouldCloseConnection.get();
262             break;
263           }
264 
265           if (cts.call.done) {
266             continue;
267           }
268 
269           if (cts.call.checkAndSetTimeout()) {
270             continue;
271           }
272 
273           try {
274             Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
275           } catch (IOException e) {
276             if (LOG.isDebugEnabled()) {
277               LOG.debug("call write error for call #" + cts.call.id
278                 + ", message =" + e.getMessage());
279             }
280             cts.call.setException(e);
281             markClosed(e);
282           }
283         }
284 
285         cleanup();
286       }
287 
288       /**
289        * Cleans the call not yet sent when we finish.
290        */
291       private void cleanup() {
292         assert shouldCloseConnection.get();
293 
294         IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
295         while (true) {
296           CallFuture cts = callsToWrite.poll();
297           if (cts == null) {
298             break;
299           }
300           if (cts.call != null && !cts.call.done) {
301             cts.call.setException(ie);
302           }
303         }
304       }
305     }
306 
307     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
308     throws IOException {
309       if (remoteId.getAddress().isUnresolved()) {
310         throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
311       }
312       this.server = remoteId.getAddress();
313       this.codec = codec;
314       this.compressor = compressor;
315 
316       UserGroupInformation ticket = remoteId.getTicket().getUGI();
317       SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
318       this.useSasl = userProvider.isHBaseSecurityEnabled();
319       if (useSasl && securityInfo != null) {
320         AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
321         if (tokenKind != null) {
322           TokenSelector<? extends TokenIdentifier> tokenSelector =
323               tokenHandlers.get(tokenKind);
324           if (tokenSelector != null) {
325             token = tokenSelector.selectToken(new Text(clusterId),
326                 ticket.getTokens());
327           } else if (LOG.isDebugEnabled()) {
328             LOG.debug("No token selector found for type "+tokenKind);
329           }
330         }
331         String serverKey = securityInfo.getServerPrincipal();
332         if (serverKey == null) {
333           throw new IOException(
334               "Can't obtain server Kerberos config key from SecurityInfo");
335         }
336         serverPrincipal = SecurityUtil.getServerPrincipal(
337             conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
338         if (LOG.isDebugEnabled()) {
339           LOG.debug("RPC Server Kerberos principal name for service="
340               + remoteId.getServiceName() + " is " + serverPrincipal);
341         }
342       }
343 
344       if (!useSasl) {
345         authMethod = AuthMethod.SIMPLE;
346       } else if (token != null) {
347         authMethod = AuthMethod.DIGEST;
348       } else {
349         authMethod = AuthMethod.KERBEROS;
350       }
351 
352       if (LOG.isDebugEnabled()) {
353         LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
354           ", sasl=" + useSasl);
355       }
356       reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
357       this.remoteId = remoteId;
358 
359       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
360       builder.setServiceName(remoteId.getServiceName());
361       UserInformation userInfoPB = getUserInfo(ticket);
362       if (userInfoPB != null) {
363         builder.setUserInfo(userInfoPB);
364       }
365       if (this.codec != null) {
366         builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
367       }
368       if (this.compressor != null) {
369         builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
370       }
371       builder.setVersionInfo(ProtobufUtil.getVersionInfo());
372       this.header = builder.build();
373 
374       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
375         remoteId.getAddress().toString() +
376         ((ticket==null)?" from an unknown user": (" from "
377         + ticket.getUserName())));
378       this.setDaemon(true);
379 
380       if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
381         callSender = new CallSender(getName(), conf);
382         callSender.start();
383       } else {
384         callSender = null;
385       }
386     }
387 
388     private synchronized UserInformation getUserInfo(UserGroupInformation ugi) {
389       if (ugi == null || authMethod == AuthMethod.DIGEST) {
390         // Don't send user for token auth
391         return null;
392       }
393       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
394       if (authMethod == AuthMethod.KERBEROS) {
395         // Send effective user for Kerberos auth
396         userInfoPB.setEffectiveUser(ugi.getUserName());
397       } else if (authMethod == AuthMethod.SIMPLE) {
398         //Send both effective user and real user for simple auth
399         userInfoPB.setEffectiveUser(ugi.getUserName());
400         if (ugi.getRealUser() != null) {
401           userInfoPB.setRealUser(ugi.getRealUser().getUserName());
402         }
403       }
404       return userInfoPB.build();
405     }
406 
407     protected synchronized void setupConnection() throws IOException {
408       short ioFailures = 0;
409       short timeoutFailures = 0;
410       while (true) {
411         try {
412           this.socket = socketFactory.createSocket();
413           this.socket.setTcpNoDelay(tcpNoDelay);
414           this.socket.setKeepAlive(tcpKeepAlive);
415           if (localAddr != null) {
416             this.socket.bind(localAddr);
417           }
418           NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
419           this.socket.setSoTimeout(readTO);
420           return;
421         } catch (SocketTimeoutException toe) {
422           /* The max number of retries is 45,
423            * which amounts to 20s*45 = 15 minutes retries.
424            */
425           handleConnectionFailure(timeoutFailures++, maxRetries, toe);
426         } catch (IOException ie) {
427           handleConnectionFailure(ioFailures++, maxRetries, ie);
428         }
429       }
430     }
431 
432     protected synchronized void closeConnection() {
433       if (socket == null) {
434         return;
435       }
436 
437       // close the current connection
438       try {
439         if (socket.getOutputStream() != null) {
440           socket.getOutputStream().close();
441         }
442       } catch (IOException ignored) {  // Can happen if the socket is already closed
443         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
444       }
445       try {
446         if (socket.getInputStream() != null) {
447           socket.getInputStream().close();
448         }
449       } catch (IOException ignored) {  // Can happen if the socket is already closed
450         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
451       }
452       try {
453         if (socket.getChannel() != null) {
454           socket.getChannel().close();
455         }
456       } catch (IOException ignored) {  // Can happen if the socket is already closed
457         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
458       }
459       try {
460         socket.close();
461       } catch (IOException e) {
462         LOG.warn("Not able to close a socket", e);
463       }
464 
465       // set socket to null so that the next call to setupIOstreams
466       // can start the process of connect all over again.
467       socket = null;
468     }
469 
470     /**
471      *  Handle connection failures
472      *
473      * If the current number of retries is equal to the max number of retries,
474      * stop retrying and throw the exception; Otherwise backoff N seconds and
475      * try connecting again.
476      *
477      * This Method is only called from inside setupIOstreams(), which is
478      * synchronized. Hence the sleep is synchronized; the locks will be retained.
479      *
480      * @param curRetries current number of retries
481      * @param maxRetries max number of retries allowed
482      * @param ioe failure reason
483      * @throws IOException if max number of retries is reached
484      */
485     private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
486     throws IOException {
487       closeConnection();
488 
489       // throw the exception if the maximum number of retries is reached
490       if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
491         throw ioe;
492       }
493 
494       // otherwise back off and retry
495       try {
496         Thread.sleep(failureSleep);
497       } catch (InterruptedException ie) {
498         ExceptionUtil.rethrowIfInterrupt(ie);
499       }
500 
501       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
502         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
503         " time(s).");
504     }
505 
506     /**
507      * @throws IOException if the connection is not open.
508      */
509     private void checkIsOpen() throws IOException {
510       if (shouldCloseConnection.get()) {
511         throw new ConnectionClosingException(getName() + " is closing");
512       }
513     }
514 
515     /* wait till someone signals us to start reading RPC response or
516      * it is idle too long, it is marked as to be closed,
517      * or the client is marked as not running.
518      *
519      * @return true if it is time to read a response; false otherwise.
520      */
521     protected synchronized boolean waitForWork() throws InterruptedException {
522       // beware of the concurrent access to the calls list: we can add calls, but as well
523       //  remove them.
524       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
525 
526       while (true) {
527         if (shouldCloseConnection.get()) {
528           return false;
529         }
530 
531         if (!running.get()) {
532           markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
533           return false;
534         }
535 
536         if (!calls.isEmpty()) {
537           // shouldCloseConnection can be set to true by a parallel thread here. The caller
538           //  will need to check anyway.
539           return true;
540         }
541 
542         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
543           // Connection is idle.
544           // We expect the number of calls to be zero here, but actually someone can
545           //  adds a call at the any moment, as there is no synchronization between this task
546           //  and adding new calls. It's not a big issue, but it will get an exception.
547           markClosed(new IOException(
548               "idle connection closed with " + calls.size() + " pending request(s)"));
549           return false;
550         }
551 
552         wait(Math.min(minIdleTimeBeforeClose, 1000));
553       }
554     }
555 
556     public InetSocketAddress getRemoteAddress() {
557       return remoteId.getAddress();
558     }
559 
560     @Override
561     public void run() {
562       if (LOG.isTraceEnabled()) {
563         LOG.trace(getName() + ": starting, connections " + connections.size());
564       }
565 
566       try {
567         while (waitForWork()) { // Wait here for work - read or close connection
568           readResponse();
569         }
570       } catch (InterruptedException t) {
571         if (LOG.isTraceEnabled()) {
572           LOG.trace(getName() + ": interrupted while waiting for call responses");
573         }
574         markClosed(ExceptionUtil.asInterrupt(t));
575       } catch (Throwable t) {
576         if (LOG.isDebugEnabled()) {
577           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
578         }
579         markClosed(new IOException("Unexpected throwable while waiting call responses", t));
580       }
581 
582       close();
583 
584       if (LOG.isTraceEnabled()) {
585         LOG.trace(getName() + ": stopped, connections " + connections.size());
586       }
587     }
588 
589     private synchronized void disposeSasl() {
590       if (saslRpcClient != null) {
591         try {
592           saslRpcClient.dispose();
593           saslRpcClient = null;
594         } catch (IOException ioe) {
595           LOG.error("Error disposing of SASL client", ioe);
596         }
597       }
598     }
599 
600     private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
601       UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
602       UserGroupInformation currentUser =
603         UserGroupInformation.getCurrentUser();
604       UserGroupInformation realUser = currentUser.getRealUser();
605       return authMethod == AuthMethod.KERBEROS &&
606           loginUser != null &&
607           //Make sure user logged in using Kerberos either keytab or TGT
608           loginUser.hasKerberosCredentials() &&
609           // relogin only in case it is the login user (e.g. JT)
610           // or superuser (like oozie).
611           (loginUser.equals(currentUser) || loginUser.equals(realUser));
612     }
613 
614     private synchronized boolean setupSaslConnection(final InputStream in2,
615         final OutputStream out2) throws IOException {
616       saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
617           conf.get("hbase.rpc.protection",
618               QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
619       return saslRpcClient.saslConnect(in2, out2);
620     }
621 
622     /**
623      * If multiple clients with the same principal try to connect
624      * to the same server at the same time, the server assumes a
625      * replay attack is in progress. This is a feature of kerberos.
626      * In order to work around this, what is done is that the client
627      * backs off randomly and tries to initiate the connection
628      * again.
629      * The other problem is to do with ticket expiry. To handle that,
630      * a relogin is attempted.
631      * <p>
632      * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
633      * method. In case when the user doesn't have valid credentials, we don't
634      * need to retry (from cache or ticket). In such cases, it is prudent to
635      * throw a runtime exception when we receive a SaslException from the
636      * underlying authentication implementation, so there is no retry from
637      * other high level (for eg, HCM or HBaseAdmin).
638      * </p>
639      */
640     private synchronized void handleSaslConnectionFailure(
641         final int currRetries,
642         final int maxRetries, final Exception ex, final Random rand,
643         final UserGroupInformation user)
644     throws IOException, InterruptedException{
645       user.doAs(new PrivilegedExceptionAction<Object>() {
646         @Override
647         public Object run() throws IOException, InterruptedException {
648           closeConnection();
649           if (shouldAuthenticateOverKrb()) {
650             if (currRetries < maxRetries) {
651               if (LOG.isDebugEnabled()) {
652                 LOG.debug("Exception encountered while connecting to " +
653                     "the server : " + ex);
654               }
655               //try re-login
656               if (UserGroupInformation.isLoginKeytabBased()) {
657                 UserGroupInformation.getLoginUser().reloginFromKeytab();
658               } else {
659                 UserGroupInformation.getLoginUser().reloginFromTicketCache();
660               }
661               disposeSasl();
662               //have granularity of milliseconds
663               //we are sleeping with the Connection lock held but since this
664               //connection instance is being used for connecting to the server
665               //in question, it is okay
666               Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
667               return null;
668             } else {
669               String msg = "Couldn't setup connection for " +
670               UserGroupInformation.getLoginUser().getUserName() +
671               " to " + serverPrincipal;
672               LOG.warn(msg, ex);
673               throw (IOException) new IOException(msg).initCause(ex);
674             }
675           } else {
676             LOG.warn("Exception encountered while connecting to " +
677                 "the server : " + ex);
678           }
679           if (ex instanceof RemoteException) {
680             throw (RemoteException)ex;
681           }
682           if (ex instanceof SaslException) {
683             String msg = "SASL authentication failed." +
684               " The most likely cause is missing or invalid credentials." +
685               " Consider 'kinit'.";
686             LOG.fatal(msg, ex);
687             throw new RuntimeException(msg, ex);
688           }
689           throw new IOException(ex);
690         }
691       });
692     }
693 
694     protected synchronized void setupIOstreams() throws IOException {
695       if (socket != null) {
696         // The connection is already available. Perfect.
697         return;
698       }
699 
700       if (shouldCloseConnection.get()){
701         throw new ConnectionClosingException("This connection is closing");
702       }
703 
704       if (failedServers.isFailedServer(remoteId.getAddress())) {
705         if (LOG.isDebugEnabled()) {
706           LOG.debug("Not trying to connect to " + server +
707               " this server is in the failed servers list");
708         }
709         IOException e = new FailedServerException(
710             "This server is in the failed servers list: " + server);
711         markClosed(e);
712         close();
713         throw e;
714       }
715 
716       try {
717         if (LOG.isDebugEnabled()) {
718           LOG.debug("Connecting to " + server);
719         }
720         short numRetries = 0;
721         final short MAX_RETRIES = 5;
722         Random rand = null;
723         while (true) {
724           setupConnection();
725           InputStream inStream = NetUtils.getInputStream(socket);
726           // This creates a socket with a write timeout. This timeout cannot be changed.
727           OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
728           // Write out the preamble -- MAGIC, version, and auth to use.
729           writeConnectionHeaderPreamble(outStream);
730           if (useSasl) {
731             final InputStream in2 = inStream;
732             final OutputStream out2 = outStream;
733             UserGroupInformation ticket = remoteId.getTicket().getUGI();
734             if (authMethod == AuthMethod.KERBEROS) {
735               if (ticket != null && ticket.getRealUser() != null) {
736                 ticket = ticket.getRealUser();
737               }
738             }
739             boolean continueSasl;
740             if (ticket == null) throw new FatalConnectionException("ticket/user is null");
741             try {
742               continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
743                 @Override
744                 public Boolean run() throws IOException {
745                   return setupSaslConnection(in2, out2);
746                 }
747               });
748             } catch (Exception ex) {
749               ExceptionUtil.rethrowIfInterrupt(ex);
750               if (rand == null) {
751                 rand = new Random();
752               }
753               handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
754               continue;
755             }
756             if (continueSasl) {
757               // Sasl connect is successful. Let's set up Sasl i/o streams.
758               inStream = saslRpcClient.getInputStream(inStream);
759               outStream = saslRpcClient.getOutputStream(outStream);
760             } else {
761               // fall back to simple auth because server told us so.
762               authMethod = AuthMethod.SIMPLE;
763               useSasl = false;
764             }
765           }
766           this.in = new DataInputStream(new BufferedInputStream(inStream));
767           synchronized (this.outLock) {
768             this.out = new DataOutputStream(new BufferedOutputStream(outStream));
769           }
770           // Now write out the connection header
771           writeConnectionHeader();
772 
773           // start the receiver thread after the socket connection has been set up
774           start();
775           return;
776         }
777       } catch (Throwable t) {
778         IOException e = ExceptionUtil.asInterrupt(t);
779         if (e == null) {
780           failedServers.addToFailedServers(remoteId.address);
781           if (t instanceof LinkageError) {
782             // probably the hbase hadoop version does not match the running hadoop version
783             e = new DoNotRetryIOException(t);
784           } else if (t instanceof IOException) {
785             e = (IOException) t;
786           } else {
787             e = new IOException("Could not set up IO Streams to " + server, t);
788           }
789         }
790         markClosed(e);
791         close();
792         throw e;
793       }
794     }
795 
796     /**
797      * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
798      */
799     private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
800       // Assemble the preamble up in a buffer first and then send it.  Writing individual elements,
801       // they are getting sent across piecemeal according to wireshark and then server is messing
802       // up the reading on occasion (the passed in stream is not buffered yet).
803 
804       // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
805       int rpcHeaderLen = HConstants.RPC_HEADER.length;
806       byte [] preamble = new byte [rpcHeaderLen + 2];
807       System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
808       preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
809       synchronized (this) {
810         preamble[rpcHeaderLen + 1] = authMethod.code;
811       }
812       outStream.write(preamble);
813       outStream.flush();
814     }
815 
816     /**
817      * Write the connection header.
818      */
819     private synchronized void writeConnectionHeader() throws IOException {
820       synchronized (this.outLock) {
821         this.out.writeInt(this.header.getSerializedSize());
822         this.header.writeTo(this.out);
823         this.out.flush();
824       }
825     }
826 
827     /** Close the connection. */
828     protected synchronized void close() {
829       if (!shouldCloseConnection.get()) {
830         LOG.error(getName() + ": the connection is not in the closed state");
831         return;
832       }
833 
834       // release the resources
835       // first thing to do;take the connection out of the connection list
836       synchronized (connections) {
837         connections.removeValue(remoteId, this);
838       }
839 
840       // close the streams and therefore the socket
841       synchronized(this.outLock) {
842         if (this.out != null) {
843           IOUtils.closeStream(out);
844           this.out = null;
845         }
846       }
847       IOUtils.closeStream(in);
848       this.in = null;
849       if (this.socket != null) {
850         try {
851           this.socket.close();
852           this.socket = null;
853         } catch (IOException e) {
854           LOG.error("Error while closing socket", e);
855         }
856       }
857 
858       disposeSasl();
859 
860       // log the info
861       if (LOG.isTraceEnabled()) {
862         LOG.trace(getName() + ": closing ipc connection to " + server);
863       }
864 
865       cleanupCalls(true);
866 
867       if (LOG.isTraceEnabled()) {
868         LOG.trace(getName() + ": ipc connection to " + server + " closed");
869       }
870     }
871 
872     protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
873       TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
874       try {
875         writeRequest(call, priority, span);
876       } finally {
877         ts.close();
878       }
879     }
880 
881     /**
882      * Initiates a call by sending the parameter to the remote server.
883      * Note: this is not called from the Connection thread, but by other
884      * threads.
885      * @see #readResponse()
886      */
887     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
888         justification="Findbugs is misinterpreting locking missing fact that this.outLock is held")
889     private void writeRequest(Call call, final int priority, Span span) throws IOException {
890       RequestHeader.Builder builder = RequestHeader.newBuilder();
891       builder.setCallId(call.id);
892       if (span != null) {
893         builder.setTraceInfo(
894             RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
895       }
896       builder.setMethodName(call.md.getName());
897       builder.setRequestParam(call.param != null);
898       ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
899       if (cellBlock != null) {
900         CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
901         cellBlockBuilder.setLength(cellBlock.limit());
902         builder.setCellBlockMeta(cellBlockBuilder.build());
903       }
904       // Only pass priority if there one.  Let zero be same as no priority.
905       if (priority != 0) builder.setPriority(priority);
906       RequestHeader header = builder.build();
907 
908       setupIOstreams();
909 
910       // Now we're going to write the call. We take the lock, then check that the connection
911       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
912       //  know where we stand, we have to close the connection.
913       checkIsOpen();
914       IOException writeException = null;
915       synchronized (this.outLock) {
916         if (Thread.interrupted()) throw new InterruptedIOException();
917 
918         calls.put(call.id, call); // We put first as we don't want the connection to become idle.
919         checkIsOpen(); // Now we're checking that it didn't became idle in between.
920 
921         try {
922           call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
923               cellBlock));
924         } catch (Throwable t) {
925           if (LOG.isTraceEnabled()) {
926             LOG.trace("Error while writing call, call_id:" + call.id, t);
927           }
928           // We set the value inside the synchronized block, this way the next in line
929           //  won't even try to write. Otherwise we might miss a call in the calls map?
930           shouldCloseConnection.set(true);
931           writeException = IPCUtil.toIOE(t);
932           interrupt();
933         }
934       }
935 
936       // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
937       if (writeException != null) {
938         markClosed(writeException);
939         close();
940       }
941 
942       // We added a call, and may be started the connection close. In both cases, we
943       //  need to notify the reader.
944       doNotify();
945 
946       // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
947       if (writeException != null) throw writeException;
948     }
949 
950     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
951         justification="Presume notifyAll is because we are closing/shutting down")
952     private synchronized void doNotify() {
953       // Make a separate method so can do synchronize and add findbugs annotation; only one
954       // annotation at at time in source 1.7.
955       notifyAll(); // Findbugs: NN_NAKED_NOTIFY
956     }
957 
958     /* Receive a response.
959      * Because only one receiver, so no synchronization on in.
960      */
961     protected void readResponse() {
962       if (shouldCloseConnection.get()) return;
963       Call call = null;
964       boolean expectedCall = false;
965       try {
966         // See HBaseServer.Call.setResponse for where we write out the response.
967         // Total size of the response.  Unused.  But have to read it in anyways.
968         int totalSize = in.readInt();
969 
970         // Read the header
971         ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
972         int id = responseHeader.getCallId();
973         call = calls.remove(id); // call.done have to be set before leaving this method
974         expectedCall = (call != null && !call.done);
975         if (!expectedCall) {
976           // So we got a response for which we have no corresponding 'call' here on the client-side.
977           // We probably timed out waiting, cleaned up all references, and now the server decides
978           // to return a response.  There is nothing we can do w/ the response at this stage. Clean
979           // out the wire of the response so its out of the way and we can get other responses on
980           // this connection.
981           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
982           int whatIsLeftToRead = totalSize - readSoFar;
983           IOUtils.skipFully(in, whatIsLeftToRead);
984           if (call != null) {
985             call.callStats.setResponseSizeBytes(totalSize);
986             call.callStats.setCallTimeMs(
987                 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
988           }
989           return;
990         }
991         if (responseHeader.hasException()) {
992           ExceptionResponse exceptionResponse = responseHeader.getException();
993           RemoteException re = createRemoteException(exceptionResponse);
994           call.setException(re);
995           call.callStats.setResponseSizeBytes(totalSize);
996           call.callStats.setCallTimeMs(
997               EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
998           if (isFatalConnectionException(exceptionResponse)) {
999             markClosed(re);
1000           }
1001         } else {
1002           Message value = null;
1003           if (call.responseDefaultType != null) {
1004             Builder builder = call.responseDefaultType.newBuilderForType();
1005             ProtobufUtil.mergeDelimitedFrom(builder, in);
1006             value = builder.build();
1007           }
1008           CellScanner cellBlockScanner = null;
1009           if (responseHeader.hasCellBlockMeta()) {
1010             int size = responseHeader.getCellBlockMeta().getLength();
1011             byte [] cellBlock = new byte[size];
1012             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1013             cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1014           }
1015           call.setResponse(value, cellBlockScanner);
1016           call.callStats.setResponseSizeBytes(totalSize);
1017           call.callStats.setCallTimeMs(
1018               EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
1019         }
1020       } catch (IOException e) {
1021         if (expectedCall) call.setException(e);
1022         if (e instanceof SocketTimeoutException) {
1023           // Clean up open calls but don't treat this as a fatal condition,
1024           // since we expect certain responses to not make it by the specified
1025           // {@link ConnectionId#rpcTimeout}.
1026           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1027         } else {
1028           // Treat this as a fatal condition and close this connection
1029           markClosed(e);
1030         }
1031       } finally {
1032         cleanupCalls(false);
1033       }
1034     }
1035 
1036     /**
1037      * @return True if the exception is a fatal connection exception.
1038      */
1039     private boolean isFatalConnectionException(final ExceptionResponse e) {
1040       return e.getExceptionClassName().
1041         equals(FatalConnectionException.class.getName());
1042     }
1043 
1044     /**
1045      * @param e exception to be wrapped
1046      * @return RemoteException made from passed <code>e</code>
1047      */
1048     private RemoteException createRemoteException(final ExceptionResponse e) {
1049       String innerExceptionClassName = e.getExceptionClassName();
1050       boolean doNotRetry = e.getDoNotRetry();
1051       return e.hasHostname()?
1052         // If a hostname then add it to the RemoteWithExtrasException
1053         new RemoteWithExtrasException(innerExceptionClassName,
1054           e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1055         new RemoteWithExtrasException(innerExceptionClassName,
1056           e.getStackTrace(), doNotRetry);
1057     }
1058 
1059     protected synchronized boolean markClosed(IOException e) {
1060       if (e == null) throw new NullPointerException();
1061 
1062       boolean ret = shouldCloseConnection.compareAndSet(false, true);
1063       if (ret) {
1064         if (LOG.isTraceEnabled()) {
1065           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1066         }
1067         if (callSender != null) {
1068           callSender.close();
1069         }
1070         notifyAll();
1071       }
1072       return ret;
1073     }
1074 
1075 
1076     /**
1077      * Cleanup the calls older than a given timeout, in milli seconds.
1078      * @param allCalls true for all calls, false for only the calls in timeout
1079      */
1080     protected synchronized void cleanupCalls(boolean allCalls) {
1081       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1082       while (itor.hasNext()) {
1083         Call c = itor.next().getValue();
1084         if (c.done) {
1085           // To catch the calls without timeout that were cancelled.
1086           itor.remove();
1087         } else if (allCalls) {
1088           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1089           IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1090               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1091           c.setException(ie);
1092           itor.remove();
1093         } else if (c.checkAndSetTimeout()) {
1094           itor.remove();
1095         } else {
1096           // We expect the call to be ordered by timeout. It may not be the case, but stopping
1097           //  at the first valid call allows to be sure that we still have something to do without
1098           //  spending too much time by reading the full list.
1099           break;
1100         }
1101       }
1102     }
1103   }
1104 
1105   /**
1106    * Used in test only. Construct an IPC cluster client whose values are of the
1107    * {@link Message} class.
1108    * @param conf configuration
1109    * @param clusterId the cluster id
1110    * @param factory socket factory
1111    */
1112   @VisibleForTesting
1113   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1114     this(conf, clusterId, factory, null, null);
1115   }
1116 
1117   /**
1118    * Construct an IPC cluster client whose values are of the {@link Message} class.
1119    * @param conf configuration
1120    * @param clusterId the cluster id
1121    * @param factory socket factory
1122    * @param localAddr client socket bind address
1123    * @param metrics the connection metrics
1124    */
1125   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1126       SocketAddress localAddr, MetricsConnection metrics) {
1127     super(conf, clusterId, localAddr, metrics);
1128 
1129     this.socketFactory = factory;
1130     this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1131     this.failedServers = new FailedServers(conf);
1132   }
1133 
1134   /**
1135    * Used in test only. Construct an IPC client for the cluster {@code clusterId} with
1136    * the default SocketFactory
1137    */
1138   @VisibleForTesting
1139   RpcClientImpl(Configuration conf, String clusterId) {
1140     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
1141   }
1142 
1143   /**
1144    * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory
1145    *
1146    * This method is called with reflection by the RpcClientFactory to create an instance
1147    *
1148    * @param conf configuration
1149    * @param clusterId the cluster id
1150    * @param localAddr client socket bind address.
1151    * @param metrics the connection metrics
1152    */
1153   public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
1154       MetricsConnection metrics) {
1155     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
1156   }
1157 
1158   /** Stop all threads related to this client.  No further calls may be made
1159    * using this client. */
1160   @Override
1161   public void close() {
1162     if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1163     if (!running.compareAndSet(true, false)) return;
1164 
1165     Set<Connection> connsToClose = null;
1166     // wake up all connections
1167     synchronized (connections) {
1168       for (Connection conn : connections.values()) {
1169         conn.interrupt();
1170         if (conn.callSender != null) {
1171           conn.callSender.interrupt();
1172         }
1173 
1174         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
1175         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
1176         if (!conn.isAlive()) {
1177           if (connsToClose == null) {
1178             connsToClose = new HashSet<Connection>();
1179           }
1180           connsToClose.add(conn);
1181         }
1182       }
1183     }
1184     if (connsToClose != null) {
1185       for (Connection conn : connsToClose) {
1186         conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1187         conn.close();
1188       }
1189     }
1190     // wait until all connections are closed
1191     while (!connections.isEmpty()) {
1192       try {
1193         Thread.sleep(10);
1194       } catch (InterruptedException e) {
1195         LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1196             " connections.");
1197         Thread.currentThread().interrupt();
1198         return;
1199       }
1200     }
1201   }
1202 
1203   /** Make a call, passing <code>param</code>, to the IPC server running at
1204    * <code>address</code> which is servicing the <code>protocol</code> protocol,
1205    * with the <code>ticket</code> credentials, returning the value.
1206    * Throws exceptions if there are network problems or if the remote code
1207    * threw an exception.
1208    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
1209    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
1210    *          new Connection each time.
1211    * @return A pair with the Message response and the Cell data (if any).
1212    * @throws InterruptedException
1213    * @throws IOException
1214    */
1215   @Override
1216   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1217       Message param, Message returnType, User ticket, InetSocketAddress addr,
1218       MetricsConnection.CallStats callStats)
1219       throws IOException, InterruptedException {
1220     if (pcrc == null) {
1221       pcrc = new PayloadCarryingRpcController();
1222     }
1223     CellScanner cells = pcrc.cellScanner();
1224 
1225     final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1226         pcrc.getCallTimeout(), MetricsConnection.newCallStats());
1227 
1228     final Connection connection = getConnection(ticket, call, addr);
1229 
1230     final CallFuture cts;
1231     if (connection.callSender != null) {
1232       cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1233         pcrc.notifyOnCancel(new RpcCallback<Object>() {
1234           @Override
1235           public void run(Object parameter) {
1236             connection.callSender.remove(cts);
1237           }
1238         });
1239         if (pcrc.isCanceled()) {
1240           // To finish if the call was cancelled before we set the notification (race condition)
1241           call.callComplete();
1242           return new Pair<Message, CellScanner>(call.response, call.cells);
1243         }
1244     } else {
1245       cts = null;
1246       connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1247     }
1248 
1249     while (!call.done) {
1250       if (call.checkAndSetTimeout()) {
1251         if (cts != null) connection.callSender.remove(cts);
1252         break;
1253       }
1254       if (connection.shouldCloseConnection.get()) {
1255         throw new ConnectionClosingException("Call id=" + call.id +
1256             " on server " + addr + " aborted: connection is closing");
1257       }
1258       try {
1259         synchronized (call) {
1260           if (call.done) break;
1261           call.wait(Math.min(call.remainingTime(), 1000) + 1);
1262         }
1263       } catch (InterruptedException e) {
1264         call.setException(new InterruptedIOException());
1265         if (cts != null) connection.callSender.remove(cts);
1266         throw e;
1267       }
1268     }
1269 
1270     if (call.error != null) {
1271       if (call.error instanceof RemoteException) {
1272         call.error.fillInStackTrace();
1273         throw call.error;
1274       }
1275       // local exception
1276       throw wrapException(addr, call.error);
1277     }
1278 
1279     return new Pair<Message, CellScanner>(call.response, call.cells);
1280   }
1281 
1282 
1283   /**
1284    * Interrupt the connections to the given ip:port server. This should be called if the server
1285    *  is known as actually dead. This will not prevent current operation to be retried, and,
1286    *  depending on their own behavior, they may retry on the same server. This can be a feature,
1287    *  for example at startup. In any case, they're likely to get connection refused (if the
1288    *  process died) or no route to host: i.e. their next retries should be faster and with a
1289    *  safe exception.
1290    */
1291   @Override
1292   public void cancelConnections(ServerName sn) {
1293     synchronized (connections) {
1294       for (Connection connection : connections.values()) {
1295         if (connection.isAlive() &&
1296             connection.getRemoteAddress().getPort() == sn.getPort() &&
1297             connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1298           LOG.info("The server on " + sn.toString() +
1299               " is dead - stopping the connection " + connection.remoteId);
1300           connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
1301                                   // This will close the connection as well.
1302         }
1303       }
1304     }
1305   }
1306 
1307   /**
1308    *  Get a connection from the pool, or create a new one and add it to the
1309    * pool. Connections to a given host/port are reused.
1310    */
1311   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1312   throws IOException {
1313     if (!running.get()) throw new StoppedRpcClientException();
1314     Connection connection;
1315     ConnectionId remoteId =
1316       new ConnectionId(ticket, call.md.getService().getName(), addr);
1317     synchronized (connections) {
1318       connection = connections.get(remoteId);
1319       if (connection == null) {
1320         connection = createConnection(remoteId, this.codec, this.compressor);
1321         connections.put(remoteId, connection);
1322       }
1323     }
1324 
1325     return connection;
1326   }
1327 }