View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.ByteBufOutputStream;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27  import io.netty.util.Timeout;
28  import io.netty.util.TimerTask;
29  import io.netty.util.concurrent.GenericFutureListener;
30  import io.netty.util.concurrent.Promise;
31  
32  import java.io.IOException;
33  import java.net.ConnectException;
34  import java.net.InetSocketAddress;
35  import java.net.SocketException;
36  import java.nio.ByteBuffer;
37  import java.security.PrivilegedExceptionAction;
38  import java.util.ArrayList;
39  import java.util.HashMap;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Random;
44  import java.util.concurrent.TimeUnit;
45  
46  import javax.security.sasl.SaslException;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
53  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
54  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
56  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
57  import org.apache.hadoop.hbase.security.AuthMethod;
58  import org.apache.hadoop.hbase.security.SaslClientHandler;
59  import org.apache.hadoop.hbase.security.SaslUtil;
60  import org.apache.hadoop.hbase.security.SecurityInfo;
61  import org.apache.hadoop.hbase.security.User;
62  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.io.Text;
65  import org.apache.hadoop.ipc.RemoteException;
66  import org.apache.hadoop.security.SecurityUtil;
67  import org.apache.hadoop.security.UserGroupInformation;
68  import org.apache.hadoop.security.token.Token;
69  import org.apache.hadoop.security.token.TokenIdentifier;
70  import org.apache.hadoop.security.token.TokenSelector;
71  import org.apache.htrace.Span;
72  import org.apache.htrace.Trace;
73  
74  import com.google.protobuf.Descriptors;
75  import com.google.protobuf.Message;
76  import com.google.protobuf.RpcCallback;
77  
78  /**
79   * Netty RPC channel
80   */
81  @InterfaceAudience.Private
82  public class AsyncRpcChannel {
83    public static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
84  
85    private static final int MAX_SASL_RETRIES = 5;
86  
87    protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
88        TokenIdentifier>> tokenHandlers = new HashMap<>();
89  
90    static {
91      tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
92          new AuthenticationTokenSelector());
93    }
94  
95    final AsyncRpcClient client;
96  
97    // Contains the channel to work with.
98    // Only exists when connected
99    private Channel channel;
100 
101   String name;
102   final User ticket;
103   final String serviceName;
104   final InetSocketAddress address;
105 
106   private int ioFailureCounter = 0;
107   private int connectFailureCounter = 0;
108 
109   boolean useSasl;
110   AuthMethod authMethod;
111   private int reloginMaxBackoff;
112   private Token<? extends TokenIdentifier> token;
113   private String serverPrincipal;
114 
115 
116   // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
117   private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
118   private boolean connected = false;
119   private boolean closed = false;
120 
121   private Timeout cleanupTimer;
122 
123   private final TimerTask timeoutTask = new TimerTask() {
124     @Override
125     public void run(Timeout timeout) throws Exception {
126       cleanupCalls();
127     }
128   };
129 
130   /**
131    * Constructor for netty RPC channel
132    *
133    * @param bootstrap to construct channel on
134    * @param client    to connect with
135    * @param ticket of user which uses connection
136    *               @param serviceName name of service to connect to
137    * @param address to connect to
138    */
139   public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
140       serviceName, InetSocketAddress address) {
141     this.client = client;
142 
143     this.ticket = ticket;
144     this.serviceName = serviceName;
145     this.address = address;
146 
147     this.channel = connect(bootstrap).channel();
148 
149     name = ("IPC Client (" + channel.hashCode() + ") to " +
150         address.toString() +
151         ((ticket == null) ?
152             " from unknown user" :
153             (" from " + ticket.getName())));
154   }
155 
156   /**
157    * Connect to channel
158    *
159    * @param bootstrap to connect to
160    * @return future of connection
161    */
162   private ChannelFuture connect(final Bootstrap bootstrap) {
163     return bootstrap.remoteAddress(address).connect()
164         .addListener(new GenericFutureListener<ChannelFuture>() {
165           @Override
166           public void operationComplete(final ChannelFuture f) throws Exception {
167             if (!f.isSuccess()) {
168               if (f.cause() instanceof SocketException) {
169                 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
170               } else {
171                 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
172               }
173               return;
174             }
175             channel = f.channel();
176 
177             setupAuthorization();
178 
179             ByteBuf b = channel.alloc().directBuffer(6);
180             createPreamble(b, authMethod);
181             channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
182             if (useSasl) {
183               UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
184               if (authMethod == AuthMethod.KERBEROS) {
185                 if (ticket != null && ticket.getRealUser() != null) {
186                   ticket = ticket.getRealUser();
187                 }
188               }
189               SaslClientHandler saslHandler;
190               if (ticket == null) {
191                 throw new FatalConnectionException("ticket/user is null");
192               }
193               final UserGroupInformation realTicket = ticket;
194               saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
195                 @Override
196                 public SaslClientHandler run() throws IOException {
197                   return getSaslHandler(realTicket, bootstrap);
198                 }
199               });
200               if (saslHandler != null) {
201                 // Sasl connect is successful. Let's set up Sasl channel handler
202                 channel.pipeline().addFirst(saslHandler);
203               } else {
204                 // fall back to simple auth because server told us so.
205                 authMethod = AuthMethod.SIMPLE;
206                 useSasl = false;
207               }
208             } else {
209               startHBaseConnection(f.channel());
210             }
211           }
212         });
213   }
214 
215   /**
216    * Start HBase connection
217    *
218    * @param ch channel to start connection on
219    */
220   private void startHBaseConnection(Channel ch) {
221     ch.pipeline()
222         .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
223     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
224     try {
225       writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
226         @Override
227         public void operationComplete(ChannelFuture future) throws Exception {
228           if (!future.isSuccess()) {
229             close(future.cause());
230             return;
231           }
232           List<AsyncCall> callsToWrite;
233           synchronized (pendingCalls) {
234             connected = true;
235             callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
236           }
237           for (AsyncCall call : callsToWrite) {
238             writeRequest(call);
239           }
240         }
241       });
242     } catch (IOException e) {
243       close(e);
244     }
245   }
246 
247   /**
248    * Get SASL handler
249    * @param bootstrap to reconnect to
250    * @return new SASL handler
251    * @throws java.io.IOException if handler failed to create
252    */
253   private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
254       final Bootstrap bootstrap) throws IOException {
255     return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
256         client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
257           SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
258         new SaslClientHandler.SaslExceptionHandler() {
259           @Override
260           public void handle(int retryCount, Random random, Throwable cause) {
261             try {
262               // Handle Sasl failure. Try to potentially get new credentials
263               handleSaslConnectionFailure(retryCount, cause, realTicket);
264 
265               // Try to reconnect
266               client.newTimeout(new TimerTask() {
267                 @Override
268                 public void run(Timeout timeout) throws Exception {
269                   connect(bootstrap);
270                 }
271               }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
272             } catch (IOException | InterruptedException e) {
273               close(e);
274             }
275           }
276         }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
277           @Override
278           public void onSuccess(Channel channel) {
279             startHBaseConnection(channel);
280           }
281         });
282   }
283 
284   /**
285    * Retry to connect or close
286    *
287    * @param bootstrap      to connect with
288    * @param connectCounter amount of tries
289    * @param e              exception of fail
290    */
291   private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
292     if (connectCounter < client.maxRetries) {
293       client.newTimeout(new TimerTask() {
294         @Override public void run(Timeout timeout) throws Exception {
295           connect(bootstrap);
296         }
297       }, client.failureSleep, TimeUnit.MILLISECONDS);
298     } else {
299       client.failedServers.addToFailedServers(address);
300       close(e);
301     }
302   }
303 
304   /**
305    * Calls method on channel
306    * @param method to call
307    * @param controller to run call with
308    * @param request to send
309    * @param responsePrototype to construct response with
310    */
311   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
312       final PayloadCarryingRpcController controller, final Message request,
313       final Message responsePrototype) {
314     final AsyncCall call =
315         new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
316             controller, responsePrototype);
317     controller.notifyOnCancel(new RpcCallback<Object>() {
318       @Override
319       public void run(Object parameter) {
320         // TODO: do not need to call AsyncCall.setFailed?
321         synchronized (pendingCalls) {
322           pendingCalls.remove(call.id);
323         }
324       }
325     });
326     // TODO: this should be handled by PayloadCarryingRpcController.
327     if (controller.isCanceled()) {
328       // To finish if the call was cancelled before we set the notification (race condition)
329       call.cancel(true);
330       return call;
331     }
332 
333     synchronized (pendingCalls) {
334       if (closed) {
335         Promise<Message> promise = channel.eventLoop().newPromise();
336         promise.setFailure(new ConnectException());
337         return promise;
338       }
339       pendingCalls.put(call.id, call);
340       // Add timeout for cleanup if none is present
341       if (cleanupTimer == null && call.getRpcTimeout() > 0) {
342         cleanupTimer =
343             client.newTimeout(timeoutTask, call.getRpcTimeout(),
344               TimeUnit.MILLISECONDS);
345       }
346       if (!connected) {
347         return call;
348       }
349     }
350     writeRequest(call);
351     return call;
352   }
353 
354   AsyncCall removePendingCall(int id) {
355     synchronized (pendingCalls) {
356       return pendingCalls.remove(id);
357     }
358   }
359 
360   /**
361    * Write the channel header
362    *
363    * @param channel to write to
364    * @return future of write
365    * @throws java.io.IOException on failure to write
366    */
367   private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
368     RPCProtos.ConnectionHeader.Builder headerBuilder =
369         RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
370 
371     RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
372     if (userInfoPB != null) {
373       headerBuilder.setUserInfo(userInfoPB);
374     }
375 
376     if (client.codec != null) {
377       headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
378     }
379     if (client.compressor != null) {
380       headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
381     }
382 
383     headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
384     RPCProtos.ConnectionHeader header = headerBuilder.build();
385 
386 
387     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
388 
389     ByteBuf b = channel.alloc().directBuffer(totalSize);
390 
391     b.writeInt(header.getSerializedSize());
392     b.writeBytes(header.toByteArray());
393 
394     return channel.writeAndFlush(b);
395   }
396 
397   /**
398    * Write request to channel
399    *
400    * @param call    to write
401    */
402   private void writeRequest(final AsyncCall call) {
403     try {
404       final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
405           .newBuilder();
406       requestHeaderBuilder.setCallId(call.id)
407               .setMethodName(call.method.getName()).setRequestParam(call.param != null);
408 
409       if (Trace.isTracing()) {
410         Span s = Trace.currentSpan();
411         requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
412             setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
413       }
414 
415       ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
416       if (cellBlock != null) {
417         final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
418             .newBuilder();
419         cellBlockBuilder.setLength(cellBlock.limit());
420         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
421       }
422       // Only pass priority if there one.  Let zero be same as no priority.
423       if (call.controller.getPriority() != 0) {
424         requestHeaderBuilder.setPriority(call.controller.getPriority());
425       }
426 
427       RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
428 
429       int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
430       if (cellBlock != null) {
431         totalSize += cellBlock.remaining();
432       }
433 
434       ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
435       try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
436         IPCUtil.write(out, rh, call.param, cellBlock);
437       }
438 
439       channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
440     } catch (IOException e) {
441       close(e);
442     }
443   }
444 
445   /**
446    * Set up server authorization
447    *
448    * @throws java.io.IOException if auth setup failed
449    */
450   private void setupAuthorization() throws IOException {
451     SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
452     this.useSasl = client.userProvider.isHBaseSecurityEnabled();
453 
454     this.token = null;
455     if (useSasl && securityInfo != null) {
456       AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
457       if (tokenKind != null) {
458         TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
459         if (tokenSelector != null) {
460           token = tokenSelector
461               .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
462         } else if (LOG.isDebugEnabled()) {
463           LOG.debug("No token selector found for type " + tokenKind);
464         }
465       }
466       String serverKey = securityInfo.getServerPrincipal();
467       if (serverKey == null) {
468         throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
469       }
470       this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
471           address.getAddress().getCanonicalHostName().toLowerCase());
472       if (LOG.isDebugEnabled()) {
473         LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
474             + serverPrincipal);
475       }
476     }
477 
478     if (!useSasl) {
479       authMethod = AuthMethod.SIMPLE;
480     } else if (token != null) {
481       authMethod = AuthMethod.DIGEST;
482     } else {
483       authMethod = AuthMethod.KERBEROS;
484     }
485 
486     if (LOG.isDebugEnabled()) {
487       LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
488           ", sasl=" + useSasl);
489     }
490     reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
491   }
492 
493   /**
494    * Build the user information
495    *
496    * @param ugi        User Group Information
497    * @param authMethod Authorization method
498    * @return UserInformation protobuf
499    */
500   private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
501     if (ugi == null || authMethod == AuthMethod.DIGEST) {
502       // Don't send user for token auth
503       return null;
504     }
505     RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
506     if (authMethod == AuthMethod.KERBEROS) {
507       // Send effective user for Kerberos auth
508       userInfoPB.setEffectiveUser(ugi.getUserName());
509     } else if (authMethod == AuthMethod.SIMPLE) {
510       //Send both effective user and real user for simple auth
511       userInfoPB.setEffectiveUser(ugi.getUserName());
512       if (ugi.getRealUser() != null) {
513         userInfoPB.setRealUser(ugi.getRealUser().getUserName());
514       }
515     }
516     return userInfoPB.build();
517   }
518 
519   /**
520    * Create connection preamble
521    *
522    * @param byteBuf    to write to
523    * @param authMethod to write
524    */
525   private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
526     byteBuf.writeBytes(HConstants.RPC_HEADER);
527     byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
528     byteBuf.writeByte(authMethod.code);
529   }
530 
531   /**
532    * Close connection
533    *
534    * @param e exception on close
535    */
536   public void close(final Throwable e) {
537     client.removeConnection(this);
538 
539     // Move closing from the requesting thread to the channel thread
540     channel.eventLoop().execute(new Runnable() {
541       @Override
542       public void run() {
543         List<AsyncCall> toCleanup;
544         synchronized (pendingCalls) {
545           if (closed) {
546             return;
547           }
548           closed = true;
549           toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
550           pendingCalls.clear();
551         }
552         IOException closeException = null;
553         if (e != null) {
554           if (e instanceof IOException) {
555             closeException = (IOException) e;
556           } else {
557             closeException = new IOException(e);
558           }
559         }
560         // log the info
561         if (LOG.isDebugEnabled() && closeException != null) {
562           LOG.debug(name + ": closing ipc connection to " + address, closeException);
563         }
564         if (cleanupTimer != null) {
565           cleanupTimer.cancel();
566           cleanupTimer = null;
567         }
568         for (AsyncCall call : toCleanup) {
569           call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
570               "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
571         }
572         channel.disconnect().addListener(ChannelFutureListener.CLOSE);
573         if (LOG.isDebugEnabled()) {
574           LOG.debug(name + ": closed");
575         }
576       }
577     });
578   }
579 
580   /**
581    * Clean up calls.
582    *
583    * @param cleanAll true if all calls should be cleaned, false for only the timed out calls
584    */
585   private void cleanupCalls() {
586     List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
587     long currentTime = EnvironmentEdgeManager.currentTime();
588     long nextCleanupTaskDelay = -1L;
589     synchronized (pendingCalls) {
590       for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
591         AsyncCall call = iter.next();
592         long timeout = call.getRpcTimeout();
593         if (timeout > 0) {
594           if (currentTime - call.getStartTime() >= timeout) {
595             iter.remove();
596             toCleanup.add(call);
597           } else {
598             if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
599               nextCleanupTaskDelay = timeout;
600             }
601           }
602         }
603       }
604       if (nextCleanupTaskDelay > 0) {
605         cleanupTimer =
606             client.newTimeout(timeoutTask, nextCleanupTaskDelay,
607               TimeUnit.MILLISECONDS);
608       } else {
609         cleanupTimer = null;
610       }
611     }
612     for (AsyncCall call : toCleanup) {
613       call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
614           + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
615     }
616   }
617 
618   /**
619    * Check if the connection is alive
620    *
621    * @return true if alive
622    */
623   public boolean isAlive() {
624     return channel.isOpen();
625   }
626 
627   /**
628    * Check if user should authenticate over Kerberos
629    *
630    * @return true if should be authenticated over Kerberos
631    * @throws java.io.IOException on failure of check
632    */
633   private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
634     UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
635     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
636     UserGroupInformation realUser = currentUser.getRealUser();
637     return authMethod == AuthMethod.KERBEROS &&
638         loginUser != null &&
639         //Make sure user logged in using Kerberos either keytab or TGT
640         loginUser.hasKerberosCredentials() &&
641         // relogin only in case it is the login user (e.g. JT)
642         // or superuser (like oozie).
643         (loginUser.equals(currentUser) || loginUser.equals(realUser));
644   }
645 
646   /**
647    * If multiple clients with the same principal try to connect
648    * to the same server at the same time, the server assumes a
649    * replay attack is in progress. This is a feature of kerberos.
650    * In order to work around this, what is done is that the client
651    * backs off randomly and tries to initiate the connection
652    * again.
653    * The other problem is to do with ticket expiry. To handle that,
654    * a relogin is attempted.
655    * <p>
656    * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
657    * method. In case when the user doesn't have valid credentials, we don't
658    * need to retry (from cache or ticket). In such cases, it is prudent to
659    * throw a runtime exception when we receive a SaslException from the
660    * underlying authentication implementation, so there is no retry from
661    * other high level (for eg, HCM or HBaseAdmin).
662    * </p>
663    *
664    * @param currRetries retry count
665    * @param ex          exception describing fail
666    * @param user        which is trying to connect
667    * @throws java.io.IOException  if IO fail
668    * @throws InterruptedException if thread is interrupted
669    */
670   private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
671       final UserGroupInformation user) throws IOException, InterruptedException {
672     user.doAs(new PrivilegedExceptionAction<Void>() {
673       public Void run() throws IOException, InterruptedException {
674         if (shouldAuthenticateOverKrb()) {
675           if (currRetries < MAX_SASL_RETRIES) {
676             LOG.debug("Exception encountered while connecting to the server : " + ex);
677             //try re-login
678             if (UserGroupInformation.isLoginKeytabBased()) {
679               UserGroupInformation.getLoginUser().reloginFromKeytab();
680             } else {
681               UserGroupInformation.getLoginUser().reloginFromTicketCache();
682             }
683 
684             // Should reconnect
685             return null;
686           } else {
687             String msg = "Couldn't setup connection for " +
688                 UserGroupInformation.getLoginUser().getUserName() +
689                 " to " + serverPrincipal;
690             LOG.warn(msg);
691             throw (IOException) new IOException(msg).initCause(ex);
692           }
693         } else {
694           LOG.warn("Exception encountered while connecting to " +
695               "the server : " + ex);
696         }
697         if (ex instanceof RemoteException) {
698           throw (RemoteException) ex;
699         }
700         if (ex instanceof SaslException) {
701           String msg = "SASL authentication failed." +
702               " The most likely cause is missing or invalid credentials." +
703               " Consider 'kinit'.";
704           LOG.fatal(msg, ex);
705           throw new RuntimeException(msg, ex);
706         }
707         throw new IOException(ex);
708       }
709     });
710   }
711 
712   public int getConnectionHashCode() {
713     return ConnectionId.hashCode(ticket, serviceName, address);
714   }
715   
716   @Override
717   public int hashCode() {
718     return getConnectionHashCode();
719   }
720      
721   @Override
722   public boolean equals(Object obj) {
723     if (obj instanceof AsyncRpcChannel) {
724       AsyncRpcChannel channel = (AsyncRpcChannel) obj;
725       return channel.hashCode() == obj.hashCode();
726     }
727     return false;
728   }
729 
730 
731   @Override
732   public String toString() {
733     return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
734   }
735 
736   /**
737    * Listens to call writes and fails if write failed
738    */
739   private static final class CallWriteListener implements ChannelFutureListener {
740     private final AsyncRpcChannel rpcChannel;
741     private final int id;
742 
743     public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
744       this.rpcChannel = asyncRpcChannel;
745       this.id = id;
746     }
747 
748     @Override
749     public void operationComplete(ChannelFuture future) throws Exception {
750       if (!future.isSuccess()) {
751         AsyncCall call = rpcChannel.removePendingCall(id);
752         if (call != null) {
753           if (future.cause() instanceof IOException) {
754             call.setFailed((IOException) future.cause());
755           } else {
756             call.setFailed(new IOException(future.cause()));
757           }
758         }
759       }
760     }
761   }
762 }