1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.Closeable;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.InterruptedIOException;
30 import java.io.OutputStream;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketTimeoutException;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Map;
42 import java.util.Map.Entry;
43 import java.util.Random;
44 import java.util.Set;
45 import java.util.concurrent.ArrayBlockingQueue;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.ConcurrentSkipListMap;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50
51 import javax.net.SocketFactory;
52 import javax.security.sasl.SaslException;
53
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.hbase.CellScanner;
56 import org.apache.hadoop.hbase.DoNotRetryIOException;
57 import org.apache.hadoop.hbase.HConstants;
58 import org.apache.hadoop.hbase.ServerName;
59 import org.apache.hadoop.hbase.classification.InterfaceAudience;
60 import org.apache.hadoop.hbase.codec.Codec;
61 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
62 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
64 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
65 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
66 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
68 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
69 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
70 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
71 import org.apache.hadoop.hbase.security.AuthMethod;
72 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
73 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
74 import org.apache.hadoop.hbase.security.SecurityInfo;
75 import org.apache.hadoop.hbase.security.User;
76 import org.apache.hadoop.hbase.security.UserProvider;
77 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
79 import org.apache.hadoop.hbase.util.ExceptionUtil;
80 import org.apache.hadoop.hbase.util.Pair;
81 import org.apache.hadoop.hbase.util.PoolMap;
82 import org.apache.hadoop.io.IOUtils;
83 import org.apache.hadoop.io.Text;
84 import org.apache.hadoop.io.compress.CompressionCodec;
85 import org.apache.hadoop.ipc.RemoteException;
86 import org.apache.hadoop.net.NetUtils;
87 import org.apache.hadoop.security.SecurityUtil;
88 import org.apache.hadoop.security.UserGroupInformation;
89 import org.apache.hadoop.security.token.Token;
90 import org.apache.hadoop.security.token.TokenIdentifier;
91 import org.apache.hadoop.security.token.TokenSelector;
92 import org.apache.htrace.Span;
93 import org.apache.htrace.Trace;
94 import org.apache.htrace.TraceScope;
95
96 import com.google.protobuf.Descriptors.MethodDescriptor;
97 import com.google.protobuf.Message;
98 import com.google.protobuf.Message.Builder;
99 import com.google.protobuf.RpcCallback;
100
101
102
103
104
105 @InterfaceAudience.Private
106 public class RpcClientImpl extends AbstractRpcClient {
107 protected final AtomicInteger callIdCnt = new AtomicInteger();
108
109 protected final PoolMap<ConnectionId, Connection> connections;
110
111 protected final AtomicBoolean running = new AtomicBoolean(true);
112
113 protected final FailedServers failedServers;
114
115 protected final SocketFactory socketFactory;
116
117 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
118 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
119 new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
120 TokenSelector<? extends TokenIdentifier>>();
121 static {
122 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
123 new AuthenticationTokenSelector());
124 }
125
126
127
128
129
130 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
131 final CompressionCodec compressor)
132 throws IOException {
133 return new Connection(remoteId, codec, compressor);
134 }
135
136
137
138
139 private static class CallFuture {
140 final Call call;
141 final int priority;
142 final Span span;
143
144
145 final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
146
147 CallFuture(Call call, int priority, Span span) {
148 this.call = call;
149 this.priority = priority;
150 this.span = span;
151 }
152 }
153
154
155
156
157 protected class Connection extends Thread {
158 private ConnectionHeader header;
159 protected ConnectionId remoteId;
160 protected Socket socket = null;
161 protected DataInputStream in;
162 protected DataOutputStream out;
163 private Object outLock = new Object();
164 private InetSocketAddress server;
165 private String serverPrincipal;
166 private AuthMethod authMethod;
167 private boolean useSasl;
168 private Token<? extends TokenIdentifier> token;
169 private HBaseSaslRpcClient saslRpcClient;
170 private int reloginMaxBackoff;
171 private final Codec codec;
172 private final CompressionCodec compressor;
173
174
175 protected final ConcurrentSkipListMap<Integer, Call> calls =
176 new ConcurrentSkipListMap<Integer, Call>();
177
178 protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
179 protected final CallSender callSender;
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200 private class CallSender extends Thread implements Closeable {
201 protected final BlockingQueue<CallFuture> callsToWrite;
202
203
204 public CallFuture sendCall(Call call, int priority, Span span)
205 throws InterruptedException, IOException {
206 CallFuture cts = new CallFuture(call, priority, span);
207 if (!callsToWrite.offer(cts)) {
208 throw new IOException("Can't add the call " + call.id +
209 " to the write queue. callsToWrite.size()=" + callsToWrite.size());
210 }
211 checkIsOpen();
212
213 return cts;
214 }
215
216 @Override
217 public void close(){
218 assert shouldCloseConnection.get();
219 callsToWrite.offer(CallFuture.DEATH_PILL);
220
221
222 }
223
224 CallSender(String name, Configuration conf) {
225 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
226 callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
227 setDaemon(true);
228 setName(name + " - writer");
229 }
230
231 public void remove(CallFuture cts){
232 callsToWrite.remove(cts);
233
234
235
236 calls.remove(cts.call.id);
237 cts.call.callComplete();
238 }
239
240
241
242
243 @Override
244 public void run() {
245 while (!shouldCloseConnection.get()) {
246 CallFuture cts = null;
247 try {
248 cts = callsToWrite.take();
249 } catch (InterruptedException e) {
250 markClosed(new InterruptedIOException());
251 }
252
253 if (cts == null || cts == CallFuture.DEATH_PILL) {
254 assert shouldCloseConnection.get();
255 break;
256 }
257
258 if (cts.call.done) {
259 continue;
260 }
261
262 if (cts.call.checkAndSetTimeout()) {
263 continue;
264 }
265
266 try {
267 Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
268 } catch (IOException e) {
269 if (LOG.isDebugEnabled()) {
270 LOG.debug("call write error for call #" + cts.call.id
271 + ", message =" + e.getMessage());
272 }
273 cts.call.setException(e);
274 markClosed(e);
275 }
276 }
277
278 cleanup();
279 }
280
281
282
283
284 private void cleanup() {
285 assert shouldCloseConnection.get();
286
287 IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
288 while (true) {
289 CallFuture cts = callsToWrite.poll();
290 if (cts == null) {
291 break;
292 }
293 if (cts.call != null && !cts.call.done) {
294 cts.call.setException(ie);
295 }
296 }
297 }
298 }
299
300 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
301 throws IOException {
302 if (remoteId.getAddress().isUnresolved()) {
303 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
304 }
305 this.server = remoteId.getAddress();
306 this.codec = codec;
307 this.compressor = compressor;
308
309 UserGroupInformation ticket = remoteId.getTicket().getUGI();
310 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
311 this.useSasl = userProvider.isHBaseSecurityEnabled();
312 if (useSasl && securityInfo != null) {
313 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
314 if (tokenKind != null) {
315 TokenSelector<? extends TokenIdentifier> tokenSelector =
316 tokenHandlers.get(tokenKind);
317 if (tokenSelector != null) {
318 token = tokenSelector.selectToken(new Text(clusterId),
319 ticket.getTokens());
320 } else if (LOG.isDebugEnabled()) {
321 LOG.debug("No token selector found for type "+tokenKind);
322 }
323 }
324 String serverKey = securityInfo.getServerPrincipal();
325 if (serverKey == null) {
326 throw new IOException(
327 "Can't obtain server Kerberos config key from SecurityInfo");
328 }
329 serverPrincipal = SecurityUtil.getServerPrincipal(
330 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
331 if (LOG.isDebugEnabled()) {
332 LOG.debug("RPC Server Kerberos principal name for service="
333 + remoteId.getServiceName() + " is " + serverPrincipal);
334 }
335 }
336
337 if (!useSasl) {
338 authMethod = AuthMethod.SIMPLE;
339 } else if (token != null) {
340 authMethod = AuthMethod.DIGEST;
341 } else {
342 authMethod = AuthMethod.KERBEROS;
343 }
344
345 if (LOG.isDebugEnabled()) {
346 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
347 ", sasl=" + useSasl);
348 }
349 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
350 this.remoteId = remoteId;
351
352 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
353 builder.setServiceName(remoteId.getServiceName());
354 UserInformation userInfoPB = getUserInfo(ticket);
355 if (userInfoPB != null) {
356 builder.setUserInfo(userInfoPB);
357 }
358 if (this.codec != null) {
359 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
360 }
361 if (this.compressor != null) {
362 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
363 }
364 builder.setVersionInfo(ProtobufUtil.getVersionInfo());
365 this.header = builder.build();
366
367 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
368 remoteId.getAddress().toString() +
369 ((ticket==null)?" from an unknown user": (" from "
370 + ticket.getUserName())));
371 this.setDaemon(true);
372
373 if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
374 callSender = new CallSender(getName(), conf);
375 callSender.start();
376 } else {
377 callSender = null;
378 }
379 }
380
381 private UserInformation getUserInfo(UserGroupInformation ugi) {
382 if (ugi == null || authMethod == AuthMethod.DIGEST) {
383
384 return null;
385 }
386 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
387 if (authMethod == AuthMethod.KERBEROS) {
388
389 userInfoPB.setEffectiveUser(ugi.getUserName());
390 } else if (authMethod == AuthMethod.SIMPLE) {
391
392 userInfoPB.setEffectiveUser(ugi.getUserName());
393 if (ugi.getRealUser() != null) {
394 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
395 }
396 }
397 return userInfoPB.build();
398 }
399
400 protected synchronized void setupConnection() throws IOException {
401 short ioFailures = 0;
402 short timeoutFailures = 0;
403 while (true) {
404 try {
405 this.socket = socketFactory.createSocket();
406 this.socket.setTcpNoDelay(tcpNoDelay);
407 this.socket.setKeepAlive(tcpKeepAlive);
408 if (localAddr != null) {
409 this.socket.bind(localAddr);
410 }
411 NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
412 this.socket.setSoTimeout(readTO);
413 return;
414 } catch (SocketTimeoutException toe) {
415
416
417
418 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
419 } catch (IOException ie) {
420 handleConnectionFailure(ioFailures++, maxRetries, ie);
421 }
422 }
423 }
424
425 protected synchronized void closeConnection() {
426 if (socket == null) {
427 return;
428 }
429
430
431 try {
432 if (socket.getOutputStream() != null) {
433 socket.getOutputStream().close();
434 }
435 } catch (IOException ignored) {
436 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
437 }
438 try {
439 if (socket.getInputStream() != null) {
440 socket.getInputStream().close();
441 }
442 } catch (IOException ignored) {
443 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
444 }
445 try {
446 if (socket.getChannel() != null) {
447 socket.getChannel().close();
448 }
449 } catch (IOException ignored) {
450 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
451 }
452 try {
453 socket.close();
454 } catch (IOException e) {
455 LOG.warn("Not able to close a socket", e);
456 }
457
458
459
460 socket = null;
461 }
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
479 throws IOException {
480 closeConnection();
481
482
483 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
484 throw ioe;
485 }
486
487
488 try {
489 Thread.sleep(failureSleep);
490 } catch (InterruptedException ie) {
491 ExceptionUtil.rethrowIfInterrupt(ie);
492 }
493
494 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
495 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
496 " time(s).");
497 }
498
499
500
501
502 private void checkIsOpen() throws IOException {
503 if (shouldCloseConnection.get()) {
504 throw new ConnectionClosingException(getName() + " is closing");
505 }
506 }
507
508
509
510
511
512
513
514 protected synchronized boolean waitForWork() throws InterruptedException {
515
516
517 long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
518
519 while (true) {
520 if (shouldCloseConnection.get()) {
521 return false;
522 }
523
524 if (!running.get()) {
525 markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
526 return false;
527 }
528
529 if (!calls.isEmpty()) {
530
531
532 return true;
533 }
534
535 if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
536
537
538
539
540 markClosed(new IOException(
541 "idle connection closed with " + calls.size() + " pending request(s)"));
542 return false;
543 }
544
545 wait(Math.min(minIdleTimeBeforeClose, 1000));
546 }
547 }
548
549 public InetSocketAddress getRemoteAddress() {
550 return remoteId.getAddress();
551 }
552
553 @Override
554 public void run() {
555 if (LOG.isTraceEnabled()) {
556 LOG.trace(getName() + ": starting, connections " + connections.size());
557 }
558
559 try {
560 while (waitForWork()) {
561 readResponse();
562 }
563 } catch (InterruptedException t) {
564 if (LOG.isTraceEnabled()) {
565 LOG.trace(getName() + ": interrupted while waiting for call responses");
566 }
567 markClosed(ExceptionUtil.asInterrupt(t));
568 } catch (Throwable t) {
569 if (LOG.isDebugEnabled()) {
570 LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
571 }
572 markClosed(new IOException("Unexpected throwable while waiting call responses", t));
573 }
574
575 close();
576
577 if (LOG.isTraceEnabled()) {
578 LOG.trace(getName() + ": stopped, connections " + connections.size());
579 }
580 }
581
582 private synchronized void disposeSasl() {
583 if (saslRpcClient != null) {
584 try {
585 saslRpcClient.dispose();
586 saslRpcClient = null;
587 } catch (IOException ioe) {
588 LOG.error("Error disposing of SASL client", ioe);
589 }
590 }
591 }
592
593 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
594 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
595 UserGroupInformation currentUser =
596 UserGroupInformation.getCurrentUser();
597 UserGroupInformation realUser = currentUser.getRealUser();
598 return authMethod == AuthMethod.KERBEROS &&
599 loginUser != null &&
600
601 loginUser.hasKerberosCredentials() &&
602
603
604 (loginUser.equals(currentUser) || loginUser.equals(realUser));
605 }
606
607 private synchronized boolean setupSaslConnection(final InputStream in2,
608 final OutputStream out2) throws IOException {
609 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
610 conf.get("hbase.rpc.protection",
611 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
612 return saslRpcClient.saslConnect(in2, out2);
613 }
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633 private synchronized void handleSaslConnectionFailure(
634 final int currRetries,
635 final int maxRetries, final Exception ex, final Random rand,
636 final UserGroupInformation user)
637 throws IOException, InterruptedException{
638 user.doAs(new PrivilegedExceptionAction<Object>() {
639 @Override
640 public Object run() throws IOException, InterruptedException {
641 closeConnection();
642 if (shouldAuthenticateOverKrb()) {
643 if (currRetries < maxRetries) {
644 if (LOG.isDebugEnabled()) {
645 LOG.debug("Exception encountered while connecting to " +
646 "the server : " + ex);
647 }
648
649 if (UserGroupInformation.isLoginKeytabBased()) {
650 UserGroupInformation.getLoginUser().reloginFromKeytab();
651 } else {
652 UserGroupInformation.getLoginUser().reloginFromTicketCache();
653 }
654 disposeSasl();
655
656
657
658
659 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
660 return null;
661 } else {
662 String msg = "Couldn't setup connection for " +
663 UserGroupInformation.getLoginUser().getUserName() +
664 " to " + serverPrincipal;
665 LOG.warn(msg, ex);
666 throw (IOException) new IOException(msg).initCause(ex);
667 }
668 } else {
669 LOG.warn("Exception encountered while connecting to " +
670 "the server : " + ex);
671 }
672 if (ex instanceof RemoteException) {
673 throw (RemoteException)ex;
674 }
675 if (ex instanceof SaslException) {
676 String msg = "SASL authentication failed." +
677 " The most likely cause is missing or invalid credentials." +
678 " Consider 'kinit'.";
679 LOG.fatal(msg, ex);
680 throw new RuntimeException(msg, ex);
681 }
682 throw new IOException(ex);
683 }
684 });
685 }
686
687 protected synchronized void setupIOstreams() throws IOException {
688 if (socket != null) {
689
690 return;
691 }
692
693 if (shouldCloseConnection.get()){
694 throw new ConnectionClosingException("This connection is closing");
695 }
696
697 if (failedServers.isFailedServer(remoteId.getAddress())) {
698 if (LOG.isDebugEnabled()) {
699 LOG.debug("Not trying to connect to " + server +
700 " this server is in the failed servers list");
701 }
702 IOException e = new FailedServerException(
703 "This server is in the failed servers list: " + server);
704 markClosed(e);
705 close();
706 throw e;
707 }
708
709 try {
710 if (LOG.isDebugEnabled()) {
711 LOG.debug("Connecting to " + server);
712 }
713 short numRetries = 0;
714 final short MAX_RETRIES = 5;
715 Random rand = null;
716 while (true) {
717 setupConnection();
718 InputStream inStream = NetUtils.getInputStream(socket);
719
720 OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
721
722 writeConnectionHeaderPreamble(outStream);
723 if (useSasl) {
724 final InputStream in2 = inStream;
725 final OutputStream out2 = outStream;
726 UserGroupInformation ticket = remoteId.getTicket().getUGI();
727 if (authMethod == AuthMethod.KERBEROS) {
728 if (ticket != null && ticket.getRealUser() != null) {
729 ticket = ticket.getRealUser();
730 }
731 }
732 boolean continueSasl;
733 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
734 try {
735 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
736 @Override
737 public Boolean run() throws IOException {
738 return setupSaslConnection(in2, out2);
739 }
740 });
741 } catch (Exception ex) {
742 ExceptionUtil.rethrowIfInterrupt(ex);
743 if (rand == null) {
744 rand = new Random();
745 }
746 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
747 continue;
748 }
749 if (continueSasl) {
750
751 inStream = saslRpcClient.getInputStream(inStream);
752 outStream = saslRpcClient.getOutputStream(outStream);
753 } else {
754
755 authMethod = AuthMethod.SIMPLE;
756 useSasl = false;
757 }
758 }
759 this.in = new DataInputStream(new BufferedInputStream(inStream));
760 synchronized (this.outLock) {
761 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
762 }
763
764 writeConnectionHeader();
765
766
767 start();
768 return;
769 }
770 } catch (Throwable t) {
771 IOException e = ExceptionUtil.asInterrupt(t);
772 if (e == null) {
773 failedServers.addToFailedServers(remoteId.address);
774 if (t instanceof LinkageError) {
775
776 e = new DoNotRetryIOException(t);
777 } else if (t instanceof IOException) {
778 e = (IOException) t;
779 } else {
780 e = new IOException("Could not set up IO Streams to " + server, t);
781 }
782 }
783 markClosed(e);
784 close();
785 throw e;
786 }
787 }
788
789
790
791
792 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
793
794
795
796
797
798 int rpcHeaderLen = HConstants.RPC_HEADER.length;
799 byte [] preamble = new byte [rpcHeaderLen + 2];
800 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
801 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
802 preamble[rpcHeaderLen + 1] = authMethod.code;
803 outStream.write(preamble);
804 outStream.flush();
805 }
806
807
808
809
810 private synchronized void writeConnectionHeader() throws IOException {
811 synchronized (this.outLock) {
812 this.out.writeInt(this.header.getSerializedSize());
813 this.header.writeTo(this.out);
814 this.out.flush();
815 }
816 }
817
818
819 protected synchronized void close() {
820 if (!shouldCloseConnection.get()) {
821 LOG.error(getName() + ": the connection is not in the closed state");
822 return;
823 }
824
825
826
827 synchronized (connections) {
828 connections.removeValue(remoteId, this);
829 }
830
831
832 synchronized(this.outLock) {
833 if (this.out != null) {
834 IOUtils.closeStream(out);
835 this.out = null;
836 }
837 }
838 IOUtils.closeStream(in);
839 this.in = null;
840 if (this.socket != null) {
841 try {
842 this.socket.close();
843 this.socket = null;
844 } catch (IOException e) {
845 LOG.error("Error while closing socket", e);
846 }
847 }
848
849 disposeSasl();
850
851
852 if (LOG.isTraceEnabled()) {
853 LOG.trace(getName() + ": closing ipc connection to " + server);
854 }
855
856 cleanupCalls(true);
857
858 if (LOG.isTraceEnabled()) {
859 LOG.trace(getName() + ": ipc connection to " + server + " closed");
860 }
861 }
862
863 protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
864 TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
865 try {
866 writeRequest(call, priority, span);
867 } finally {
868 ts.close();
869 }
870 }
871
872
873
874
875
876
877
878 private void writeRequest(Call call, final int priority, Span span) throws IOException {
879 RequestHeader.Builder builder = RequestHeader.newBuilder();
880 builder.setCallId(call.id);
881 if (span != null) {
882 builder.setTraceInfo(
883 RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
884 }
885 builder.setMethodName(call.md.getName());
886 builder.setRequestParam(call.param != null);
887 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
888 if (cellBlock != null) {
889 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
890 cellBlockBuilder.setLength(cellBlock.limit());
891 builder.setCellBlockMeta(cellBlockBuilder.build());
892 }
893
894 if (priority != 0) builder.setPriority(priority);
895 RequestHeader header = builder.build();
896
897 setupIOstreams();
898
899
900
901
902 checkIsOpen();
903 IOException writeException = null;
904 synchronized (this.outLock) {
905 if (Thread.interrupted()) throw new InterruptedIOException();
906
907 calls.put(call.id, call);
908 checkIsOpen();
909
910 try {
911 IPCUtil.write(this.out, header, call.param, cellBlock);
912 } catch (IOException e) {
913
914
915 shouldCloseConnection.set(true);
916 writeException = e;
917 interrupt();
918 }
919 }
920
921
922 if (writeException != null) {
923 markClosed(writeException);
924 close();
925 }
926
927
928
929 synchronized (this) {
930 notifyAll();
931 }
932
933
934 if (writeException != null) throw writeException;
935 }
936
937
938
939
940 protected void readResponse() {
941 if (shouldCloseConnection.get()) return;
942 Call call = null;
943 boolean expectedCall = false;
944 try {
945
946
947 int totalSize = in.readInt();
948
949
950 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
951 int id = responseHeader.getCallId();
952 call = calls.remove(id);
953 expectedCall = (call != null && !call.done);
954 if (!expectedCall) {
955
956
957
958
959
960 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
961 int whatIsLeftToRead = totalSize - readSoFar;
962 IOUtils.skipFully(in, whatIsLeftToRead);
963 return;
964 }
965 if (responseHeader.hasException()) {
966 ExceptionResponse exceptionResponse = responseHeader.getException();
967 RemoteException re = createRemoteException(exceptionResponse);
968 call.setException(re);
969 if (isFatalConnectionException(exceptionResponse)) {
970 markClosed(re);
971 }
972 } else {
973 Message value = null;
974 if (call.responseDefaultType != null) {
975 Builder builder = call.responseDefaultType.newBuilderForType();
976 ProtobufUtil.mergeDelimitedFrom(builder, in);
977 value = builder.build();
978 }
979 CellScanner cellBlockScanner = null;
980 if (responseHeader.hasCellBlockMeta()) {
981 int size = responseHeader.getCellBlockMeta().getLength();
982 byte [] cellBlock = new byte[size];
983 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
984 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
985 }
986 call.setResponse(value, cellBlockScanner);
987 }
988 } catch (IOException e) {
989 if (expectedCall) call.setException(e);
990 if (e instanceof SocketTimeoutException) {
991
992
993
994 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
995 } else {
996
997 markClosed(e);
998 }
999 } finally {
1000 cleanupCalls(false);
1001 }
1002 }
1003
1004
1005
1006
1007 private boolean isFatalConnectionException(final ExceptionResponse e) {
1008 return e.getExceptionClassName().
1009 equals(FatalConnectionException.class.getName());
1010 }
1011
1012
1013
1014
1015
1016 private RemoteException createRemoteException(final ExceptionResponse e) {
1017 String innerExceptionClassName = e.getExceptionClassName();
1018 boolean doNotRetry = e.getDoNotRetry();
1019 return e.hasHostname()?
1020
1021 new RemoteWithExtrasException(innerExceptionClassName,
1022 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1023 new RemoteWithExtrasException(innerExceptionClassName,
1024 e.getStackTrace(), doNotRetry);
1025 }
1026
1027 protected synchronized boolean markClosed(IOException e) {
1028 if (e == null) throw new NullPointerException();
1029
1030 boolean ret = shouldCloseConnection.compareAndSet(false, true);
1031 if (ret) {
1032 if (LOG.isTraceEnabled()) {
1033 LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1034 }
1035 if (callSender != null) {
1036 callSender.close();
1037 }
1038 notifyAll();
1039 }
1040 return ret;
1041 }
1042
1043
1044
1045
1046
1047
1048 protected synchronized void cleanupCalls(boolean allCalls) {
1049 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1050 while (itor.hasNext()) {
1051 Call c = itor.next().getValue();
1052 if (c.done) {
1053
1054 itor.remove();
1055 } else if (allCalls) {
1056 long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1057 IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1058 + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1059 c.setException(ie);
1060 itor.remove();
1061 } else if (c.checkAndSetTimeout()) {
1062 itor.remove();
1063 } else {
1064
1065
1066
1067 break;
1068 }
1069 }
1070 }
1071 }
1072
1073
1074
1075
1076
1077
1078
1079 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1080 this(conf, clusterId, factory, null);
1081 }
1082
1083
1084
1085
1086
1087
1088
1089
1090 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1091 SocketAddress localAddr) {
1092 super(conf, clusterId, localAddr);
1093
1094 this.socketFactory = factory;
1095 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1096 this.failedServers = new FailedServers(conf);
1097 }
1098
1099
1100
1101
1102
1103
1104 public RpcClientImpl(Configuration conf, String clusterId) {
1105 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1106 }
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117 public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
1118 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1119 }
1120
1121
1122
1123 @Override
1124 public void close() {
1125 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1126 if (!running.compareAndSet(true, false)) return;
1127
1128 Set<Connection> connsToClose = null;
1129
1130 synchronized (connections) {
1131 for (Connection conn : connections.values()) {
1132 conn.interrupt();
1133 if (conn.callSender != null) {
1134 conn.callSender.interrupt();
1135 }
1136
1137
1138
1139 if (!conn.isAlive()) {
1140 if (connsToClose == null) {
1141 connsToClose = new HashSet<Connection>();
1142 }
1143 connsToClose.add(conn);
1144 }
1145 }
1146 }
1147 if (connsToClose != null) {
1148 for (Connection conn : connsToClose) {
1149 conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1150 conn.close();
1151 }
1152 }
1153
1154 while (!connections.isEmpty()) {
1155 try {
1156 Thread.sleep(10);
1157 } catch (InterruptedException e) {
1158 LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1159 " connections.");
1160 Thread.currentThread().interrupt();
1161 return;
1162 }
1163 }
1164 }
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178 @Override
1179 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1180 Message param, Message returnType, User ticket, InetSocketAddress addr)
1181 throws IOException, InterruptedException {
1182 if (pcrc == null) {
1183 pcrc = new PayloadCarryingRpcController();
1184 }
1185 CellScanner cells = pcrc.cellScanner();
1186
1187 final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1188 pcrc.getCallTimeout());
1189
1190 final Connection connection = getConnection(ticket, call, addr);
1191
1192 final CallFuture cts;
1193 if (connection.callSender != null) {
1194 cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1195 pcrc.notifyOnCancel(new RpcCallback<Object>() {
1196 @Override
1197 public void run(Object parameter) {
1198 connection.callSender.remove(cts);
1199 }
1200 });
1201 if (pcrc.isCanceled()) {
1202
1203 call.callComplete();
1204 return new Pair<Message, CellScanner>(call.response, call.cells);
1205 }
1206 } else {
1207 cts = null;
1208 connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1209 }
1210
1211 while (!call.done) {
1212 if (call.checkAndSetTimeout()) {
1213 if (cts != null) connection.callSender.remove(cts);
1214 break;
1215 }
1216 if (connection.shouldCloseConnection.get()) {
1217 throw new ConnectionClosingException("Call id=" + call.id +
1218 " on server " + addr + " aborted: connection is closing");
1219 }
1220 try {
1221 synchronized (call) {
1222 if (call.done) break;
1223 call.wait(Math.min(call.remainingTime(), 1000) + 1);
1224 }
1225 } catch (InterruptedException e) {
1226 call.setException(new InterruptedIOException());
1227 if (cts != null) connection.callSender.remove(cts);
1228 throw e;
1229 }
1230 }
1231
1232 if (call.error != null) {
1233 if (call.error instanceof RemoteException) {
1234 call.error.fillInStackTrace();
1235 throw call.error;
1236 }
1237
1238 throw wrapException(addr, call.error);
1239 }
1240
1241 return new Pair<Message, CellScanner>(call.response, call.cells);
1242 }
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253 @Override
1254 public void cancelConnections(ServerName sn) {
1255 synchronized (connections) {
1256 for (Connection connection : connections.values()) {
1257 if (connection.isAlive() &&
1258 connection.getRemoteAddress().getPort() == sn.getPort() &&
1259 connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1260 LOG.info("The server on " + sn.toString() +
1261 " is dead - stopping the connection " + connection.remoteId);
1262 connection.interrupt();
1263
1264 }
1265 }
1266 }
1267 }
1268
1269
1270
1271
1272
1273 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1274 throws IOException {
1275 if (!running.get()) throw new StoppedRpcClientException();
1276 Connection connection;
1277 ConnectionId remoteId =
1278 new ConnectionId(ticket, call.md.getService().getName(), addr);
1279 synchronized (connections) {
1280 connection = connections.get(remoteId);
1281 if (connection == null) {
1282 connection = createConnection(remoteId, this.codec, this.compressor);
1283 connections.put(remoteId, connection);
1284 }
1285 }
1286
1287 return connection;
1288 }
1289 }