1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.Closeable;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.InterruptedIOException;
30 import java.io.OutputStream;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketTimeoutException;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Map;
42 import java.util.Map.Entry;
43 import java.util.Random;
44 import java.util.Set;
45 import java.util.concurrent.ArrayBlockingQueue;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.ConcurrentSkipListMap;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50
51 import javax.net.SocketFactory;
52 import javax.security.sasl.SaslException;
53
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56 import org.apache.hadoop.conf.Configuration;
57 import org.apache.hadoop.hbase.CellScanner;
58 import org.apache.hadoop.hbase.DoNotRetryIOException;
59 import org.apache.hadoop.hbase.HConstants;
60 import org.apache.hadoop.hbase.ServerName;
61 import org.apache.hadoop.hbase.classification.InterfaceAudience;
62 import org.apache.hadoop.hbase.client.MetricsConnection;
63 import org.apache.hadoop.hbase.codec.Codec;
64 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
65 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
67 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
68 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
69 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
70 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
71 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
72 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
73 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
74 import org.apache.hadoop.hbase.security.AuthMethod;
75 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
76 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
77 import org.apache.hadoop.hbase.security.SecurityInfo;
78 import org.apache.hadoop.hbase.security.User;
79 import org.apache.hadoop.hbase.security.UserProvider;
80 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
81 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82 import org.apache.hadoop.hbase.util.ExceptionUtil;
83 import org.apache.hadoop.hbase.util.Pair;
84 import org.apache.hadoop.hbase.util.PoolMap;
85 import org.apache.hadoop.io.IOUtils;
86 import org.apache.hadoop.io.Text;
87 import org.apache.hadoop.io.compress.CompressionCodec;
88 import org.apache.hadoop.ipc.RemoteException;
89 import org.apache.hadoop.net.NetUtils;
90 import org.apache.hadoop.security.SecurityUtil;
91 import org.apache.hadoop.security.UserGroupInformation;
92 import org.apache.hadoop.security.token.Token;
93 import org.apache.hadoop.security.token.TokenIdentifier;
94 import org.apache.hadoop.security.token.TokenSelector;
95 import org.apache.htrace.Span;
96 import org.apache.htrace.Trace;
97 import org.apache.htrace.TraceScope;
98
99 import com.google.common.annotations.VisibleForTesting;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcCallback;
104
105
106
107
108
109 @InterfaceAudience.Private
110 public class RpcClientImpl extends AbstractRpcClient {
111 private static final Log LOG = LogFactory.getLog(RpcClientImpl.class);
112 protected final AtomicInteger callIdCnt = new AtomicInteger();
113
114 protected final PoolMap<ConnectionId, Connection> connections;
115
116 protected final AtomicBoolean running = new AtomicBoolean(true);
117
118 protected final FailedServers failedServers;
119
120 protected final SocketFactory socketFactory;
121
122 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
123 justification="the rest of the system which live in the different package can use")
124 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
125 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
126 new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
127 TokenSelector<? extends TokenIdentifier>>();
128 static {
129 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
130 new AuthenticationTokenSelector());
131 }
132
133
134
135
136
137 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
138 final CompressionCodec compressor)
139 throws IOException {
140 return new Connection(remoteId, codec, compressor);
141 }
142
143
144
145
146 private static class CallFuture {
147 final Call call;
148 final int priority;
149 final Span span;
150
151
152 final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
153
154 CallFuture(Call call, int priority, Span span) {
155 this.call = call;
156 this.priority = priority;
157 this.span = span;
158 }
159 }
160
161
162
163
164 protected class Connection extends Thread {
165 private ConnectionHeader header;
166 protected ConnectionId remoteId;
167 protected Socket socket = null;
168 protected DataInputStream in;
169 protected DataOutputStream out;
170 private Object outLock = new Object();
171 private InetSocketAddress server;
172 private String serverPrincipal;
173 private AuthMethod authMethod;
174 private boolean useSasl;
175 private Token<? extends TokenIdentifier> token;
176 private HBaseSaslRpcClient saslRpcClient;
177 private int reloginMaxBackoff;
178 private final Codec codec;
179 private final CompressionCodec compressor;
180
181
182 protected final ConcurrentSkipListMap<Integer, Call> calls =
183 new ConcurrentSkipListMap<Integer, Call>();
184
185 protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
186 protected final CallSender callSender;
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 private class CallSender extends Thread implements Closeable {
208 protected final BlockingQueue<CallFuture> callsToWrite;
209
210
211 public CallFuture sendCall(Call call, int priority, Span span)
212 throws InterruptedException, IOException {
213 CallFuture cts = new CallFuture(call, priority, span);
214 if (!callsToWrite.offer(cts)) {
215 throw new IOException("Can't add the call " + call.id +
216 " to the write queue. callsToWrite.size()=" + callsToWrite.size());
217 }
218 checkIsOpen();
219
220 return cts;
221 }
222
223 @Override
224 public void close(){
225 assert shouldCloseConnection.get();
226 callsToWrite.offer(CallFuture.DEATH_PILL);
227
228
229 }
230
231 CallSender(String name, Configuration conf) {
232 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
233 callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
234 setDaemon(true);
235 setName(name + " - writer");
236 }
237
238 public void remove(CallFuture cts){
239 callsToWrite.remove(cts);
240
241
242
243 calls.remove(cts.call.id);
244 cts.call.callComplete();
245 }
246
247
248
249
250 @Override
251 public void run() {
252 while (!shouldCloseConnection.get()) {
253 CallFuture cts = null;
254 try {
255 cts = callsToWrite.take();
256 } catch (InterruptedException e) {
257 markClosed(new InterruptedIOException());
258 }
259
260 if (cts == null || cts == CallFuture.DEATH_PILL) {
261 assert shouldCloseConnection.get();
262 break;
263 }
264
265 if (cts.call.done) {
266 continue;
267 }
268
269 if (cts.call.checkAndSetTimeout()) {
270 continue;
271 }
272
273 try {
274 Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
275 } catch (IOException e) {
276 if (LOG.isDebugEnabled()) {
277 LOG.debug("call write error for call #" + cts.call.id
278 + ", message =" + e.getMessage());
279 }
280 cts.call.setException(e);
281 markClosed(e);
282 }
283 }
284
285 cleanup();
286 }
287
288
289
290
291 private void cleanup() {
292 assert shouldCloseConnection.get();
293
294 IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
295 while (true) {
296 CallFuture cts = callsToWrite.poll();
297 if (cts == null) {
298 break;
299 }
300 if (cts.call != null && !cts.call.done) {
301 cts.call.setException(ie);
302 }
303 }
304 }
305 }
306
307 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
308 throws IOException {
309 if (remoteId.getAddress().isUnresolved()) {
310 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
311 }
312 this.server = remoteId.getAddress();
313 this.codec = codec;
314 this.compressor = compressor;
315
316 UserGroupInformation ticket = remoteId.getTicket().getUGI();
317 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
318 this.useSasl = userProvider.isHBaseSecurityEnabled();
319 if (useSasl && securityInfo != null) {
320 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
321 if (tokenKind != null) {
322 TokenSelector<? extends TokenIdentifier> tokenSelector =
323 tokenHandlers.get(tokenKind);
324 if (tokenSelector != null) {
325 token = tokenSelector.selectToken(new Text(clusterId),
326 ticket.getTokens());
327 } else if (LOG.isDebugEnabled()) {
328 LOG.debug("No token selector found for type "+tokenKind);
329 }
330 }
331 String serverKey = securityInfo.getServerPrincipal();
332 if (serverKey == null) {
333 throw new IOException(
334 "Can't obtain server Kerberos config key from SecurityInfo");
335 }
336 serverPrincipal = SecurityUtil.getServerPrincipal(
337 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
338 if (LOG.isDebugEnabled()) {
339 LOG.debug("RPC Server Kerberos principal name for service="
340 + remoteId.getServiceName() + " is " + serverPrincipal);
341 }
342 }
343
344 if (!useSasl) {
345 authMethod = AuthMethod.SIMPLE;
346 } else if (token != null) {
347 authMethod = AuthMethod.DIGEST;
348 } else {
349 authMethod = AuthMethod.KERBEROS;
350 }
351
352 if (LOG.isDebugEnabled()) {
353 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
354 ", sasl=" + useSasl);
355 }
356 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
357 this.remoteId = remoteId;
358
359 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
360 builder.setServiceName(remoteId.getServiceName());
361 UserInformation userInfoPB = getUserInfo(ticket);
362 if (userInfoPB != null) {
363 builder.setUserInfo(userInfoPB);
364 }
365 if (this.codec != null) {
366 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
367 }
368 if (this.compressor != null) {
369 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
370 }
371 builder.setVersionInfo(ProtobufUtil.getVersionInfo());
372 this.header = builder.build();
373
374 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
375 remoteId.getAddress().toString() +
376 ((ticket==null)?" from an unknown user": (" from "
377 + ticket.getUserName())));
378 this.setDaemon(true);
379
380 if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
381 callSender = new CallSender(getName(), conf);
382 callSender.start();
383 } else {
384 callSender = null;
385 }
386 }
387
388 private synchronized UserInformation getUserInfo(UserGroupInformation ugi) {
389 if (ugi == null || authMethod == AuthMethod.DIGEST) {
390
391 return null;
392 }
393 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
394 if (authMethod == AuthMethod.KERBEROS) {
395
396 userInfoPB.setEffectiveUser(ugi.getUserName());
397 } else if (authMethod == AuthMethod.SIMPLE) {
398
399 userInfoPB.setEffectiveUser(ugi.getUserName());
400 if (ugi.getRealUser() != null) {
401 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
402 }
403 }
404 return userInfoPB.build();
405 }
406
407 protected synchronized void setupConnection() throws IOException {
408 short ioFailures = 0;
409 short timeoutFailures = 0;
410 while (true) {
411 try {
412 this.socket = socketFactory.createSocket();
413 this.socket.setTcpNoDelay(tcpNoDelay);
414 this.socket.setKeepAlive(tcpKeepAlive);
415 if (localAddr != null) {
416 this.socket.bind(localAddr);
417 }
418 NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
419 this.socket.setSoTimeout(readTO);
420 return;
421 } catch (SocketTimeoutException toe) {
422
423
424
425 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
426 } catch (IOException ie) {
427 handleConnectionFailure(ioFailures++, maxRetries, ie);
428 }
429 }
430 }
431
432 protected synchronized void closeConnection() {
433 if (socket == null) {
434 return;
435 }
436
437
438 try {
439 if (socket.getOutputStream() != null) {
440 socket.getOutputStream().close();
441 }
442 } catch (IOException ignored) {
443 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
444 }
445 try {
446 if (socket.getInputStream() != null) {
447 socket.getInputStream().close();
448 }
449 } catch (IOException ignored) {
450 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
451 }
452 try {
453 if (socket.getChannel() != null) {
454 socket.getChannel().close();
455 }
456 } catch (IOException ignored) {
457 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
458 }
459 try {
460 socket.close();
461 } catch (IOException e) {
462 LOG.warn("Not able to close a socket", e);
463 }
464
465
466
467 socket = null;
468 }
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
486 throws IOException {
487 closeConnection();
488
489
490 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
491 throw ioe;
492 }
493
494
495 try {
496 Thread.sleep(failureSleep);
497 } catch (InterruptedException ie) {
498 ExceptionUtil.rethrowIfInterrupt(ie);
499 }
500
501 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
502 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
503 " time(s).");
504 }
505
506
507
508
509 private void checkIsOpen() throws IOException {
510 if (shouldCloseConnection.get()) {
511 throw new ConnectionClosingException(getName() + " is closing");
512 }
513 }
514
515
516
517
518
519
520
521 protected synchronized boolean waitForWork() throws InterruptedException {
522
523
524 long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
525
526 while (true) {
527 if (shouldCloseConnection.get()) {
528 return false;
529 }
530
531 if (!running.get()) {
532 markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
533 return false;
534 }
535
536 if (!calls.isEmpty()) {
537
538
539 return true;
540 }
541
542 if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
543
544
545
546
547 markClosed(new IOException(
548 "idle connection closed with " + calls.size() + " pending request(s)"));
549 return false;
550 }
551
552 wait(Math.min(minIdleTimeBeforeClose, 1000));
553 }
554 }
555
556 public InetSocketAddress getRemoteAddress() {
557 return remoteId.getAddress();
558 }
559
560 @Override
561 public void run() {
562 if (LOG.isTraceEnabled()) {
563 LOG.trace(getName() + ": starting, connections " + connections.size());
564 }
565
566 try {
567 while (waitForWork()) {
568 readResponse();
569 }
570 } catch (InterruptedException t) {
571 if (LOG.isTraceEnabled()) {
572 LOG.trace(getName() + ": interrupted while waiting for call responses");
573 }
574 markClosed(ExceptionUtil.asInterrupt(t));
575 } catch (Throwable t) {
576 if (LOG.isDebugEnabled()) {
577 LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
578 }
579 markClosed(new IOException("Unexpected throwable while waiting call responses", t));
580 }
581
582 close();
583
584 if (LOG.isTraceEnabled()) {
585 LOG.trace(getName() + ": stopped, connections " + connections.size());
586 }
587 }
588
589 private synchronized void disposeSasl() {
590 if (saslRpcClient != null) {
591 try {
592 saslRpcClient.dispose();
593 saslRpcClient = null;
594 } catch (IOException ioe) {
595 LOG.error("Error disposing of SASL client", ioe);
596 }
597 }
598 }
599
600 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
601 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
602 UserGroupInformation currentUser =
603 UserGroupInformation.getCurrentUser();
604 UserGroupInformation realUser = currentUser.getRealUser();
605 return authMethod == AuthMethod.KERBEROS &&
606 loginUser != null &&
607
608 loginUser.hasKerberosCredentials() &&
609
610
611 (loginUser.equals(currentUser) || loginUser.equals(realUser));
612 }
613
614 private synchronized boolean setupSaslConnection(final InputStream in2,
615 final OutputStream out2) throws IOException {
616 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
617 conf.get("hbase.rpc.protection",
618 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
619 return saslRpcClient.saslConnect(in2, out2);
620 }
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640 private synchronized void handleSaslConnectionFailure(
641 final int currRetries,
642 final int maxRetries, final Exception ex, final Random rand,
643 final UserGroupInformation user)
644 throws IOException, InterruptedException{
645 user.doAs(new PrivilegedExceptionAction<Object>() {
646 @Override
647 public Object run() throws IOException, InterruptedException {
648 closeConnection();
649 if (shouldAuthenticateOverKrb()) {
650 if (currRetries < maxRetries) {
651 if (LOG.isDebugEnabled()) {
652 LOG.debug("Exception encountered while connecting to " +
653 "the server : " + ex);
654 }
655
656 if (UserGroupInformation.isLoginKeytabBased()) {
657 UserGroupInformation.getLoginUser().reloginFromKeytab();
658 } else {
659 UserGroupInformation.getLoginUser().reloginFromTicketCache();
660 }
661 disposeSasl();
662
663
664
665
666 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
667 return null;
668 } else {
669 String msg = "Couldn't setup connection for " +
670 UserGroupInformation.getLoginUser().getUserName() +
671 " to " + serverPrincipal;
672 LOG.warn(msg, ex);
673 throw (IOException) new IOException(msg).initCause(ex);
674 }
675 } else {
676 LOG.warn("Exception encountered while connecting to " +
677 "the server : " + ex);
678 }
679 if (ex instanceof RemoteException) {
680 throw (RemoteException)ex;
681 }
682 if (ex instanceof SaslException) {
683 String msg = "SASL authentication failed." +
684 " The most likely cause is missing or invalid credentials." +
685 " Consider 'kinit'.";
686 LOG.fatal(msg, ex);
687 throw new RuntimeException(msg, ex);
688 }
689 throw new IOException(ex);
690 }
691 });
692 }
693
694 protected synchronized void setupIOstreams() throws IOException {
695 if (socket != null) {
696
697 return;
698 }
699
700 if (shouldCloseConnection.get()){
701 throw new ConnectionClosingException("This connection is closing");
702 }
703
704 if (failedServers.isFailedServer(remoteId.getAddress())) {
705 if (LOG.isDebugEnabled()) {
706 LOG.debug("Not trying to connect to " + server +
707 " this server is in the failed servers list");
708 }
709 IOException e = new FailedServerException(
710 "This server is in the failed servers list: " + server);
711 markClosed(e);
712 close();
713 throw e;
714 }
715
716 try {
717 if (LOG.isDebugEnabled()) {
718 LOG.debug("Connecting to " + server);
719 }
720 short numRetries = 0;
721 final short MAX_RETRIES = 5;
722 Random rand = null;
723 while (true) {
724 setupConnection();
725 InputStream inStream = NetUtils.getInputStream(socket);
726
727 OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
728
729 writeConnectionHeaderPreamble(outStream);
730 if (useSasl) {
731 final InputStream in2 = inStream;
732 final OutputStream out2 = outStream;
733 UserGroupInformation ticket = remoteId.getTicket().getUGI();
734 if (authMethod == AuthMethod.KERBEROS) {
735 if (ticket != null && ticket.getRealUser() != null) {
736 ticket = ticket.getRealUser();
737 }
738 }
739 boolean continueSasl;
740 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
741 try {
742 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
743 @Override
744 public Boolean run() throws IOException {
745 return setupSaslConnection(in2, out2);
746 }
747 });
748 } catch (Exception ex) {
749 ExceptionUtil.rethrowIfInterrupt(ex);
750 if (rand == null) {
751 rand = new Random();
752 }
753 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
754 continue;
755 }
756 if (continueSasl) {
757
758 inStream = saslRpcClient.getInputStream(inStream);
759 outStream = saslRpcClient.getOutputStream(outStream);
760 } else {
761
762 authMethod = AuthMethod.SIMPLE;
763 useSasl = false;
764 }
765 }
766 this.in = new DataInputStream(new BufferedInputStream(inStream));
767 synchronized (this.outLock) {
768 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
769 }
770
771 writeConnectionHeader();
772
773
774 start();
775 return;
776 }
777 } catch (Throwable t) {
778 IOException e = ExceptionUtil.asInterrupt(t);
779 if (e == null) {
780 failedServers.addToFailedServers(remoteId.address);
781 if (t instanceof LinkageError) {
782
783 e = new DoNotRetryIOException(t);
784 } else if (t instanceof IOException) {
785 e = (IOException) t;
786 } else {
787 e = new IOException("Could not set up IO Streams to " + server, t);
788 }
789 }
790 markClosed(e);
791 close();
792 throw e;
793 }
794 }
795
796
797
798
799 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
800
801
802
803
804
805 int rpcHeaderLen = HConstants.RPC_HEADER.length;
806 byte [] preamble = new byte [rpcHeaderLen + 2];
807 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
808 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
809 synchronized (this) {
810 preamble[rpcHeaderLen + 1] = authMethod.code;
811 }
812 outStream.write(preamble);
813 outStream.flush();
814 }
815
816
817
818
819 private synchronized void writeConnectionHeader() throws IOException {
820 synchronized (this.outLock) {
821 this.out.writeInt(this.header.getSerializedSize());
822 this.header.writeTo(this.out);
823 this.out.flush();
824 }
825 }
826
827
828 protected synchronized void close() {
829 if (!shouldCloseConnection.get()) {
830 LOG.error(getName() + ": the connection is not in the closed state");
831 return;
832 }
833
834
835
836 synchronized (connections) {
837 connections.removeValue(remoteId, this);
838 }
839
840
841 synchronized(this.outLock) {
842 if (this.out != null) {
843 IOUtils.closeStream(out);
844 this.out = null;
845 }
846 }
847 IOUtils.closeStream(in);
848 this.in = null;
849 if (this.socket != null) {
850 try {
851 this.socket.close();
852 this.socket = null;
853 } catch (IOException e) {
854 LOG.error("Error while closing socket", e);
855 }
856 }
857
858 disposeSasl();
859
860
861 if (LOG.isTraceEnabled()) {
862 LOG.trace(getName() + ": closing ipc connection to " + server);
863 }
864
865 cleanupCalls(true);
866
867 if (LOG.isTraceEnabled()) {
868 LOG.trace(getName() + ": ipc connection to " + server + " closed");
869 }
870 }
871
872 protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
873 TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
874 try {
875 writeRequest(call, priority, span);
876 } finally {
877 ts.close();
878 }
879 }
880
881
882
883
884
885
886
887 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
888 justification="Findbugs is misinterpreting locking missing fact that this.outLock is held")
889 private void writeRequest(Call call, final int priority, Span span) throws IOException {
890 RequestHeader.Builder builder = RequestHeader.newBuilder();
891 builder.setCallId(call.id);
892 if (span != null) {
893 builder.setTraceInfo(
894 RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
895 }
896 builder.setMethodName(call.md.getName());
897 builder.setRequestParam(call.param != null);
898 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
899 if (cellBlock != null) {
900 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
901 cellBlockBuilder.setLength(cellBlock.limit());
902 builder.setCellBlockMeta(cellBlockBuilder.build());
903 }
904
905 if (priority != 0) builder.setPriority(priority);
906 RequestHeader header = builder.build();
907
908 setupIOstreams();
909
910
911
912
913 checkIsOpen();
914 IOException writeException = null;
915 synchronized (this.outLock) {
916 if (Thread.interrupted()) throw new InterruptedIOException();
917
918 calls.put(call.id, call);
919 checkIsOpen();
920
921 try {
922 call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
923 cellBlock));
924 } catch (Throwable t) {
925 if (LOG.isTraceEnabled()) {
926 LOG.trace("Error while writing call, call_id:" + call.id, t);
927 }
928
929
930 shouldCloseConnection.set(true);
931 writeException = IPCUtil.toIOE(t);
932 interrupt();
933 }
934 }
935
936
937 if (writeException != null) {
938 markClosed(writeException);
939 close();
940 }
941
942
943
944 doNotify();
945
946
947 if (writeException != null) throw writeException;
948 }
949
950 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
951 justification="Presume notifyAll is because we are closing/shutting down")
952 private synchronized void doNotify() {
953
954
955 notifyAll();
956 }
957
958
959
960
961 protected void readResponse() {
962 if (shouldCloseConnection.get()) return;
963 Call call = null;
964 boolean expectedCall = false;
965 try {
966
967
968 int totalSize = in.readInt();
969
970
971 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
972 int id = responseHeader.getCallId();
973 call = calls.remove(id);
974 expectedCall = (call != null && !call.done);
975 if (!expectedCall) {
976
977
978
979
980
981 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
982 int whatIsLeftToRead = totalSize - readSoFar;
983 IOUtils.skipFully(in, whatIsLeftToRead);
984 if (call != null) {
985 call.callStats.setResponseSizeBytes(totalSize);
986 call.callStats.setCallTimeMs(
987 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
988 }
989 return;
990 }
991 if (responseHeader.hasException()) {
992 ExceptionResponse exceptionResponse = responseHeader.getException();
993 RemoteException re = createRemoteException(exceptionResponse);
994 call.setException(re);
995 call.callStats.setResponseSizeBytes(totalSize);
996 call.callStats.setCallTimeMs(
997 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
998 if (isFatalConnectionException(exceptionResponse)) {
999 markClosed(re);
1000 }
1001 } else {
1002 Message value = null;
1003 if (call.responseDefaultType != null) {
1004 Builder builder = call.responseDefaultType.newBuilderForType();
1005 ProtobufUtil.mergeDelimitedFrom(builder, in);
1006 value = builder.build();
1007 }
1008 CellScanner cellBlockScanner = null;
1009 if (responseHeader.hasCellBlockMeta()) {
1010 int size = responseHeader.getCellBlockMeta().getLength();
1011 byte [] cellBlock = new byte[size];
1012 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1013 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1014 }
1015 call.setResponse(value, cellBlockScanner);
1016 call.callStats.setResponseSizeBytes(totalSize);
1017 call.callStats.setCallTimeMs(
1018 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
1019 }
1020 } catch (IOException e) {
1021 if (expectedCall) call.setException(e);
1022 if (e instanceof SocketTimeoutException) {
1023
1024
1025
1026 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1027 } else {
1028
1029 markClosed(e);
1030 }
1031 } finally {
1032 cleanupCalls(false);
1033 }
1034 }
1035
1036
1037
1038
1039 private boolean isFatalConnectionException(final ExceptionResponse e) {
1040 return e.getExceptionClassName().
1041 equals(FatalConnectionException.class.getName());
1042 }
1043
1044
1045
1046
1047
1048 private RemoteException createRemoteException(final ExceptionResponse e) {
1049 String innerExceptionClassName = e.getExceptionClassName();
1050 boolean doNotRetry = e.getDoNotRetry();
1051 return e.hasHostname()?
1052
1053 new RemoteWithExtrasException(innerExceptionClassName,
1054 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1055 new RemoteWithExtrasException(innerExceptionClassName,
1056 e.getStackTrace(), doNotRetry);
1057 }
1058
1059 protected synchronized boolean markClosed(IOException e) {
1060 if (e == null) throw new NullPointerException();
1061
1062 boolean ret = shouldCloseConnection.compareAndSet(false, true);
1063 if (ret) {
1064 if (LOG.isTraceEnabled()) {
1065 LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1066 }
1067 if (callSender != null) {
1068 callSender.close();
1069 }
1070 notifyAll();
1071 }
1072 return ret;
1073 }
1074
1075
1076
1077
1078
1079
1080 protected synchronized void cleanupCalls(boolean allCalls) {
1081 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1082 while (itor.hasNext()) {
1083 Call c = itor.next().getValue();
1084 if (c.done) {
1085
1086 itor.remove();
1087 } else if (allCalls) {
1088 long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1089 IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1090 + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1091 c.setException(ie);
1092 itor.remove();
1093 } else if (c.checkAndSetTimeout()) {
1094 itor.remove();
1095 } else {
1096
1097
1098
1099 break;
1100 }
1101 }
1102 }
1103 }
1104
1105
1106
1107
1108
1109
1110
1111
1112 @VisibleForTesting
1113 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1114 this(conf, clusterId, factory, null, null);
1115 }
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1126 SocketAddress localAddr, MetricsConnection metrics) {
1127 super(conf, clusterId, localAddr, metrics);
1128
1129 this.socketFactory = factory;
1130 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1131 this.failedServers = new FailedServers(conf);
1132 }
1133
1134
1135
1136
1137
1138 @VisibleForTesting
1139 RpcClientImpl(Configuration conf, String clusterId) {
1140 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
1141 }
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153 public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
1154 MetricsConnection metrics) {
1155 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
1156 }
1157
1158
1159
1160 @Override
1161 public void close() {
1162 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1163 if (!running.compareAndSet(true, false)) return;
1164
1165 Set<Connection> connsToClose = null;
1166
1167 synchronized (connections) {
1168 for (Connection conn : connections.values()) {
1169 conn.interrupt();
1170 if (conn.callSender != null) {
1171 conn.callSender.interrupt();
1172 }
1173
1174
1175
1176 if (!conn.isAlive()) {
1177 if (connsToClose == null) {
1178 connsToClose = new HashSet<Connection>();
1179 }
1180 connsToClose.add(conn);
1181 }
1182 }
1183 }
1184 if (connsToClose != null) {
1185 for (Connection conn : connsToClose) {
1186 conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1187 conn.close();
1188 }
1189 }
1190
1191 while (!connections.isEmpty()) {
1192 try {
1193 Thread.sleep(10);
1194 } catch (InterruptedException e) {
1195 LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1196 " connections.");
1197 Thread.currentThread().interrupt();
1198 return;
1199 }
1200 }
1201 }
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215 @Override
1216 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1217 Message param, Message returnType, User ticket, InetSocketAddress addr,
1218 MetricsConnection.CallStats callStats)
1219 throws IOException, InterruptedException {
1220 if (pcrc == null) {
1221 pcrc = new PayloadCarryingRpcController();
1222 }
1223 CellScanner cells = pcrc.cellScanner();
1224
1225 final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1226 pcrc.getCallTimeout(), MetricsConnection.newCallStats());
1227
1228 final Connection connection = getConnection(ticket, call, addr);
1229
1230 final CallFuture cts;
1231 if (connection.callSender != null) {
1232 cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1233 pcrc.notifyOnCancel(new RpcCallback<Object>() {
1234 @Override
1235 public void run(Object parameter) {
1236 connection.callSender.remove(cts);
1237 }
1238 });
1239 if (pcrc.isCanceled()) {
1240
1241 call.callComplete();
1242 return new Pair<Message, CellScanner>(call.response, call.cells);
1243 }
1244 } else {
1245 cts = null;
1246 connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1247 }
1248
1249 while (!call.done) {
1250 if (call.checkAndSetTimeout()) {
1251 if (cts != null) connection.callSender.remove(cts);
1252 break;
1253 }
1254 if (connection.shouldCloseConnection.get()) {
1255 throw new ConnectionClosingException("Call id=" + call.id +
1256 " on server " + addr + " aborted: connection is closing");
1257 }
1258 try {
1259 synchronized (call) {
1260 if (call.done) break;
1261 call.wait(Math.min(call.remainingTime(), 1000) + 1);
1262 }
1263 } catch (InterruptedException e) {
1264 call.setException(new InterruptedIOException());
1265 if (cts != null) connection.callSender.remove(cts);
1266 throw e;
1267 }
1268 }
1269
1270 if (call.error != null) {
1271 if (call.error instanceof RemoteException) {
1272 call.error.fillInStackTrace();
1273 throw call.error;
1274 }
1275
1276 throw wrapException(addr, call.error);
1277 }
1278
1279 return new Pair<Message, CellScanner>(call.response, call.cells);
1280 }
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291 @Override
1292 public void cancelConnections(ServerName sn) {
1293 synchronized (connections) {
1294 for (Connection connection : connections.values()) {
1295 if (connection.isAlive() &&
1296 connection.getRemoteAddress().getPort() == sn.getPort() &&
1297 connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1298 LOG.info("The server on " + sn.toString() +
1299 " is dead - stopping the connection " + connection.remoteId);
1300 connection.interrupt();
1301
1302 }
1303 }
1304 }
1305 }
1306
1307
1308
1309
1310
1311 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1312 throws IOException {
1313 if (!running.get()) throw new StoppedRpcClientException();
1314 Connection connection;
1315 ConnectionId remoteId =
1316 new ConnectionId(ticket, call.md.getService().getName(), addr);
1317 synchronized (connections) {
1318 connection = connections.get(remoteId);
1319 if (connection == null) {
1320 connection = createConnection(remoteId, this.codec, this.compressor);
1321 connections.put(remoteId, connection);
1322 }
1323 }
1324
1325 return connection;
1326 }
1327 }