1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Arrays;
48 import java.util.Collections;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Random;
55 import java.util.Set;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedDeque;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.locks.Lock;
62 import java.util.concurrent.locks.ReentrantLock;
63
64 import javax.security.sasl.Sasl;
65 import javax.security.sasl.SaslException;
66 import javax.security.sasl.SaslServer;
67
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.hbase.classification.InterfaceAudience;
71 import org.apache.hadoop.hbase.classification.InterfaceStability;
72 import org.apache.hadoop.conf.Configuration;
73 import org.apache.hadoop.hbase.CellScanner;
74 import org.apache.hadoop.hbase.DoNotRetryIOException;
75 import org.apache.hadoop.hbase.HBaseIOException;
76 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
77 import org.apache.hadoop.hbase.HConstants;
78 import org.apache.hadoop.hbase.HRegionInfo;
79 import org.apache.hadoop.hbase.Server;
80 import org.apache.hadoop.hbase.TableName;
81 import org.apache.hadoop.hbase.client.Operation;
82 import org.apache.hadoop.hbase.codec.Codec;
83 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
84 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
85 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
86 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
87 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
88 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
89 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
90 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
91 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
92 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
93 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
94 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
95 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.VersionInfo;
96 import org.apache.hadoop.hbase.regionserver.HRegionServer;
97 import org.apache.hadoop.hbase.security.AccessDeniedException;
98 import org.apache.hadoop.hbase.security.AuthMethod;
99 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
100 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
101 import org.apache.hadoop.hbase.security.User;
102 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
103 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
104 import org.apache.hadoop.hbase.security.SaslStatus;
105 import org.apache.hadoop.hbase.security.SaslUtil;
106 import org.apache.hadoop.hbase.security.UserProvider;
107 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
108 import org.apache.hadoop.hbase.util.Bytes;
109 import org.apache.hadoop.hbase.util.Counter;
110 import org.apache.hadoop.hbase.util.Pair;
111 import org.apache.hadoop.io.BytesWritable;
112 import org.apache.hadoop.io.IntWritable;
113 import org.apache.hadoop.io.Writable;
114 import org.apache.hadoop.io.WritableUtils;
115 import org.apache.hadoop.io.compress.CompressionCodec;
116 import org.apache.hadoop.security.UserGroupInformation;
117 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
118 import org.apache.hadoop.security.authorize.AuthorizationException;
119 import org.apache.hadoop.security.authorize.PolicyProvider;
120 import org.apache.hadoop.security.authorize.ProxyUsers;
121 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
122 import org.apache.hadoop.security.token.SecretManager;
123 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
124 import org.apache.hadoop.security.token.TokenIdentifier;
125 import org.apache.hadoop.util.StringUtils;
126 import org.codehaus.jackson.map.ObjectMapper;
127 import org.apache.htrace.TraceInfo;
128
129 import com.google.common.util.concurrent.ThreadFactoryBuilder;
130 import com.google.protobuf.BlockingService;
131 import com.google.protobuf.CodedInputStream;
132 import com.google.protobuf.Descriptors.MethodDescriptor;
133 import com.google.protobuf.Message;
134 import com.google.protobuf.ServiceException;
135 import com.google.protobuf.TextFormat;
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
158 @InterfaceStability.Evolving
159 public class RpcServer implements RpcServerInterface {
160 public static final Log LOG = LogFactory.getLog(RpcServer.class);
161 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
162 = new CallQueueTooBigException();
163
164 private final boolean authorize;
165 private boolean isSecurityEnabled;
166
167 public static final byte CURRENT_VERSION = 0;
168
169
170
171
172 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
173
174
175
176
177 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
178
179 private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
180
181 private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
182
183 private final int warnDelayedCalls;
184
185 private AtomicInteger delayedCalls;
186 private final IPCUtil ipcUtil;
187
188 private static final String AUTH_FAILED_FOR = "Auth failed for ";
189 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
190 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
191 Server.class.getName());
192 protected SecretManager<TokenIdentifier> secretManager;
193 protected ServiceAuthorizationManager authManager;
194
195
196
197
198 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
199
200
201 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
202 = new ThreadLocal<MonitoredRPCHandler>();
203
204 protected final InetSocketAddress bindAddress;
205 protected int port;
206 private int readThreads;
207 protected int maxIdleTime;
208
209
210 protected int thresholdIdleConnections;
211
212
213
214 int maxConnectionsToNuke;
215
216
217
218 protected MetricsHBaseServer metrics;
219
220 protected final Configuration conf;
221
222 private int maxQueueSize;
223 protected int socketSendBufferSize;
224 protected final boolean tcpNoDelay;
225 protected final boolean tcpKeepAlive;
226 protected final long purgeTimeout;
227
228
229
230
231
232
233 volatile boolean running = true;
234
235
236
237
238
239 volatile boolean started = false;
240
241
242
243
244 protected final Counter callQueueSize = new Counter();
245
246 protected final List<Connection> connectionList =
247 Collections.synchronizedList(new LinkedList<Connection>());
248
249
250 private Listener listener = null;
251 protected Responder responder = null;
252 protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
253 protected int numConnections = 0;
254
255 protected HBaseRPCErrorHandler errorHandler = null;
256
257 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
258 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
259
260
261 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
262 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
263
264 private static final ObjectMapper MAPPER = new ObjectMapper();
265
266 private final int warnResponseTime;
267 private final int warnResponseSize;
268 private final Server server;
269 private final List<BlockingServiceAndInterface> services;
270
271 private final RpcScheduler scheduler;
272
273 private UserProvider userProvider;
274
275 private final BoundedByteBufferPool reservoir;
276
277
278
279
280
281
282 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
283 @InterfaceStability.Evolving
284 public class Call implements RpcCallContext {
285 protected int id;
286 protected BlockingService service;
287 protected MethodDescriptor md;
288 protected RequestHeader header;
289 protected Message param;
290
291 protected CellScanner cellScanner;
292 protected Connection connection;
293 protected long timestamp;
294
295
296
297
298 protected BufferChain response;
299 protected boolean delayResponse;
300 protected Responder responder;
301 protected boolean delayReturnValue;
302
303 protected long size;
304 protected boolean isError;
305 protected TraceInfo tinfo;
306 private ByteBuffer cellBlock = null;
307
308 private User user;
309 private InetAddress remoteAddress;
310
311 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
312 Message param, CellScanner cellScanner, Connection connection, Responder responder,
313 long size, TraceInfo tinfo, final InetAddress remoteAddress) {
314 this.id = id;
315 this.service = service;
316 this.md = md;
317 this.header = header;
318 this.param = param;
319 this.cellScanner = cellScanner;
320 this.connection = connection;
321 this.timestamp = System.currentTimeMillis();
322 this.response = null;
323 this.delayResponse = false;
324 this.responder = responder;
325 this.isError = false;
326 this.size = size;
327 this.tinfo = tinfo;
328 this.user = connection.user == null? null: userProvider.create(connection.user);
329 this.remoteAddress = remoteAddress;
330 }
331
332
333
334
335
336 void done() {
337 if (this.cellBlock != null) {
338
339 reservoir.putBuffer(this.cellBlock);
340 this.cellBlock = null;
341 }
342 this.connection.decRpcCount();
343 }
344
345 @Override
346 public String toString() {
347 return toShortString() + " param: " +
348 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
349 " connection: " + connection.toString();
350 }
351
352 protected RequestHeader getHeader() {
353 return this.header;
354 }
355
356 public boolean hasPriority() {
357 return this.header.hasPriority();
358 }
359
360 public int getPriority() {
361 return this.header.getPriority();
362 }
363
364
365
366
367
368 String toShortString() {
369 String serviceName = this.connection.service != null ?
370 this.connection.service.getDescriptorForType().getName() : "null";
371 return "callId: " + this.id + " service: " + serviceName +
372 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
373 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
374 " connection: " + connection.toString();
375 }
376
377 String toTraceString() {
378 String serviceName = this.connection.service != null ?
379 this.connection.service.getDescriptorForType().getName() : "";
380 String methodName = (this.md != null) ? this.md.getName() : "";
381 return serviceName + "." + methodName;
382 }
383
384 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
385 this.response = new BufferChain(response);
386 }
387
388 protected synchronized void setResponse(Object m, final CellScanner cells,
389 Throwable t, String errorMsg) {
390 if (this.isError) return;
391 if (t != null) this.isError = true;
392 BufferChain bc = null;
393 try {
394 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
395
396 Message result = (Message)m;
397
398 headerBuilder.setCallId(this.id);
399 if (t != null) {
400 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
401 exceptionBuilder.setExceptionClassName(t.getClass().getName());
402 exceptionBuilder.setStackTrace(errorMsg);
403 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
404 if (t instanceof RegionMovedException) {
405
406
407
408 RegionMovedException rme = (RegionMovedException)t;
409 exceptionBuilder.setHostname(rme.getHostname());
410 exceptionBuilder.setPort(rme.getPort());
411 }
412
413 headerBuilder.setException(exceptionBuilder.build());
414 }
415
416
417
418 this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
419 this.connection.compressionCodec, cells, reservoir);
420 if (this.cellBlock != null) {
421 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
422
423 cellBlockBuilder.setLength(this.cellBlock.limit());
424 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
425 }
426 Message header = headerBuilder.build();
427
428
429
430 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
431 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
432 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
433 (this.cellBlock == null? 0: this.cellBlock.limit());
434 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
435 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
436 if (connection.useWrap) {
437 bc = wrapWithSasl(bc);
438 }
439 } catch (IOException e) {
440 LOG.warn("Exception while creating response " + e);
441 }
442 this.response = bc;
443 }
444
445 private BufferChain wrapWithSasl(BufferChain bc)
446 throws IOException {
447 if (!this.connection.useSasl) return bc;
448
449
450 byte [] responseBytes = bc.getBytes();
451 byte [] token;
452
453
454 synchronized (connection.saslServer) {
455 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
456 }
457 if (LOG.isTraceEnabled()) {
458 LOG.trace("Adding saslServer wrapped token of size " + token.length
459 + " as call response.");
460 }
461
462 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
463 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
464 return new BufferChain(bbTokenLength, bbTokenBytes);
465 }
466
467 @Override
468 public synchronized void endDelay(Object result) throws IOException {
469 assert this.delayResponse;
470 assert this.delayReturnValue || result == null;
471 this.delayResponse = false;
472 delayedCalls.decrementAndGet();
473 if (this.delayReturnValue) {
474 this.setResponse(result, null, null, null);
475 }
476 this.responder.doRespond(this);
477 }
478
479 @Override
480 public synchronized void endDelay() throws IOException {
481 this.endDelay(null);
482 }
483
484 @Override
485 public synchronized void startDelay(boolean delayReturnValue) {
486 assert !this.delayResponse;
487 this.delayResponse = true;
488 this.delayReturnValue = delayReturnValue;
489 int numDelayed = delayedCalls.incrementAndGet();
490 if (numDelayed > warnDelayedCalls) {
491 LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
492 }
493 }
494
495 @Override
496 public synchronized void endDelayThrowing(Throwable t) throws IOException {
497 this.setResponse(null, null, t, StringUtils.stringifyException(t));
498 this.delayResponse = false;
499 this.sendResponseIfReady();
500 }
501
502 @Override
503 public synchronized boolean isDelayed() {
504 return this.delayResponse;
505 }
506
507 @Override
508 public synchronized boolean isReturnValueDelayed() {
509 return this.delayReturnValue;
510 }
511
512 @Override
513 public boolean isClientCellBlockSupport() {
514 return this.connection != null && this.connection.codec != null;
515 }
516
517 @Override
518 public long disconnectSince() {
519 if (!connection.channel.isOpen()) {
520 return System.currentTimeMillis() - timestamp;
521 } else {
522 return -1L;
523 }
524 }
525
526 public long getSize() {
527 return this.size;
528 }
529
530
531
532
533
534
535 public synchronized void sendResponseIfReady() throws IOException {
536
537 this.param = null;
538 if (!this.delayResponse) {
539 this.responder.doRespond(this);
540 }
541 }
542
543 public UserGroupInformation getRemoteUser() {
544 return connection.user;
545 }
546
547 @Override
548 public User getRequestUser() {
549 return user;
550 }
551
552 @Override
553 public String getRequestUserName() {
554 User user = getRequestUser();
555 return user == null? null: user.getShortName();
556 }
557
558 @Override
559 public InetAddress getRemoteAddress() {
560 return remoteAddress;
561 }
562
563 @Override
564 public VersionInfo getClientVersionInfo() {
565 return connection.getVersionInfo();
566 }
567 }
568
569
570 private class Listener extends Thread {
571
572 private ServerSocketChannel acceptChannel = null;
573 private Selector selector = null;
574 private Reader[] readers = null;
575 private int currentReader = 0;
576 private Random rand = new Random();
577 private long lastCleanupRunTime = 0;
578
579 private long cleanupInterval = 10000;
580
581 private int backlogLength;
582
583 private ExecutorService readPool;
584
585 public Listener(final String name) throws IOException {
586 super(name);
587 backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
588
589 acceptChannel = ServerSocketChannel.open();
590 acceptChannel.configureBlocking(false);
591
592
593 bind(acceptChannel.socket(), bindAddress, backlogLength);
594 port = acceptChannel.socket().getLocalPort();
595
596 selector= Selector.open();
597
598 readers = new Reader[readThreads];
599 readPool = Executors.newFixedThreadPool(readThreads,
600 new ThreadFactoryBuilder().setNameFormat(
601 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
602 ",port=" + port).setDaemon(true).build());
603 for (int i = 0; i < readThreads; ++i) {
604 Reader reader = new Reader();
605 readers[i] = reader;
606 readPool.execute(reader);
607 }
608 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
609
610
611 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
612 this.setName("RpcServer.listener,port=" + port);
613 this.setDaemon(true);
614 }
615
616
617 private class Reader implements Runnable {
618 private volatile boolean adding = false;
619 private final Selector readSelector;
620
621 Reader() throws IOException {
622 this.readSelector = Selector.open();
623 }
624 @Override
625 public void run() {
626 try {
627 doRunLoop();
628 } finally {
629 try {
630 readSelector.close();
631 } catch (IOException ioe) {
632 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
633 }
634 }
635 }
636
637 private synchronized void doRunLoop() {
638 while (running) {
639 try {
640 readSelector.select();
641 while (adding) {
642 this.wait(1000);
643 }
644
645 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
646 while (iter.hasNext()) {
647 SelectionKey key = iter.next();
648 iter.remove();
649 if (key.isValid()) {
650 if (key.isReadable()) {
651 doRead(key);
652 }
653 }
654 }
655 } catch (InterruptedException e) {
656 LOG.debug("Interrupted while sleeping");
657 return;
658 } catch (IOException ex) {
659 LOG.info(getName() + ": IOException in Reader", ex);
660 }
661 }
662 }
663
664
665
666
667
668
669
670
671 public void startAdd() {
672 adding = true;
673 readSelector.wakeup();
674 }
675
676 public synchronized SelectionKey registerChannel(SocketChannel channel)
677 throws IOException {
678 return channel.register(readSelector, SelectionKey.OP_READ);
679 }
680
681 public synchronized void finishAdd() {
682 adding = false;
683 this.notify();
684 }
685 }
686
687
688
689
690
691
692
693
694 private void cleanupConnections(boolean force) {
695 if (force || numConnections > thresholdIdleConnections) {
696 long currentTime = System.currentTimeMillis();
697 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
698 return;
699 }
700 int start = 0;
701 int end = numConnections - 1;
702 if (!force) {
703 start = rand.nextInt() % numConnections;
704 end = rand.nextInt() % numConnections;
705 int temp;
706 if (end < start) {
707 temp = start;
708 start = end;
709 end = temp;
710 }
711 }
712 int i = start;
713 int numNuked = 0;
714 while (i <= end) {
715 Connection c;
716 synchronized (connectionList) {
717 try {
718 c = connectionList.get(i);
719 } catch (Exception e) {return;}
720 }
721 if (c.timedOut(currentTime)) {
722 if (LOG.isDebugEnabled())
723 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
724 closeConnection(c);
725 numNuked++;
726 end--;
727
728 c = null;
729 if (!force && numNuked == maxConnectionsToNuke) break;
730 }
731 else i++;
732 }
733 lastCleanupRunTime = System.currentTimeMillis();
734 }
735 }
736
737 @Override
738 public void run() {
739 LOG.info(getName() + ": starting");
740 while (running) {
741 SelectionKey key = null;
742 try {
743 selector.select();
744 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
745 while (iter.hasNext()) {
746 key = iter.next();
747 iter.remove();
748 try {
749 if (key.isValid()) {
750 if (key.isAcceptable())
751 doAccept(key);
752 }
753 } catch (IOException ignored) {
754 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
755 }
756 key = null;
757 }
758 } catch (OutOfMemoryError e) {
759 if (errorHandler != null) {
760 if (errorHandler.checkOOME(e)) {
761 LOG.info(getName() + ": exiting on OutOfMemoryError");
762 closeCurrentConnection(key, e);
763 cleanupConnections(true);
764 return;
765 }
766 } else {
767
768
769
770 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
771 closeCurrentConnection(key, e);
772 cleanupConnections(true);
773 try {
774 Thread.sleep(60000);
775 } catch (InterruptedException ex) {
776 LOG.debug("Interrupted while sleeping");
777 return;
778 }
779 }
780 } catch (Exception e) {
781 closeCurrentConnection(key, e);
782 }
783 cleanupConnections(false);
784 }
785
786 LOG.info(getName() + ": stopping");
787
788 synchronized (this) {
789 try {
790 acceptChannel.close();
791 selector.close();
792 } catch (IOException ignored) {
793 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
794 }
795
796 selector= null;
797 acceptChannel= null;
798
799
800 while (!connectionList.isEmpty()) {
801 closeConnection(connectionList.remove(0));
802 }
803 }
804 }
805
806 private void closeCurrentConnection(SelectionKey key, Throwable e) {
807 if (key != null) {
808 Connection c = (Connection)key.attachment();
809 if (c != null) {
810 if (LOG.isDebugEnabled()) {
811 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
812 (e != null ? " on error " + e.getMessage() : ""));
813 }
814 closeConnection(c);
815 key.attach(null);
816 }
817 }
818 }
819
820 InetSocketAddress getAddress() {
821 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
822 }
823
824 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
825 Connection c;
826 ServerSocketChannel server = (ServerSocketChannel) key.channel();
827
828 SocketChannel channel;
829 while ((channel = server.accept()) != null) {
830 try {
831 channel.configureBlocking(false);
832 channel.socket().setTcpNoDelay(tcpNoDelay);
833 channel.socket().setKeepAlive(tcpKeepAlive);
834 } catch (IOException ioe) {
835 channel.close();
836 throw ioe;
837 }
838
839 Reader reader = getReader();
840 try {
841 reader.startAdd();
842 SelectionKey readKey = reader.registerChannel(channel);
843 c = getConnection(channel, System.currentTimeMillis());
844 readKey.attach(c);
845 synchronized (connectionList) {
846 connectionList.add(numConnections, c);
847 numConnections++;
848 }
849 if (LOG.isDebugEnabled())
850 LOG.debug(getName() + ": connection from " + c.toString() +
851 "; # active connections: " + numConnections);
852 } finally {
853 reader.finishAdd();
854 }
855 }
856 }
857
858 void doRead(SelectionKey key) throws InterruptedException {
859 int count;
860 Connection c = (Connection) key.attachment();
861 if (c == null) {
862 return;
863 }
864 c.setLastContact(System.currentTimeMillis());
865 try {
866 count = c.readAndProcess();
867
868 if (count > 0) {
869 c.setLastContact(System.currentTimeMillis());
870 }
871
872 } catch (InterruptedException ieo) {
873 throw ieo;
874 } catch (Exception e) {
875 if (LOG.isDebugEnabled()) {
876 LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
877 }
878 count = -1;
879 }
880 if (count < 0) {
881 if (LOG.isDebugEnabled()) {
882 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
883 " because read count=" + count +
884 ". Number of active connections: " + numConnections);
885 }
886 closeConnection(c);
887 }
888 }
889
890 synchronized void doStop() {
891 if (selector != null) {
892 selector.wakeup();
893 Thread.yield();
894 }
895 if (acceptChannel != null) {
896 try {
897 acceptChannel.socket().close();
898 } catch (IOException e) {
899 LOG.info(getName() + ": exception in closing listener socket. " + e);
900 }
901 }
902 readPool.shutdownNow();
903 }
904
905
906
907 Reader getReader() {
908 currentReader = (currentReader + 1) % readers.length;
909 return readers[currentReader];
910 }
911 }
912
913
914 protected class Responder extends Thread {
915 private final Selector writeSelector;
916 private final Set<Connection> writingCons =
917 Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
918
919 Responder() throws IOException {
920 this.setName("RpcServer.responder");
921 this.setDaemon(true);
922 writeSelector = Selector.open();
923 }
924
925 @Override
926 public void run() {
927 LOG.info(getName() + ": starting");
928 try {
929 doRunLoop();
930 } finally {
931 LOG.info(getName() + ": stopping");
932 try {
933 writeSelector.close();
934 } catch (IOException ioe) {
935 LOG.error(getName() + ": couldn't close write selector", ioe);
936 }
937 }
938 }
939
940
941
942
943
944 private void registerWrites() {
945 Iterator<Connection> it = writingCons.iterator();
946 while (it.hasNext()) {
947 Connection c = it.next();
948 it.remove();
949 SelectionKey sk = c.channel.keyFor(writeSelector);
950 try {
951 if (sk == null) {
952 try {
953 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
954 } catch (ClosedChannelException e) {
955
956 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
957 }
958 } else {
959 sk.interestOps(SelectionKey.OP_WRITE);
960 }
961 } catch (CancelledKeyException e) {
962
963 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
964 }
965 }
966 }
967
968
969
970
971 public void registerForWrite(Connection c) {
972 if (writingCons.add(c)) {
973 writeSelector.wakeup();
974 }
975 }
976
977 private void doRunLoop() {
978 long lastPurgeTime = 0;
979 while (running) {
980 try {
981 registerWrites();
982 int keyCt = writeSelector.select(purgeTimeout);
983 if (keyCt == 0) {
984 continue;
985 }
986
987 Set<SelectionKey> keys = writeSelector.selectedKeys();
988 Iterator<SelectionKey> iter = keys.iterator();
989 while (iter.hasNext()) {
990 SelectionKey key = iter.next();
991 iter.remove();
992 try {
993 if (key.isValid() && key.isWritable()) {
994 doAsyncWrite(key);
995 }
996 } catch (IOException e) {
997 LOG.debug(getName() + ": asyncWrite", e);
998 }
999 }
1000
1001 lastPurgeTime = purge(lastPurgeTime);
1002
1003 } catch (OutOfMemoryError e) {
1004 if (errorHandler != null) {
1005 if (errorHandler.checkOOME(e)) {
1006 LOG.info(getName() + ": exiting on OutOfMemoryError");
1007 return;
1008 }
1009 } else {
1010
1011
1012
1013
1014
1015 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1016 try {
1017 Thread.sleep(60000);
1018 } catch (InterruptedException ex) {
1019 LOG.debug("Interrupted while sleeping");
1020 return;
1021 }
1022 }
1023 } catch (Exception e) {
1024 LOG.warn(getName() + ": exception in Responder " +
1025 StringUtils.stringifyException(e), e);
1026 }
1027 }
1028 LOG.info(getName() + ": stopped");
1029 }
1030
1031
1032
1033
1034
1035
1036 private long purge(long lastPurgeTime) {
1037 long now = System.currentTimeMillis();
1038 if (now < lastPurgeTime + purgeTimeout) {
1039 return lastPurgeTime;
1040 }
1041
1042 ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1043
1044 synchronized (writeSelector.keys()) {
1045 for (SelectionKey key : writeSelector.keys()) {
1046 Connection connection = (Connection) key.attachment();
1047 if (connection == null) {
1048 throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1049 }
1050 Call call = connection.responseQueue.peekFirst();
1051 if (call != null && now > call.timestamp + purgeTimeout) {
1052 conWithOldCalls.add(call.connection);
1053 }
1054 }
1055 }
1056
1057
1058 for (Connection connection : conWithOldCalls) {
1059 closeConnection(connection);
1060 }
1061
1062 return now;
1063 }
1064
1065 private void doAsyncWrite(SelectionKey key) throws IOException {
1066 Connection connection = (Connection) key.attachment();
1067 if (connection == null) {
1068 throw new IOException("doAsyncWrite: no connection");
1069 }
1070 if (key.channel() != connection.channel) {
1071 throw new IOException("doAsyncWrite: bad channel");
1072 }
1073
1074 if (processAllResponses(connection)) {
1075 try {
1076
1077
1078 key.interestOps(0);
1079 } catch (CancelledKeyException e) {
1080
1081
1082
1083
1084
1085 LOG.warn("Exception while changing ops : " + e);
1086 }
1087 }
1088 }
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098 private boolean processResponse(final Call call) throws IOException {
1099 boolean error = true;
1100 try {
1101
1102 long numBytes = channelWrite(call.connection.channel, call.response);
1103 if (numBytes < 0) {
1104 throw new HBaseIOException("Error writing on the socket " +
1105 "for the call:" + call.toShortString());
1106 }
1107 error = false;
1108 } finally {
1109 if (error) {
1110 LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1111 closeConnection(call.connection);
1112 }
1113 }
1114
1115 if (!call.response.hasRemaining()) {
1116 call.done();
1117 return true;
1118 } else {
1119 return false;
1120 }
1121 }
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131 private boolean processAllResponses(final Connection connection) throws IOException {
1132
1133 connection.responseWriteLock.lock();
1134 try {
1135 for (int i = 0; i < 20; i++) {
1136
1137 Call call = connection.responseQueue.pollFirst();
1138 if (call == null) {
1139 return true;
1140 }
1141 if (!processResponse(call)) {
1142 connection.responseQueue.addFirst(call);
1143 return false;
1144 }
1145 }
1146 } finally {
1147 connection.responseWriteLock.unlock();
1148 }
1149
1150 return connection.responseQueue.isEmpty();
1151 }
1152
1153
1154
1155
1156 void doRespond(Call call) throws IOException {
1157 boolean added = false;
1158
1159
1160
1161 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1162 try {
1163 if (call.connection.responseQueue.isEmpty()) {
1164
1165
1166 if (processResponse(call)) {
1167 return;
1168 }
1169
1170 call.connection.responseQueue.addFirst(call);
1171 added = true;
1172 }
1173 } finally {
1174 call.connection.responseWriteLock.unlock();
1175 }
1176 }
1177
1178 if (!added) {
1179 call.connection.responseQueue.addLast(call);
1180 }
1181 call.responder.registerForWrite(call.connection);
1182
1183
1184 call.timestamp = System.currentTimeMillis();
1185 }
1186 }
1187
1188 @SuppressWarnings("serial")
1189 public static class CallQueueTooBigException extends IOException {
1190 CallQueueTooBigException() {
1191 super();
1192 }
1193 }
1194
1195
1196 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1197 value="VO_VOLATILE_INCREMENT",
1198 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1199 public class Connection {
1200
1201 private boolean connectionPreambleRead = false;
1202
1203 private boolean connectionHeaderRead = false;
1204 protected SocketChannel channel;
1205 private ByteBuffer data;
1206 private ByteBuffer dataLengthBuffer;
1207 protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1208 private final Lock responseWriteLock = new ReentrantLock();
1209 private Counter rpcCount = new Counter();
1210 private long lastContact;
1211 private InetAddress addr;
1212 protected Socket socket;
1213
1214
1215 protected String hostAddress;
1216 protected int remotePort;
1217 ConnectionHeader connectionHeader;
1218
1219
1220
1221 private Codec codec;
1222
1223
1224
1225 private CompressionCodec compressionCodec;
1226 BlockingService service;
1227 protected UserGroupInformation user = null;
1228 private AuthMethod authMethod;
1229 private boolean saslContextEstablished;
1230 private boolean skipInitialSaslHandshake;
1231 private ByteBuffer unwrappedData;
1232
1233 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1234 boolean useSasl;
1235 SaslServer saslServer;
1236 private boolean useWrap = false;
1237
1238 private static final int AUTHORIZATION_FAILED_CALLID = -1;
1239 private final Call authFailedCall =
1240 new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
1241 null);
1242 private ByteArrayOutputStream authFailedResponse =
1243 new ByteArrayOutputStream();
1244
1245 private static final int SASL_CALLID = -33;
1246 private final Call saslCall =
1247 new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
1248
1249 public UserGroupInformation attemptingUser = null;
1250
1251 public Connection(SocketChannel channel, long lastContact) {
1252 this.channel = channel;
1253 this.lastContact = lastContact;
1254 this.data = null;
1255 this.dataLengthBuffer = ByteBuffer.allocate(4);
1256 this.socket = channel.socket();
1257 this.addr = socket.getInetAddress();
1258 if (addr == null) {
1259 this.hostAddress = "*Unknown*";
1260 } else {
1261 this.hostAddress = addr.getHostAddress();
1262 }
1263 this.remotePort = socket.getPort();
1264 if (socketSendBufferSize != 0) {
1265 try {
1266 socket.setSendBufferSize(socketSendBufferSize);
1267 } catch (IOException e) {
1268 LOG.warn("Connection: unable to set socket send buffer size to " +
1269 socketSendBufferSize);
1270 }
1271 }
1272 }
1273
1274 @Override
1275 public String toString() {
1276 return getHostAddress() + ":" + remotePort;
1277 }
1278
1279 public String getHostAddress() {
1280 return hostAddress;
1281 }
1282
1283 public InetAddress getHostInetAddress() {
1284 return addr;
1285 }
1286
1287 public int getRemotePort() {
1288 return remotePort;
1289 }
1290
1291 public void setLastContact(long lastContact) {
1292 this.lastContact = lastContact;
1293 }
1294
1295 public VersionInfo getVersionInfo() {
1296 if (connectionHeader.hasVersionInfo()) {
1297 return connectionHeader.getVersionInfo();
1298 }
1299 return null;
1300 }
1301
1302
1303 private boolean isIdle() {
1304 return rpcCount.get() == 0;
1305 }
1306
1307
1308 protected void decRpcCount() {
1309 rpcCount.decrement();
1310 }
1311
1312
1313 protected void incRpcCount() {
1314 rpcCount.increment();
1315 }
1316
1317 protected boolean timedOut(long currentTime) {
1318 return isIdle() && currentTime - lastContact > maxIdleTime;
1319 }
1320
1321 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1322 throws IOException {
1323 if (authMethod == AuthMethod.DIGEST) {
1324 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1325 secretManager);
1326 UserGroupInformation ugi = tokenId.getUser();
1327 if (ugi == null) {
1328 throw new AccessDeniedException(
1329 "Can't retrieve username from tokenIdentifier.");
1330 }
1331 ugi.addTokenIdentifier(tokenId);
1332 return ugi;
1333 } else {
1334 return UserGroupInformation.createRemoteUser(authorizedId);
1335 }
1336 }
1337
1338 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1339 InterruptedException {
1340 if (saslContextEstablished) {
1341 if (LOG.isTraceEnabled())
1342 LOG.trace("Have read input token of size " + saslToken.length
1343 + " for processing by saslServer.unwrap()");
1344
1345 if (!useWrap) {
1346 processOneRpc(saslToken);
1347 } else {
1348 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1349 processUnwrappedData(plaintextData);
1350 }
1351 } else {
1352 byte[] replyToken;
1353 try {
1354 if (saslServer == null) {
1355 switch (authMethod) {
1356 case DIGEST:
1357 if (secretManager == null) {
1358 throw new AccessDeniedException(
1359 "Server is not configured to do DIGEST authentication.");
1360 }
1361 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1362 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1363 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1364 secretManager, this));
1365 break;
1366 default:
1367 UserGroupInformation current = UserGroupInformation.getCurrentUser();
1368 String fullName = current.getUserName();
1369 if (LOG.isDebugEnabled()) {
1370 LOG.debug("Kerberos principal name is " + fullName);
1371 }
1372 final String names[] = SaslUtil.splitKerberosName(fullName);
1373 if (names.length != 3) {
1374 throw new AccessDeniedException(
1375 "Kerberos principal name does NOT have the expected "
1376 + "hostname part: " + fullName);
1377 }
1378 current.doAs(new PrivilegedExceptionAction<Object>() {
1379 @Override
1380 public Object run() throws SaslException {
1381 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1382 .getMechanismName(), names[0], names[1],
1383 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1384 return null;
1385 }
1386 });
1387 }
1388 if (saslServer == null)
1389 throw new AccessDeniedException(
1390 "Unable to find SASL server implementation for "
1391 + authMethod.getMechanismName());
1392 if (LOG.isDebugEnabled()) {
1393 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1394 }
1395 }
1396 if (LOG.isDebugEnabled()) {
1397 LOG.debug("Have read input token of size " + saslToken.length
1398 + " for processing by saslServer.evaluateResponse()");
1399 }
1400 replyToken = saslServer.evaluateResponse(saslToken);
1401 } catch (IOException e) {
1402 IOException sendToClient = e;
1403 Throwable cause = e;
1404 while (cause != null) {
1405 if (cause instanceof InvalidToken) {
1406 sendToClient = (InvalidToken) cause;
1407 break;
1408 }
1409 cause = cause.getCause();
1410 }
1411 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1412 sendToClient.getLocalizedMessage());
1413 metrics.authenticationFailure();
1414 String clientIP = this.toString();
1415
1416 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1417 throw e;
1418 }
1419 if (replyToken != null) {
1420 if (LOG.isDebugEnabled()) {
1421 LOG.debug("Will send token of size " + replyToken.length
1422 + " from saslServer.");
1423 }
1424 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1425 null);
1426 }
1427 if (saslServer.isComplete()) {
1428 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1429 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1430 user = getAuthorizedUgi(saslServer.getAuthorizationID());
1431 if (LOG.isDebugEnabled()) {
1432 LOG.debug("SASL server context established. Authenticated client: "
1433 + user + ". Negotiated QoP is "
1434 + saslServer.getNegotiatedProperty(Sasl.QOP));
1435 }
1436 metrics.authenticationSuccess();
1437 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1438 saslContextEstablished = true;
1439 }
1440 }
1441 }
1442
1443
1444
1445
1446 private void doRawSaslReply(SaslStatus status, Writable rv,
1447 String errorClass, String error) throws IOException {
1448 ByteBufferOutputStream saslResponse = null;
1449 DataOutputStream out = null;
1450 try {
1451
1452
1453 saslResponse = new ByteBufferOutputStream(256);
1454 out = new DataOutputStream(saslResponse);
1455 out.writeInt(status.state);
1456 if (status == SaslStatus.SUCCESS) {
1457 rv.write(out);
1458 } else {
1459 WritableUtils.writeString(out, errorClass);
1460 WritableUtils.writeString(out, error);
1461 }
1462 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1463 saslCall.responder = responder;
1464 saslCall.sendResponseIfReady();
1465 } finally {
1466 if (saslResponse != null) {
1467 saslResponse.close();
1468 }
1469 if (out != null) {
1470 out.close();
1471 }
1472 }
1473 }
1474
1475 private void disposeSasl() {
1476 if (saslServer != null) {
1477 try {
1478 saslServer.dispose();
1479 saslServer = null;
1480 } catch (SaslException ignored) {
1481
1482 }
1483 }
1484 }
1485
1486 private int readPreamble() throws IOException {
1487 int count;
1488
1489 this.dataLengthBuffer.flip();
1490 if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1491 return doBadPreambleHandling("Expected HEADER=" +
1492 Bytes.toStringBinary(HConstants.RPC_HEADER) +
1493 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1494 " from " + toString());
1495 }
1496
1497 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1498 count = channelRead(channel, versionAndAuthBytes);
1499 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1500 return count;
1501 }
1502 int version = versionAndAuthBytes.get(0);
1503 byte authbyte = versionAndAuthBytes.get(1);
1504 this.authMethod = AuthMethod.valueOf(authbyte);
1505 if (version != CURRENT_VERSION) {
1506 String msg = getFatalConnectionString(version, authbyte);
1507 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1508 }
1509 if (authMethod == null) {
1510 String msg = getFatalConnectionString(version, authbyte);
1511 return doBadPreambleHandling(msg, new BadAuthException(msg));
1512 }
1513 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1514 AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1515 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1516 responder.doRespond(authFailedCall);
1517 throw ae;
1518 }
1519 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1520 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1521 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1522 authMethod = AuthMethod.SIMPLE;
1523
1524
1525
1526 skipInitialSaslHandshake = true;
1527 }
1528 if (authMethod != AuthMethod.SIMPLE) {
1529 useSasl = true;
1530 }
1531
1532 dataLengthBuffer.clear();
1533 connectionPreambleRead = true;
1534 return count;
1535 }
1536
1537 private int read4Bytes() throws IOException {
1538 if (this.dataLengthBuffer.remaining() > 0) {
1539 return channelRead(channel, this.dataLengthBuffer);
1540 } else {
1541 return 0;
1542 }
1543 }
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553 public int readAndProcess() throws IOException, InterruptedException {
1554
1555
1556
1557
1558 int count = read4Bytes();
1559 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1560 return count;
1561 }
1562
1563
1564 if (!connectionPreambleRead) {
1565 count = readPreamble();
1566 if (!connectionPreambleRead) {
1567 return count;
1568 }
1569
1570 count = read4Bytes();
1571 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1572 return count;
1573 }
1574 }
1575
1576
1577
1578 if (data == null) {
1579 dataLengthBuffer.flip();
1580 int dataLength = dataLengthBuffer.getInt();
1581 if (dataLength == RpcClient.PING_CALL_ID) {
1582 if (!useWrap) {
1583 dataLengthBuffer.clear();
1584 return 0;
1585 }
1586 }
1587 if (dataLength < 0) {
1588 throw new IllegalArgumentException("Unexpected data length "
1589 + dataLength + "!! from " + getHostAddress());
1590 }
1591 data = ByteBuffer.allocate(dataLength);
1592
1593
1594
1595
1596 incRpcCount();
1597 }
1598
1599 count = channelRead(channel, data);
1600
1601 if (count >= 0 && data.remaining() == 0) {
1602 process();
1603 }
1604
1605 return count;
1606 }
1607
1608
1609
1610
1611 private void process() throws IOException, InterruptedException {
1612 data.flip();
1613 try {
1614 if (skipInitialSaslHandshake) {
1615 skipInitialSaslHandshake = false;
1616 return;
1617 }
1618
1619 if (useSasl) {
1620 saslReadAndProcess(data.array());
1621 } else {
1622 processOneRpc(data.array());
1623 }
1624
1625 } finally {
1626 dataLengthBuffer.clear();
1627 data = null;
1628 }
1629 }
1630
1631 private String getFatalConnectionString(final int version, final byte authByte) {
1632 return "serverVersion=" + CURRENT_VERSION +
1633 ", clientVersion=" + version + ", authMethod=" + authByte +
1634 ", authSupported=" + (authMethod != null) + " from " + toString();
1635 }
1636
1637 private int doBadPreambleHandling(final String msg) throws IOException {
1638 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1639 }
1640
1641 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1642 LOG.warn(msg);
1643 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1644 setupResponse(null, fakeCall, e, msg);
1645 responder.doRespond(fakeCall);
1646
1647 return -1;
1648 }
1649
1650
1651 private void processConnectionHeader(byte[] buf) throws IOException {
1652 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1653 String serviceName = connectionHeader.getServiceName();
1654 if (serviceName == null) throw new EmptyServiceNameException();
1655 this.service = getService(services, serviceName);
1656 if (this.service == null) throw new UnknownServiceException(serviceName);
1657 setupCellBlockCodecs(this.connectionHeader);
1658 UserGroupInformation protocolUser = createUser(connectionHeader);
1659 if (!useSasl) {
1660 user = protocolUser;
1661 if (user != null) {
1662 user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1663 }
1664 } else {
1665
1666 user.setAuthenticationMethod(authMethod.authenticationMethod);
1667
1668
1669
1670 if ((protocolUser != null)
1671 && (!protocolUser.getUserName().equals(user.getUserName()))) {
1672 if (authMethod == AuthMethod.DIGEST) {
1673
1674 throw new AccessDeniedException("Authenticated user (" + user
1675 + ") doesn't match what the client claims to be ("
1676 + protocolUser + ")");
1677 } else {
1678
1679
1680
1681 UserGroupInformation realUser = user;
1682 user = UserGroupInformation.createProxyUser(protocolUser
1683 .getUserName(), realUser);
1684
1685 user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1686 }
1687 }
1688 }
1689 if (connectionHeader.hasVersionInfo()) {
1690 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1691 + " with version info: "
1692 + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1693 } else {
1694 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1695 + " with unknown version info");
1696 }
1697 }
1698
1699
1700
1701
1702
1703 private void setupCellBlockCodecs(final ConnectionHeader header)
1704 throws FatalConnectionException {
1705
1706 if (!header.hasCellBlockCodecClass()) return;
1707 String className = header.getCellBlockCodecClass();
1708 if (className == null || className.length() == 0) return;
1709 try {
1710 this.codec = (Codec)Class.forName(className).newInstance();
1711 } catch (Exception e) {
1712 throw new UnsupportedCellCodecException(className, e);
1713 }
1714 if (!header.hasCellBlockCompressorClass()) return;
1715 className = header.getCellBlockCompressorClass();
1716 try {
1717 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1718 } catch (Exception e) {
1719 throw new UnsupportedCompressionCodecException(className, e);
1720 }
1721 }
1722
1723 private void processUnwrappedData(byte[] inBuf) throws IOException,
1724 InterruptedException {
1725 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1726
1727 while (true) {
1728 int count;
1729 if (unwrappedDataLengthBuffer.remaining() > 0) {
1730 count = channelRead(ch, unwrappedDataLengthBuffer);
1731 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1732 return;
1733 }
1734
1735 if (unwrappedData == null) {
1736 unwrappedDataLengthBuffer.flip();
1737 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1738
1739 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1740 if (LOG.isDebugEnabled())
1741 LOG.debug("Received ping message");
1742 unwrappedDataLengthBuffer.clear();
1743 continue;
1744 }
1745 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1746 }
1747
1748 count = channelRead(ch, unwrappedData);
1749 if (count <= 0 || unwrappedData.remaining() > 0)
1750 return;
1751
1752 if (unwrappedData.remaining() == 0) {
1753 unwrappedDataLengthBuffer.clear();
1754 unwrappedData.flip();
1755 processOneRpc(unwrappedData.array());
1756 unwrappedData = null;
1757 }
1758 }
1759 }
1760
1761 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1762 if (connectionHeaderRead) {
1763 processRequest(buf);
1764 } else {
1765 processConnectionHeader(buf);
1766 this.connectionHeaderRead = true;
1767 if (!authorizeConnection()) {
1768
1769
1770 throw new AccessDeniedException("Connection from " + this + " for service " +
1771 connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1772 }
1773 }
1774 }
1775
1776
1777
1778
1779
1780
1781
1782 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1783 long totalRequestSize = buf.length;
1784 int offset = 0;
1785
1786
1787 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1788 int headerSize = cis.readRawVarint32();
1789 offset = cis.getTotalBytesRead();
1790 Message.Builder builder = RequestHeader.newBuilder();
1791 ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1792 RequestHeader header = (RequestHeader) builder.build();
1793 offset += headerSize;
1794 int id = header.getCallId();
1795 if (LOG.isTraceEnabled()) {
1796 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1797 " totalRequestSize: " + totalRequestSize + " bytes");
1798 }
1799
1800
1801 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1802 final Call callTooBig =
1803 new Call(id, this.service, null, null, null, null, this,
1804 responder, totalRequestSize, null, null);
1805 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1806 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1807 InetSocketAddress address = getListenerAddress();
1808 setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1809 "Call queue is full on " + (address != null ? address : "(channel closed)") +
1810 ", is hbase.ipc.server.max.callqueue.size too small?");
1811 responder.doRespond(callTooBig);
1812 return;
1813 }
1814 MethodDescriptor md = null;
1815 Message param = null;
1816 CellScanner cellScanner = null;
1817 try {
1818 if (header.hasRequestParam() && header.getRequestParam()) {
1819 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1820 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1821 builder = this.service.getRequestPrototype(md).newBuilderForType();
1822
1823 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1824 int paramSize = cis.readRawVarint32();
1825 offset += cis.getTotalBytesRead();
1826 if (builder != null) {
1827 ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1828 param = builder.build();
1829 }
1830 offset += paramSize;
1831 }
1832 if (header.hasCellBlockMeta()) {
1833 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1834 buf, offset, buf.length);
1835 }
1836 } catch (Throwable t) {
1837 InetSocketAddress address = getListenerAddress();
1838 String msg = (address != null ? address : "(channel closed)") +
1839 " is unable to read call parameter from client " + getHostAddress();
1840 LOG.warn(msg, t);
1841
1842 metrics.exception(t);
1843
1844
1845 if (t instanceof LinkageError) {
1846 t = new DoNotRetryIOException(t);
1847 }
1848
1849 if (t instanceof UnsupportedOperationException) {
1850 t = new DoNotRetryIOException(t);
1851 }
1852
1853 final Call readParamsFailedCall =
1854 new Call(id, this.service, null, null, null, null, this,
1855 responder, totalRequestSize, null, null);
1856 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1857 setupResponse(responseBuffer, readParamsFailedCall, t,
1858 msg + "; " + t.getMessage());
1859 responder.doRespond(readParamsFailedCall);
1860 return;
1861 }
1862
1863 TraceInfo traceInfo = header.hasTraceInfo()
1864 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1865 : null;
1866 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1867 totalRequestSize, traceInfo, RpcServer.getRemoteIp());
1868 scheduler.dispatch(new CallRunner(RpcServer.this, call));
1869 }
1870
1871 private boolean authorizeConnection() throws IOException {
1872 try {
1873
1874
1875
1876
1877 if (user != null && user.getRealUser() != null
1878 && (authMethod != AuthMethod.DIGEST)) {
1879 ProxyUsers.authorize(user, this.getHostAddress(), conf);
1880 }
1881 authorize(user, connectionHeader, getHostInetAddress());
1882 metrics.authorizationSuccess();
1883 } catch (AuthorizationException ae) {
1884 if (LOG.isDebugEnabled()) {
1885 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1886 }
1887 metrics.authorizationFailure();
1888 setupResponse(authFailedResponse, authFailedCall,
1889 new AccessDeniedException(ae), ae.getMessage());
1890 responder.doRespond(authFailedCall);
1891 return false;
1892 }
1893 return true;
1894 }
1895
1896 protected synchronized void close() {
1897 disposeSasl();
1898 data = null;
1899 if (!channel.isOpen())
1900 return;
1901 try {socket.shutdownOutput();} catch(Exception ignored) {}
1902 if (channel.isOpen()) {
1903 try {channel.close();} catch(Exception ignored) {}
1904 }
1905 try {socket.close();} catch(Exception ignored) {}
1906 }
1907
1908 private UserGroupInformation createUser(ConnectionHeader head) {
1909 UserGroupInformation ugi = null;
1910
1911 if (!head.hasUserInfo()) {
1912 return null;
1913 }
1914 UserInformation userInfoProto = head.getUserInfo();
1915 String effectiveUser = null;
1916 if (userInfoProto.hasEffectiveUser()) {
1917 effectiveUser = userInfoProto.getEffectiveUser();
1918 }
1919 String realUser = null;
1920 if (userInfoProto.hasRealUser()) {
1921 realUser = userInfoProto.getRealUser();
1922 }
1923 if (effectiveUser != null) {
1924 if (realUser != null) {
1925 UserGroupInformation realUserUgi =
1926 UserGroupInformation.createRemoteUser(realUser);
1927 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1928 } else {
1929 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1930 }
1931 }
1932 return ugi;
1933 }
1934 }
1935
1936
1937
1938
1939
1940
1941
1942 public static class BlockingServiceAndInterface {
1943 private final BlockingService service;
1944 private final Class<?> serviceInterface;
1945 public BlockingServiceAndInterface(final BlockingService service,
1946 final Class<?> serviceInterface) {
1947 this.service = service;
1948 this.serviceInterface = serviceInterface;
1949 }
1950 public Class<?> getServiceInterface() {
1951 return this.serviceInterface;
1952 }
1953 public BlockingService getBlockingService() {
1954 return this.service;
1955 }
1956 }
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968 public RpcServer(final Server server, final String name,
1969 final List<BlockingServiceAndInterface> services,
1970 final InetSocketAddress bindAddress, Configuration conf,
1971 RpcScheduler scheduler)
1972 throws IOException {
1973 this.reservoir = new BoundedByteBufferPool(
1974 conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
1975 conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
1976
1977 conf.getInt("hbase.ipc.server.reservoir.initial.max",
1978 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
1979 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
1980 this.server = server;
1981 this.services = services;
1982 this.bindAddress = bindAddress;
1983 this.conf = conf;
1984 this.socketSendBufferSize = 0;
1985 this.maxQueueSize =
1986 this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
1987 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
1988 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
1989 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
1990 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
1991 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
1992 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1993 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1994 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1995
1996
1997 listener = new Listener(name);
1998 this.port = listener.getAddress().getPort();
1999
2000 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2001 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2002 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2003
2004 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
2005 this.delayedCalls = new AtomicInteger(0);
2006 this.ipcUtil = new IPCUtil(conf);
2007
2008
2009
2010 responder = new Responder();
2011 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2012 this.userProvider = UserProvider.instantiate(conf);
2013 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2014 if (isSecurityEnabled) {
2015 HBaseSaslRpcServer.init(conf);
2016 }
2017 this.scheduler = scheduler;
2018 this.scheduler.init(new RpcSchedulerContext(this));
2019 }
2020
2021
2022
2023
2024
2025 protected Connection getConnection(SocketChannel channel, long time) {
2026 return new Connection(channel, time);
2027 }
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2038 throws IOException {
2039 if (response != null) response.reset();
2040 call.setResponse(null, null, t, error);
2041 }
2042
2043 protected void closeConnection(Connection connection) {
2044 synchronized (connectionList) {
2045 if (connectionList.remove(connection)) {
2046 numConnections--;
2047 }
2048 }
2049 connection.close();
2050 }
2051
2052 Configuration getConf() {
2053 return conf;
2054 }
2055
2056
2057
2058
2059 @Override
2060 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2061
2062 @Override
2063 public boolean isStarted() {
2064 return this.started;
2065 }
2066
2067
2068 @Override
2069 public synchronized void start() {
2070 if (started) return;
2071 authTokenSecretMgr = createSecretManager();
2072 if (authTokenSecretMgr != null) {
2073 setSecretManager(authTokenSecretMgr);
2074 authTokenSecretMgr.start();
2075 }
2076 this.authManager = new ServiceAuthorizationManager();
2077 HBasePolicyProvider.init(conf, authManager);
2078 responder.start();
2079 listener.start();
2080 scheduler.start();
2081 started = true;
2082 }
2083
2084 @Override
2085 public void refreshAuthManager(PolicyProvider pp) {
2086
2087
2088 this.authManager.refresh(this.conf, pp);
2089 }
2090
2091 private AuthenticationTokenSecretManager createSecretManager() {
2092 if (!isSecurityEnabled) return null;
2093 if (server == null) return null;
2094 Configuration conf = server.getConfiguration();
2095 long keyUpdateInterval =
2096 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2097 long maxAge =
2098 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2099 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2100 server.getServerName().toString(), keyUpdateInterval, maxAge);
2101 }
2102
2103 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2104 return this.secretManager;
2105 }
2106
2107 @SuppressWarnings("unchecked")
2108 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2109 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2110 }
2111
2112
2113
2114
2115
2116
2117 @Override
2118 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2119 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2120 throws IOException {
2121 try {
2122 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2123
2124 status.setRPCPacket(param);
2125 status.resume("Servicing call");
2126
2127 long startTime = System.currentTimeMillis();
2128 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2129 Message result = service.callBlockingMethod(md, controller, param);
2130 long endTime = System.currentTimeMillis();
2131 int processingTime = (int) (endTime - startTime);
2132 int qTime = (int) (startTime - receiveTime);
2133 int totalTime = (int) (endTime - receiveTime);
2134 if (LOG.isTraceEnabled()) {
2135 LOG.trace(CurCall.get().toString() +
2136 ", response " + TextFormat.shortDebugString(result) +
2137 " queueTime: " + qTime +
2138 " processingTime: " + processingTime +
2139 " totalTime: " + totalTime);
2140 }
2141 metrics.dequeuedCall(qTime);
2142 metrics.processedCall(processingTime);
2143 metrics.totalCall(totalTime);
2144 long responseSize = result.getSerializedSize();
2145
2146
2147 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2148 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2149 if (tooSlow || tooLarge) {
2150
2151
2152 logResponse(new Object[]{param},
2153 md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2154 (tooLarge ? "TooLarge" : "TooSlow"),
2155 status.getClient(), startTime, processingTime, qTime,
2156 responseSize);
2157 }
2158 return new Pair<Message, CellScanner>(result, controller.cellScanner());
2159 } catch (Throwable e) {
2160
2161
2162
2163 if (e instanceof ServiceException) {
2164 if (e.getCause() == null) {
2165 LOG.debug("Caught a ServiceException with null cause", e);
2166 } else {
2167 e = e.getCause();
2168 }
2169 }
2170
2171
2172 metrics.exception(e);
2173
2174 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2175 if (e instanceof IOException) throw (IOException)e;
2176 LOG.error("Unexpected throwable object ", e);
2177 throw new IOException(e.getMessage(), e);
2178 }
2179 }
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195 void logResponse(Object[] params, String methodName, String call, String tag,
2196 String clientAddress, long startTime, int processingTime, int qTime,
2197 long responseSize)
2198 throws IOException {
2199
2200 Map<String, Object> responseInfo = new HashMap<String, Object>();
2201 responseInfo.put("starttimems", startTime);
2202 responseInfo.put("processingtimems", processingTime);
2203 responseInfo.put("queuetimems", qTime);
2204 responseInfo.put("responsesize", responseSize);
2205 responseInfo.put("client", clientAddress);
2206 responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2207 responseInfo.put("method", methodName);
2208 if (params.length == 2 && server instanceof HRegionServer &&
2209 params[0] instanceof byte[] &&
2210 params[1] instanceof Operation) {
2211
2212
2213 TableName tableName = TableName.valueOf(
2214 HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2215 responseInfo.put("table", tableName.getNameAsString());
2216
2217 responseInfo.putAll(((Operation) params[1]).toMap());
2218
2219 LOG.warn("(operation" + tag + "): " +
2220 MAPPER.writeValueAsString(responseInfo));
2221 } else if (params.length == 1 && server instanceof HRegionServer &&
2222 params[0] instanceof Operation) {
2223
2224 responseInfo.putAll(((Operation) params[0]).toMap());
2225
2226 LOG.warn("(operation" + tag + "): " +
2227 MAPPER.writeValueAsString(responseInfo));
2228 } else {
2229
2230
2231 responseInfo.put("call", call);
2232 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2233 }
2234 }
2235
2236
2237 @Override
2238 public synchronized void stop() {
2239 LOG.info("Stopping server on " + port);
2240 running = false;
2241 if (authTokenSecretMgr != null) {
2242 authTokenSecretMgr.stop();
2243 authTokenSecretMgr = null;
2244 }
2245 listener.interrupt();
2246 listener.doStop();
2247 responder.interrupt();
2248 scheduler.stop();
2249 notifyAll();
2250 }
2251
2252
2253
2254
2255
2256
2257 @Override
2258 public synchronized void join() throws InterruptedException {
2259 while (running) {
2260 wait();
2261 }
2262 }
2263
2264
2265
2266
2267
2268
2269
2270 @Override
2271 public synchronized InetSocketAddress getListenerAddress() {
2272 if (listener == null) {
2273 return null;
2274 }
2275 return listener.getAddress();
2276 }
2277
2278
2279
2280
2281
2282 @Override
2283 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2284 this.errorHandler = handler;
2285 }
2286
2287 @Override
2288 public HBaseRPCErrorHandler getErrorHandler() {
2289 return this.errorHandler;
2290 }
2291
2292
2293
2294
2295 @Override
2296 public MetricsHBaseServer getMetrics() {
2297 return metrics;
2298 }
2299
2300 @Override
2301 public void addCallSize(final long diff) {
2302 this.callQueueSize.add(diff);
2303 }
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314 public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2315 throws AuthorizationException {
2316 if (authorize) {
2317 Class<?> c = getServiceInterface(services, connection.getServiceName());
2318 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2319 }
2320 }
2321
2322
2323
2324
2325
2326
2327 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2344 throws IOException {
2345 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2346 if (count > 0) this.metrics.sentBytes(count);
2347 return count;
2348 }
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362 protected int channelRead(ReadableByteChannel channel,
2363 ByteBuffer buffer) throws IOException {
2364
2365 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2366 channel.read(buffer) : channelIO(channel, null, buffer);
2367 if (count > 0) {
2368 metrics.receivedBytes(count);
2369 }
2370 return count;
2371 }
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386 private static int channelIO(ReadableByteChannel readCh,
2387 WritableByteChannel writeCh,
2388 ByteBuffer buf) throws IOException {
2389
2390 int originalLimit = buf.limit();
2391 int initialRemaining = buf.remaining();
2392 int ret = 0;
2393
2394 while (buf.remaining() > 0) {
2395 try {
2396 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2397 buf.limit(buf.position() + ioSize);
2398
2399 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2400
2401 if (ret < ioSize) {
2402 break;
2403 }
2404
2405 } finally {
2406 buf.limit(originalLimit);
2407 }
2408 }
2409
2410 int nBytes = initialRemaining - buf.remaining();
2411 return (nBytes > 0) ? nBytes : ret;
2412 }
2413
2414
2415
2416
2417
2418
2419
2420 public static RpcCallContext getCurrentCall() {
2421 return CurCall.get();
2422 }
2423
2424 public static boolean isInRpcCallContext() {
2425 return CurCall.get() != null;
2426 }
2427
2428
2429
2430
2431
2432
2433 public static User getRequestUser() {
2434 RpcCallContext ctx = getCurrentCall();
2435 return ctx == null? null: ctx.getRequestUser();
2436 }
2437
2438
2439
2440
2441
2442 public static String getRequestUserName() {
2443 User user = getRequestUser();
2444 return user == null? null: user.getShortName();
2445 }
2446
2447
2448
2449
2450 public static InetAddress getRemoteAddress() {
2451 RpcCallContext ctx = getCurrentCall();
2452 return ctx == null? null: ctx.getRemoteAddress();
2453 }
2454
2455
2456
2457
2458
2459
2460 static BlockingServiceAndInterface getServiceAndInterface(
2461 final List<BlockingServiceAndInterface> services, final String serviceName) {
2462 for (BlockingServiceAndInterface bs : services) {
2463 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2464 return bs;
2465 }
2466 }
2467 return null;
2468 }
2469
2470
2471
2472
2473
2474
2475 static Class<?> getServiceInterface(
2476 final List<BlockingServiceAndInterface> services,
2477 final String serviceName) {
2478 BlockingServiceAndInterface bsasi =
2479 getServiceAndInterface(services, serviceName);
2480 return bsasi == null? null: bsasi.getServiceInterface();
2481 }
2482
2483
2484
2485
2486
2487
2488 static BlockingService getService(
2489 final List<BlockingServiceAndInterface> services,
2490 final String serviceName) {
2491 BlockingServiceAndInterface bsasi =
2492 getServiceAndInterface(services, serviceName);
2493 return bsasi == null? null: bsasi.getBlockingService();
2494 }
2495
2496 static MonitoredRPCHandler getStatus() {
2497
2498 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2499 if (status != null) {
2500 return status;
2501 }
2502 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2503 status.pause("Waiting for a call");
2504 RpcServer.MONITORED_RPC.set(status);
2505 return status;
2506 }
2507
2508
2509
2510
2511
2512 public static InetAddress getRemoteIp() {
2513 Call call = CurCall.get();
2514 if (call != null && call.connection.socket != null) {
2515 return call.connection.socket.getInetAddress();
2516 }
2517 return null;
2518 }
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531 public static void bind(ServerSocket socket, InetSocketAddress address,
2532 int backlog) throws IOException {
2533 try {
2534 socket.bind(address, backlog);
2535 } catch (BindException e) {
2536 BindException bindException =
2537 new BindException("Problem binding to " + address + " : " +
2538 e.getMessage());
2539 bindException.initCause(e);
2540 throw bindException;
2541 } catch (SocketException e) {
2542
2543
2544 if ("Unresolved address".equals(e.getMessage())) {
2545 throw new UnknownHostException("Invalid hostname for server: " +
2546 address.getHostName());
2547 }
2548 throw e;
2549 }
2550 }
2551
2552 @Override
2553 public RpcScheduler getScheduler() {
2554 return scheduler;
2555 }
2556 }