View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.ByteBufOutputStream;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27  import io.netty.util.Timeout;
28  import io.netty.util.TimerTask;
29  import io.netty.util.concurrent.GenericFutureListener;
30  import io.netty.util.concurrent.Promise;
31  
32  import java.io.IOException;
33  import java.net.ConnectException;
34  import java.net.InetSocketAddress;
35  import java.net.SocketException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.ArrayList;
39  import java.util.HashMap;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Random;
44  import java.util.concurrent.TimeUnit;
45  
46  import javax.security.sasl.SaslException;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.client.MetricsConnection;
53  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
56  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
57  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
58  import org.apache.hadoop.hbase.security.AuthMethod;
59  import org.apache.hadoop.hbase.security.SaslClientHandler;
60  import org.apache.hadoop.hbase.security.SaslUtil;
61  import org.apache.hadoop.hbase.security.SecurityInfo;
62  import org.apache.hadoop.hbase.security.User;
63  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.io.Text;
66  import org.apache.hadoop.ipc.RemoteException;
67  import org.apache.hadoop.security.SecurityUtil;
68  import org.apache.hadoop.security.UserGroupInformation;
69  import org.apache.hadoop.security.token.Token;
70  import org.apache.hadoop.security.token.TokenIdentifier;
71  import org.apache.hadoop.security.token.TokenSelector;
72  import org.apache.htrace.Span;
73  import org.apache.htrace.Trace;
74  
75  import com.google.protobuf.Descriptors;
76  import com.google.protobuf.Message;
77  import com.google.protobuf.RpcCallback;
78  
79  /**
80   * Netty RPC channel
81   */
82  @InterfaceAudience.Private
83  public class AsyncRpcChannel {
84    private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
85  
86    private static final int MAX_SASL_RETRIES = 5;
87  
88    protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
89        TokenIdentifier>> tokenHandlers = new HashMap<>();
90  
91    static {
92      tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
93          new AuthenticationTokenSelector());
94    }
95  
96    final AsyncRpcClient client;
97  
98    // Contains the channel to work with.
99    // Only exists when connected
100   private Channel channel;
101 
102   String name;
103   final User ticket;
104   final String serviceName;
105   final InetSocketAddress address;
106 
107   private int ioFailureCounter = 0;
108   private int connectFailureCounter = 0;
109 
110   boolean useSasl;
111   AuthMethod authMethod;
112   private int reloginMaxBackoff;
113   private Token<? extends TokenIdentifier> token;
114   private String serverPrincipal;
115 
116 
117   // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
118   private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
119   private boolean connected = false;
120   private boolean closed = false;
121 
122   private Timeout cleanupTimer;
123 
124   private final TimerTask timeoutTask = new TimerTask() {
125     @Override
126     public void run(Timeout timeout) throws Exception {
127       cleanupCalls();
128     }
129   };
130 
131   /**
132    * Constructor for netty RPC channel
133    *
134    * @param bootstrap to construct channel on
135    * @param client    to connect with
136    * @param ticket of user which uses connection
137    *               @param serviceName name of service to connect to
138    * @param address to connect to
139    */
140   public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
141       serviceName, InetSocketAddress address) {
142     this.client = client;
143 
144     this.ticket = ticket;
145     this.serviceName = serviceName;
146     this.address = address;
147 
148     this.channel = connect(bootstrap).channel();
149 
150     name = ("IPC Client (" + channel.hashCode() + ") to " +
151         address.toString() +
152         ((ticket == null) ?
153             " from unknown user" :
154             (" from " + ticket.getName())));
155   }
156 
157   /**
158    * Connect to channel
159    *
160    * @param bootstrap to connect to
161    * @return future of connection
162    */
163   private ChannelFuture connect(final Bootstrap bootstrap) {
164     return bootstrap.remoteAddress(address).connect()
165         .addListener(new GenericFutureListener<ChannelFuture>() {
166           @Override
167           public void operationComplete(final ChannelFuture f) throws Exception {
168             if (!f.isSuccess()) {
169               if (f.cause() instanceof SocketException) {
170                 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
171               } else {
172                 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
173               }
174               return;
175             }
176             channel = f.channel();
177 
178             setupAuthorization();
179 
180             ByteBuf b = channel.alloc().directBuffer(6);
181             createPreamble(b, authMethod);
182             channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
183             if (useSasl) {
184               UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
185               if (authMethod == AuthMethod.KERBEROS) {
186                 if (ticket != null && ticket.getRealUser() != null) {
187                   ticket = ticket.getRealUser();
188                 }
189               }
190               SaslClientHandler saslHandler;
191               if (ticket == null) {
192                 throw new FatalConnectionException("ticket/user is null");
193               }
194               final UserGroupInformation realTicket = ticket;
195               saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
196                 @Override
197                 public SaslClientHandler run() throws IOException {
198                   return getSaslHandler(realTicket, bootstrap);
199                 }
200               });
201               if (saslHandler != null) {
202                 // Sasl connect is successful. Let's set up Sasl channel handler
203                 channel.pipeline().addFirst(saslHandler);
204               } else {
205                 // fall back to simple auth because server told us so.
206                 authMethod = AuthMethod.SIMPLE;
207                 useSasl = false;
208               }
209             } else {
210               startHBaseConnection(f.channel());
211             }
212           }
213         });
214   }
215 
216   /**
217    * Start HBase connection
218    *
219    * @param ch channel to start connection on
220    */
221   private void startHBaseConnection(Channel ch) {
222     ch.pipeline()
223         .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
224     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
225     try {
226       writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
227         @Override
228         public void operationComplete(ChannelFuture future) throws Exception {
229           if (!future.isSuccess()) {
230             close(future.cause());
231             return;
232           }
233           List<AsyncCall> callsToWrite;
234           synchronized (pendingCalls) {
235             connected = true;
236             callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
237           }
238           for (AsyncCall call : callsToWrite) {
239             writeRequest(call);
240           }
241         }
242       });
243     } catch (IOException e) {
244       close(e);
245     }
246   }
247 
248   /**
249    * Get SASL handler
250    * @param bootstrap to reconnect to
251    * @return new SASL handler
252    * @throws java.io.IOException if handler failed to create
253    */
254   private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
255       final Bootstrap bootstrap) throws IOException {
256     return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
257         client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
258           SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
259         new SaslClientHandler.SaslExceptionHandler() {
260           @Override
261           public void handle(int retryCount, Random random, Throwable cause) {
262             try {
263               // Handle Sasl failure. Try to potentially get new credentials
264               handleSaslConnectionFailure(retryCount, cause, realTicket);
265 
266               // Try to reconnect
267               client.newTimeout(new TimerTask() {
268                 @Override
269                 public void run(Timeout timeout) throws Exception {
270                   connect(bootstrap);
271                 }
272               }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
273             } catch (IOException | InterruptedException e) {
274               close(e);
275             }
276           }
277         }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
278           @Override
279           public void onSuccess(Channel channel) {
280             startHBaseConnection(channel);
281           }
282         });
283   }
284 
285   /**
286    * Retry to connect or close
287    *
288    * @param bootstrap      to connect with
289    * @param connectCounter amount of tries
290    * @param e              exception of fail
291    */
292   private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
293     if (connectCounter < client.maxRetries) {
294       client.newTimeout(new TimerTask() {
295         @Override public void run(Timeout timeout) throws Exception {
296           connect(bootstrap);
297         }
298       }, client.failureSleep, TimeUnit.MILLISECONDS);
299     } else {
300       client.failedServers.addToFailedServers(address);
301       close(e);
302     }
303   }
304 
305   /**
306    * Calls method on channel
307    * @param method to call
308    * @param controller to run call with
309    * @param request to send
310    * @param responsePrototype to construct response with
311    */
312   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
313       final PayloadCarryingRpcController controller, final Message request,
314       final Message responsePrototype, MetricsConnection.CallStats callStats) {
315     final AsyncCall call =
316         new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
317             controller, responsePrototype, callStats);
318     controller.notifyOnCancel(new RpcCallback<Object>() {
319       @Override
320       public void run(Object parameter) {
321         // TODO: do not need to call AsyncCall.setFailed?
322         synchronized (pendingCalls) {
323           pendingCalls.remove(call.id);
324         }
325       }
326     });
327     // TODO: this should be handled by PayloadCarryingRpcController.
328     if (controller.isCanceled()) {
329       // To finish if the call was cancelled before we set the notification (race condition)
330       call.cancel(true);
331       return call;
332     }
333 
334     synchronized (pendingCalls) {
335       if (closed) {
336         Promise<Message> promise = channel.eventLoop().newPromise();
337         promise.setFailure(new ConnectException());
338         return promise;
339       }
340       pendingCalls.put(call.id, call);
341       // Add timeout for cleanup if none is present
342       if (cleanupTimer == null && call.getRpcTimeout() > 0) {
343         cleanupTimer =
344             client.newTimeout(timeoutTask, call.getRpcTimeout(),
345               TimeUnit.MILLISECONDS);
346       }
347       if (!connected) {
348         return call;
349       }
350     }
351     writeRequest(call);
352     return call;
353   }
354 
355   AsyncCall removePendingCall(int id) {
356     synchronized (pendingCalls) {
357       return pendingCalls.remove(id);
358     }
359   }
360 
361   /**
362    * Write the channel header
363    *
364    * @param channel to write to
365    * @return future of write
366    * @throws java.io.IOException on failure to write
367    */
368   private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
369     RPCProtos.ConnectionHeader.Builder headerBuilder =
370         RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
371 
372     RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
373     if (userInfoPB != null) {
374       headerBuilder.setUserInfo(userInfoPB);
375     }
376 
377     if (client.codec != null) {
378       headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
379     }
380     if (client.compressor != null) {
381       headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
382     }
383 
384     headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
385     RPCProtos.ConnectionHeader header = headerBuilder.build();
386 
387 
388     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
389 
390     ByteBuf b = channel.alloc().directBuffer(totalSize);
391 
392     b.writeInt(header.getSerializedSize());
393     b.writeBytes(header.toByteArray());
394 
395     return channel.writeAndFlush(b);
396   }
397 
398   /**
399    * Write request to channel
400    *
401    * @param call    to write
402    */
403   private void writeRequest(final AsyncCall call) {
404     try {
405       final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
406           .newBuilder();
407       requestHeaderBuilder.setCallId(call.id)
408               .setMethodName(call.method.getName()).setRequestParam(call.param != null);
409 
410       if (Trace.isTracing()) {
411         Span s = Trace.currentSpan();
412         requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
413             setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
414       }
415 
416       ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
417       if (cellBlock != null) {
418         final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
419             .newBuilder();
420         cellBlockBuilder.setLength(cellBlock.limit());
421         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
422       }
423       // Only pass priority if there one.  Let zero be same as no priority.
424       if (call.controller.getPriority() != 0) {
425         requestHeaderBuilder.setPriority(call.controller.getPriority());
426       }
427 
428       RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
429 
430       int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
431       if (cellBlock != null) {
432         totalSize += cellBlock.remaining();
433       }
434 
435       ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
436       try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
437         call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
438       }
439 
440       channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
441     } catch (IOException e) {
442       close(e);
443     }
444   }
445 
446   /**
447    * Set up server authorization
448    *
449    * @throws java.io.IOException if auth setup failed
450    */
451   private void setupAuthorization() throws IOException {
452     SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
453     this.useSasl = client.userProvider.isHBaseSecurityEnabled();
454 
455     this.token = null;
456     if (useSasl && securityInfo != null) {
457       AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
458       if (tokenKind != null) {
459         TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
460         if (tokenSelector != null) {
461           token = tokenSelector
462               .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
463         } else if (LOG.isDebugEnabled()) {
464           LOG.debug("No token selector found for type " + tokenKind);
465         }
466       }
467       String serverKey = securityInfo.getServerPrincipal();
468       if (serverKey == null) {
469         throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
470       }
471       this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
472           address.getAddress().getCanonicalHostName().toLowerCase());
473       if (LOG.isDebugEnabled()) {
474         LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
475             + serverPrincipal);
476       }
477     }
478 
479     if (!useSasl) {
480       authMethod = AuthMethod.SIMPLE;
481     } else if (token != null) {
482       authMethod = AuthMethod.DIGEST;
483     } else {
484       authMethod = AuthMethod.KERBEROS;
485     }
486 
487     if (LOG.isDebugEnabled()) {
488       LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
489           ", sasl=" + useSasl);
490     }
491     reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
492   }
493 
494   /**
495    * Build the user information
496    *
497    * @param ugi        User Group Information
498    * @param authMethod Authorization method
499    * @return UserInformation protobuf
500    */
501   private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
502     if (ugi == null || authMethod == AuthMethod.DIGEST) {
503       // Don't send user for token auth
504       return null;
505     }
506     RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
507     if (authMethod == AuthMethod.KERBEROS) {
508       // Send effective user for Kerberos auth
509       userInfoPB.setEffectiveUser(ugi.getUserName());
510     } else if (authMethod == AuthMethod.SIMPLE) {
511       //Send both effective user and real user for simple auth
512       userInfoPB.setEffectiveUser(ugi.getUserName());
513       if (ugi.getRealUser() != null) {
514         userInfoPB.setRealUser(ugi.getRealUser().getUserName());
515       }
516     }
517     return userInfoPB.build();
518   }
519 
520   /**
521    * Create connection preamble
522    *
523    * @param byteBuf    to write to
524    * @param authMethod to write
525    */
526   private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
527     byteBuf.writeBytes(HConstants.RPC_HEADER);
528     byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
529     byteBuf.writeByte(authMethod.code);
530   }
531 
532   /**
533    * Close connection
534    *
535    * @param e exception on close
536    */
537   public void close(final Throwable e) {
538     client.removeConnection(this);
539 
540     // Move closing from the requesting thread to the channel thread
541     channel.eventLoop().execute(new Runnable() {
542       @Override
543       public void run() {
544         List<AsyncCall> toCleanup;
545         synchronized (pendingCalls) {
546           if (closed) {
547             return;
548           }
549           closed = true;
550           toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
551           pendingCalls.clear();
552         }
553         IOException closeException = null;
554         if (e != null) {
555           if (e instanceof IOException) {
556             closeException = (IOException) e;
557           } else {
558             closeException = new IOException(e);
559           }
560         }
561         // log the info
562         if (LOG.isDebugEnabled() && closeException != null) {
563           LOG.debug(name + ": closing ipc connection to " + address, closeException);
564         }
565         if (cleanupTimer != null) {
566           cleanupTimer.cancel();
567           cleanupTimer = null;
568         }
569         for (AsyncCall call : toCleanup) {
570           call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
571               "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
572         }
573         channel.disconnect().addListener(ChannelFutureListener.CLOSE);
574         if (LOG.isDebugEnabled()) {
575           LOG.debug(name + ": closed");
576         }
577       }
578     });
579   }
580 
581   /**
582    * Clean up calls.
583    */
584   private void cleanupCalls() {
585     List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
586     long currentTime = EnvironmentEdgeManager.currentTime();
587     long nextCleanupTaskDelay = -1L;
588     synchronized (pendingCalls) {
589       for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
590         AsyncCall call = iter.next();
591         long timeout = call.getRpcTimeout();
592         if (timeout > 0) {
593           if (currentTime - call.getStartTime() >= timeout) {
594             iter.remove();
595             toCleanup.add(call);
596           } else {
597             if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
598               nextCleanupTaskDelay = timeout;
599             }
600           }
601         }
602       }
603       if (nextCleanupTaskDelay > 0) {
604         cleanupTimer =
605             client.newTimeout(timeoutTask, nextCleanupTaskDelay,
606               TimeUnit.MILLISECONDS);
607       } else {
608         cleanupTimer = null;
609       }
610     }
611     for (AsyncCall call : toCleanup) {
612       call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
613           + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
614     }
615   }
616 
617   /**
618    * Check if the connection is alive
619    *
620    * @return true if alive
621    */
622   public boolean isAlive() {
623     return channel.isOpen();
624   }
625 
626   /**
627    * Check if user should authenticate over Kerberos
628    *
629    * @return true if should be authenticated over Kerberos
630    * @throws java.io.IOException on failure of check
631    */
632   private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
633     UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
634     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
635     UserGroupInformation realUser = currentUser.getRealUser();
636     return authMethod == AuthMethod.KERBEROS &&
637         loginUser != null &&
638         //Make sure user logged in using Kerberos either keytab or TGT
639         loginUser.hasKerberosCredentials() &&
640         // relogin only in case it is the login user (e.g. JT)
641         // or superuser (like oozie).
642         (loginUser.equals(currentUser) || loginUser.equals(realUser));
643   }
644 
645   /**
646    * If multiple clients with the same principal try to connect
647    * to the same server at the same time, the server assumes a
648    * replay attack is in progress. This is a feature of kerberos.
649    * In order to work around this, what is done is that the client
650    * backs off randomly and tries to initiate the connection
651    * again.
652    * The other problem is to do with ticket expiry. To handle that,
653    * a relogin is attempted.
654    * <p>
655    * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
656    * method. In case when the user doesn't have valid credentials, we don't
657    * need to retry (from cache or ticket). In such cases, it is prudent to
658    * throw a runtime exception when we receive a SaslException from the
659    * underlying authentication implementation, so there is no retry from
660    * other high level (for eg, HCM or HBaseAdmin).
661    * </p>
662    *
663    * @param currRetries retry count
664    * @param ex          exception describing fail
665    * @param user        which is trying to connect
666    * @throws java.io.IOException  if IO fail
667    * @throws InterruptedException if thread is interrupted
668    */
669   private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
670       final UserGroupInformation user) throws IOException, InterruptedException {
671     user.doAs(new PrivilegedExceptionAction<Void>() {
672       public Void run() throws IOException, InterruptedException {
673         if (shouldAuthenticateOverKrb()) {
674           if (currRetries < MAX_SASL_RETRIES) {
675             LOG.debug("Exception encountered while connecting to the server : " + ex);
676             //try re-login
677             if (UserGroupInformation.isLoginKeytabBased()) {
678               UserGroupInformation.getLoginUser().reloginFromKeytab();
679             } else {
680               UserGroupInformation.getLoginUser().reloginFromTicketCache();
681             }
682 
683             // Should reconnect
684             return null;
685           } else {
686             String msg = "Couldn't setup connection for " +
687                 UserGroupInformation.getLoginUser().getUserName() +
688                 " to " + serverPrincipal;
689             LOG.warn(msg);
690             throw (IOException) new IOException(msg).initCause(ex);
691           }
692         } else {
693           LOG.warn("Exception encountered while connecting to " +
694               "the server : " + ex);
695         }
696         if (ex instanceof RemoteException) {
697           throw (RemoteException) ex;
698         }
699         if (ex instanceof SaslException) {
700           String msg = "SASL authentication failed." +
701               " The most likely cause is missing or invalid credentials." +
702               " Consider 'kinit'.";
703           LOG.fatal(msg, ex);
704           throw new RuntimeException(msg, ex);
705         }
706         throw new IOException(ex);
707       }
708     });
709   }
710 
711   public int getConnectionHashCode() {
712     return ConnectionId.hashCode(ticket, serviceName, address);
713   }
714   
715   @Override
716   public int hashCode() {
717     return getConnectionHashCode();
718   }
719      
720   @Override
721   public boolean equals(Object obj) {
722     if (obj instanceof AsyncRpcChannel) {
723       AsyncRpcChannel channel = (AsyncRpcChannel) obj;
724       return channel.hashCode() == obj.hashCode();
725     }
726     return false;
727   }
728 
729 
730   @Override
731   public String toString() {
732     return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
733   }
734 
735   /**
736    * Listens to call writes and fails if write failed
737    */
738   private static final class CallWriteListener implements ChannelFutureListener {
739     private final AsyncRpcChannel rpcChannel;
740     private final int id;
741 
742     public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
743       this.rpcChannel = asyncRpcChannel;
744       this.id = id;
745     }
746 
747     @Override
748     public void operationComplete(ChannelFuture future) throws Exception {
749       if (!future.isSuccess()) {
750         AsyncCall call = rpcChannel.removePendingCall(id);
751         if (call != null) {
752           if (future.cause() instanceof IOException) {
753             call.setFailed((IOException) future.cause());
754           } else {
755             call.setFailed(new IOException(future.cause()));
756           }
757         }
758       }
759     }
760   }
761 }