1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.ByteBufOutputStream;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27 import io.netty.util.Timeout;
28 import io.netty.util.TimerTask;
29 import io.netty.util.concurrent.GenericFutureListener;
30 import io.netty.util.concurrent.Promise;
31
32 import java.io.IOException;
33 import java.net.ConnectException;
34 import java.net.InetSocketAddress;
35 import java.net.SocketException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Random;
44 import java.util.concurrent.TimeUnit;
45
46 import javax.security.sasl.SaslException;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
53 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
54 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
55 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
56 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
57 import org.apache.hadoop.hbase.security.AuthMethod;
58 import org.apache.hadoop.hbase.security.SaslClientHandler;
59 import org.apache.hadoop.hbase.security.SaslUtil;
60 import org.apache.hadoop.hbase.security.SecurityInfo;
61 import org.apache.hadoop.hbase.security.User;
62 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.io.Text;
65 import org.apache.hadoop.ipc.RemoteException;
66 import org.apache.hadoop.security.SecurityUtil;
67 import org.apache.hadoop.security.UserGroupInformation;
68 import org.apache.hadoop.security.token.Token;
69 import org.apache.hadoop.security.token.TokenIdentifier;
70 import org.apache.hadoop.security.token.TokenSelector;
71 import org.apache.htrace.Span;
72 import org.apache.htrace.Trace;
73
74 import com.google.protobuf.Descriptors;
75 import com.google.protobuf.Message;
76 import com.google.protobuf.RpcCallback;
77
78
79
80
81 @InterfaceAudience.Private
82 public class AsyncRpcChannel {
83 public static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
84
85 private static final int MAX_SASL_RETRIES = 5;
86
87 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
88 TokenIdentifier>> tokenHandlers = new HashMap<>();
89
90 static {
91 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
92 new AuthenticationTokenSelector());
93 }
94
95 final AsyncRpcClient client;
96
97
98
99 private Channel channel;
100
101 String name;
102 final User ticket;
103 final String serviceName;
104 final InetSocketAddress address;
105
106 private int ioFailureCounter = 0;
107 private int connectFailureCounter = 0;
108
109 boolean useSasl;
110 AuthMethod authMethod;
111 private int reloginMaxBackoff;
112 private Token<? extends TokenIdentifier> token;
113 private String serverPrincipal;
114
115
116
117 private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
118 private boolean connected = false;
119 private boolean closed = false;
120
121 private Timeout cleanupTimer;
122
123 private final TimerTask timeoutTask = new TimerTask() {
124 @Override
125 public void run(Timeout timeout) throws Exception {
126 cleanupCalls();
127 }
128 };
129
130
131
132
133
134
135
136
137
138
139 public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
140 serviceName, InetSocketAddress address) {
141 this.client = client;
142
143 this.ticket = ticket;
144 this.serviceName = serviceName;
145 this.address = address;
146
147 this.channel = connect(bootstrap).channel();
148
149 name = ("IPC Client (" + channel.hashCode() + ") to " +
150 address.toString() +
151 ((ticket == null) ?
152 " from unknown user" :
153 (" from " + ticket.getName())));
154 }
155
156
157
158
159
160
161
162 private ChannelFuture connect(final Bootstrap bootstrap) {
163 return bootstrap.remoteAddress(address).connect()
164 .addListener(new GenericFutureListener<ChannelFuture>() {
165 @Override
166 public void operationComplete(final ChannelFuture f) throws Exception {
167 if (!f.isSuccess()) {
168 if (f.cause() instanceof SocketException) {
169 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
170 } else {
171 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
172 }
173 return;
174 }
175 channel = f.channel();
176
177 setupAuthorization();
178
179 ByteBuf b = channel.alloc().directBuffer(6);
180 createPreamble(b, authMethod);
181 channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
182 if (useSasl) {
183 UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
184 if (authMethod == AuthMethod.KERBEROS) {
185 if (ticket != null && ticket.getRealUser() != null) {
186 ticket = ticket.getRealUser();
187 }
188 }
189 SaslClientHandler saslHandler;
190 if (ticket == null) {
191 throw new FatalConnectionException("ticket/user is null");
192 }
193 final UserGroupInformation realTicket = ticket;
194 saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
195 @Override
196 public SaslClientHandler run() throws IOException {
197 return getSaslHandler(realTicket, bootstrap);
198 }
199 });
200 if (saslHandler != null) {
201
202 channel.pipeline().addFirst(saslHandler);
203 } else {
204
205 authMethod = AuthMethod.SIMPLE;
206 useSasl = false;
207 }
208 } else {
209 startHBaseConnection(f.channel());
210 }
211 }
212 });
213 }
214
215
216
217
218
219
220 private void startHBaseConnection(Channel ch) {
221 ch.pipeline()
222 .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
223 ch.pipeline().addLast(new AsyncServerResponseHandler(this));
224 try {
225 writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
226 @Override
227 public void operationComplete(ChannelFuture future) throws Exception {
228 if (!future.isSuccess()) {
229 close(future.cause());
230 return;
231 }
232 List<AsyncCall> callsToWrite;
233 synchronized (pendingCalls) {
234 connected = true;
235 callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
236 }
237 for (AsyncCall call : callsToWrite) {
238 writeRequest(call);
239 }
240 }
241 });
242 } catch (IOException e) {
243 close(e);
244 }
245 }
246
247
248
249
250
251
252
253 private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
254 final Bootstrap bootstrap) throws IOException {
255 return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
256 client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
257 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
258 new SaslClientHandler.SaslExceptionHandler() {
259 @Override
260 public void handle(int retryCount, Random random, Throwable cause) {
261 try {
262
263 handleSaslConnectionFailure(retryCount, cause, realTicket);
264
265
266 client.newTimeout(new TimerTask() {
267 @Override
268 public void run(Timeout timeout) throws Exception {
269 connect(bootstrap);
270 }
271 }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
272 } catch (IOException | InterruptedException e) {
273 close(e);
274 }
275 }
276 }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
277 @Override
278 public void onSuccess(Channel channel) {
279 startHBaseConnection(channel);
280 }
281 });
282 }
283
284
285
286
287
288
289
290
291 private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
292 if (connectCounter < client.maxRetries) {
293 client.newTimeout(new TimerTask() {
294 @Override public void run(Timeout timeout) throws Exception {
295 connect(bootstrap);
296 }
297 }, client.failureSleep, TimeUnit.MILLISECONDS);
298 } else {
299 client.failedServers.addToFailedServers(address);
300 close(e);
301 }
302 }
303
304
305
306
307
308
309
310
311 public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
312 final PayloadCarryingRpcController controller, final Message request,
313 final Message responsePrototype) {
314 final AsyncCall call =
315 new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
316 controller, responsePrototype);
317 controller.notifyOnCancel(new RpcCallback<Object>() {
318 @Override
319 public void run(Object parameter) {
320
321 synchronized (pendingCalls) {
322 pendingCalls.remove(call.id);
323 }
324 }
325 });
326
327 if (controller.isCanceled()) {
328
329 call.cancel(true);
330 return call;
331 }
332
333 synchronized (pendingCalls) {
334 if (closed) {
335 Promise<Message> promise = channel.eventLoop().newPromise();
336 promise.setFailure(new ConnectException());
337 return promise;
338 }
339 pendingCalls.put(call.id, call);
340
341 if (cleanupTimer == null && call.getRpcTimeout() > 0) {
342 cleanupTimer =
343 client.newTimeout(timeoutTask, call.getRpcTimeout(),
344 TimeUnit.MILLISECONDS);
345 }
346 if (!connected) {
347 return call;
348 }
349 }
350 writeRequest(call);
351 return call;
352 }
353
354 AsyncCall removePendingCall(int id) {
355 synchronized (pendingCalls) {
356 return pendingCalls.remove(id);
357 }
358 }
359
360
361
362
363
364
365
366
367 private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
368 RPCProtos.ConnectionHeader.Builder headerBuilder =
369 RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
370
371 RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
372 if (userInfoPB != null) {
373 headerBuilder.setUserInfo(userInfoPB);
374 }
375
376 if (client.codec != null) {
377 headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
378 }
379 if (client.compressor != null) {
380 headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
381 }
382
383 headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
384 RPCProtos.ConnectionHeader header = headerBuilder.build();
385
386
387 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
388
389 ByteBuf b = channel.alloc().directBuffer(totalSize);
390
391 b.writeInt(header.getSerializedSize());
392 b.writeBytes(header.toByteArray());
393
394 return channel.writeAndFlush(b);
395 }
396
397
398
399
400
401
402 private void writeRequest(final AsyncCall call) {
403 try {
404 final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
405 .newBuilder();
406 requestHeaderBuilder.setCallId(call.id)
407 .setMethodName(call.method.getName()).setRequestParam(call.param != null);
408
409 if (Trace.isTracing()) {
410 Span s = Trace.currentSpan();
411 requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
412 setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
413 }
414
415 ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
416 if (cellBlock != null) {
417 final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
418 .newBuilder();
419 cellBlockBuilder.setLength(cellBlock.limit());
420 requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
421 }
422
423 if (call.controller.getPriority() != 0) {
424 requestHeaderBuilder.setPriority(call.controller.getPriority());
425 }
426
427 RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
428
429 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
430 if (cellBlock != null) {
431 totalSize += cellBlock.remaining();
432 }
433
434 ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
435 try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
436 IPCUtil.write(out, rh, call.param, cellBlock);
437 }
438
439 channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
440 } catch (IOException e) {
441 close(e);
442 }
443 }
444
445
446
447
448
449
450 private void setupAuthorization() throws IOException {
451 SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
452 this.useSasl = client.userProvider.isHBaseSecurityEnabled();
453
454 this.token = null;
455 if (useSasl && securityInfo != null) {
456 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
457 if (tokenKind != null) {
458 TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
459 if (tokenSelector != null) {
460 token = tokenSelector
461 .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
462 } else if (LOG.isDebugEnabled()) {
463 LOG.debug("No token selector found for type " + tokenKind);
464 }
465 }
466 String serverKey = securityInfo.getServerPrincipal();
467 if (serverKey == null) {
468 throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
469 }
470 this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
471 address.getAddress().getCanonicalHostName().toLowerCase());
472 if (LOG.isDebugEnabled()) {
473 LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
474 + serverPrincipal);
475 }
476 }
477
478 if (!useSasl) {
479 authMethod = AuthMethod.SIMPLE;
480 } else if (token != null) {
481 authMethod = AuthMethod.DIGEST;
482 } else {
483 authMethod = AuthMethod.KERBEROS;
484 }
485
486 if (LOG.isDebugEnabled()) {
487 LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
488 ", sasl=" + useSasl);
489 }
490 reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
491 }
492
493
494
495
496
497
498
499
500 private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
501 if (ugi == null || authMethod == AuthMethod.DIGEST) {
502
503 return null;
504 }
505 RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
506 if (authMethod == AuthMethod.KERBEROS) {
507
508 userInfoPB.setEffectiveUser(ugi.getUserName());
509 } else if (authMethod == AuthMethod.SIMPLE) {
510
511 userInfoPB.setEffectiveUser(ugi.getUserName());
512 if (ugi.getRealUser() != null) {
513 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
514 }
515 }
516 return userInfoPB.build();
517 }
518
519
520
521
522
523
524
525 private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
526 byteBuf.writeBytes(HConstants.RPC_HEADER);
527 byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
528 byteBuf.writeByte(authMethod.code);
529 }
530
531
532
533
534
535
536 public void close(final Throwable e) {
537 client.removeConnection(this);
538
539
540 channel.eventLoop().execute(new Runnable() {
541 @Override
542 public void run() {
543 List<AsyncCall> toCleanup;
544 synchronized (pendingCalls) {
545 if (closed) {
546 return;
547 }
548 closed = true;
549 toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
550 pendingCalls.clear();
551 }
552 IOException closeException = null;
553 if (e != null) {
554 if (e instanceof IOException) {
555 closeException = (IOException) e;
556 } else {
557 closeException = new IOException(e);
558 }
559 }
560
561 if (LOG.isDebugEnabled() && closeException != null) {
562 LOG.debug(name + ": closing ipc connection to " + address, closeException);
563 }
564 if (cleanupTimer != null) {
565 cleanupTimer.cancel();
566 cleanupTimer = null;
567 }
568 for (AsyncCall call : toCleanup) {
569 call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
570 "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
571 }
572 channel.disconnect().addListener(ChannelFutureListener.CLOSE);
573 if (LOG.isDebugEnabled()) {
574 LOG.debug(name + ": closed");
575 }
576 }
577 });
578 }
579
580
581
582
583
584
585 private void cleanupCalls() {
586 List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
587 long currentTime = EnvironmentEdgeManager.currentTime();
588 long nextCleanupTaskDelay = -1L;
589 synchronized (pendingCalls) {
590 for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
591 AsyncCall call = iter.next();
592 long timeout = call.getRpcTimeout();
593 if (timeout > 0) {
594 if (currentTime - call.getStartTime() >= timeout) {
595 iter.remove();
596 toCleanup.add(call);
597 } else {
598 if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
599 nextCleanupTaskDelay = timeout;
600 }
601 }
602 }
603 }
604 if (nextCleanupTaskDelay > 0) {
605 cleanupTimer =
606 client.newTimeout(timeoutTask, nextCleanupTaskDelay,
607 TimeUnit.MILLISECONDS);
608 } else {
609 cleanupTimer = null;
610 }
611 }
612 for (AsyncCall call : toCleanup) {
613 call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
614 + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
615 }
616 }
617
618
619
620
621
622
623 public boolean isAlive() {
624 return channel.isOpen();
625 }
626
627
628
629
630
631
632
633 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
634 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
635 UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
636 UserGroupInformation realUser = currentUser.getRealUser();
637 return authMethod == AuthMethod.KERBEROS &&
638 loginUser != null &&
639
640 loginUser.hasKerberosCredentials() &&
641
642
643 (loginUser.equals(currentUser) || loginUser.equals(realUser));
644 }
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670 private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
671 final UserGroupInformation user) throws IOException, InterruptedException {
672 user.doAs(new PrivilegedExceptionAction<Void>() {
673 public Void run() throws IOException, InterruptedException {
674 if (shouldAuthenticateOverKrb()) {
675 if (currRetries < MAX_SASL_RETRIES) {
676 LOG.debug("Exception encountered while connecting to the server : " + ex);
677
678 if (UserGroupInformation.isLoginKeytabBased()) {
679 UserGroupInformation.getLoginUser().reloginFromKeytab();
680 } else {
681 UserGroupInformation.getLoginUser().reloginFromTicketCache();
682 }
683
684
685 return null;
686 } else {
687 String msg = "Couldn't setup connection for " +
688 UserGroupInformation.getLoginUser().getUserName() +
689 " to " + serverPrincipal;
690 LOG.warn(msg);
691 throw (IOException) new IOException(msg).initCause(ex);
692 }
693 } else {
694 LOG.warn("Exception encountered while connecting to " +
695 "the server : " + ex);
696 }
697 if (ex instanceof RemoteException) {
698 throw (RemoteException) ex;
699 }
700 if (ex instanceof SaslException) {
701 String msg = "SASL authentication failed." +
702 " The most likely cause is missing or invalid credentials." +
703 " Consider 'kinit'.";
704 LOG.fatal(msg, ex);
705 throw new RuntimeException(msg, ex);
706 }
707 throw new IOException(ex);
708 }
709 });
710 }
711
712 public int getConnectionHashCode() {
713 return ConnectionId.hashCode(ticket, serviceName, address);
714 }
715
716 @Override
717 public int hashCode() {
718 return getConnectionHashCode();
719 }
720
721 @Override
722 public boolean equals(Object obj) {
723 if (obj instanceof AsyncRpcChannel) {
724 AsyncRpcChannel channel = (AsyncRpcChannel) obj;
725 return channel.hashCode() == obj.hashCode();
726 }
727 return false;
728 }
729
730
731 @Override
732 public String toString() {
733 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
734 }
735
736
737
738
739 private static final class CallWriteListener implements ChannelFutureListener {
740 private final AsyncRpcChannel rpcChannel;
741 private final int id;
742
743 public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
744 this.rpcChannel = asyncRpcChannel;
745 this.id = id;
746 }
747
748 @Override
749 public void operationComplete(ChannelFuture future) throws Exception {
750 if (!future.isSuccess()) {
751 AsyncCall call = rpcChannel.removePendingCall(id);
752 if (call != null) {
753 if (future.cause() instanceof IOException) {
754 call.setFailed((IOException) future.cause());
755 } else {
756 call.setFailed(new IOException(future.cause()));
757 }
758 }
759 }
760 }
761 }
762 }