View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.ByteBufOutputStream;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27  import io.netty.util.Timeout;
28  import io.netty.util.TimerTask;
29  import io.netty.util.concurrent.GenericFutureListener;
30  import io.netty.util.concurrent.Promise;
31  
32  import java.io.IOException;
33  import java.net.ConnectException;
34  import java.net.InetSocketAddress;
35  import java.net.SocketException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.ArrayList;
39  import java.util.HashMap;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Random;
44  import java.util.concurrent.TimeUnit;
45  
46  import javax.security.sasl.SaslException;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.client.MetricsConnection;
53  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
56  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
57  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
58  import org.apache.hadoop.hbase.security.AuthMethod;
59  import org.apache.hadoop.hbase.security.SaslClientHandler;
60  import org.apache.hadoop.hbase.security.SaslUtil;
61  import org.apache.hadoop.hbase.security.SecurityInfo;
62  import org.apache.hadoop.hbase.security.User;
63  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.io.Text;
66  import org.apache.hadoop.ipc.RemoteException;
67  import org.apache.hadoop.security.SecurityUtil;
68  import org.apache.hadoop.security.UserGroupInformation;
69  import org.apache.hadoop.security.token.Token;
70  import org.apache.hadoop.security.token.TokenIdentifier;
71  import org.apache.hadoop.security.token.TokenSelector;
72  import org.apache.htrace.Span;
73  import org.apache.htrace.Trace;
74  
75  import com.google.protobuf.Descriptors;
76  import com.google.protobuf.Message;
77  import com.google.protobuf.RpcCallback;
78  
79  /**
80   * Netty RPC channel
81   */
82  @InterfaceAudience.Private
83  public class AsyncRpcChannel {
84    private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
85  
86    private static final int MAX_SASL_RETRIES = 5;
87  
88    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
89        justification="the rest of the system which live in the different package can use")
90    protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
91        TokenIdentifier>> tokenHandlers = new HashMap<>();
92  
93    static {
94      tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
95          new AuthenticationTokenSelector());
96    }
97  
98    final AsyncRpcClient client;
99  
100   // Contains the channel to work with.
101   // Only exists when connected
102   private Channel channel;
103 
104   String name;
105   final User ticket;
106   final String serviceName;
107   final InetSocketAddress address;
108 
109   private int ioFailureCounter = 0;
110   private int connectFailureCounter = 0;
111 
112   boolean useSasl;
113   AuthMethod authMethod;
114   private int reloginMaxBackoff;
115   private Token<? extends TokenIdentifier> token;
116   private String serverPrincipal;
117 
118 
119   // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
120   private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
121   private boolean connected = false;
122   private boolean closed = false;
123 
124   private Timeout cleanupTimer;
125 
126   private final TimerTask timeoutTask = new TimerTask() {
127     @Override
128     public void run(Timeout timeout) throws Exception {
129       cleanupCalls();
130     }
131   };
132 
133   /**
134    * Constructor for netty RPC channel
135    *
136    * @param bootstrap to construct channel on
137    * @param client    to connect with
138    * @param ticket of user which uses connection
139    *               @param serviceName name of service to connect to
140    * @param address to connect to
141    */
142   public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
143       serviceName, InetSocketAddress address) {
144     this.client = client;
145 
146     this.ticket = ticket;
147     this.serviceName = serviceName;
148     this.address = address;
149 
150     this.channel = connect(bootstrap).channel();
151 
152     name = ("IPC Client (" + channel.hashCode() + ") to " +
153         address.toString() +
154         ((ticket == null) ?
155             " from unknown user" :
156             (" from " + ticket.getName())));
157   }
158 
159   /**
160    * Connect to channel
161    *
162    * @param bootstrap to connect to
163    * @return future of connection
164    */
165   private ChannelFuture connect(final Bootstrap bootstrap) {
166     return bootstrap.remoteAddress(address).connect()
167         .addListener(new GenericFutureListener<ChannelFuture>() {
168           @Override
169           public void operationComplete(final ChannelFuture f) throws Exception {
170             if (!f.isSuccess()) {
171               if (f.cause() instanceof SocketException) {
172                 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
173               } else {
174                 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
175               }
176               return;
177             }
178             channel = f.channel();
179 
180             setupAuthorization();
181 
182             ByteBuf b = channel.alloc().directBuffer(6);
183             createPreamble(b, authMethod);
184             channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
185             if (useSasl) {
186               UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
187               if (authMethod == AuthMethod.KERBEROS) {
188                 if (ticket != null && ticket.getRealUser() != null) {
189                   ticket = ticket.getRealUser();
190                 }
191               }
192               SaslClientHandler saslHandler;
193               if (ticket == null) {
194                 throw new FatalConnectionException("ticket/user is null");
195               }
196               final UserGroupInformation realTicket = ticket;
197               saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
198                 @Override
199                 public SaslClientHandler run() throws IOException {
200                   return getSaslHandler(realTicket, bootstrap);
201                 }
202               });
203               if (saslHandler != null) {
204                 // Sasl connect is successful. Let's set up Sasl channel handler
205                 channel.pipeline().addFirst(saslHandler);
206               } else {
207                 // fall back to simple auth because server told us so.
208                 authMethod = AuthMethod.SIMPLE;
209                 useSasl = false;
210               }
211             } else {
212               startHBaseConnection(f.channel());
213             }
214           }
215         });
216   }
217 
218   /**
219    * Start HBase connection
220    *
221    * @param ch channel to start connection on
222    */
223   private void startHBaseConnection(Channel ch) {
224     ch.pipeline()
225         .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
226     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
227     try {
228       writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
229         @Override
230         public void operationComplete(ChannelFuture future) throws Exception {
231           if (!future.isSuccess()) {
232             close(future.cause());
233             return;
234           }
235           List<AsyncCall> callsToWrite;
236           synchronized (pendingCalls) {
237             connected = true;
238             callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
239           }
240           for (AsyncCall call : callsToWrite) {
241             writeRequest(call);
242           }
243         }
244       });
245     } catch (IOException e) {
246       close(e);
247     }
248   }
249 
250   /**
251    * Get SASL handler
252    * @param bootstrap to reconnect to
253    * @return new SASL handler
254    * @throws java.io.IOException if handler failed to create
255    */
256   private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
257       final Bootstrap bootstrap) throws IOException {
258     return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
259         client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
260           SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
261         new SaslClientHandler.SaslExceptionHandler() {
262           @Override
263           public void handle(int retryCount, Random random, Throwable cause) {
264             try {
265               // Handle Sasl failure. Try to potentially get new credentials
266               handleSaslConnectionFailure(retryCount, cause, realTicket);
267 
268               // Try to reconnect
269               client.newTimeout(new TimerTask() {
270                 @Override
271                 public void run(Timeout timeout) throws Exception {
272                   connect(bootstrap);
273                 }
274               }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
275             } catch (IOException | InterruptedException e) {
276               close(e);
277             }
278           }
279         }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
280           @Override
281           public void onSuccess(Channel channel) {
282             startHBaseConnection(channel);
283           }
284         });
285   }
286 
287   /**
288    * Retry to connect or close
289    *
290    * @param bootstrap      to connect with
291    * @param connectCounter amount of tries
292    * @param e              exception of fail
293    */
294   private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
295     if (connectCounter < client.maxRetries) {
296       client.newTimeout(new TimerTask() {
297         @Override public void run(Timeout timeout) throws Exception {
298           connect(bootstrap);
299         }
300       }, client.failureSleep, TimeUnit.MILLISECONDS);
301     } else {
302       client.failedServers.addToFailedServers(address);
303       close(e);
304     }
305   }
306 
307   /**
308    * Calls method on channel
309    * @param method to call
310    * @param controller to run call with
311    * @param request to send
312    * @param responsePrototype to construct response with
313    */
314   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
315       final PayloadCarryingRpcController controller, final Message request,
316       final Message responsePrototype, MetricsConnection.CallStats callStats) {
317     final AsyncCall call =
318         new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
319             controller, responsePrototype, callStats);
320     controller.notifyOnCancel(new RpcCallback<Object>() {
321       @Override
322       public void run(Object parameter) {
323         // TODO: do not need to call AsyncCall.setFailed?
324         synchronized (pendingCalls) {
325           pendingCalls.remove(call.id);
326         }
327       }
328     });
329     // TODO: this should be handled by PayloadCarryingRpcController.
330     if (controller.isCanceled()) {
331       // To finish if the call was cancelled before we set the notification (race condition)
332       call.cancel(true);
333       return call;
334     }
335 
336     synchronized (pendingCalls) {
337       if (closed) {
338         Promise<Message> promise = channel.eventLoop().newPromise();
339         promise.setFailure(new ConnectException());
340         return promise;
341       }
342       pendingCalls.put(call.id, call);
343       // Add timeout for cleanup if none is present
344       if (cleanupTimer == null && call.getRpcTimeout() > 0) {
345         cleanupTimer =
346             client.newTimeout(timeoutTask, call.getRpcTimeout(),
347               TimeUnit.MILLISECONDS);
348       }
349       if (!connected) {
350         return call;
351       }
352     }
353     writeRequest(call);
354     return call;
355   }
356 
357   AsyncCall removePendingCall(int id) {
358     synchronized (pendingCalls) {
359       return pendingCalls.remove(id);
360     }
361   }
362 
363   /**
364    * Write the channel header
365    *
366    * @param channel to write to
367    * @return future of write
368    * @throws java.io.IOException on failure to write
369    */
370   private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
371     RPCProtos.ConnectionHeader.Builder headerBuilder =
372         RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
373 
374     RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
375     if (userInfoPB != null) {
376       headerBuilder.setUserInfo(userInfoPB);
377     }
378 
379     if (client.codec != null) {
380       headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
381     }
382     if (client.compressor != null) {
383       headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
384     }
385 
386     headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
387     RPCProtos.ConnectionHeader header = headerBuilder.build();
388 
389 
390     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
391 
392     ByteBuf b = channel.alloc().directBuffer(totalSize);
393 
394     b.writeInt(header.getSerializedSize());
395     b.writeBytes(header.toByteArray());
396 
397     return channel.writeAndFlush(b);
398   }
399 
400   /**
401    * Write request to channel
402    *
403    * @param call    to write
404    */
405   private void writeRequest(final AsyncCall call) {
406     try {
407       final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
408           .newBuilder();
409       requestHeaderBuilder.setCallId(call.id)
410               .setMethodName(call.method.getName()).setRequestParam(call.param != null);
411 
412       if (Trace.isTracing()) {
413         Span s = Trace.currentSpan();
414         requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
415             setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
416       }
417 
418       ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
419       if (cellBlock != null) {
420         final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
421             .newBuilder();
422         cellBlockBuilder.setLength(cellBlock.limit());
423         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
424       }
425       // Only pass priority if there one.  Let zero be same as no priority.
426       if (call.controller.getPriority() != 0) {
427         requestHeaderBuilder.setPriority(call.controller.getPriority());
428       }
429 
430       RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
431 
432       int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
433       if (cellBlock != null) {
434         totalSize += cellBlock.remaining();
435       }
436 
437       ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
438       try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
439         call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
440       }
441 
442       channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
443     } catch (IOException e) {
444       close(e);
445     }
446   }
447 
448   /**
449    * Set up server authorization
450    *
451    * @throws java.io.IOException if auth setup failed
452    */
453   private void setupAuthorization() throws IOException {
454     SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
455     this.useSasl = client.userProvider.isHBaseSecurityEnabled();
456 
457     this.token = null;
458     if (useSasl && securityInfo != null) {
459       AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
460       if (tokenKind != null) {
461         TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
462         if (tokenSelector != null) {
463           token = tokenSelector
464               .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
465         } else if (LOG.isDebugEnabled()) {
466           LOG.debug("No token selector found for type " + tokenKind);
467         }
468       }
469       String serverKey = securityInfo.getServerPrincipal();
470       if (serverKey == null) {
471         throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
472       }
473       this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
474           address.getAddress().getCanonicalHostName().toLowerCase());
475       if (LOG.isDebugEnabled()) {
476         LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
477             + serverPrincipal);
478       }
479     }
480 
481     if (!useSasl) {
482       authMethod = AuthMethod.SIMPLE;
483     } else if (token != null) {
484       authMethod = AuthMethod.DIGEST;
485     } else {
486       authMethod = AuthMethod.KERBEROS;
487     }
488 
489     if (LOG.isDebugEnabled()) {
490       LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
491           ", sasl=" + useSasl);
492     }
493     reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
494   }
495 
496   /**
497    * Build the user information
498    *
499    * @param ugi        User Group Information
500    * @param authMethod Authorization method
501    * @return UserInformation protobuf
502    */
503   private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
504     if (ugi == null || authMethod == AuthMethod.DIGEST) {
505       // Don't send user for token auth
506       return null;
507     }
508     RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
509     if (authMethod == AuthMethod.KERBEROS) {
510       // Send effective user for Kerberos auth
511       userInfoPB.setEffectiveUser(ugi.getUserName());
512     } else if (authMethod == AuthMethod.SIMPLE) {
513       //Send both effective user and real user for simple auth
514       userInfoPB.setEffectiveUser(ugi.getUserName());
515       if (ugi.getRealUser() != null) {
516         userInfoPB.setRealUser(ugi.getRealUser().getUserName());
517       }
518     }
519     return userInfoPB.build();
520   }
521 
522   /**
523    * Create connection preamble
524    *
525    * @param byteBuf    to write to
526    * @param authMethod to write
527    */
528   private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
529     byteBuf.writeBytes(HConstants.RPC_HEADER);
530     byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
531     byteBuf.writeByte(authMethod.code);
532   }
533 
534   /**
535    * Close connection
536    *
537    * @param e exception on close
538    */
539   public void close(final Throwable e) {
540     client.removeConnection(this);
541 
542     // Move closing from the requesting thread to the channel thread
543     channel.eventLoop().execute(new Runnable() {
544       @Override
545       public void run() {
546         List<AsyncCall> toCleanup;
547         synchronized (pendingCalls) {
548           if (closed) {
549             return;
550           }
551           closed = true;
552           toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
553           pendingCalls.clear();
554         }
555         IOException closeException = null;
556         if (e != null) {
557           if (e instanceof IOException) {
558             closeException = (IOException) e;
559           } else {
560             closeException = new IOException(e);
561           }
562         }
563         // log the info
564         if (LOG.isDebugEnabled() && closeException != null) {
565           LOG.debug(name + ": closing ipc connection to " + address, closeException);
566         }
567         if (cleanupTimer != null) {
568           cleanupTimer.cancel();
569           cleanupTimer = null;
570         }
571         for (AsyncCall call : toCleanup) {
572           call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
573               "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
574         }
575         channel.disconnect().addListener(ChannelFutureListener.CLOSE);
576         if (LOG.isDebugEnabled()) {
577           LOG.debug(name + ": closed");
578         }
579       }
580     });
581   }
582 
583   /**
584    * Clean up calls.
585    */
586   private void cleanupCalls() {
587     List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
588     long currentTime = EnvironmentEdgeManager.currentTime();
589     long nextCleanupTaskDelay = -1L;
590     synchronized (pendingCalls) {
591       for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
592         AsyncCall call = iter.next();
593         long timeout = call.getRpcTimeout();
594         if (timeout > 0) {
595           if (currentTime - call.getStartTime() >= timeout) {
596             iter.remove();
597             toCleanup.add(call);
598           } else {
599             if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
600               nextCleanupTaskDelay = timeout;
601             }
602           }
603         }
604       }
605       if (nextCleanupTaskDelay > 0) {
606         cleanupTimer =
607             client.newTimeout(timeoutTask, nextCleanupTaskDelay,
608               TimeUnit.MILLISECONDS);
609       } else {
610         cleanupTimer = null;
611       }
612     }
613     for (AsyncCall call : toCleanup) {
614       call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
615           + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
616     }
617   }
618 
619   /**
620    * Check if the connection is alive
621    *
622    * @return true if alive
623    */
624   public boolean isAlive() {
625     return channel.isOpen();
626   }
627 
628   /**
629    * Check if user should authenticate over Kerberos
630    *
631    * @return true if should be authenticated over Kerberos
632    * @throws java.io.IOException on failure of check
633    */
634   private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
635     UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
636     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
637     UserGroupInformation realUser = currentUser.getRealUser();
638     return authMethod == AuthMethod.KERBEROS &&
639         loginUser != null &&
640         //Make sure user logged in using Kerberos either keytab or TGT
641         loginUser.hasKerberosCredentials() &&
642         // relogin only in case it is the login user (e.g. JT)
643         // or superuser (like oozie).
644         (loginUser.equals(currentUser) || loginUser.equals(realUser));
645   }
646 
647   /**
648    * If multiple clients with the same principal try to connect
649    * to the same server at the same time, the server assumes a
650    * replay attack is in progress. This is a feature of kerberos.
651    * In order to work around this, what is done is that the client
652    * backs off randomly and tries to initiate the connection
653    * again.
654    * The other problem is to do with ticket expiry. To handle that,
655    * a relogin is attempted.
656    * <p>
657    * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
658    * method. In case when the user doesn't have valid credentials, we don't
659    * need to retry (from cache or ticket). In such cases, it is prudent to
660    * throw a runtime exception when we receive a SaslException from the
661    * underlying authentication implementation, so there is no retry from
662    * other high level (for eg, HCM or HBaseAdmin).
663    * </p>
664    *
665    * @param currRetries retry count
666    * @param ex          exception describing fail
667    * @param user        which is trying to connect
668    * @throws java.io.IOException  if IO fail
669    * @throws InterruptedException if thread is interrupted
670    */
671   private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
672       final UserGroupInformation user) throws IOException, InterruptedException {
673     user.doAs(new PrivilegedExceptionAction<Void>() {
674       public Void run() throws IOException, InterruptedException {
675         if (shouldAuthenticateOverKrb()) {
676           if (currRetries < MAX_SASL_RETRIES) {
677             LOG.debug("Exception encountered while connecting to the server : " + ex);
678             //try re-login
679             if (UserGroupInformation.isLoginKeytabBased()) {
680               UserGroupInformation.getLoginUser().reloginFromKeytab();
681             } else {
682               UserGroupInformation.getLoginUser().reloginFromTicketCache();
683             }
684 
685             // Should reconnect
686             return null;
687           } else {
688             String msg = "Couldn't setup connection for " +
689                 UserGroupInformation.getLoginUser().getUserName() +
690                 " to " + serverPrincipal;
691             LOG.warn(msg);
692             throw (IOException) new IOException(msg).initCause(ex);
693           }
694         } else {
695           LOG.warn("Exception encountered while connecting to " +
696               "the server : " + ex);
697         }
698         if (ex instanceof RemoteException) {
699           throw (RemoteException) ex;
700         }
701         if (ex instanceof SaslException) {
702           String msg = "SASL authentication failed." +
703               " The most likely cause is missing or invalid credentials." +
704               " Consider 'kinit'.";
705           LOG.fatal(msg, ex);
706           throw new RuntimeException(msg, ex);
707         }
708         throw new IOException(ex);
709       }
710     });
711   }
712 
713   public int getConnectionHashCode() {
714     return ConnectionId.hashCode(ticket, serviceName, address);
715   }
716   
717   @Override
718   public int hashCode() {
719     return getConnectionHashCode();
720   }
721      
722   @Override
723   public boolean equals(Object obj) {
724     if (obj instanceof AsyncRpcChannel) {
725       AsyncRpcChannel channel = (AsyncRpcChannel) obj;
726       return channel.hashCode() == obj.hashCode();
727     }
728     return false;
729   }
730 
731 
732   @Override
733   public String toString() {
734     return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
735   }
736 
737   /**
738    * Listens to call writes and fails if write failed
739    */
740   private static final class CallWriteListener implements ChannelFutureListener {
741     private final AsyncRpcChannel rpcChannel;
742     private final int id;
743 
744     public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
745       this.rpcChannel = asyncRpcChannel;
746       this.id = id;
747     }
748 
749     @Override
750     public void operationComplete(ChannelFuture future) throws Exception {
751       if (!future.isSuccess()) {
752         AsyncCall call = rpcChannel.removePendingCall(id);
753         if (call != null) {
754           if (future.cause() instanceof IOException) {
755             call.setFailed((IOException) future.cause());
756           } else {
757             call.setFailed(new IOException(future.cause()));
758           }
759         }
760       }
761     }
762   }
763 }