View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import java.io.BufferedInputStream;
23  import java.io.BufferedOutputStream;
24  import java.io.Closeable;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.OutputStream;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.SocketTimeoutException;
35  import java.net.UnknownHostException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.HashMap;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.Map;
42  import java.util.Map.Entry;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.concurrent.ArrayBlockingQueue;
46  import java.util.concurrent.BlockingQueue;
47  import java.util.concurrent.ConcurrentSkipListMap;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.concurrent.atomic.AtomicInteger;
50  
51  import javax.net.SocketFactory;
52  import javax.security.sasl.SaslException;
53  
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.hbase.CellScanner;
58  import org.apache.hadoop.hbase.DoNotRetryIOException;
59  import org.apache.hadoop.hbase.HConstants;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.client.MetricsConnection;
63  import org.apache.hadoop.hbase.codec.Codec;
64  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
68  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
69  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
70  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
71  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
72  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
73  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
74  import org.apache.hadoop.hbase.security.AuthMethod;
75  import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
76  import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
77  import org.apache.hadoop.hbase.security.SecurityInfo;
78  import org.apache.hadoop.hbase.security.User;
79  import org.apache.hadoop.hbase.security.UserProvider;
80  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.ExceptionUtil;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PoolMap;
85  import org.apache.hadoop.io.IOUtils;
86  import org.apache.hadoop.io.Text;
87  import org.apache.hadoop.io.compress.CompressionCodec;
88  import org.apache.hadoop.ipc.RemoteException;
89  import org.apache.hadoop.net.NetUtils;
90  import org.apache.hadoop.security.SecurityUtil;
91  import org.apache.hadoop.security.UserGroupInformation;
92  import org.apache.hadoop.security.token.Token;
93  import org.apache.hadoop.security.token.TokenIdentifier;
94  import org.apache.hadoop.security.token.TokenSelector;
95  import org.apache.htrace.Span;
96  import org.apache.htrace.Trace;
97  import org.apache.htrace.TraceScope;
98  
99  import com.google.common.annotations.VisibleForTesting;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcCallback;
104 
105 /**
106  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
107  * <p>See HBaseServer
108  */
109 @InterfaceAudience.Private
110 public class RpcClientImpl extends AbstractRpcClient {
111   private static final Log LOG = LogFactory.getLog(RpcClientImpl.class);
112   protected final AtomicInteger callIdCnt = new AtomicInteger();
113 
114   protected final PoolMap<ConnectionId, Connection> connections;
115 
116   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
117 
118   protected final FailedServers failedServers;
119 
120   protected final SocketFactory socketFactory;           // how to create sockets
121 
122   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
123       TokenSelector<? extends TokenIdentifier>> tokenHandlers =
124       new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
125         TokenSelector<? extends TokenIdentifier>>();
126   static {
127     tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
128         new AuthenticationTokenSelector());
129   }
130 
131   /**
132    * Creates a connection. Can be overridden by a subclass for testing.
133    * @param remoteId - the ConnectionId to use for the connection creation.
134    */
135   protected Connection createConnection(ConnectionId remoteId, final Codec codec,
136       final CompressionCodec compressor)
137   throws IOException {
138     return new Connection(remoteId, codec, compressor);
139   }
140 
141   /**
142    * see {@link RpcClientImpl.Connection.CallSender}
143    */
144   private static class CallFuture {
145     final Call call;
146     final int priority;
147     final Span span;
148 
149     // We will use this to stop the writer
150     final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
151 
152     CallFuture(Call call, int priority, Span span) {
153       this.call = call;
154       this.priority = priority;
155       this.span = span;
156     }
157   }
158 
159   /** Thread that reads responses and notifies callers.  Each connection owns a
160    * socket connected to a remote address.  Calls are multiplexed through this
161    * socket: responses may be delivered out of order. */
162   protected class Connection extends Thread {
163     private ConnectionHeader header;              // connection header
164     protected ConnectionId remoteId;
165     protected Socket socket = null;                 // connected socket
166     protected DataInputStream in;
167     protected DataOutputStream out;
168     private Object outLock = new Object();
169     private InetSocketAddress server;             // server ip:port
170     private String serverPrincipal;  // server's krb5 principal name
171     private AuthMethod authMethod; // authentication method
172     private boolean useSasl;
173     private Token<? extends TokenIdentifier> token;
174     private HBaseSaslRpcClient saslRpcClient;
175     private int reloginMaxBackoff; // max pause before relogin on sasl failure
176     private final Codec codec;
177     private final CompressionCodec compressor;
178 
179     // currently active calls
180     protected final ConcurrentSkipListMap<Integer, Call> calls =
181       new ConcurrentSkipListMap<Integer, Call>();
182 
183     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
184     protected final CallSender callSender;
185 
186 
187     /**
188      * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
189      *  it gets into a java issue: an interruption during a write closes the socket/channel.
190      * A way to avoid this is to use a different thread for writing. This way, on interruptions,
191      *  we either cancel the writes or ignore the answer if the write is already done, but we
192      *  don't stop the write in the middle.
193      * This adds a thread per region server in the client, so it's kept as an option.
194      * <p>
195      * The implementation is simple: the client threads adds their call to the queue, and then
196      *  wait for an answer. The CallSender blocks on the queue, and writes the calls one
197      *  after the other. On interruption, the client cancels its call. The CallSender checks that
198      *  the call has not been canceled before writing it.
199      * </p>
200      * When the connection closes, all the calls not yet sent are dismissed. The client thread
201      *  is notified with an appropriate exception, as if the call was already sent but the answer
202      *  not yet received.
203      * </p>
204      */
205     private class CallSender extends Thread implements Closeable {
206       protected final BlockingQueue<CallFuture> callsToWrite;
207 
208 
209       public CallFuture sendCall(Call call, int priority, Span span)
210           throws InterruptedException, IOException {
211         CallFuture cts = new CallFuture(call, priority, span);
212         if (!callsToWrite.offer(cts)) {
213           throw new IOException("Can't add the call " + call.id +
214               " to the write queue. callsToWrite.size()=" + callsToWrite.size());
215         }
216         checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
217                        //  in the list while the cleanup was already done.
218         return cts;
219       }
220 
221       @Override
222       public void close(){
223         assert shouldCloseConnection.get();
224         callsToWrite.offer(CallFuture.DEATH_PILL);
225         // We don't care if we can't add the death pill to the queue: the writer
226         //  won't be blocked in the 'take', as its queue is full.
227       }
228 
229       CallSender(String name, Configuration conf) {
230         int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
231         callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
232         setDaemon(true);
233         setName(name + " - writer");
234       }
235 
236       public void remove(CallFuture cts){
237         callsToWrite.remove(cts);
238 
239         // By removing the call from the expected call list, we make the list smaller, but
240         //  it means as well that we don't know how many calls we cancelled.
241         calls.remove(cts.call.id);
242         cts.call.callComplete();
243       }
244 
245       /**
246        * Reads the call from the queue, write them on the socket.
247        */
248       @Override
249       public void run() {
250         while (!shouldCloseConnection.get()) {
251           CallFuture cts = null;
252           try {
253             cts = callsToWrite.take();
254           } catch (InterruptedException e) {
255             markClosed(new InterruptedIOException());
256           }
257 
258           if (cts == null || cts == CallFuture.DEATH_PILL) {
259             assert shouldCloseConnection.get();
260             break;
261           }
262 
263           if (cts.call.done) {
264             continue;
265           }
266 
267           if (cts.call.checkAndSetTimeout()) {
268             continue;
269           }
270 
271           try {
272             Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
273           } catch (IOException e) {
274             if (LOG.isDebugEnabled()) {
275               LOG.debug("call write error for call #" + cts.call.id
276                 + ", message =" + e.getMessage());
277             }
278             cts.call.setException(e);
279             markClosed(e);
280           }
281         }
282 
283         cleanup();
284       }
285 
286       /**
287        * Cleans the call not yet sent when we finish.
288        */
289       private void cleanup() {
290         assert shouldCloseConnection.get();
291 
292         IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
293         while (true) {
294           CallFuture cts = callsToWrite.poll();
295           if (cts == null) {
296             break;
297           }
298           if (cts.call != null && !cts.call.done) {
299             cts.call.setException(ie);
300           }
301         }
302       }
303     }
304 
305     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
306     throws IOException {
307       if (remoteId.getAddress().isUnresolved()) {
308         throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
309       }
310       this.server = remoteId.getAddress();
311       this.codec = codec;
312       this.compressor = compressor;
313 
314       UserGroupInformation ticket = remoteId.getTicket().getUGI();
315       SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
316       this.useSasl = userProvider.isHBaseSecurityEnabled();
317       if (useSasl && securityInfo != null) {
318         AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
319         if (tokenKind != null) {
320           TokenSelector<? extends TokenIdentifier> tokenSelector =
321               tokenHandlers.get(tokenKind);
322           if (tokenSelector != null) {
323             token = tokenSelector.selectToken(new Text(clusterId),
324                 ticket.getTokens());
325           } else if (LOG.isDebugEnabled()) {
326             LOG.debug("No token selector found for type "+tokenKind);
327           }
328         }
329         String serverKey = securityInfo.getServerPrincipal();
330         if (serverKey == null) {
331           throw new IOException(
332               "Can't obtain server Kerberos config key from SecurityInfo");
333         }
334         serverPrincipal = SecurityUtil.getServerPrincipal(
335             conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
336         if (LOG.isDebugEnabled()) {
337           LOG.debug("RPC Server Kerberos principal name for service="
338               + remoteId.getServiceName() + " is " + serverPrincipal);
339         }
340       }
341 
342       if (!useSasl) {
343         authMethod = AuthMethod.SIMPLE;
344       } else if (token != null) {
345         authMethod = AuthMethod.DIGEST;
346       } else {
347         authMethod = AuthMethod.KERBEROS;
348       }
349 
350       if (LOG.isDebugEnabled()) {
351         LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
352           ", sasl=" + useSasl);
353       }
354       reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
355       this.remoteId = remoteId;
356 
357       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
358       builder.setServiceName(remoteId.getServiceName());
359       UserInformation userInfoPB = getUserInfo(ticket);
360       if (userInfoPB != null) {
361         builder.setUserInfo(userInfoPB);
362       }
363       if (this.codec != null) {
364         builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
365       }
366       if (this.compressor != null) {
367         builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
368       }
369       builder.setVersionInfo(ProtobufUtil.getVersionInfo());
370       this.header = builder.build();
371 
372       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
373         remoteId.getAddress().toString() +
374         ((ticket==null)?" from an unknown user": (" from "
375         + ticket.getUserName())));
376       this.setDaemon(true);
377 
378       if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
379         callSender = new CallSender(getName(), conf);
380         callSender.start();
381       } else {
382         callSender = null;
383       }
384     }
385 
386     private synchronized UserInformation getUserInfo(UserGroupInformation ugi) {
387       if (ugi == null || authMethod == AuthMethod.DIGEST) {
388         // Don't send user for token auth
389         return null;
390       }
391       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
392       if (authMethod == AuthMethod.KERBEROS) {
393         // Send effective user for Kerberos auth
394         userInfoPB.setEffectiveUser(ugi.getUserName());
395       } else if (authMethod == AuthMethod.SIMPLE) {
396         //Send both effective user and real user for simple auth
397         userInfoPB.setEffectiveUser(ugi.getUserName());
398         if (ugi.getRealUser() != null) {
399           userInfoPB.setRealUser(ugi.getRealUser().getUserName());
400         }
401       }
402       return userInfoPB.build();
403     }
404 
405     protected synchronized void setupConnection() throws IOException {
406       short ioFailures = 0;
407       short timeoutFailures = 0;
408       while (true) {
409         try {
410           this.socket = socketFactory.createSocket();
411           this.socket.setTcpNoDelay(tcpNoDelay);
412           this.socket.setKeepAlive(tcpKeepAlive);
413           if (localAddr != null) {
414             this.socket.bind(localAddr);
415           }
416           NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
417           this.socket.setSoTimeout(readTO);
418           return;
419         } catch (SocketTimeoutException toe) {
420           /* The max number of retries is 45,
421            * which amounts to 20s*45 = 15 minutes retries.
422            */
423           handleConnectionFailure(timeoutFailures++, maxRetries, toe);
424         } catch (IOException ie) {
425           handleConnectionFailure(ioFailures++, maxRetries, ie);
426         }
427       }
428     }
429 
430     protected synchronized void closeConnection() {
431       if (socket == null) {
432         return;
433       }
434 
435       // close the current connection
436       try {
437         if (socket.getOutputStream() != null) {
438           socket.getOutputStream().close();
439         }
440       } catch (IOException ignored) {  // Can happen if the socket is already closed
441         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
442       }
443       try {
444         if (socket.getInputStream() != null) {
445           socket.getInputStream().close();
446         }
447       } catch (IOException ignored) {  // Can happen if the socket is already closed
448         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
449       }
450       try {
451         if (socket.getChannel() != null) {
452           socket.getChannel().close();
453         }
454       } catch (IOException ignored) {  // Can happen if the socket is already closed
455         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
456       }
457       try {
458         socket.close();
459       } catch (IOException e) {
460         LOG.warn("Not able to close a socket", e);
461       }
462 
463       // set socket to null so that the next call to setupIOstreams
464       // can start the process of connect all over again.
465       socket = null;
466     }
467 
468     /**
469      *  Handle connection failures
470      *
471      * If the current number of retries is equal to the max number of retries,
472      * stop retrying and throw the exception; Otherwise backoff N seconds and
473      * try connecting again.
474      *
475      * This Method is only called from inside setupIOstreams(), which is
476      * synchronized. Hence the sleep is synchronized; the locks will be retained.
477      *
478      * @param curRetries current number of retries
479      * @param maxRetries max number of retries allowed
480      * @param ioe failure reason
481      * @throws IOException if max number of retries is reached
482      */
483     private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
484     throws IOException {
485       closeConnection();
486 
487       // throw the exception if the maximum number of retries is reached
488       if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
489         throw ioe;
490       }
491 
492       // otherwise back off and retry
493       try {
494         Thread.sleep(failureSleep);
495       } catch (InterruptedException ie) {
496         ExceptionUtil.rethrowIfInterrupt(ie);
497       }
498 
499       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
500         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
501         " time(s).");
502     }
503 
504     /**
505      * @throws IOException if the connection is not open.
506      */
507     private void checkIsOpen() throws IOException {
508       if (shouldCloseConnection.get()) {
509         throw new ConnectionClosingException(getName() + " is closing");
510       }
511     }
512 
513     /* wait till someone signals us to start reading RPC response or
514      * it is idle too long, it is marked as to be closed,
515      * or the client is marked as not running.
516      *
517      * @return true if it is time to read a response; false otherwise.
518      */
519     protected synchronized boolean waitForWork() throws InterruptedException {
520       // beware of the concurrent access to the calls list: we can add calls, but as well
521       //  remove them.
522       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
523 
524       while (true) {
525         if (shouldCloseConnection.get()) {
526           return false;
527         }
528 
529         if (!running.get()) {
530           markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
531           return false;
532         }
533 
534         if (!calls.isEmpty()) {
535           // shouldCloseConnection can be set to true by a parallel thread here. The caller
536           //  will need to check anyway.
537           return true;
538         }
539 
540         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
541           // Connection is idle.
542           // We expect the number of calls to be zero here, but actually someone can
543           //  adds a call at the any moment, as there is no synchronization between this task
544           //  and adding new calls. It's not a big issue, but it will get an exception.
545           markClosed(new IOException(
546               "idle connection closed with " + calls.size() + " pending request(s)"));
547           return false;
548         }
549 
550         wait(Math.min(minIdleTimeBeforeClose, 1000));
551       }
552     }
553 
554     public InetSocketAddress getRemoteAddress() {
555       return remoteId.getAddress();
556     }
557 
558     @Override
559     public void run() {
560       if (LOG.isTraceEnabled()) {
561         LOG.trace(getName() + ": starting, connections " + connections.size());
562       }
563 
564       try {
565         while (waitForWork()) { // Wait here for work - read or close connection
566           readResponse();
567         }
568       } catch (InterruptedException t) {
569         if (LOG.isTraceEnabled()) {
570           LOG.trace(getName() + ": interrupted while waiting for call responses");
571         }
572         markClosed(ExceptionUtil.asInterrupt(t));
573       } catch (Throwable t) {
574         if (LOG.isDebugEnabled()) {
575           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
576         }
577         markClosed(new IOException("Unexpected throwable while waiting call responses", t));
578       }
579 
580       close();
581 
582       if (LOG.isTraceEnabled()) {
583         LOG.trace(getName() + ": stopped, connections " + connections.size());
584       }
585     }
586 
587     private synchronized void disposeSasl() {
588       if (saslRpcClient != null) {
589         try {
590           saslRpcClient.dispose();
591           saslRpcClient = null;
592         } catch (IOException ioe) {
593           LOG.error("Error disposing of SASL client", ioe);
594         }
595       }
596     }
597 
598     private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
599       UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
600       UserGroupInformation currentUser =
601         UserGroupInformation.getCurrentUser();
602       UserGroupInformation realUser = currentUser.getRealUser();
603       return authMethod == AuthMethod.KERBEROS &&
604           loginUser != null &&
605           //Make sure user logged in using Kerberos either keytab or TGT
606           loginUser.hasKerberosCredentials() &&
607           // relogin only in case it is the login user (e.g. JT)
608           // or superuser (like oozie).
609           (loginUser.equals(currentUser) || loginUser.equals(realUser));
610     }
611 
612     private synchronized boolean setupSaslConnection(final InputStream in2,
613         final OutputStream out2) throws IOException {
614       saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
615           conf.get("hbase.rpc.protection",
616               QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
617       return saslRpcClient.saslConnect(in2, out2);
618     }
619 
620     /**
621      * If multiple clients with the same principal try to connect
622      * to the same server at the same time, the server assumes a
623      * replay attack is in progress. This is a feature of kerberos.
624      * In order to work around this, what is done is that the client
625      * backs off randomly and tries to initiate the connection
626      * again.
627      * The other problem is to do with ticket expiry. To handle that,
628      * a relogin is attempted.
629      * <p>
630      * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
631      * method. In case when the user doesn't have valid credentials, we don't
632      * need to retry (from cache or ticket). In such cases, it is prudent to
633      * throw a runtime exception when we receive a SaslException from the
634      * underlying authentication implementation, so there is no retry from
635      * other high level (for eg, HCM or HBaseAdmin).
636      * </p>
637      */
638     private synchronized void handleSaslConnectionFailure(
639         final int currRetries,
640         final int maxRetries, final Exception ex, final Random rand,
641         final UserGroupInformation user)
642     throws IOException, InterruptedException{
643       user.doAs(new PrivilegedExceptionAction<Object>() {
644         @Override
645         public Object run() throws IOException, InterruptedException {
646           closeConnection();
647           if (shouldAuthenticateOverKrb()) {
648             if (currRetries < maxRetries) {
649               if (LOG.isDebugEnabled()) {
650                 LOG.debug("Exception encountered while connecting to " +
651                     "the server : " + ex);
652               }
653               //try re-login
654               if (UserGroupInformation.isLoginKeytabBased()) {
655                 UserGroupInformation.getLoginUser().reloginFromKeytab();
656               } else {
657                 UserGroupInformation.getLoginUser().reloginFromTicketCache();
658               }
659               disposeSasl();
660               //have granularity of milliseconds
661               //we are sleeping with the Connection lock held but since this
662               //connection instance is being used for connecting to the server
663               //in question, it is okay
664               Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
665               return null;
666             } else {
667               String msg = "Couldn't setup connection for " +
668               UserGroupInformation.getLoginUser().getUserName() +
669               " to " + serverPrincipal;
670               LOG.warn(msg, ex);
671               throw (IOException) new IOException(msg).initCause(ex);
672             }
673           } else {
674             LOG.warn("Exception encountered while connecting to " +
675                 "the server : " + ex);
676           }
677           if (ex instanceof RemoteException) {
678             throw (RemoteException)ex;
679           }
680           if (ex instanceof SaslException) {
681             String msg = "SASL authentication failed." +
682               " The most likely cause is missing or invalid credentials." +
683               " Consider 'kinit'.";
684             LOG.fatal(msg, ex);
685             throw new RuntimeException(msg, ex);
686           }
687           throw new IOException(ex);
688         }
689       });
690     }
691 
692     protected synchronized void setupIOstreams() throws IOException {
693       if (socket != null) {
694         // The connection is already available. Perfect.
695         return;
696       }
697 
698       if (shouldCloseConnection.get()){
699         throw new ConnectionClosingException("This connection is closing");
700       }
701 
702       if (failedServers.isFailedServer(remoteId.getAddress())) {
703         if (LOG.isDebugEnabled()) {
704           LOG.debug("Not trying to connect to " + server +
705               " this server is in the failed servers list");
706         }
707         IOException e = new FailedServerException(
708             "This server is in the failed servers list: " + server);
709         markClosed(e);
710         close();
711         throw e;
712       }
713 
714       try {
715         if (LOG.isDebugEnabled()) {
716           LOG.debug("Connecting to " + server);
717         }
718         short numRetries = 0;
719         final short MAX_RETRIES = 5;
720         Random rand = null;
721         while (true) {
722           setupConnection();
723           InputStream inStream = NetUtils.getInputStream(socket);
724           // This creates a socket with a write timeout. This timeout cannot be changed.
725           OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
726           // Write out the preamble -- MAGIC, version, and auth to use.
727           writeConnectionHeaderPreamble(outStream);
728           if (useSasl) {
729             final InputStream in2 = inStream;
730             final OutputStream out2 = outStream;
731             UserGroupInformation ticket = remoteId.getTicket().getUGI();
732             if (authMethod == AuthMethod.KERBEROS) {
733               if (ticket != null && ticket.getRealUser() != null) {
734                 ticket = ticket.getRealUser();
735               }
736             }
737             boolean continueSasl;
738             if (ticket == null) throw new FatalConnectionException("ticket/user is null");
739             try {
740               continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
741                 @Override
742                 public Boolean run() throws IOException {
743                   return setupSaslConnection(in2, out2);
744                 }
745               });
746             } catch (Exception ex) {
747               ExceptionUtil.rethrowIfInterrupt(ex);
748               if (rand == null) {
749                 rand = new Random();
750               }
751               handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
752               continue;
753             }
754             if (continueSasl) {
755               // Sasl connect is successful. Let's set up Sasl i/o streams.
756               inStream = saslRpcClient.getInputStream(inStream);
757               outStream = saslRpcClient.getOutputStream(outStream);
758             } else {
759               // fall back to simple auth because server told us so.
760               authMethod = AuthMethod.SIMPLE;
761               useSasl = false;
762             }
763           }
764           this.in = new DataInputStream(new BufferedInputStream(inStream));
765           synchronized (this.outLock) {
766             this.out = new DataOutputStream(new BufferedOutputStream(outStream));
767           }
768           // Now write out the connection header
769           writeConnectionHeader();
770 
771           // start the receiver thread after the socket connection has been set up
772           start();
773           return;
774         }
775       } catch (Throwable t) {
776         IOException e = ExceptionUtil.asInterrupt(t);
777         if (e == null) {
778           failedServers.addToFailedServers(remoteId.address);
779           if (t instanceof LinkageError) {
780             // probably the hbase hadoop version does not match the running hadoop version
781             e = new DoNotRetryIOException(t);
782           } else if (t instanceof IOException) {
783             e = (IOException) t;
784           } else {
785             e = new IOException("Could not set up IO Streams to " + server, t);
786           }
787         }
788         markClosed(e);
789         close();
790         throw e;
791       }
792     }
793 
794     /**
795      * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
796      */
797     private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
798       // Assemble the preamble up in a buffer first and then send it.  Writing individual elements,
799       // they are getting sent across piecemeal according to wireshark and then server is messing
800       // up the reading on occasion (the passed in stream is not buffered yet).
801 
802       // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
803       int rpcHeaderLen = HConstants.RPC_HEADER.length;
804       byte [] preamble = new byte [rpcHeaderLen + 2];
805       System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
806       preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
807       synchronized (this) {
808         preamble[rpcHeaderLen + 1] = authMethod.code;
809       }
810       outStream.write(preamble);
811       outStream.flush();
812     }
813 
814     /**
815      * Write the connection header.
816      */
817     private synchronized void writeConnectionHeader() throws IOException {
818       synchronized (this.outLock) {
819         this.out.writeInt(this.header.getSerializedSize());
820         this.header.writeTo(this.out);
821         this.out.flush();
822       }
823     }
824 
825     /** Close the connection. */
826     protected synchronized void close() {
827       if (!shouldCloseConnection.get()) {
828         LOG.error(getName() + ": the connection is not in the closed state");
829         return;
830       }
831 
832       // release the resources
833       // first thing to do;take the connection out of the connection list
834       synchronized (connections) {
835         connections.removeValue(remoteId, this);
836       }
837 
838       // close the streams and therefore the socket
839       synchronized(this.outLock) {
840         if (this.out != null) {
841           IOUtils.closeStream(out);
842           this.out = null;
843         }
844       }
845       IOUtils.closeStream(in);
846       this.in = null;
847       if (this.socket != null) {
848         try {
849           this.socket.close();
850           this.socket = null;
851         } catch (IOException e) {
852           LOG.error("Error while closing socket", e);
853         }
854       }
855 
856       disposeSasl();
857 
858       // log the info
859       if (LOG.isTraceEnabled()) {
860         LOG.trace(getName() + ": closing ipc connection to " + server);
861       }
862 
863       cleanupCalls(true);
864 
865       if (LOG.isTraceEnabled()) {
866         LOG.trace(getName() + ": ipc connection to " + server + " closed");
867       }
868     }
869 
870     protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
871       TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
872       try {
873         writeRequest(call, priority, span);
874       } finally {
875         ts.close();
876       }
877     }
878 
879     /**
880      * Initiates a call by sending the parameter to the remote server.
881      * Note: this is not called from the Connection thread, but by other
882      * threads.
883      * @see #readResponse()
884      */
885     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
886         justification="Findbugs is misinterpreting locking missing fact that this.outLock is held")
887     private void writeRequest(Call call, final int priority, Span span) throws IOException {
888       RequestHeader.Builder builder = RequestHeader.newBuilder();
889       builder.setCallId(call.id);
890       if (span != null) {
891         builder.setTraceInfo(
892             RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
893       }
894       builder.setMethodName(call.md.getName());
895       builder.setRequestParam(call.param != null);
896       ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
897       if (cellBlock != null) {
898         CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
899         cellBlockBuilder.setLength(cellBlock.limit());
900         builder.setCellBlockMeta(cellBlockBuilder.build());
901       }
902       // Only pass priority if there one.  Let zero be same as no priority.
903       if (priority != 0) builder.setPriority(priority);
904       RequestHeader header = builder.build();
905 
906       setupIOstreams();
907 
908       // Now we're going to write the call. We take the lock, then check that the connection
909       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
910       //  know where we stand, we have to close the connection.
911       checkIsOpen();
912       IOException writeException = null;
913       synchronized (this.outLock) {
914         if (Thread.interrupted()) throw new InterruptedIOException();
915 
916         calls.put(call.id, call); // We put first as we don't want the connection to become idle.
917         checkIsOpen(); // Now we're checking that it didn't became idle in between.
918 
919         try {
920           call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
921               cellBlock));
922         } catch (IOException e) {
923           // We set the value inside the synchronized block, this way the next in line
924           //  won't even try to write. Otherwise we might miss a call in the calls map?
925           shouldCloseConnection.set(true);
926           writeException = e;
927           interrupt();
928         }
929       }
930 
931       // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
932       if (writeException != null) {
933         markClosed(writeException);
934         close();
935       }
936 
937       // We added a call, and may be started the connection close. In both cases, we
938       //  need to notify the reader.
939       doNotify();
940 
941       // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
942       if (writeException != null) throw writeException;
943     }
944 
945     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
946         justification="Presume notifyAll is because we are closing/shutting down")
947     private synchronized void doNotify() {
948       // Make a separate method so can do synchronize and add findbugs annotation; only one
949       // annotation at at time in source 1.7.
950       notifyAll(); // Findbugs: NN_NAKED_NOTIFY
951     }
952 
953     /* Receive a response.
954      * Because only one receiver, so no synchronization on in.
955      */
956     protected void readResponse() {
957       if (shouldCloseConnection.get()) return;
958       Call call = null;
959       boolean expectedCall = false;
960       try {
961         // See HBaseServer.Call.setResponse for where we write out the response.
962         // Total size of the response.  Unused.  But have to read it in anyways.
963         int totalSize = in.readInt();
964 
965         // Read the header
966         ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
967         int id = responseHeader.getCallId();
968         call = calls.remove(id); // call.done have to be set before leaving this method
969         expectedCall = (call != null && !call.done);
970         if (!expectedCall) {
971           // So we got a response for which we have no corresponding 'call' here on the client-side.
972           // We probably timed out waiting, cleaned up all references, and now the server decides
973           // to return a response.  There is nothing we can do w/ the response at this stage. Clean
974           // out the wire of the response so its out of the way and we can get other responses on
975           // this connection.
976           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
977           int whatIsLeftToRead = totalSize - readSoFar;
978           IOUtils.skipFully(in, whatIsLeftToRead);
979           if (call != null) {
980             call.callStats.setResponseSizeBytes(totalSize);
981             call.callStats.setCallTimeMs(
982                 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
983           }
984           return;
985         }
986         if (responseHeader.hasException()) {
987           ExceptionResponse exceptionResponse = responseHeader.getException();
988           RemoteException re = createRemoteException(exceptionResponse);
989           call.setException(re);
990           call.callStats.setResponseSizeBytes(totalSize);
991           call.callStats.setCallTimeMs(
992               EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
993           if (isFatalConnectionException(exceptionResponse)) {
994             markClosed(re);
995           }
996         } else {
997           Message value = null;
998           if (call.responseDefaultType != null) {
999             Builder builder = call.responseDefaultType.newBuilderForType();
1000             ProtobufUtil.mergeDelimitedFrom(builder, in);
1001             value = builder.build();
1002           }
1003           CellScanner cellBlockScanner = null;
1004           if (responseHeader.hasCellBlockMeta()) {
1005             int size = responseHeader.getCellBlockMeta().getLength();
1006             byte [] cellBlock = new byte[size];
1007             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1008             cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1009           }
1010           call.setResponse(value, cellBlockScanner);
1011           call.callStats.setResponseSizeBytes(totalSize);
1012           call.callStats.setCallTimeMs(
1013               EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
1014         }
1015       } catch (IOException e) {
1016         if (expectedCall) call.setException(e);
1017         if (e instanceof SocketTimeoutException) {
1018           // Clean up open calls but don't treat this as a fatal condition,
1019           // since we expect certain responses to not make it by the specified
1020           // {@link ConnectionId#rpcTimeout}.
1021           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1022         } else {
1023           // Treat this as a fatal condition and close this connection
1024           markClosed(e);
1025         }
1026       } finally {
1027         cleanupCalls(false);
1028       }
1029     }
1030 
1031     /**
1032      * @return True if the exception is a fatal connection exception.
1033      */
1034     private boolean isFatalConnectionException(final ExceptionResponse e) {
1035       return e.getExceptionClassName().
1036         equals(FatalConnectionException.class.getName());
1037     }
1038 
1039     /**
1040      * @param e exception to be wrapped
1041      * @return RemoteException made from passed <code>e</code>
1042      */
1043     private RemoteException createRemoteException(final ExceptionResponse e) {
1044       String innerExceptionClassName = e.getExceptionClassName();
1045       boolean doNotRetry = e.getDoNotRetry();
1046       return e.hasHostname()?
1047         // If a hostname then add it to the RemoteWithExtrasException
1048         new RemoteWithExtrasException(innerExceptionClassName,
1049           e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1050         new RemoteWithExtrasException(innerExceptionClassName,
1051           e.getStackTrace(), doNotRetry);
1052     }
1053 
1054     protected synchronized boolean markClosed(IOException e) {
1055       if (e == null) throw new NullPointerException();
1056 
1057       boolean ret = shouldCloseConnection.compareAndSet(false, true);
1058       if (ret) {
1059         if (LOG.isTraceEnabled()) {
1060           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1061         }
1062         if (callSender != null) {
1063           callSender.close();
1064         }
1065         notifyAll();
1066       }
1067       return ret;
1068     }
1069 
1070 
1071     /**
1072      * Cleanup the calls older than a given timeout, in milli seconds.
1073      * @param allCalls true for all calls, false for only the calls in timeout
1074      */
1075     protected synchronized void cleanupCalls(boolean allCalls) {
1076       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1077       while (itor.hasNext()) {
1078         Call c = itor.next().getValue();
1079         if (c.done) {
1080           // To catch the calls without timeout that were cancelled.
1081           itor.remove();
1082         } else if (allCalls) {
1083           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1084           IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1085               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1086           c.setException(ie);
1087           itor.remove();
1088         } else if (c.checkAndSetTimeout()) {
1089           itor.remove();
1090         } else {
1091           // We expect the call to be ordered by timeout. It may not be the case, but stopping
1092           //  at the first valid call allows to be sure that we still have something to do without
1093           //  spending too much time by reading the full list.
1094           break;
1095         }
1096       }
1097     }
1098   }
1099 
1100   /**
1101    * Used in test only. Construct an IPC cluster client whose values are of the
1102    * {@link Message} class.
1103    * @param conf configuration
1104    * @param clusterId the cluster id
1105    * @param factory socket factory
1106    */
1107   @VisibleForTesting
1108   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1109     this(conf, clusterId, factory, null, null);
1110   }
1111 
1112   /**
1113    * Construct an IPC cluster client whose values are of the {@link Message} class.
1114    * @param conf configuration
1115    * @param clusterId the cluster id
1116    * @param factory socket factory
1117    * @param localAddr client socket bind address
1118    * @param metrics the connection metrics
1119    */
1120   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1121       SocketAddress localAddr, MetricsConnection metrics) {
1122     super(conf, clusterId, localAddr, metrics);
1123 
1124     this.socketFactory = factory;
1125     this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1126     this.failedServers = new FailedServers(conf);
1127   }
1128 
1129   /**
1130    * Used in test only. Construct an IPC client for the cluster {@code clusterId} with
1131    * the default SocketFactory
1132    */
1133   @VisibleForTesting
1134   RpcClientImpl(Configuration conf, String clusterId) {
1135     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
1136   }
1137 
1138   /**
1139    * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory
1140    *
1141    * This method is called with reflection by the RpcClientFactory to create an instance
1142    *
1143    * @param conf configuration
1144    * @param clusterId the cluster id
1145    * @param localAddr client socket bind address.
1146    * @param metrics the connection metrics
1147    */
1148   public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
1149       MetricsConnection metrics) {
1150     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
1151   }
1152 
1153   /** Stop all threads related to this client.  No further calls may be made
1154    * using this client. */
1155   @Override
1156   public void close() {
1157     if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1158     if (!running.compareAndSet(true, false)) return;
1159 
1160     Set<Connection> connsToClose = null;
1161     // wake up all connections
1162     synchronized (connections) {
1163       for (Connection conn : connections.values()) {
1164         conn.interrupt();
1165         if (conn.callSender != null) {
1166           conn.callSender.interrupt();
1167         }
1168 
1169         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
1170         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
1171         if (!conn.isAlive()) {
1172           if (connsToClose == null) {
1173             connsToClose = new HashSet<Connection>();
1174           }
1175           connsToClose.add(conn);
1176         }
1177       }
1178     }
1179     if (connsToClose != null) {
1180       for (Connection conn : connsToClose) {
1181         conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1182         conn.close();
1183       }
1184     }
1185     // wait until all connections are closed
1186     while (!connections.isEmpty()) {
1187       try {
1188         Thread.sleep(10);
1189       } catch (InterruptedException e) {
1190         LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1191             " connections.");
1192         Thread.currentThread().interrupt();
1193         return;
1194       }
1195     }
1196   }
1197 
1198   /** Make a call, passing <code>param</code>, to the IPC server running at
1199    * <code>address</code> which is servicing the <code>protocol</code> protocol,
1200    * with the <code>ticket</code> credentials, returning the value.
1201    * Throws exceptions if there are network problems or if the remote code
1202    * threw an exception.
1203    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
1204    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
1205    *          new Connection each time.
1206    * @return A pair with the Message response and the Cell data (if any).
1207    * @throws InterruptedException
1208    * @throws IOException
1209    */
1210   @Override
1211   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1212       Message param, Message returnType, User ticket, InetSocketAddress addr,
1213       MetricsConnection.CallStats callStats)
1214       throws IOException, InterruptedException {
1215     if (pcrc == null) {
1216       pcrc = new PayloadCarryingRpcController();
1217     }
1218     CellScanner cells = pcrc.cellScanner();
1219 
1220     final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1221         pcrc.getCallTimeout(), MetricsConnection.newCallStats());
1222 
1223     final Connection connection = getConnection(ticket, call, addr);
1224 
1225     final CallFuture cts;
1226     if (connection.callSender != null) {
1227       cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1228         pcrc.notifyOnCancel(new RpcCallback<Object>() {
1229           @Override
1230           public void run(Object parameter) {
1231             connection.callSender.remove(cts);
1232           }
1233         });
1234         if (pcrc.isCanceled()) {
1235           // To finish if the call was cancelled before we set the notification (race condition)
1236           call.callComplete();
1237           return new Pair<Message, CellScanner>(call.response, call.cells);
1238         }
1239     } else {
1240       cts = null;
1241       connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1242     }
1243 
1244     while (!call.done) {
1245       if (call.checkAndSetTimeout()) {
1246         if (cts != null) connection.callSender.remove(cts);
1247         break;
1248       }
1249       if (connection.shouldCloseConnection.get()) {
1250         throw new ConnectionClosingException("Call id=" + call.id +
1251             " on server " + addr + " aborted: connection is closing");
1252       }
1253       try {
1254         synchronized (call) {
1255           if (call.done) break;
1256           call.wait(Math.min(call.remainingTime(), 1000) + 1);
1257         }
1258       } catch (InterruptedException e) {
1259         call.setException(new InterruptedIOException());
1260         if (cts != null) connection.callSender.remove(cts);
1261         throw e;
1262       }
1263     }
1264 
1265     if (call.error != null) {
1266       if (call.error instanceof RemoteException) {
1267         call.error.fillInStackTrace();
1268         throw call.error;
1269       }
1270       // local exception
1271       throw wrapException(addr, call.error);
1272     }
1273 
1274     return new Pair<Message, CellScanner>(call.response, call.cells);
1275   }
1276 
1277 
1278   /**
1279    * Interrupt the connections to the given ip:port server. This should be called if the server
1280    *  is known as actually dead. This will not prevent current operation to be retried, and,
1281    *  depending on their own behavior, they may retry on the same server. This can be a feature,
1282    *  for example at startup. In any case, they're likely to get connection refused (if the
1283    *  process died) or no route to host: i.e. their next retries should be faster and with a
1284    *  safe exception.
1285    */
1286   @Override
1287   public void cancelConnections(ServerName sn) {
1288     synchronized (connections) {
1289       for (Connection connection : connections.values()) {
1290         if (connection.isAlive() &&
1291             connection.getRemoteAddress().getPort() == sn.getPort() &&
1292             connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1293           LOG.info("The server on " + sn.toString() +
1294               " is dead - stopping the connection " + connection.remoteId);
1295           connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
1296                                   // This will close the connection as well.
1297         }
1298       }
1299     }
1300   }
1301 
1302   /**
1303    *  Get a connection from the pool, or create a new one and add it to the
1304    * pool. Connections to a given host/port are reused.
1305    */
1306   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1307   throws IOException {
1308     if (!running.get()) throw new StoppedRpcClientException();
1309     Connection connection;
1310     ConnectionId remoteId =
1311       new ConnectionId(ticket, call.md.getService().getName(), addr);
1312     synchronized (connections) {
1313       connection = connections.get(remoteId);
1314       if (connection == null) {
1315         connection = createConnection(remoteId, this.codec, this.compressor);
1316         connections.put(remoteId, connection);
1317       }
1318     }
1319 
1320     return connection;
1321   }
1322 }