1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.ByteBufOutputStream;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27 import io.netty.util.Timeout;
28 import io.netty.util.TimerTask;
29 import io.netty.util.concurrent.GenericFutureListener;
30 import io.netty.util.concurrent.Promise;
31
32 import java.io.IOException;
33 import java.net.ConnectException;
34 import java.net.InetSocketAddress;
35 import java.net.SocketException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Random;
44 import java.util.concurrent.TimeUnit;
45
46 import javax.security.sasl.SaslException;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.client.MetricsConnection;
53 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
54 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
56 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
57 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
58 import org.apache.hadoop.hbase.security.AuthMethod;
59 import org.apache.hadoop.hbase.security.SaslClientHandler;
60 import org.apache.hadoop.hbase.security.SaslUtil;
61 import org.apache.hadoop.hbase.security.SecurityInfo;
62 import org.apache.hadoop.hbase.security.User;
63 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
64 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65 import org.apache.hadoop.io.Text;
66 import org.apache.hadoop.ipc.RemoteException;
67 import org.apache.hadoop.security.SecurityUtil;
68 import org.apache.hadoop.security.UserGroupInformation;
69 import org.apache.hadoop.security.token.Token;
70 import org.apache.hadoop.security.token.TokenIdentifier;
71 import org.apache.hadoop.security.token.TokenSelector;
72 import org.apache.htrace.Span;
73 import org.apache.htrace.Trace;
74
75 import com.google.protobuf.Descriptors;
76 import com.google.protobuf.Message;
77 import com.google.protobuf.RpcCallback;
78
79
80
81
82 @InterfaceAudience.Private
83 public class AsyncRpcChannel {
84 private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
85
86 private static final int MAX_SASL_RETRIES = 5;
87
88 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
89 justification="the rest of the system which live in the different package can use")
90 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
91 TokenIdentifier>> tokenHandlers = new HashMap<>();
92
93 static {
94 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
95 new AuthenticationTokenSelector());
96 }
97
98 final AsyncRpcClient client;
99
100
101
102 private Channel channel;
103
104 String name;
105 final User ticket;
106 final String serviceName;
107 final InetSocketAddress address;
108
109 private int ioFailureCounter = 0;
110 private int connectFailureCounter = 0;
111
112 boolean useSasl;
113 AuthMethod authMethod;
114 private int reloginMaxBackoff;
115 private Token<? extends TokenIdentifier> token;
116 private String serverPrincipal;
117
118
119
120 private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
121 private boolean connected = false;
122 private boolean closed = false;
123
124 private Timeout cleanupTimer;
125
126 private final TimerTask timeoutTask = new TimerTask() {
127 @Override
128 public void run(Timeout timeout) throws Exception {
129 cleanupCalls();
130 }
131 };
132
133
134
135
136
137
138
139
140
141
142 public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
143 serviceName, InetSocketAddress address) {
144 this.client = client;
145
146 this.ticket = ticket;
147 this.serviceName = serviceName;
148 this.address = address;
149
150 this.channel = connect(bootstrap).channel();
151
152 name = ("IPC Client (" + channel.hashCode() + ") to " +
153 address.toString() +
154 ((ticket == null) ?
155 " from unknown user" :
156 (" from " + ticket.getName())));
157 }
158
159
160
161
162
163
164
165 private ChannelFuture connect(final Bootstrap bootstrap) {
166 return bootstrap.remoteAddress(address).connect()
167 .addListener(new GenericFutureListener<ChannelFuture>() {
168 @Override
169 public void operationComplete(final ChannelFuture f) throws Exception {
170 if (!f.isSuccess()) {
171 if (f.cause() instanceof SocketException) {
172 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
173 } else {
174 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
175 }
176 return;
177 }
178 channel = f.channel();
179
180 setupAuthorization();
181
182 ByteBuf b = channel.alloc().directBuffer(6);
183 createPreamble(b, authMethod);
184 channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
185 if (useSasl) {
186 UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
187 if (authMethod == AuthMethod.KERBEROS) {
188 if (ticket != null && ticket.getRealUser() != null) {
189 ticket = ticket.getRealUser();
190 }
191 }
192 SaslClientHandler saslHandler;
193 if (ticket == null) {
194 throw new FatalConnectionException("ticket/user is null");
195 }
196 final UserGroupInformation realTicket = ticket;
197 saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
198 @Override
199 public SaslClientHandler run() throws IOException {
200 return getSaslHandler(realTicket, bootstrap);
201 }
202 });
203 if (saslHandler != null) {
204
205 channel.pipeline().addFirst(saslHandler);
206 } else {
207
208 authMethod = AuthMethod.SIMPLE;
209 useSasl = false;
210 }
211 } else {
212 startHBaseConnection(f.channel());
213 }
214 }
215 });
216 }
217
218
219
220
221
222
223 private void startHBaseConnection(Channel ch) {
224 ch.pipeline()
225 .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
226 ch.pipeline().addLast(new AsyncServerResponseHandler(this));
227 try {
228 writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
229 @Override
230 public void operationComplete(ChannelFuture future) throws Exception {
231 if (!future.isSuccess()) {
232 close(future.cause());
233 return;
234 }
235 List<AsyncCall> callsToWrite;
236 synchronized (pendingCalls) {
237 connected = true;
238 callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
239 }
240 for (AsyncCall call : callsToWrite) {
241 writeRequest(call);
242 }
243 }
244 });
245 } catch (IOException e) {
246 close(e);
247 }
248 }
249
250
251
252
253
254
255
256 private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
257 final Bootstrap bootstrap) throws IOException {
258 return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
259 client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
260 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
261 new SaslClientHandler.SaslExceptionHandler() {
262 @Override
263 public void handle(int retryCount, Random random, Throwable cause) {
264 try {
265
266 handleSaslConnectionFailure(retryCount, cause, realTicket);
267
268
269 client.newTimeout(new TimerTask() {
270 @Override
271 public void run(Timeout timeout) throws Exception {
272 connect(bootstrap);
273 }
274 }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
275 } catch (IOException | InterruptedException e) {
276 close(e);
277 }
278 }
279 }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
280 @Override
281 public void onSuccess(Channel channel) {
282 startHBaseConnection(channel);
283 }
284 });
285 }
286
287
288
289
290
291
292
293
294 private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
295 if (connectCounter < client.maxRetries) {
296 client.newTimeout(new TimerTask() {
297 @Override public void run(Timeout timeout) throws Exception {
298 connect(bootstrap);
299 }
300 }, client.failureSleep, TimeUnit.MILLISECONDS);
301 } else {
302 client.failedServers.addToFailedServers(address);
303 close(e);
304 }
305 }
306
307
308
309
310
311
312
313
314 public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
315 final PayloadCarryingRpcController controller, final Message request,
316 final Message responsePrototype, MetricsConnection.CallStats callStats) {
317 final AsyncCall call =
318 new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
319 controller, responsePrototype, callStats);
320 controller.notifyOnCancel(new RpcCallback<Object>() {
321 @Override
322 public void run(Object parameter) {
323
324 synchronized (pendingCalls) {
325 pendingCalls.remove(call.id);
326 }
327 }
328 });
329
330 if (controller.isCanceled()) {
331
332 call.cancel(true);
333 return call;
334 }
335
336 synchronized (pendingCalls) {
337 if (closed) {
338 Promise<Message> promise = channel.eventLoop().newPromise();
339 promise.setFailure(new ConnectException());
340 return promise;
341 }
342 pendingCalls.put(call.id, call);
343
344 if (cleanupTimer == null && call.getRpcTimeout() > 0) {
345 cleanupTimer =
346 client.newTimeout(timeoutTask, call.getRpcTimeout(),
347 TimeUnit.MILLISECONDS);
348 }
349 if (!connected) {
350 return call;
351 }
352 }
353 writeRequest(call);
354 return call;
355 }
356
357 AsyncCall removePendingCall(int id) {
358 synchronized (pendingCalls) {
359 return pendingCalls.remove(id);
360 }
361 }
362
363
364
365
366
367
368
369
370 private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
371 RPCProtos.ConnectionHeader.Builder headerBuilder =
372 RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
373
374 RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
375 if (userInfoPB != null) {
376 headerBuilder.setUserInfo(userInfoPB);
377 }
378
379 if (client.codec != null) {
380 headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
381 }
382 if (client.compressor != null) {
383 headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
384 }
385
386 headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
387 RPCProtos.ConnectionHeader header = headerBuilder.build();
388
389
390 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
391
392 ByteBuf b = channel.alloc().directBuffer(totalSize);
393
394 b.writeInt(header.getSerializedSize());
395 b.writeBytes(header.toByteArray());
396
397 return channel.writeAndFlush(b);
398 }
399
400
401
402
403
404
405 private void writeRequest(final AsyncCall call) {
406 try {
407 final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
408 .newBuilder();
409 requestHeaderBuilder.setCallId(call.id)
410 .setMethodName(call.method.getName()).setRequestParam(call.param != null);
411
412 if (Trace.isTracing()) {
413 Span s = Trace.currentSpan();
414 requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
415 setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
416 }
417
418 ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
419 if (cellBlock != null) {
420 final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
421 .newBuilder();
422 cellBlockBuilder.setLength(cellBlock.limit());
423 requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
424 }
425
426 if (call.controller.getPriority() != 0) {
427 requestHeaderBuilder.setPriority(call.controller.getPriority());
428 }
429
430 RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
431
432 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
433 if (cellBlock != null) {
434 totalSize += cellBlock.remaining();
435 }
436
437 ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
438 try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
439 call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
440 }
441
442 channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
443 } catch (IOException e) {
444 close(e);
445 }
446 }
447
448
449
450
451
452
453 private void setupAuthorization() throws IOException {
454 SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
455 this.useSasl = client.userProvider.isHBaseSecurityEnabled();
456
457 this.token = null;
458 if (useSasl && securityInfo != null) {
459 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
460 if (tokenKind != null) {
461 TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
462 if (tokenSelector != null) {
463 token = tokenSelector
464 .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
465 } else if (LOG.isDebugEnabled()) {
466 LOG.debug("No token selector found for type " + tokenKind);
467 }
468 }
469 String serverKey = securityInfo.getServerPrincipal();
470 if (serverKey == null) {
471 throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
472 }
473 this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
474 address.getAddress().getCanonicalHostName().toLowerCase());
475 if (LOG.isDebugEnabled()) {
476 LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
477 + serverPrincipal);
478 }
479 }
480
481 if (!useSasl) {
482 authMethod = AuthMethod.SIMPLE;
483 } else if (token != null) {
484 authMethod = AuthMethod.DIGEST;
485 } else {
486 authMethod = AuthMethod.KERBEROS;
487 }
488
489 if (LOG.isDebugEnabled()) {
490 LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
491 ", sasl=" + useSasl);
492 }
493 reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
494 }
495
496
497
498
499
500
501
502
503 private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
504 if (ugi == null || authMethod == AuthMethod.DIGEST) {
505
506 return null;
507 }
508 RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
509 if (authMethod == AuthMethod.KERBEROS) {
510
511 userInfoPB.setEffectiveUser(ugi.getUserName());
512 } else if (authMethod == AuthMethod.SIMPLE) {
513
514 userInfoPB.setEffectiveUser(ugi.getUserName());
515 if (ugi.getRealUser() != null) {
516 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
517 }
518 }
519 return userInfoPB.build();
520 }
521
522
523
524
525
526
527
528 private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
529 byteBuf.writeBytes(HConstants.RPC_HEADER);
530 byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
531 byteBuf.writeByte(authMethod.code);
532 }
533
534
535
536
537
538
539 public void close(final Throwable e) {
540 client.removeConnection(this);
541
542
543 channel.eventLoop().execute(new Runnable() {
544 @Override
545 public void run() {
546 List<AsyncCall> toCleanup;
547 synchronized (pendingCalls) {
548 if (closed) {
549 return;
550 }
551 closed = true;
552 toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
553 pendingCalls.clear();
554 }
555 IOException closeException = null;
556 if (e != null) {
557 if (e instanceof IOException) {
558 closeException = (IOException) e;
559 } else {
560 closeException = new IOException(e);
561 }
562 }
563
564 if (LOG.isDebugEnabled() && closeException != null) {
565 LOG.debug(name + ": closing ipc connection to " + address, closeException);
566 }
567 if (cleanupTimer != null) {
568 cleanupTimer.cancel();
569 cleanupTimer = null;
570 }
571 for (AsyncCall call : toCleanup) {
572 call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
573 "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
574 }
575 channel.disconnect().addListener(ChannelFutureListener.CLOSE);
576 if (LOG.isDebugEnabled()) {
577 LOG.debug(name + ": closed");
578 }
579 }
580 });
581 }
582
583
584
585
586 private void cleanupCalls() {
587 List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
588 long currentTime = EnvironmentEdgeManager.currentTime();
589 long nextCleanupTaskDelay = -1L;
590 synchronized (pendingCalls) {
591 for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
592 AsyncCall call = iter.next();
593 long timeout = call.getRpcTimeout();
594 if (timeout > 0) {
595 if (currentTime - call.getStartTime() >= timeout) {
596 iter.remove();
597 toCleanup.add(call);
598 } else {
599 if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
600 nextCleanupTaskDelay = timeout;
601 }
602 }
603 }
604 }
605 if (nextCleanupTaskDelay > 0) {
606 cleanupTimer =
607 client.newTimeout(timeoutTask, nextCleanupTaskDelay,
608 TimeUnit.MILLISECONDS);
609 } else {
610 cleanupTimer = null;
611 }
612 }
613 for (AsyncCall call : toCleanup) {
614 call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
615 + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
616 }
617 }
618
619
620
621
622
623
624 public boolean isAlive() {
625 return channel.isOpen();
626 }
627
628
629
630
631
632
633
634 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
635 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
636 UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
637 UserGroupInformation realUser = currentUser.getRealUser();
638 return authMethod == AuthMethod.KERBEROS &&
639 loginUser != null &&
640
641 loginUser.hasKerberosCredentials() &&
642
643
644 (loginUser.equals(currentUser) || loginUser.equals(realUser));
645 }
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671 private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
672 final UserGroupInformation user) throws IOException, InterruptedException {
673 user.doAs(new PrivilegedExceptionAction<Void>() {
674 public Void run() throws IOException, InterruptedException {
675 if (shouldAuthenticateOverKrb()) {
676 if (currRetries < MAX_SASL_RETRIES) {
677 LOG.debug("Exception encountered while connecting to the server : " + ex);
678
679 if (UserGroupInformation.isLoginKeytabBased()) {
680 UserGroupInformation.getLoginUser().reloginFromKeytab();
681 } else {
682 UserGroupInformation.getLoginUser().reloginFromTicketCache();
683 }
684
685
686 return null;
687 } else {
688 String msg = "Couldn't setup connection for " +
689 UserGroupInformation.getLoginUser().getUserName() +
690 " to " + serverPrincipal;
691 LOG.warn(msg);
692 throw (IOException) new IOException(msg).initCause(ex);
693 }
694 } else {
695 LOG.warn("Exception encountered while connecting to " +
696 "the server : " + ex);
697 }
698 if (ex instanceof RemoteException) {
699 throw (RemoteException) ex;
700 }
701 if (ex instanceof SaslException) {
702 String msg = "SASL authentication failed." +
703 " The most likely cause is missing or invalid credentials." +
704 " Consider 'kinit'.";
705 LOG.fatal(msg, ex);
706 throw new RuntimeException(msg, ex);
707 }
708 throw new IOException(ex);
709 }
710 });
711 }
712
713 public int getConnectionHashCode() {
714 return ConnectionId.hashCode(ticket, serviceName, address);
715 }
716
717 @Override
718 public int hashCode() {
719 return getConnectionHashCode();
720 }
721
722 @Override
723 public boolean equals(Object obj) {
724 if (obj instanceof AsyncRpcChannel) {
725 AsyncRpcChannel channel = (AsyncRpcChannel) obj;
726 return channel.hashCode() == obj.hashCode();
727 }
728 return false;
729 }
730
731
732 @Override
733 public String toString() {
734 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
735 }
736
737
738
739
740 private static final class CallWriteListener implements ChannelFutureListener {
741 private final AsyncRpcChannel rpcChannel;
742 private final int id;
743
744 public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
745 this.rpcChannel = asyncRpcChannel;
746 this.id = id;
747 }
748
749 @Override
750 public void operationComplete(ChannelFuture future) throws Exception {
751 if (!future.isSuccess()) {
752 AsyncCall call = rpcChannel.removePendingCall(id);
753 if (call != null) {
754 if (future.cause() instanceof IOException) {
755 call.setFailed((IOException) future.cause());
756 } else {
757 call.setFailed(new IOException(future.cause()));
758 }
759 }
760 }
761 }
762 }
763 }