1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Arrays;
48 import java.util.Collections;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Random;
55 import java.util.Set;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedDeque;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.locks.Lock;
62 import java.util.concurrent.locks.ReentrantLock;
63
64 import javax.security.sasl.Sasl;
65 import javax.security.sasl.SaslException;
66 import javax.security.sasl.SaslServer;
67
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.hbase.CallQueueTooBigException;
71 import org.apache.hadoop.hbase.classification.InterfaceAudience;
72 import org.apache.hadoop.hbase.classification.InterfaceStability;
73 import org.apache.hadoop.conf.Configuration;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.DoNotRetryIOException;
76 import org.apache.hadoop.hbase.HBaseIOException;
77 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78 import org.apache.hadoop.hbase.HConstants;
79 import org.apache.hadoop.hbase.HRegionInfo;
80 import org.apache.hadoop.hbase.Server;
81 import org.apache.hadoop.hbase.TableName;
82 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83 import org.apache.hadoop.hbase.client.Operation;
84 import org.apache.hadoop.hbase.client.VersionInfoUtil;
85 import org.apache.hadoop.hbase.codec.Codec;
86 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
94 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
95 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
96 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
97 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
98 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
99 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
100 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
101 import org.apache.hadoop.hbase.regionserver.HRegionServer;
102 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
103 import org.apache.hadoop.hbase.security.AccessDeniedException;
104 import org.apache.hadoop.hbase.security.AuthMethod;
105 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
109 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
110 import org.apache.hadoop.hbase.security.SaslStatus;
111 import org.apache.hadoop.hbase.security.SaslUtil;
112 import org.apache.hadoop.hbase.security.UserProvider;
113 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
114 import org.apache.hadoop.hbase.util.Bytes;
115 import org.apache.hadoop.hbase.util.Counter;
116 import org.apache.hadoop.hbase.util.Pair;
117 import org.apache.hadoop.io.BytesWritable;
118 import org.apache.hadoop.io.IntWritable;
119 import org.apache.hadoop.io.Writable;
120 import org.apache.hadoop.io.WritableUtils;
121 import org.apache.hadoop.io.compress.CompressionCodec;
122 import org.apache.hadoop.security.UserGroupInformation;
123 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
124 import org.apache.hadoop.security.authorize.AuthorizationException;
125 import org.apache.hadoop.security.authorize.PolicyProvider;
126 import org.apache.hadoop.security.authorize.ProxyUsers;
127 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
128 import org.apache.hadoop.security.token.SecretManager;
129 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
130 import org.apache.hadoop.security.token.TokenIdentifier;
131 import org.apache.hadoop.util.StringUtils;
132 import org.codehaus.jackson.map.ObjectMapper;
133 import org.apache.htrace.TraceInfo;
134
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 import com.google.protobuf.BlockingService;
137 import com.google.protobuf.CodedInputStream;
138 import com.google.protobuf.Descriptors.MethodDescriptor;
139 import com.google.protobuf.Message;
140 import com.google.protobuf.ServiceException;
141 import com.google.protobuf.TextFormat;
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
164 @InterfaceStability.Evolving
165 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
166
167 public static final Log LOG = LogFactory.getLog(RpcServer.class);
168 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
169 = new CallQueueTooBigException();
170
171 private final boolean authorize;
172 private boolean isSecurityEnabled;
173
174 public static final byte CURRENT_VERSION = 0;
175
176
177
178
179 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
180 "hbase.ipc.server.fallback-to-simple-auth-allowed";
181
182
183
184
185 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
186
187
188
189
190 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
191
192 private final IPCUtil ipcUtil;
193
194 private static final String AUTH_FAILED_FOR = "Auth failed for ";
195 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
196 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
197 Server.class.getName());
198 protected SecretManager<TokenIdentifier> secretManager;
199 protected ServiceAuthorizationManager authManager;
200
201
202
203
204 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
205
206
207 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
208 = new ThreadLocal<MonitoredRPCHandler>();
209
210 protected final InetSocketAddress bindAddress;
211 protected int port;
212 protected InetSocketAddress address;
213 private int readThreads;
214 protected int maxIdleTime;
215
216
217 protected int thresholdIdleConnections;
218
219
220
221 int maxConnectionsToNuke;
222
223
224
225 protected MetricsHBaseServer metrics;
226
227 protected final Configuration conf;
228
229 private int maxQueueSize;
230 protected int socketSendBufferSize;
231 protected final boolean tcpNoDelay;
232 protected final boolean tcpKeepAlive;
233 protected final long purgeTimeout;
234
235
236
237
238
239
240 volatile boolean running = true;
241
242
243
244
245
246 volatile boolean started = false;
247
248
249
250
251 protected final Counter callQueueSize = new Counter();
252
253 protected final List<Connection> connectionList =
254 Collections.synchronizedList(new LinkedList<Connection>());
255
256
257 private Listener listener = null;
258 protected Responder responder = null;
259 protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
260 protected int numConnections = 0;
261
262 protected HBaseRPCErrorHandler errorHandler = null;
263
264 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
265 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
266
267
268 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
269 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
270
271 private static final ObjectMapper MAPPER = new ObjectMapper();
272
273 private final int warnResponseTime;
274 private final int warnResponseSize;
275 private final Server server;
276 private final List<BlockingServiceAndInterface> services;
277
278 private final RpcScheduler scheduler;
279
280 private UserProvider userProvider;
281
282 private final BoundedByteBufferPool reservoir;
283
284 private volatile boolean allowFallbackToSimpleAuth;
285
286
287
288
289
290 private RSRpcServices rsRpcServices;
291
292
293
294
295
296 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
297 @InterfaceStability.Evolving
298 public class Call implements RpcCallContext {
299 protected int id;
300 protected BlockingService service;
301 protected MethodDescriptor md;
302 protected RequestHeader header;
303 protected Message param;
304
305 protected CellScanner cellScanner;
306 protected Connection connection;
307 protected long timestamp;
308
309
310
311
312 protected BufferChain response;
313 protected Responder responder;
314
315 protected long size;
316 protected boolean isError;
317 protected TraceInfo tinfo;
318 private ByteBuffer cellBlock = null;
319
320 private User user;
321 private InetAddress remoteAddress;
322
323 private long responseCellSize = 0;
324 private long responseBlockSize = 0;
325 private boolean retryImmediatelySupported;
326
327 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
328 justification="Can't figure why this complaint is happening... see below")
329 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
330 Message param, CellScanner cellScanner, Connection connection, Responder responder,
331 long size, TraceInfo tinfo, final InetAddress remoteAddress) {
332 this.id = id;
333 this.service = service;
334 this.md = md;
335 this.header = header;
336 this.param = param;
337 this.cellScanner = cellScanner;
338 this.connection = connection;
339 this.timestamp = System.currentTimeMillis();
340 this.response = null;
341 this.responder = responder;
342 this.isError = false;
343 this.size = size;
344 this.tinfo = tinfo;
345 this.user = connection == null? null: connection.user;
346 this.remoteAddress = remoteAddress;
347 this.retryImmediatelySupported =
348 connection == null? null: connection.retryImmediatelySupported;
349 }
350
351
352
353
354
355 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
356 justification="Presume the lock on processing request held by caller is protection enough")
357 void done() {
358 if (this.cellBlock != null && reservoir != null) {
359
360 reservoir.putBuffer(this.cellBlock);
361 this.cellBlock = null;
362 }
363 this.connection.decRpcCount();
364 }
365
366 @Override
367 public String toString() {
368 return toShortString() + " param: " +
369 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
370 " connection: " + connection.toString();
371 }
372
373 protected RequestHeader getHeader() {
374 return this.header;
375 }
376
377 public boolean hasPriority() {
378 return this.header.hasPriority();
379 }
380
381 public int getPriority() {
382 return this.header.getPriority();
383 }
384
385
386
387
388
389 String toShortString() {
390 String serviceName = this.connection.service != null ?
391 this.connection.service.getDescriptorForType().getName() : "null";
392 return "callId: " + this.id + " service: " + serviceName +
393 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
394 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
395 " connection: " + connection.toString();
396 }
397
398 String toTraceString() {
399 String serviceName = this.connection.service != null ?
400 this.connection.service.getDescriptorForType().getName() : "";
401 String methodName = (this.md != null) ? this.md.getName() : "";
402 return serviceName + "." + methodName;
403 }
404
405 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
406 this.response = new BufferChain(response);
407 }
408
409 protected synchronized void setResponse(Object m, final CellScanner cells,
410 Throwable t, String errorMsg) {
411 if (this.isError) return;
412 if (t != null) this.isError = true;
413 BufferChain bc = null;
414 try {
415 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
416
417 Message result = (Message)m;
418
419 headerBuilder.setCallId(this.id);
420 if (t != null) {
421 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
422 exceptionBuilder.setExceptionClassName(t.getClass().getName());
423 exceptionBuilder.setStackTrace(errorMsg);
424 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
425 t instanceof NeedUnmanagedConnectionException);
426 if (t instanceof RegionMovedException) {
427
428
429
430 RegionMovedException rme = (RegionMovedException)t;
431 exceptionBuilder.setHostname(rme.getHostname());
432 exceptionBuilder.setPort(rme.getPort());
433 }
434
435 headerBuilder.setException(exceptionBuilder.build());
436 }
437
438
439
440 this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
441 this.connection.compressionCodec, cells, reservoir);
442 if (this.cellBlock != null) {
443 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
444
445 cellBlockBuilder.setLength(this.cellBlock.limit());
446 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
447 }
448 Message header = headerBuilder.build();
449
450
451
452 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
453 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
454 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
455 (this.cellBlock == null? 0: this.cellBlock.limit());
456 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
457 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
458 if (connection.useWrap) {
459 bc = wrapWithSasl(bc);
460 }
461 } catch (IOException e) {
462 LOG.warn("Exception while creating response " + e);
463 }
464 this.response = bc;
465 }
466
467 private BufferChain wrapWithSasl(BufferChain bc)
468 throws IOException {
469 if (!this.connection.useSasl) return bc;
470
471
472 byte [] responseBytes = bc.getBytes();
473 byte [] token;
474
475
476 synchronized (connection.saslServer) {
477 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
478 }
479 if (LOG.isTraceEnabled()) {
480 LOG.trace("Adding saslServer wrapped token of size " + token.length
481 + " as call response.");
482 }
483
484 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
485 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
486 return new BufferChain(bbTokenLength, bbTokenBytes);
487 }
488
489 @Override
490 public boolean isClientCellBlockSupported() {
491 return this.connection != null && this.connection.codec != null;
492 }
493
494 @Override
495 public long disconnectSince() {
496 if (!connection.channel.isOpen()) {
497 return System.currentTimeMillis() - timestamp;
498 } else {
499 return -1L;
500 }
501 }
502
503 public long getSize() {
504 return this.size;
505 }
506
507 public long getResponseCellSize() {
508 return responseCellSize;
509 }
510
511 public void incrementResponseCellSize(long cellSize) {
512 responseCellSize += cellSize;
513 }
514
515 @Override
516 public long getResponseBlockSize() {
517 return responseBlockSize;
518 }
519
520 @Override
521 public void incrementResponseBlockSize(long blockSize) {
522 responseBlockSize += blockSize;
523 }
524
525 public synchronized void sendResponseIfReady() throws IOException {
526
527 this.param = null;
528 this.responder.doRespond(this);
529 }
530
531 public UserGroupInformation getRemoteUser() {
532 return connection.ugi;
533 }
534
535 @Override
536 public User getRequestUser() {
537 return user;
538 }
539
540 @Override
541 public String getRequestUserName() {
542 User user = getRequestUser();
543 return user == null? null: user.getShortName();
544 }
545
546 @Override
547 public InetAddress getRemoteAddress() {
548 return remoteAddress;
549 }
550
551 @Override
552 public VersionInfo getClientVersionInfo() {
553 return connection.getVersionInfo();
554 }
555
556 @Override
557 public boolean isRetryImmediatelySupported() {
558 return retryImmediatelySupported;
559 }
560 }
561
562
563 private class Listener extends Thread {
564
565 private ServerSocketChannel acceptChannel = null;
566 private Selector selector = null;
567 private Reader[] readers = null;
568 private int currentReader = 0;
569 private Random rand = new Random();
570 private long lastCleanupRunTime = 0;
571
572 private long cleanupInterval = 10000;
573
574 private int backlogLength;
575
576 private ExecutorService readPool;
577
578 public Listener(final String name) throws IOException {
579 super(name);
580 backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
581
582 acceptChannel = ServerSocketChannel.open();
583 acceptChannel.configureBlocking(false);
584
585
586 bind(acceptChannel.socket(), bindAddress, backlogLength);
587 port = acceptChannel.socket().getLocalPort();
588 address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
589
590
591 selector= Selector.open();
592
593 readers = new Reader[readThreads];
594 readPool = Executors.newFixedThreadPool(readThreads,
595 new ThreadFactoryBuilder().setNameFormat(
596 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
597 ",port=" + port).setDaemon(true).build());
598 for (int i = 0; i < readThreads; ++i) {
599 Reader reader = new Reader();
600 readers[i] = reader;
601 readPool.execute(reader);
602 }
603 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
604
605
606 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
607 this.setName("RpcServer.listener,port=" + port);
608 this.setDaemon(true);
609 }
610
611
612 private class Reader implements Runnable {
613 private volatile boolean adding = false;
614 private final Selector readSelector;
615
616 Reader() throws IOException {
617 this.readSelector = Selector.open();
618 }
619 @Override
620 public void run() {
621 try {
622 doRunLoop();
623 } finally {
624 try {
625 readSelector.close();
626 } catch (IOException ioe) {
627 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
628 }
629 }
630 }
631
632 private synchronized void doRunLoop() {
633 while (running) {
634 try {
635 readSelector.select();
636 while (adding) {
637 this.wait(1000);
638 }
639
640 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
641 while (iter.hasNext()) {
642 SelectionKey key = iter.next();
643 iter.remove();
644 if (key.isValid()) {
645 if (key.isReadable()) {
646 doRead(key);
647 }
648 }
649 }
650 } catch (InterruptedException e) {
651 LOG.debug("Interrupted while sleeping");
652 return;
653 } catch (IOException ex) {
654 LOG.info(getName() + ": IOException in Reader", ex);
655 } catch (OutOfMemoryError e) {
656 if (getErrorHandler() != null) {
657 if (getErrorHandler().checkOOME(e)) {
658 RpcServer.LOG.info(Thread.currentThread().getName()
659 + ": exiting on OutOfMemoryError");
660 return;
661 }
662 } else {
663
664 throw e;
665 }
666 }
667 }
668 }
669
670
671
672
673
674
675
676
677 public void startAdd() {
678 adding = true;
679 readSelector.wakeup();
680 }
681
682 public synchronized SelectionKey registerChannel(SocketChannel channel)
683 throws IOException {
684 return channel.register(readSelector, SelectionKey.OP_READ);
685 }
686
687 public synchronized void finishAdd() {
688 adding = false;
689 this.notify();
690 }
691 }
692
693
694
695
696
697
698
699
700 private void cleanupConnections(boolean force) {
701 if (force || numConnections > thresholdIdleConnections) {
702 long currentTime = System.currentTimeMillis();
703 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
704 return;
705 }
706 int start = 0;
707 int end = numConnections - 1;
708 if (!force) {
709 start = rand.nextInt() % numConnections;
710 end = rand.nextInt() % numConnections;
711 int temp;
712 if (end < start) {
713 temp = start;
714 start = end;
715 end = temp;
716 }
717 }
718 int i = start;
719 int numNuked = 0;
720 while (i <= end) {
721 Connection c;
722 synchronized (connectionList) {
723 try {
724 c = connectionList.get(i);
725 } catch (Exception e) {return;}
726 }
727 if (c.timedOut(currentTime)) {
728 if (LOG.isDebugEnabled())
729 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
730 closeConnection(c);
731 numNuked++;
732 end--;
733
734 c = null;
735 if (!force && numNuked == maxConnectionsToNuke) break;
736 }
737 else i++;
738 }
739 lastCleanupRunTime = System.currentTimeMillis();
740 }
741 }
742
743 @Override
744 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
745 justification="selector access is not synchronized; seems fine but concerned changing " +
746 "it will have per impact")
747 public void run() {
748 LOG.info(getName() + ": starting");
749 while (running) {
750 SelectionKey key = null;
751 try {
752 selector.select();
753 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
754 while (iter.hasNext()) {
755 key = iter.next();
756 iter.remove();
757 try {
758 if (key.isValid()) {
759 if (key.isAcceptable())
760 doAccept(key);
761 }
762 } catch (IOException ignored) {
763 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
764 }
765 key = null;
766 }
767 } catch (OutOfMemoryError e) {
768 if (errorHandler != null) {
769 if (errorHandler.checkOOME(e)) {
770 LOG.info(getName() + ": exiting on OutOfMemoryError");
771 closeCurrentConnection(key, e);
772 cleanupConnections(true);
773 return;
774 }
775 } else {
776
777
778
779 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
780 closeCurrentConnection(key, e);
781 cleanupConnections(true);
782 try {
783 Thread.sleep(60000);
784 } catch (InterruptedException ex) {
785 LOG.debug("Interrupted while sleeping");
786 return;
787 }
788 }
789 } catch (Exception e) {
790 closeCurrentConnection(key, e);
791 }
792 cleanupConnections(false);
793 }
794
795 LOG.info(getName() + ": stopping");
796
797 synchronized (this) {
798 try {
799 acceptChannel.close();
800 selector.close();
801 } catch (IOException ignored) {
802 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
803 }
804
805 selector= null;
806 acceptChannel= null;
807
808
809 while (!connectionList.isEmpty()) {
810 closeConnection(connectionList.remove(0));
811 }
812 }
813 }
814
815 private void closeCurrentConnection(SelectionKey key, Throwable e) {
816 if (key != null) {
817 Connection c = (Connection)key.attachment();
818 if (c != null) {
819 if (LOG.isDebugEnabled()) {
820 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
821 (e != null ? " on error " + e.getMessage() : ""));
822 }
823 closeConnection(c);
824 key.attach(null);
825 }
826 }
827 }
828
829 InetSocketAddress getAddress() {
830 return address;
831 }
832
833 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
834 Connection c;
835 ServerSocketChannel server = (ServerSocketChannel) key.channel();
836
837 SocketChannel channel;
838 while ((channel = server.accept()) != null) {
839 try {
840 channel.configureBlocking(false);
841 channel.socket().setTcpNoDelay(tcpNoDelay);
842 channel.socket().setKeepAlive(tcpKeepAlive);
843 } catch (IOException ioe) {
844 channel.close();
845 throw ioe;
846 }
847
848 Reader reader = getReader();
849 try {
850 reader.startAdd();
851 SelectionKey readKey = reader.registerChannel(channel);
852 c = getConnection(channel, System.currentTimeMillis());
853 readKey.attach(c);
854 synchronized (connectionList) {
855 connectionList.add(numConnections, c);
856 numConnections++;
857 }
858 if (LOG.isDebugEnabled())
859 LOG.debug(getName() + ": connection from " + c.toString() +
860 "; # active connections: " + numConnections);
861 } finally {
862 reader.finishAdd();
863 }
864 }
865 }
866
867 void doRead(SelectionKey key) throws InterruptedException {
868 int count;
869 Connection c = (Connection) key.attachment();
870 if (c == null) {
871 return;
872 }
873 c.setLastContact(System.currentTimeMillis());
874 try {
875 count = c.readAndProcess();
876
877 if (count > 0) {
878 c.setLastContact(System.currentTimeMillis());
879 }
880
881 } catch (InterruptedException ieo) {
882 throw ieo;
883 } catch (Exception e) {
884 if (LOG.isDebugEnabled()) {
885 LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
886 }
887 count = -1;
888 }
889 if (count < 0) {
890 if (LOG.isDebugEnabled()) {
891 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
892 " because read count=" + count +
893 ". Number of active connections: " + numConnections);
894 }
895 closeConnection(c);
896 }
897 }
898
899 synchronized void doStop() {
900 if (selector != null) {
901 selector.wakeup();
902 Thread.yield();
903 }
904 if (acceptChannel != null) {
905 try {
906 acceptChannel.socket().close();
907 } catch (IOException e) {
908 LOG.info(getName() + ": exception in closing listener socket. " + e);
909 }
910 }
911 readPool.shutdownNow();
912 }
913
914
915
916 Reader getReader() {
917 currentReader = (currentReader + 1) % readers.length;
918 return readers[currentReader];
919 }
920 }
921
922
923 protected class Responder extends Thread {
924 private final Selector writeSelector;
925 private final Set<Connection> writingCons =
926 Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
927
928 Responder() throws IOException {
929 this.setName("RpcServer.responder");
930 this.setDaemon(true);
931 writeSelector = Selector.open();
932 }
933
934 @Override
935 public void run() {
936 LOG.info(getName() + ": starting");
937 try {
938 doRunLoop();
939 } finally {
940 LOG.info(getName() + ": stopping");
941 try {
942 writeSelector.close();
943 } catch (IOException ioe) {
944 LOG.error(getName() + ": couldn't close write selector", ioe);
945 }
946 }
947 }
948
949
950
951
952
953 private void registerWrites() {
954 Iterator<Connection> it = writingCons.iterator();
955 while (it.hasNext()) {
956 Connection c = it.next();
957 it.remove();
958 SelectionKey sk = c.channel.keyFor(writeSelector);
959 try {
960 if (sk == null) {
961 try {
962 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
963 } catch (ClosedChannelException e) {
964
965 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
966 }
967 } else {
968 sk.interestOps(SelectionKey.OP_WRITE);
969 }
970 } catch (CancelledKeyException e) {
971
972 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
973 }
974 }
975 }
976
977
978
979
980 public void registerForWrite(Connection c) {
981 if (writingCons.add(c)) {
982 writeSelector.wakeup();
983 }
984 }
985
986 private void doRunLoop() {
987 long lastPurgeTime = 0;
988 while (running) {
989 try {
990 registerWrites();
991 int keyCt = writeSelector.select(purgeTimeout);
992 if (keyCt == 0) {
993 continue;
994 }
995
996 Set<SelectionKey> keys = writeSelector.selectedKeys();
997 Iterator<SelectionKey> iter = keys.iterator();
998 while (iter.hasNext()) {
999 SelectionKey key = iter.next();
1000 iter.remove();
1001 try {
1002 if (key.isValid() && key.isWritable()) {
1003 doAsyncWrite(key);
1004 }
1005 } catch (IOException e) {
1006 LOG.debug(getName() + ": asyncWrite", e);
1007 }
1008 }
1009
1010 lastPurgeTime = purge(lastPurgeTime);
1011
1012 } catch (OutOfMemoryError e) {
1013 if (errorHandler != null) {
1014 if (errorHandler.checkOOME(e)) {
1015 LOG.info(getName() + ": exiting on OutOfMemoryError");
1016 return;
1017 }
1018 } else {
1019
1020
1021
1022
1023
1024 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1025 try {
1026 Thread.sleep(60000);
1027 } catch (InterruptedException ex) {
1028 LOG.debug("Interrupted while sleeping");
1029 return;
1030 }
1031 }
1032 } catch (Exception e) {
1033 LOG.warn(getName() + ": exception in Responder " +
1034 StringUtils.stringifyException(e), e);
1035 }
1036 }
1037 LOG.info(getName() + ": stopped");
1038 }
1039
1040
1041
1042
1043
1044
1045 private long purge(long lastPurgeTime) {
1046 long now = System.currentTimeMillis();
1047 if (now < lastPurgeTime + purgeTimeout) {
1048 return lastPurgeTime;
1049 }
1050
1051 ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1052
1053 synchronized (writeSelector.keys()) {
1054 for (SelectionKey key : writeSelector.keys()) {
1055 Connection connection = (Connection) key.attachment();
1056 if (connection == null) {
1057 throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1058 }
1059 Call call = connection.responseQueue.peekFirst();
1060 if (call != null && now > call.timestamp + purgeTimeout) {
1061 conWithOldCalls.add(call.connection);
1062 }
1063 }
1064 }
1065
1066
1067 for (Connection connection : conWithOldCalls) {
1068 closeConnection(connection);
1069 }
1070
1071 return now;
1072 }
1073
1074 private void doAsyncWrite(SelectionKey key) throws IOException {
1075 Connection connection = (Connection) key.attachment();
1076 if (connection == null) {
1077 throw new IOException("doAsyncWrite: no connection");
1078 }
1079 if (key.channel() != connection.channel) {
1080 throw new IOException("doAsyncWrite: bad channel");
1081 }
1082
1083 if (processAllResponses(connection)) {
1084 try {
1085
1086
1087 key.interestOps(0);
1088 } catch (CancelledKeyException e) {
1089
1090
1091
1092
1093
1094 LOG.warn("Exception while changing ops : " + e);
1095 }
1096 }
1097 }
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107 private boolean processResponse(final Call call) throws IOException {
1108 boolean error = true;
1109 try {
1110
1111 long numBytes = channelWrite(call.connection.channel, call.response);
1112 if (numBytes < 0) {
1113 throw new HBaseIOException("Error writing on the socket " +
1114 "for the call:" + call.toShortString());
1115 }
1116 error = false;
1117 } finally {
1118 if (error) {
1119 LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1120 closeConnection(call.connection);
1121 }
1122 }
1123
1124 if (!call.response.hasRemaining()) {
1125 call.done();
1126 return true;
1127 } else {
1128 return false;
1129 }
1130 }
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140 private boolean processAllResponses(final Connection connection) throws IOException {
1141
1142 connection.responseWriteLock.lock();
1143 try {
1144 for (int i = 0; i < 20; i++) {
1145
1146 Call call = connection.responseQueue.pollFirst();
1147 if (call == null) {
1148 return true;
1149 }
1150 if (!processResponse(call)) {
1151 connection.responseQueue.addFirst(call);
1152 return false;
1153 }
1154 }
1155 } finally {
1156 connection.responseWriteLock.unlock();
1157 }
1158
1159 return connection.responseQueue.isEmpty();
1160 }
1161
1162
1163
1164
1165 void doRespond(Call call) throws IOException {
1166 boolean added = false;
1167
1168
1169
1170 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1171 try {
1172 if (call.connection.responseQueue.isEmpty()) {
1173
1174
1175 if (processResponse(call)) {
1176 return;
1177 }
1178
1179 call.connection.responseQueue.addFirst(call);
1180 added = true;
1181 }
1182 } finally {
1183 call.connection.responseWriteLock.unlock();
1184 }
1185 }
1186
1187 if (!added) {
1188 call.connection.responseQueue.addLast(call);
1189 }
1190 call.responder.registerForWrite(call.connection);
1191
1192
1193 call.timestamp = System.currentTimeMillis();
1194 }
1195 }
1196
1197
1198 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1199 value="VO_VOLATILE_INCREMENT",
1200 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1201 public class Connection {
1202
1203 private boolean connectionPreambleRead = false;
1204
1205 private boolean connectionHeaderRead = false;
1206 protected SocketChannel channel;
1207 private ByteBuffer data;
1208 private ByteBuffer dataLengthBuffer;
1209 private ByteBuffer preambleBuffer;
1210 protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1211 private final Lock responseWriteLock = new ReentrantLock();
1212 private Counter rpcCount = new Counter();
1213 private long lastContact;
1214 private InetAddress addr;
1215 protected Socket socket;
1216
1217
1218 protected String hostAddress;
1219 protected int remotePort;
1220 ConnectionHeader connectionHeader;
1221
1222
1223
1224 private Codec codec;
1225
1226
1227
1228 private CompressionCodec compressionCodec;
1229 BlockingService service;
1230
1231 private AuthMethod authMethod;
1232 private boolean saslContextEstablished;
1233 private boolean skipInitialSaslHandshake;
1234 private ByteBuffer unwrappedData;
1235
1236 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1237 boolean useSasl;
1238 SaslServer saslServer;
1239 private boolean useWrap = false;
1240
1241 private static final int AUTHORIZATION_FAILED_CALLID = -1;
1242 private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1243 null, null, this, null, 0, null, null);
1244 private ByteArrayOutputStream authFailedResponse =
1245 new ByteArrayOutputStream();
1246
1247 private static final int SASL_CALLID = -33;
1248 private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1249 0, null, null);
1250
1251
1252 private boolean authenticatedWithFallback;
1253
1254 private boolean retryImmediatelySupported = false;
1255
1256 public UserGroupInformation attemptingUser = null;
1257 protected User user = null;
1258 protected UserGroupInformation ugi = null;
1259
1260 public Connection(SocketChannel channel, long lastContact) {
1261 this.channel = channel;
1262 this.lastContact = lastContact;
1263 this.data = null;
1264 this.dataLengthBuffer = ByteBuffer.allocate(4);
1265 this.socket = channel.socket();
1266 this.addr = socket.getInetAddress();
1267 if (addr == null) {
1268 this.hostAddress = "*Unknown*";
1269 } else {
1270 this.hostAddress = addr.getHostAddress();
1271 }
1272 this.remotePort = socket.getPort();
1273 if (socketSendBufferSize != 0) {
1274 try {
1275 socket.setSendBufferSize(socketSendBufferSize);
1276 } catch (IOException e) {
1277 LOG.warn("Connection: unable to set socket send buffer size to " +
1278 socketSendBufferSize);
1279 }
1280 }
1281 }
1282
1283 @Override
1284 public String toString() {
1285 return getHostAddress() + ":" + remotePort;
1286 }
1287
1288 public String getHostAddress() {
1289 return hostAddress;
1290 }
1291
1292 public InetAddress getHostInetAddress() {
1293 return addr;
1294 }
1295
1296 public int getRemotePort() {
1297 return remotePort;
1298 }
1299
1300 public void setLastContact(long lastContact) {
1301 this.lastContact = lastContact;
1302 }
1303
1304 public VersionInfo getVersionInfo() {
1305 if (connectionHeader.hasVersionInfo()) {
1306 return connectionHeader.getVersionInfo();
1307 }
1308 return null;
1309 }
1310
1311
1312 private boolean isIdle() {
1313 return rpcCount.get() == 0;
1314 }
1315
1316
1317 protected void decRpcCount() {
1318 rpcCount.decrement();
1319 }
1320
1321
1322 protected void incRpcCount() {
1323 rpcCount.increment();
1324 }
1325
1326 protected boolean timedOut(long currentTime) {
1327 return isIdle() && currentTime - lastContact > maxIdleTime;
1328 }
1329
1330 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1331 throws IOException {
1332 UserGroupInformation authorizedUgi;
1333 if (authMethod == AuthMethod.DIGEST) {
1334 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1335 secretManager);
1336 authorizedUgi = tokenId.getUser();
1337 if (authorizedUgi == null) {
1338 throw new AccessDeniedException(
1339 "Can't retrieve username from tokenIdentifier.");
1340 }
1341 authorizedUgi.addTokenIdentifier(tokenId);
1342 } else {
1343 authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1344 }
1345 authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1346 return authorizedUgi;
1347 }
1348
1349 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1350 InterruptedException {
1351 if (saslContextEstablished) {
1352 if (LOG.isTraceEnabled())
1353 LOG.trace("Have read input token of size " + saslToken.length
1354 + " for processing by saslServer.unwrap()");
1355
1356 if (!useWrap) {
1357 processOneRpc(saslToken);
1358 } else {
1359 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1360 processUnwrappedData(plaintextData);
1361 }
1362 } else {
1363 byte[] replyToken;
1364 try {
1365 if (saslServer == null) {
1366 switch (authMethod) {
1367 case DIGEST:
1368 if (secretManager == null) {
1369 throw new AccessDeniedException(
1370 "Server is not configured to do DIGEST authentication.");
1371 }
1372 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1373 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1374 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1375 secretManager, this));
1376 break;
1377 default:
1378 UserGroupInformation current = UserGroupInformation.getCurrentUser();
1379 String fullName = current.getUserName();
1380 if (LOG.isDebugEnabled()) {
1381 LOG.debug("Kerberos principal name is " + fullName);
1382 }
1383 final String names[] = SaslUtil.splitKerberosName(fullName);
1384 if (names.length != 3) {
1385 throw new AccessDeniedException(
1386 "Kerberos principal name does NOT have the expected "
1387 + "hostname part: " + fullName);
1388 }
1389 current.doAs(new PrivilegedExceptionAction<Object>() {
1390 @Override
1391 public Object run() throws SaslException {
1392 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1393 .getMechanismName(), names[0], names[1],
1394 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1395 return null;
1396 }
1397 });
1398 }
1399 if (saslServer == null)
1400 throw new AccessDeniedException(
1401 "Unable to find SASL server implementation for "
1402 + authMethod.getMechanismName());
1403 if (LOG.isDebugEnabled()) {
1404 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1405 }
1406 }
1407 if (LOG.isDebugEnabled()) {
1408 LOG.debug("Have read input token of size " + saslToken.length
1409 + " for processing by saslServer.evaluateResponse()");
1410 }
1411 replyToken = saslServer.evaluateResponse(saslToken);
1412 } catch (IOException e) {
1413 IOException sendToClient = e;
1414 Throwable cause = e;
1415 while (cause != null) {
1416 if (cause instanceof InvalidToken) {
1417 sendToClient = (InvalidToken) cause;
1418 break;
1419 }
1420 cause = cause.getCause();
1421 }
1422 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1423 sendToClient.getLocalizedMessage());
1424 metrics.authenticationFailure();
1425 String clientIP = this.toString();
1426
1427 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1428 throw e;
1429 }
1430 if (replyToken != null) {
1431 if (LOG.isDebugEnabled()) {
1432 LOG.debug("Will send token of size " + replyToken.length
1433 + " from saslServer.");
1434 }
1435 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1436 null);
1437 }
1438 if (saslServer.isComplete()) {
1439 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1440 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1441 ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1442 if (LOG.isDebugEnabled()) {
1443 LOG.debug("SASL server context established. Authenticated client: "
1444 + ugi + ". Negotiated QoP is "
1445 + saslServer.getNegotiatedProperty(Sasl.QOP));
1446 }
1447 metrics.authenticationSuccess();
1448 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1449 saslContextEstablished = true;
1450 }
1451 }
1452 }
1453
1454
1455
1456
1457 private void doRawSaslReply(SaslStatus status, Writable rv,
1458 String errorClass, String error) throws IOException {
1459 ByteBufferOutputStream saslResponse = null;
1460 DataOutputStream out = null;
1461 try {
1462
1463
1464 saslResponse = new ByteBufferOutputStream(256);
1465 out = new DataOutputStream(saslResponse);
1466 out.writeInt(status.state);
1467 if (status == SaslStatus.SUCCESS) {
1468 rv.write(out);
1469 } else {
1470 WritableUtils.writeString(out, errorClass);
1471 WritableUtils.writeString(out, error);
1472 }
1473 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1474 saslCall.responder = responder;
1475 saslCall.sendResponseIfReady();
1476 } finally {
1477 if (saslResponse != null) {
1478 saslResponse.close();
1479 }
1480 if (out != null) {
1481 out.close();
1482 }
1483 }
1484 }
1485
1486 private void disposeSasl() {
1487 if (saslServer != null) {
1488 try {
1489 saslServer.dispose();
1490 saslServer = null;
1491 } catch (SaslException ignored) {
1492
1493 }
1494 }
1495 }
1496
1497 private int readPreamble() throws IOException {
1498 if (preambleBuffer == null) {
1499 preambleBuffer = ByteBuffer.allocate(6);
1500 }
1501 int count = channelRead(channel, preambleBuffer);
1502 if (count < 0 || preambleBuffer.remaining() > 0) {
1503 return count;
1504 }
1505
1506 preambleBuffer.flip();
1507 for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
1508 if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
1509 return doBadPreambleHandling("Expected HEADER=" +
1510 Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
1511 Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
1512 " from " + toString());
1513 }
1514 }
1515 int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
1516 byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
1517 this.authMethod = AuthMethod.valueOf(authbyte);
1518 if (version != CURRENT_VERSION) {
1519 String msg = getFatalConnectionString(version, authbyte);
1520 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1521 }
1522 if (authMethod == null) {
1523 String msg = getFatalConnectionString(version, authbyte);
1524 return doBadPreambleHandling(msg, new BadAuthException(msg));
1525 }
1526 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1527 if (allowFallbackToSimpleAuth) {
1528 metrics.authenticationFallback();
1529 authenticatedWithFallback = true;
1530 } else {
1531 AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1532 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1533 responder.doRespond(authFailedCall);
1534 throw ae;
1535 }
1536 }
1537 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1538 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1539 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1540 authMethod = AuthMethod.SIMPLE;
1541
1542
1543
1544 skipInitialSaslHandshake = true;
1545 }
1546 if (authMethod != AuthMethod.SIMPLE) {
1547 useSasl = true;
1548 }
1549
1550 preambleBuffer = null;
1551 connectionPreambleRead = true;
1552 return count;
1553 }
1554
1555 private int read4Bytes() throws IOException {
1556 if (this.dataLengthBuffer.remaining() > 0) {
1557 return channelRead(channel, this.dataLengthBuffer);
1558 } else {
1559 return 0;
1560 }
1561 }
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571 public int readAndProcess() throws IOException, InterruptedException {
1572
1573 if (!connectionPreambleRead) {
1574 int count = readPreamble();
1575 if (!connectionPreambleRead) {
1576 return count;
1577 }
1578 }
1579
1580
1581 int count = read4Bytes();
1582 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1583 return count;
1584 }
1585
1586
1587 if (!connectionPreambleRead) {
1588 count = readPreamble();
1589 if (!connectionPreambleRead) {
1590 return count;
1591 }
1592
1593 count = read4Bytes();
1594 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1595 return count;
1596 }
1597 }
1598
1599 final boolean useWrap = this.useWrap;
1600
1601
1602
1603 synchronized(this) {
1604
1605
1606 if (data == null) {
1607 dataLengthBuffer.flip();
1608 int dataLength = dataLengthBuffer.getInt();
1609 if (dataLength == RpcClient.PING_CALL_ID) {
1610 if (!useWrap) {
1611 dataLengthBuffer.clear();
1612 return 0;
1613 }
1614 }
1615
1616 if (dataLength < 0) {
1617 throw new IllegalArgumentException("Unexpected data length "
1618 + dataLength + "!! from " + getHostAddress());
1619 }
1620 data = ByteBuffer.allocate(dataLength);
1621
1622
1623
1624
1625 incRpcCount();
1626 }
1627
1628 count = channelRead(channel, data);
1629
1630 if (count >= 0 && data.remaining() == 0) {
1631 process();
1632 }
1633 }
1634
1635 return count;
1636 }
1637
1638
1639
1640
1641 private void process() throws IOException, InterruptedException {
1642 data.flip();
1643 try {
1644 if (skipInitialSaslHandshake) {
1645 skipInitialSaslHandshake = false;
1646 return;
1647 }
1648
1649 if (useSasl) {
1650 saslReadAndProcess(data.array());
1651 } else {
1652 processOneRpc(data.array());
1653 }
1654
1655 } finally {
1656 dataLengthBuffer.clear();
1657 data = null;
1658 }
1659 }
1660
1661 private String getFatalConnectionString(final int version, final byte authByte) {
1662 return "serverVersion=" + CURRENT_VERSION +
1663 ", clientVersion=" + version + ", authMethod=" + authByte +
1664 ", authSupported=" + (authMethod != null) + " from " + toString();
1665 }
1666
1667 private int doBadPreambleHandling(final String msg) throws IOException {
1668 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1669 }
1670
1671 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1672 LOG.warn(msg);
1673 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1674 setupResponse(null, fakeCall, e, msg);
1675 responder.doRespond(fakeCall);
1676
1677 return -1;
1678 }
1679
1680
1681 private void processConnectionHeader(byte[] buf) throws IOException {
1682 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1683 String serviceName = connectionHeader.getServiceName();
1684 if (serviceName == null) throw new EmptyServiceNameException();
1685 this.service = getService(services, serviceName);
1686 if (this.service == null) throw new UnknownServiceException(serviceName);
1687 setupCellBlockCodecs(this.connectionHeader);
1688 UserGroupInformation protocolUser = createUser(connectionHeader);
1689 if (!useSasl) {
1690 ugi = protocolUser;
1691 if (ugi != null) {
1692 ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1693 }
1694
1695 if (authenticatedWithFallback) {
1696 LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1697 + " connecting from " + getHostAddress());
1698 }
1699 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1700 } else {
1701
1702 ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1703
1704
1705
1706 if ((protocolUser != null)
1707 && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1708 if (authMethod == AuthMethod.DIGEST) {
1709
1710 throw new AccessDeniedException("Authenticated user (" + ugi
1711 + ") doesn't match what the client claims to be ("
1712 + protocolUser + ")");
1713 } else {
1714
1715
1716
1717 UserGroupInformation realUser = ugi;
1718 ugi = UserGroupInformation.createProxyUser(protocolUser
1719 .getUserName(), realUser);
1720
1721 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1722 }
1723 }
1724 }
1725 if (connectionHeader.hasVersionInfo()) {
1726
1727 retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1728
1729 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1730 + " with version info: "
1731 + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1732 } else {
1733 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1734 + " with unknown version info");
1735 }
1736
1737
1738 }
1739
1740
1741
1742
1743
1744 private void setupCellBlockCodecs(final ConnectionHeader header)
1745 throws FatalConnectionException {
1746
1747 if (!header.hasCellBlockCodecClass()) return;
1748 String className = header.getCellBlockCodecClass();
1749 if (className == null || className.length() == 0) return;
1750 try {
1751 this.codec = (Codec)Class.forName(className).newInstance();
1752 } catch (Exception e) {
1753 throw new UnsupportedCellCodecException(className, e);
1754 }
1755 if (!header.hasCellBlockCompressorClass()) return;
1756 className = header.getCellBlockCompressorClass();
1757 try {
1758 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1759 } catch (Exception e) {
1760 throw new UnsupportedCompressionCodecException(className, e);
1761 }
1762 }
1763
1764 private void processUnwrappedData(byte[] inBuf) throws IOException,
1765 InterruptedException {
1766 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1767
1768 while (true) {
1769 int count;
1770 if (unwrappedDataLengthBuffer.remaining() > 0) {
1771 count = channelRead(ch, unwrappedDataLengthBuffer);
1772 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1773 return;
1774 }
1775
1776 if (unwrappedData == null) {
1777 unwrappedDataLengthBuffer.flip();
1778 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1779
1780 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1781 if (LOG.isDebugEnabled())
1782 LOG.debug("Received ping message");
1783 unwrappedDataLengthBuffer.clear();
1784 continue;
1785 }
1786 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1787 }
1788
1789 count = channelRead(ch, unwrappedData);
1790 if (count <= 0 || unwrappedData.remaining() > 0)
1791 return;
1792
1793 if (unwrappedData.remaining() == 0) {
1794 unwrappedDataLengthBuffer.clear();
1795 unwrappedData.flip();
1796 processOneRpc(unwrappedData.array());
1797 unwrappedData = null;
1798 }
1799 }
1800 }
1801
1802 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1803 if (connectionHeaderRead) {
1804 processRequest(buf);
1805 } else {
1806 processConnectionHeader(buf);
1807 this.connectionHeaderRead = true;
1808 if (!authorizeConnection()) {
1809
1810
1811 throw new AccessDeniedException("Connection from " + this + " for service " +
1812 connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1813 }
1814 this.user = userProvider.create(this.ugi);
1815 }
1816 }
1817
1818
1819
1820
1821
1822
1823
1824 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1825 long totalRequestSize = buf.length;
1826 int offset = 0;
1827
1828
1829 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1830 int headerSize = cis.readRawVarint32();
1831 offset = cis.getTotalBytesRead();
1832 Message.Builder builder = RequestHeader.newBuilder();
1833 ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1834 RequestHeader header = (RequestHeader) builder.build();
1835 offset += headerSize;
1836 int id = header.getCallId();
1837 if (LOG.isTraceEnabled()) {
1838 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1839 " totalRequestSize: " + totalRequestSize + " bytes");
1840 }
1841
1842
1843 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1844 final Call callTooBig =
1845 new Call(id, this.service, null, null, null, null, this,
1846 responder, totalRequestSize, null, null);
1847 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1848 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1849 setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1850 "Call queue is full on " + server.getServerName() +
1851 ", is hbase.ipc.server.max.callqueue.size too small?");
1852 responder.doRespond(callTooBig);
1853 return;
1854 }
1855 MethodDescriptor md = null;
1856 Message param = null;
1857 CellScanner cellScanner = null;
1858 try {
1859 if (header.hasRequestParam() && header.getRequestParam()) {
1860 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1861 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1862 builder = this.service.getRequestPrototype(md).newBuilderForType();
1863
1864 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1865 int paramSize = cis.readRawVarint32();
1866 offset += cis.getTotalBytesRead();
1867 if (builder != null) {
1868 ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1869 param = builder.build();
1870 }
1871 offset += paramSize;
1872 }
1873 if (header.hasCellBlockMeta()) {
1874 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1875 buf, offset, buf.length);
1876 }
1877 } catch (Throwable t) {
1878 InetSocketAddress address = getListenerAddress();
1879 String msg = (address != null ? address : "(channel closed)") +
1880 " is unable to read call parameter from client " + getHostAddress();
1881 LOG.warn(msg, t);
1882
1883 metrics.exception(t);
1884
1885
1886 if (t instanceof LinkageError) {
1887 t = new DoNotRetryIOException(t);
1888 }
1889
1890 if (t instanceof UnsupportedOperationException) {
1891 t = new DoNotRetryIOException(t);
1892 }
1893
1894 final Call readParamsFailedCall =
1895 new Call(id, this.service, null, null, null, null, this,
1896 responder, totalRequestSize, null, null);
1897 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1898 setupResponse(responseBuffer, readParamsFailedCall, t,
1899 msg + "; " + t.getMessage());
1900 responder.doRespond(readParamsFailedCall);
1901 return;
1902 }
1903
1904 TraceInfo traceInfo = header.hasTraceInfo()
1905 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1906 : null;
1907 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1908 totalRequestSize, traceInfo, this.addr);
1909
1910 if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1911 callQueueSize.add(-1 * call.getSize());
1912
1913 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1914 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1915 setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1916 "Call queue is full on " + server.getServerName() +
1917 ", too many items queued ?");
1918 responder.doRespond(call);
1919 }
1920 }
1921
1922 private boolean authorizeConnection() throws IOException {
1923 try {
1924
1925
1926
1927
1928 if (ugi != null && ugi.getRealUser() != null
1929 && (authMethod != AuthMethod.DIGEST)) {
1930 ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1931 }
1932 authorize(ugi, connectionHeader, getHostInetAddress());
1933 metrics.authorizationSuccess();
1934 } catch (AuthorizationException ae) {
1935 if (LOG.isDebugEnabled()) {
1936 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1937 }
1938 metrics.authorizationFailure();
1939 setupResponse(authFailedResponse, authFailedCall,
1940 new AccessDeniedException(ae), ae.getMessage());
1941 responder.doRespond(authFailedCall);
1942 return false;
1943 }
1944 return true;
1945 }
1946
1947 protected synchronized void close() {
1948 disposeSasl();
1949 data = null;
1950 if (!channel.isOpen())
1951 return;
1952 try {socket.shutdownOutput();} catch(Exception ignored) {
1953 if (LOG.isTraceEnabled()) {
1954 LOG.trace(ignored);
1955 }
1956 }
1957 if (channel.isOpen()) {
1958 try {channel.close();} catch(Exception ignored) {
1959 if (LOG.isTraceEnabled()) {
1960 LOG.trace(ignored);
1961 }
1962 }
1963 }
1964 try {socket.close();} catch(Exception ignored) {
1965 if (LOG.isTraceEnabled()) {
1966 LOG.trace(ignored);
1967 }
1968 }
1969 }
1970
1971 private UserGroupInformation createUser(ConnectionHeader head) {
1972 UserGroupInformation ugi = null;
1973
1974 if (!head.hasUserInfo()) {
1975 return null;
1976 }
1977 UserInformation userInfoProto = head.getUserInfo();
1978 String effectiveUser = null;
1979 if (userInfoProto.hasEffectiveUser()) {
1980 effectiveUser = userInfoProto.getEffectiveUser();
1981 }
1982 String realUser = null;
1983 if (userInfoProto.hasRealUser()) {
1984 realUser = userInfoProto.getRealUser();
1985 }
1986 if (effectiveUser != null) {
1987 if (realUser != null) {
1988 UserGroupInformation realUserUgi =
1989 UserGroupInformation.createRemoteUser(realUser);
1990 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1991 } else {
1992 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1993 }
1994 }
1995 return ugi;
1996 }
1997 }
1998
1999
2000
2001
2002
2003
2004
2005 public static class BlockingServiceAndInterface {
2006 private final BlockingService service;
2007 private final Class<?> serviceInterface;
2008 public BlockingServiceAndInterface(final BlockingService service,
2009 final Class<?> serviceInterface) {
2010 this.service = service;
2011 this.serviceInterface = serviceInterface;
2012 }
2013 public Class<?> getServiceInterface() {
2014 return this.serviceInterface;
2015 }
2016 public BlockingService getBlockingService() {
2017 return this.service;
2018 }
2019 }
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031 public RpcServer(final Server server, final String name,
2032 final List<BlockingServiceAndInterface> services,
2033 final InetSocketAddress bindAddress, Configuration conf,
2034 RpcScheduler scheduler)
2035 throws IOException {
2036
2037 if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2038 this.reservoir = new BoundedByteBufferPool(
2039 conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2040 conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2041
2042 conf.getInt("hbase.ipc.server.reservoir.initial.max",
2043 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2044 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2045 } else {
2046 reservoir = null;
2047 }
2048
2049 this.server = server;
2050 this.services = services;
2051 this.bindAddress = bindAddress;
2052 this.conf = conf;
2053 this.socketSendBufferSize = 0;
2054 this.maxQueueSize =
2055 this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2056 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2057 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2058 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2059 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2060 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2061 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2062 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2063 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2064
2065
2066 listener = new Listener(name);
2067 this.port = listener.getAddress().getPort();
2068
2069 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2070 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2071 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2072
2073 this.ipcUtil = new IPCUtil(conf);
2074
2075
2076 responder = new Responder();
2077 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2078 this.userProvider = UserProvider.instantiate(conf);
2079 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2080 if (isSecurityEnabled) {
2081 HBaseSaslRpcServer.init(conf);
2082 }
2083 initReconfigurable(conf);
2084
2085 this.scheduler = scheduler;
2086 this.scheduler.init(new RpcSchedulerContext(this));
2087 }
2088
2089 @Override
2090 public void onConfigurationChange(Configuration newConf) {
2091 initReconfigurable(newConf);
2092 }
2093
2094 private void initReconfigurable(Configuration confToLoad) {
2095 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2096 if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2097 LOG.warn("********* WARNING! *********");
2098 LOG.warn("This server is configured to allow connections from INSECURE clients");
2099 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2100 LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2101 LOG.warn("impersonation is possible!");
2102 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2103 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2104 LOG.warn("****************************");
2105 }
2106 }
2107
2108
2109
2110
2111
2112 protected Connection getConnection(SocketChannel channel, long time) {
2113 return new Connection(channel, time);
2114 }
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2125 throws IOException {
2126 if (response != null) response.reset();
2127 call.setResponse(null, null, t, error);
2128 }
2129
2130 protected void closeConnection(Connection connection) {
2131 synchronized (connectionList) {
2132 if (connectionList.remove(connection)) {
2133 numConnections--;
2134 }
2135 }
2136 connection.close();
2137 }
2138
2139 Configuration getConf() {
2140 return conf;
2141 }
2142
2143
2144
2145
2146 @Override
2147 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2148
2149 @Override
2150 public boolean isStarted() {
2151 return this.started;
2152 }
2153
2154
2155 @Override
2156 public synchronized void start() {
2157 if (started) return;
2158 authTokenSecretMgr = createSecretManager();
2159 if (authTokenSecretMgr != null) {
2160 setSecretManager(authTokenSecretMgr);
2161 authTokenSecretMgr.start();
2162 }
2163 this.authManager = new ServiceAuthorizationManager();
2164 HBasePolicyProvider.init(conf, authManager);
2165 responder.start();
2166 listener.start();
2167 scheduler.start();
2168 started = true;
2169 }
2170
2171 @Override
2172 public synchronized void refreshAuthManager(PolicyProvider pp) {
2173
2174
2175 this.authManager.refresh(this.conf, pp);
2176 }
2177
2178 private AuthenticationTokenSecretManager createSecretManager() {
2179 if (!isSecurityEnabled) return null;
2180 if (server == null) return null;
2181 Configuration conf = server.getConfiguration();
2182 long keyUpdateInterval =
2183 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2184 long maxAge =
2185 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2186 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2187 server.getServerName().toString(), keyUpdateInterval, maxAge);
2188 }
2189
2190 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2191 return this.secretManager;
2192 }
2193
2194 @SuppressWarnings("unchecked")
2195 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2196 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2197 }
2198
2199
2200
2201
2202
2203
2204 @Override
2205 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2206 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2207 throws IOException {
2208 try {
2209 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2210
2211 status.setRPCPacket(param);
2212 status.resume("Servicing call");
2213
2214 long startTime = System.currentTimeMillis();
2215 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2216 Message result = service.callBlockingMethod(md, controller, param);
2217 long endTime = System.currentTimeMillis();
2218 int processingTime = (int) (endTime - startTime);
2219 int qTime = (int) (startTime - receiveTime);
2220 int totalTime = (int) (endTime - receiveTime);
2221 if (LOG.isTraceEnabled()) {
2222 LOG.trace(CurCall.get().toString() +
2223 ", response " + TextFormat.shortDebugString(result) +
2224 " queueTime: " + qTime +
2225 " processingTime: " + processingTime +
2226 " totalTime: " + totalTime);
2227 }
2228 long requestSize = param.getSerializedSize();
2229 long responseSize = result.getSerializedSize();
2230 metrics.dequeuedCall(qTime);
2231 metrics.processedCall(processingTime);
2232 metrics.totalCall(totalTime);
2233 metrics.receivedRequest(requestSize);
2234 metrics.sentResponse(responseSize);
2235
2236
2237 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2238 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2239 if (tooSlow || tooLarge) {
2240
2241
2242 logResponse(param,
2243 md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2244 (tooLarge ? "TooLarge" : "TooSlow"),
2245 status.getClient(), startTime, processingTime, qTime,
2246 responseSize);
2247 }
2248 return new Pair<Message, CellScanner>(result, controller.cellScanner());
2249 } catch (Throwable e) {
2250
2251
2252
2253 if (e instanceof ServiceException) {
2254 if (e.getCause() == null) {
2255 LOG.debug("Caught a ServiceException with null cause", e);
2256 } else {
2257 e = e.getCause();
2258 }
2259 }
2260
2261
2262 metrics.exception(e);
2263
2264 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2265 if (e instanceof IOException) throw (IOException)e;
2266 LOG.error("Unexpected throwable object ", e);
2267 throw new IOException(e.getMessage(), e);
2268 }
2269 }
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285 void logResponse(Message param, String methodName, String call, String tag,
2286 String clientAddress, long startTime, int processingTime, int qTime,
2287 long responseSize)
2288 throws IOException {
2289
2290 Map<String, Object> responseInfo = new HashMap<String, Object>();
2291 responseInfo.put("starttimems", startTime);
2292 responseInfo.put("processingtimems", processingTime);
2293 responseInfo.put("queuetimems", qTime);
2294 responseInfo.put("responsesize", responseSize);
2295 responseInfo.put("client", clientAddress);
2296 responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2297 responseInfo.put("method", methodName);
2298 responseInfo.put("call", call);
2299
2300 String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
2301 if (stringifiedParam.length() > 150) {
2302
2303 stringifiedParam = stringifiedParam.subSequence(
2304 0, LOG.isTraceEnabled() ? 1000 : 150) + " <TRUNCATED>";
2305 }
2306 responseInfo.put("param", stringifiedParam);
2307 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
2308 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
2309 if (request.hasScannerId()) {
2310 long scannerId = request.getScannerId();
2311 String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
2312 if (scanDetails != null) {
2313 responseInfo.put("scandetails", scanDetails);
2314 }
2315 }
2316 }
2317 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2318 }
2319
2320
2321 @Override
2322 public synchronized void stop() {
2323 LOG.info("Stopping server on " + port);
2324 running = false;
2325 if (authTokenSecretMgr != null) {
2326 authTokenSecretMgr.stop();
2327 authTokenSecretMgr = null;
2328 }
2329 listener.interrupt();
2330 listener.doStop();
2331 responder.interrupt();
2332 scheduler.stop();
2333 notifyAll();
2334 }
2335
2336
2337
2338
2339
2340
2341 @Override
2342 public synchronized void join() throws InterruptedException {
2343 while (running) {
2344 wait();
2345 }
2346 }
2347
2348
2349
2350
2351
2352
2353
2354 @Override
2355 public synchronized InetSocketAddress getListenerAddress() {
2356 if (listener == null) {
2357 return null;
2358 }
2359 return listener.getAddress();
2360 }
2361
2362
2363
2364
2365
2366 @Override
2367 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2368 this.errorHandler = handler;
2369 }
2370
2371 @Override
2372 public HBaseRPCErrorHandler getErrorHandler() {
2373 return this.errorHandler;
2374 }
2375
2376
2377
2378
2379 @Override
2380 public MetricsHBaseServer getMetrics() {
2381 return metrics;
2382 }
2383
2384 @Override
2385 public void addCallSize(final long diff) {
2386 this.callQueueSize.add(diff);
2387 }
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2399 InetAddress addr)
2400 throws AuthorizationException {
2401 if (authorize) {
2402 Class<?> c = getServiceInterface(services, connection.getServiceName());
2403 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2404 }
2405 }
2406
2407
2408
2409
2410
2411
2412 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2429 throws IOException {
2430 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2431 if (count > 0) this.metrics.sentBytes(count);
2432 return count;
2433 }
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447 protected int channelRead(ReadableByteChannel channel,
2448 ByteBuffer buffer) throws IOException {
2449
2450 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2451 channel.read(buffer) : channelIO(channel, null, buffer);
2452 if (count > 0) {
2453 metrics.receivedBytes(count);
2454 }
2455 return count;
2456 }
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471 private static int channelIO(ReadableByteChannel readCh,
2472 WritableByteChannel writeCh,
2473 ByteBuffer buf) throws IOException {
2474
2475 int originalLimit = buf.limit();
2476 int initialRemaining = buf.remaining();
2477 int ret = 0;
2478
2479 while (buf.remaining() > 0) {
2480 try {
2481 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2482 buf.limit(buf.position() + ioSize);
2483
2484 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2485
2486 if (ret < ioSize) {
2487 break;
2488 }
2489
2490 } finally {
2491 buf.limit(originalLimit);
2492 }
2493 }
2494
2495 int nBytes = initialRemaining - buf.remaining();
2496 return (nBytes > 0) ? nBytes : ret;
2497 }
2498
2499
2500
2501
2502
2503
2504
2505 public static RpcCallContext getCurrentCall() {
2506 return CurCall.get();
2507 }
2508
2509 public static boolean isInRpcCallContext() {
2510 return CurCall.get() != null;
2511 }
2512
2513
2514
2515
2516
2517
2518 public static User getRequestUser() {
2519 RpcCallContext ctx = getCurrentCall();
2520 return ctx == null? null: ctx.getRequestUser();
2521 }
2522
2523
2524
2525
2526
2527 public static String getRequestUserName() {
2528 User user = getRequestUser();
2529 return user == null? null: user.getShortName();
2530 }
2531
2532
2533
2534
2535 public static InetAddress getRemoteAddress() {
2536 RpcCallContext ctx = getCurrentCall();
2537 return ctx == null? null: ctx.getRemoteAddress();
2538 }
2539
2540
2541
2542
2543
2544
2545 static BlockingServiceAndInterface getServiceAndInterface(
2546 final List<BlockingServiceAndInterface> services, final String serviceName) {
2547 for (BlockingServiceAndInterface bs : services) {
2548 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2549 return bs;
2550 }
2551 }
2552 return null;
2553 }
2554
2555
2556
2557
2558
2559
2560 static Class<?> getServiceInterface(
2561 final List<BlockingServiceAndInterface> services,
2562 final String serviceName) {
2563 BlockingServiceAndInterface bsasi =
2564 getServiceAndInterface(services, serviceName);
2565 return bsasi == null? null: bsasi.getServiceInterface();
2566 }
2567
2568
2569
2570
2571
2572
2573 static BlockingService getService(
2574 final List<BlockingServiceAndInterface> services,
2575 final String serviceName) {
2576 BlockingServiceAndInterface bsasi =
2577 getServiceAndInterface(services, serviceName);
2578 return bsasi == null? null: bsasi.getBlockingService();
2579 }
2580
2581 static MonitoredRPCHandler getStatus() {
2582
2583 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2584 if (status != null) {
2585 return status;
2586 }
2587 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2588 status.pause("Waiting for a call");
2589 RpcServer.MONITORED_RPC.set(status);
2590 return status;
2591 }
2592
2593
2594
2595
2596
2597 public static InetAddress getRemoteIp() {
2598 Call call = CurCall.get();
2599 if (call != null && call.connection != null && call.connection.socket != null) {
2600 return call.connection.socket.getInetAddress();
2601 }
2602 return null;
2603 }
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616 public static void bind(ServerSocket socket, InetSocketAddress address,
2617 int backlog) throws IOException {
2618 try {
2619 socket.bind(address, backlog);
2620 } catch (BindException e) {
2621 BindException bindException =
2622 new BindException("Problem binding to " + address + " : " +
2623 e.getMessage());
2624 bindException.initCause(e);
2625 throw bindException;
2626 } catch (SocketException e) {
2627
2628
2629 if ("Unresolved address".equals(e.getMessage())) {
2630 throw new UnknownHostException("Invalid hostname for server: " +
2631 address.getHostName());
2632 }
2633 throw e;
2634 }
2635 }
2636
2637 @Override
2638 public RpcScheduler getScheduler() {
2639 return scheduler;
2640 }
2641
2642 @Override
2643 public void setRsRpcServices(RSRpcServices rsRpcServices) {
2644 this.rsRpcServices = rsRpcServices;
2645 }
2646 }