1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.net.BindException;
29 import java.net.InetAddress;
30 import java.net.InetSocketAddress;
31 import java.net.ServerSocket;
32 import java.net.Socket;
33 import java.net.SocketException;
34 import java.net.UnknownHostException;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.CancelledKeyException;
37 import java.nio.channels.Channels;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.HashMap;
49 import java.util.Iterator;
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Random;
54 import java.util.concurrent.BlockingQueue;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.LinkedBlockingQueue;
58 import java.util.concurrent.atomic.AtomicInteger;
59
60 import javax.security.sasl.Sasl;
61 import javax.security.sasl.SaslException;
62 import javax.security.sasl.SaslServer;
63
64 import com.google.common.collect.Lists;
65 import org.apache.commons.logging.Log;
66 import org.apache.commons.logging.LogFactory;
67 import org.apache.hadoop.classification.InterfaceAudience;
68 import org.apache.hadoop.conf.Configuration;
69 import org.apache.hadoop.hbase.CellScanner;
70 import org.apache.hadoop.hbase.HConstants;
71 import org.apache.hadoop.hbase.HRegionInfo;
72 import org.apache.hadoop.hbase.client.Operation;
73 import org.apache.hadoop.hbase.codec.Codec;
74 import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException;
75 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
76 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
77 import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
78 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
79 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
80 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
81 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
82 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
83 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
84 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
85 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
86 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
87 import org.apache.hadoop.hbase.regionserver.HRegionServer;
88 import org.apache.hadoop.hbase.security.AuthMethod;
89 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
90 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
91 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
92 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
93 import org.apache.hadoop.hbase.security.SaslStatus;
94 import org.apache.hadoop.hbase.security.SaslUtil;
95 import org.apache.hadoop.hbase.security.User;
96 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
97 import org.apache.hadoop.hbase.util.Bytes;
98 import org.apache.hadoop.hbase.util.Pair;
99 import org.apache.hadoop.io.BytesWritable;
100 import org.apache.hadoop.io.IntWritable;
101 import org.apache.hadoop.io.Writable;
102 import org.apache.hadoop.io.WritableUtils;
103 import org.apache.hadoop.io.compress.CompressionCodec;
104 import org.apache.hadoop.hbase.Server;
105 import org.apache.hadoop.security.AccessControlException;
106 import org.apache.hadoop.security.UserGroupInformation;
107 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
108 import org.apache.hadoop.security.authorize.AuthorizationException;
109 import org.apache.hadoop.security.authorize.PolicyProvider;
110 import org.apache.hadoop.security.authorize.ProxyUsers;
111 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
112 import org.apache.hadoop.security.token.SecretManager;
113 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
114 import org.apache.hadoop.security.token.TokenIdentifier;
115 import org.apache.hadoop.util.StringUtils;
116 import org.cliffc.high_scale_lib.Counter;
117 import org.cloudera.htrace.Sampler;
118 import org.cloudera.htrace.Span;
119 import org.cloudera.htrace.Trace;
120 import org.cloudera.htrace.TraceInfo;
121 import org.cloudera.htrace.impl.NullSpan;
122 import org.codehaus.jackson.map.ObjectMapper;
123
124 import com.google.common.base.Function;
125 import com.google.common.util.concurrent.ThreadFactoryBuilder;
126 import com.google.protobuf.BlockingService;
127 import com.google.protobuf.CodedInputStream;
128 import com.google.protobuf.Descriptors.MethodDescriptor;
129 import com.google.protobuf.Message;
130 import com.google.protobuf.Message.Builder;
131 import com.google.protobuf.ServiceException;
132 import com.google.protobuf.TextFormat;
133
134
135
136
137
138
139
140
141 @InterfaceAudience.Private
142 public class RpcServer implements RpcServerInterface {
143
144
145 public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
146
147 private final boolean authorize;
148 private boolean isSecurityEnabled;
149
150 public static final byte CURRENT_VERSION = 0;
151
152
153
154
155 private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
156
157
158
159
160 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
161
162 static final int BUFFER_INITIAL_SIZE = 1024;
163
164 private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
165
166 private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
167
168 private final int warnDelayedCalls;
169
170 private AtomicInteger delayedCalls;
171 private final IPCUtil ipcUtil;
172
173 private static final String AUTH_FAILED_FOR = "Auth failed for ";
174 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
175 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
176 Server.class.getName());
177 protected SecretManager<TokenIdentifier> secretManager;
178 protected ServiceAuthorizationManager authManager;
179
180 protected static final ThreadLocal<RpcServerInterface> SERVER = new ThreadLocal<RpcServerInterface>();
181 private volatile boolean started = false;
182
183
184
185
186 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
187
188 protected final InetSocketAddress isa;
189 protected int port;
190 private int handlerCount;
191 private int priorityHandlerCount;
192 private int readThreads;
193 protected int maxIdleTime;
194
195
196 protected int thresholdIdleConnections;
197
198
199
200 int maxConnectionsToNuke;
201
202
203
204 protected MetricsHBaseServer metrics;
205
206 protected final Configuration conf;
207
208 private int maxQueueLength;
209 private int maxQueueSize;
210 protected int socketSendBufferSize;
211 protected final boolean tcpNoDelay;
212 protected final boolean tcpKeepAlive;
213 protected final long purgeTimeout;
214
215 volatile protected boolean running = true;
216 protected BlockingQueue<Call> callQueue;
217 protected final Counter callQueueSize = new Counter();
218 protected BlockingQueue<Call> priorityCallQueue;
219
220 protected int highPriorityLevel;
221
222 protected final List<Connection> connectionList =
223 Collections.synchronizedList(new LinkedList<Connection>());
224
225
226 private Listener listener = null;
227 protected Responder responder = null;
228 protected int numConnections = 0;
229 private Handler[] handlers = null;
230 private Handler[] priorityHandlers = null;
231
232 protected BlockingQueue<Call> replicationQueue;
233 private int numOfReplicationHandlers = 0;
234 private Handler[] replicationHandlers = null;
235
236 protected HBaseRPCErrorHandler errorHandler = null;
237
238 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
239 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
240
241
242 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
243 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
244
245 private final int warnResponseTime;
246 private final int warnResponseSize;
247 private final Object serverInstance;
248 private final List<BlockingServiceAndInterface> services;
249
250
251
252
253
254 class Call implements RpcCallContext {
255 protected int id;
256 protected BlockingService service;
257 protected MethodDescriptor md;
258 protected Message param;
259
260 protected CellScanner cellScanner;
261 protected Connection connection;
262 protected long timestamp;
263
264 protected ByteBuffer response;
265 protected boolean delayResponse;
266 protected Responder responder;
267 protected boolean delayReturnValue;
268
269 protected long size;
270 protected boolean isError;
271 protected TraceInfo tinfo;
272
273 Call(int id, final BlockingService service, final MethodDescriptor md, Message param,
274 CellScanner cellScanner, Connection connection, Responder responder, long size,
275 TraceInfo tinfo) {
276 this.id = id;
277 this.service = service;
278 this.md = md;
279 this.param = param;
280 this.cellScanner = cellScanner;
281 this.connection = connection;
282 this.timestamp = System.currentTimeMillis();
283 this.response = null;
284 this.delayResponse = false;
285 this.responder = responder;
286 this.isError = false;
287 this.size = size;
288 this.tinfo = tinfo;
289 }
290
291 @Override
292 public String toString() {
293 String serviceName = this.connection.service != null?
294 this.connection.service.getDescriptorForType().getName(): "null";
295 return "callId: " + this.id + " service: " + serviceName + " methodName: " +
296 ((this.md != null)? this.md.getName(): null) + " param: " +
297 (this.param != null? IPCUtil.getRequestShortTextFormat(this.param): "") +
298 " connection: " + connection.toString();
299 }
300
301 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
302 this.response = response;
303 }
304
305 protected synchronized void setResponse(Object m, final CellScanner cells,
306 Throwable t, String errorMsg) {
307 if (this.isError) return;
308 if (t != null) this.isError = true;
309 ByteBufferOutputStream bbos = null;
310 try {
311 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
312
313 Message result = (Message)m;
314
315 headerBuilder.setCallId(this.id);
316 if (t != null) {
317 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
318 exceptionBuilder.setExceptionClassName(t.getClass().getName());
319 exceptionBuilder.setStackTrace(errorMsg);
320 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
321 if (t instanceof RegionMovedException) {
322
323
324
325 RegionMovedException rme = (RegionMovedException)t;
326 exceptionBuilder.setHostname(rme.getHostname());
327 exceptionBuilder.setPort(rme.getPort());
328 }
329
330 headerBuilder.setException(exceptionBuilder.build());
331 }
332 ByteBuffer cellBlock =
333 ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
334 if (cellBlock != null) {
335 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
336
337 cellBlockBuilder.setLength(cellBlock.limit());
338 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
339 }
340 Message header = headerBuilder.build();
341 bbos = IPCUtil.write(header, result, cellBlock);
342 if (connection.useWrap) {
343 wrapWithSasl(bbos);
344 }
345 } catch (IOException e) {
346 LOG.warn("Exception while creating response " + e);
347 }
348 ByteBuffer bb = null;
349 if (bbos != null) {
350
351 bb = bbos.getByteBuffer();
352 bb.position(0);
353 }
354 this.response = bb;
355 }
356
357 private void wrapWithSasl(ByteBufferOutputStream response)
358 throws IOException {
359 if (connection.useSasl) {
360
361 ByteBuffer buf = response.getByteBuffer();
362 byte[] token;
363
364
365 synchronized (connection.saslServer) {
366 token = connection.saslServer.wrap(buf.array(),
367 buf.arrayOffset(), buf.remaining());
368 }
369 if (LOG.isDebugEnabled())
370 LOG.debug("Adding saslServer wrapped token of size " + token.length
371 + " as call response.");
372 buf.clear();
373 DataOutputStream saslOut = new DataOutputStream(response);
374 saslOut.writeInt(token.length);
375 saslOut.write(token, 0, token.length);
376 }
377 }
378
379 @Override
380 public synchronized void endDelay(Object result) throws IOException {
381 assert this.delayResponse;
382 assert this.delayReturnValue || result == null;
383 this.delayResponse = false;
384 delayedCalls.decrementAndGet();
385 if (this.delayReturnValue) {
386 this.setResponse(result, null, null, null);
387 }
388 this.responder.doRespond(this);
389 }
390
391 @Override
392 public synchronized void endDelay() throws IOException {
393 this.endDelay(null);
394 }
395
396 @Override
397 public synchronized void startDelay(boolean delayReturnValue) {
398 assert !this.delayResponse;
399 this.delayResponse = true;
400 this.delayReturnValue = delayReturnValue;
401 int numDelayed = delayedCalls.incrementAndGet();
402 if (numDelayed > warnDelayedCalls) {
403 LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
404 }
405 }
406
407 @Override
408 public synchronized void endDelayThrowing(Throwable t) throws IOException {
409 this.setResponse(null, null, t, StringUtils.stringifyException(t));
410 this.delayResponse = false;
411 this.sendResponseIfReady();
412 }
413
414 @Override
415 public synchronized boolean isDelayed() {
416 return this.delayResponse;
417 }
418
419 @Override
420 public synchronized boolean isReturnValueDelayed() {
421 return this.delayReturnValue;
422 }
423
424 @Override
425 public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
426 if (!connection.channel.isOpen()) {
427 long afterTime = System.currentTimeMillis() - timestamp;
428 throw new CallerDisconnectedException(
429 "Aborting call " + this + " after " + afterTime + " ms, since " +
430 "caller disconnected");
431 }
432 }
433
434 public long getSize() {
435 return this.size;
436 }
437
438
439
440
441
442
443 public synchronized void sendResponseIfReady() throws IOException {
444 if (!this.delayResponse) {
445 this.responder.doRespond(this);
446 }
447 }
448 }
449
450
451 private class Listener extends Thread {
452
453 private ServerSocketChannel acceptChannel = null;
454 private Selector selector = null;
455 private Reader[] readers = null;
456 private int currentReader = 0;
457 private Random rand = new Random();
458 private long lastCleanupRunTime = 0;
459
460 private long cleanupInterval = 10000;
461
462 private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
463
464 private ExecutorService readPool;
465
466 public Listener(final String name) throws IOException {
467 super(name);
468
469 acceptChannel = ServerSocketChannel.open();
470 acceptChannel.configureBlocking(false);
471
472
473 bind(acceptChannel.socket(), isa, backlogLength);
474 port = acceptChannel.socket().getLocalPort();
475
476 selector= Selector.open();
477
478 readers = new Reader[readThreads];
479 readPool = Executors.newFixedThreadPool(readThreads,
480 new ThreadFactoryBuilder().setNameFormat(
481 "IPC Reader %d on port " + port).setDaemon(true).build());
482 for (int i = 0; i < readThreads; ++i) {
483 Reader reader = new Reader();
484 readers[i] = reader;
485 readPool.execute(reader);
486 }
487 LOG.info(getName() + ": started " + readThreads + " reader(s).");
488
489
490 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
491 this.setName("IPC Server listener on " + port);
492 this.setDaemon(true);
493 }
494
495
496 private class Reader implements Runnable {
497 private volatile boolean adding = false;
498 private final Selector readSelector;
499
500 Reader() throws IOException {
501 this.readSelector = Selector.open();
502 }
503 public void run() {
504 try {
505 doRunLoop();
506 } finally {
507 try {
508 readSelector.close();
509 } catch (IOException ioe) {
510 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
511 }
512 }
513 }
514
515 private synchronized void doRunLoop() {
516 while (running) {
517 SelectionKey key = null;
518 try {
519 readSelector.select();
520 while (adding) {
521 this.wait(1000);
522 }
523
524 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
525 while (iter.hasNext()) {
526 key = iter.next();
527 iter.remove();
528 if (key.isValid()) {
529 if (key.isReadable()) {
530 doRead(key);
531 }
532 }
533 key = null;
534 }
535 } catch (InterruptedException e) {
536 if (running) {
537 LOG.info(getName() + ": unexpectedly interrupted: " +
538 StringUtils.stringifyException(e));
539 }
540 } catch (IOException ex) {
541 LOG.error(getName() + ": error in Reader", ex);
542 }
543 }
544 }
545
546
547
548
549
550
551
552
553 public void startAdd() {
554 adding = true;
555 readSelector.wakeup();
556 }
557
558 public synchronized SelectionKey registerChannel(SocketChannel channel)
559 throws IOException {
560 return channel.register(readSelector, SelectionKey.OP_READ);
561 }
562
563 public synchronized void finishAdd() {
564 adding = false;
565 this.notify();
566 }
567 }
568
569
570
571
572
573
574
575
576 private void cleanupConnections(boolean force) {
577 if (force || numConnections > thresholdIdleConnections) {
578 long currentTime = System.currentTimeMillis();
579 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
580 return;
581 }
582 int start = 0;
583 int end = numConnections - 1;
584 if (!force) {
585 start = rand.nextInt() % numConnections;
586 end = rand.nextInt() % numConnections;
587 int temp;
588 if (end < start) {
589 temp = start;
590 start = end;
591 end = temp;
592 }
593 }
594 int i = start;
595 int numNuked = 0;
596 while (i <= end) {
597 Connection c;
598 synchronized (connectionList) {
599 try {
600 c = connectionList.get(i);
601 } catch (Exception e) {return;}
602 }
603 if (c.timedOut(currentTime)) {
604 if (LOG.isDebugEnabled())
605 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
606 closeConnection(c);
607 numNuked++;
608 end--;
609
610 c = null;
611 if (!force && numNuked == maxConnectionsToNuke) break;
612 }
613 else i++;
614 }
615 lastCleanupRunTime = System.currentTimeMillis();
616 }
617 }
618
619 @Override
620 public void run() {
621 LOG.info(getName() + ": starting");
622 SERVER.set(RpcServer.this);
623
624 while (running) {
625 SelectionKey key = null;
626 try {
627 selector.select();
628 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
629 while (iter.hasNext()) {
630 key = iter.next();
631 iter.remove();
632 try {
633 if (key.isValid()) {
634 if (key.isAcceptable())
635 doAccept(key);
636 }
637 } catch (IOException ignored) {
638 }
639 key = null;
640 }
641 } catch (OutOfMemoryError e) {
642 if (errorHandler != null) {
643 if (errorHandler.checkOOME(e)) {
644 LOG.info(getName() + ": exiting on OutOfMemoryError");
645 closeCurrentConnection(key, e);
646 cleanupConnections(true);
647 return;
648 }
649 } else {
650
651
652
653 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
654 closeCurrentConnection(key, e);
655 cleanupConnections(true);
656 try { Thread.sleep(60000); } catch (Exception ignored) {}
657 }
658 } catch (Exception e) {
659 closeCurrentConnection(key, e);
660 }
661 cleanupConnections(false);
662 }
663 LOG.info(getName() + ": stopping");
664
665 synchronized (this) {
666 try {
667 acceptChannel.close();
668 selector.close();
669 } catch (IOException ignored) { }
670
671 selector= null;
672 acceptChannel= null;
673
674
675 while (!connectionList.isEmpty()) {
676 closeConnection(connectionList.remove(0));
677 }
678 }
679 }
680
681 private void closeCurrentConnection(SelectionKey key, Throwable e) {
682 if (key != null) {
683 Connection c = (Connection)key.attachment();
684 if (c != null) {
685 if (LOG.isDebugEnabled()) {
686 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
687 (e != null ? " on error " + e.getMessage() : ""));
688 }
689 closeConnection(c);
690 key.attach(null);
691 }
692 }
693 }
694
695 InetSocketAddress getAddress() {
696 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
697 }
698
699 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
700 Connection c;
701 ServerSocketChannel server = (ServerSocketChannel) key.channel();
702
703 SocketChannel channel;
704 while ((channel = server.accept()) != null) {
705 channel.configureBlocking(false);
706 channel.socket().setTcpNoDelay(tcpNoDelay);
707 channel.socket().setKeepAlive(tcpKeepAlive);
708
709 Reader reader = getReader();
710 try {
711 reader.startAdd();
712 SelectionKey readKey = reader.registerChannel(channel);
713 c = getConnection(channel, System.currentTimeMillis());
714 readKey.attach(c);
715 synchronized (connectionList) {
716 connectionList.add(numConnections, c);
717 numConnections++;
718 }
719 if (LOG.isDebugEnabled())
720 LOG.debug(getName() + ": connection from " + c.toString() +
721 "; # active connections: " + numConnections +
722 "; # queued calls: " + callQueue.size());
723 } finally {
724 reader.finishAdd();
725 }
726 }
727 }
728
729 void doRead(SelectionKey key) throws InterruptedException {
730 int count = 0;
731 Connection c = (Connection)key.attachment();
732 if (c == null) {
733 return;
734 }
735 c.setLastContact(System.currentTimeMillis());
736 try {
737 count = c.readAndProcess();
738 } catch (InterruptedException ieo) {
739 throw ieo;
740 } catch (Exception e) {
741 LOG.warn(getName() + ": count of bytes read: " + count, e);
742 count = -1;
743 }
744 if (count < 0) {
745 if (LOG.isDebugEnabled()) {
746 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
747 " because read count=" + count +
748 ". Number of active connections: " + numConnections);
749 }
750 closeConnection(c);
751
752 } else {
753 c.setLastContact(System.currentTimeMillis());
754 }
755 }
756
757 synchronized void doStop() {
758 if (selector != null) {
759 selector.wakeup();
760 Thread.yield();
761 }
762 if (acceptChannel != null) {
763 try {
764 acceptChannel.socket().close();
765 } catch (IOException e) {
766 LOG.info(getName() + ": exception in closing listener socket. " + e);
767 }
768 }
769 readPool.shutdownNow();
770 }
771
772
773
774 Reader getReader() {
775 currentReader = (currentReader + 1) % readers.length;
776 return readers[currentReader];
777 }
778 }
779
780
781 protected class Responder extends Thread {
782 private final Selector writeSelector;
783 private int pending;
784
785 Responder() throws IOException {
786 this.setName("IPC Server Responder");
787 this.setDaemon(true);
788 writeSelector = Selector.open();
789 pending = 0;
790 }
791
792 @Override
793 public void run() {
794 LOG.info(getName() + ": starting");
795 SERVER.set(RpcServer.this);
796 try {
797 doRunLoop();
798 } finally {
799 LOG.info(getName() + ": stopping");
800 try {
801 writeSelector.close();
802 } catch (IOException ioe) {
803 LOG.error(getName() + ": couldn't close write selector", ioe);
804 }
805 }
806 }
807
808 private void doRunLoop() {
809 long lastPurgeTime = 0;
810
811 while (running) {
812 try {
813 waitPending();
814 writeSelector.select(purgeTimeout);
815 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
816 while (iter.hasNext()) {
817 SelectionKey key = iter.next();
818 iter.remove();
819 try {
820 if (key.isValid() && key.isWritable()) {
821 doAsyncWrite(key);
822 }
823 } catch (IOException e) {
824 LOG.info(getName() + ": asyncWrite", e);
825 }
826 }
827 long now = System.currentTimeMillis();
828 if (now < lastPurgeTime + purgeTimeout) {
829 continue;
830 }
831 lastPurgeTime = now;
832
833
834
835
836 if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
837 ArrayList<Call> calls;
838
839
840 synchronized (writeSelector.keys()) {
841 calls = new ArrayList<Call>(writeSelector.keys().size());
842 iter = writeSelector.keys().iterator();
843 while (iter.hasNext()) {
844 SelectionKey key = iter.next();
845 Call call = (Call)key.attachment();
846 if (call != null && key.channel() == call.connection.channel) {
847 calls.add(call);
848 }
849 }
850 }
851
852 for(Call call : calls) {
853 try {
854 doPurge(call, now);
855 } catch (IOException e) {
856 LOG.warn(getName() + ": error in purging old calls " + e);
857 }
858 }
859 } catch (OutOfMemoryError e) {
860 if (errorHandler != null) {
861 if (errorHandler.checkOOME(e)) {
862 LOG.info(getName() + ": exiting on OutOfMemoryError");
863 return;
864 }
865 } else {
866
867
868
869
870
871 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
872 try { Thread.sleep(60000); } catch (Exception ignored) {}
873 }
874 } catch (Exception e) {
875 LOG.warn(getName() + ": exception in Responder " +
876 StringUtils.stringifyException(e));
877 }
878 }
879 LOG.info(getName() + ": stopped");
880 }
881
882 private void doAsyncWrite(SelectionKey key) throws IOException {
883 Call call = (Call)key.attachment();
884 if (call == null) {
885 return;
886 }
887 if (key.channel() != call.connection.channel) {
888 throw new IOException("doAsyncWrite: bad channel");
889 }
890
891 synchronized(call.connection.responseQueue) {
892 if (processResponse(call.connection.responseQueue, false)) {
893 try {
894 key.interestOps(0);
895 } catch (CancelledKeyException e) {
896
897
898
899
900
901 LOG.warn("Exception while changing ops : " + e);
902 }
903 }
904 }
905 }
906
907
908
909
910
911 private void doPurge(Call call, long now) throws IOException {
912 synchronized (call.connection.responseQueue) {
913 Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
914 while (iter.hasNext()) {
915 Call nextCall = iter.next();
916 if (now > nextCall.timestamp + purgeTimeout) {
917 closeConnection(nextCall.connection);
918 break;
919 }
920 }
921 }
922 }
923
924
925
926
927 private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
928 throws IOException {
929 boolean error = true;
930 boolean done = false;
931 int numElements;
932 Call call = null;
933 try {
934
935 synchronized (responseQueue) {
936
937
938
939 numElements = responseQueue.size();
940 if (numElements == 0) {
941 error = false;
942 return true;
943 }
944
945
946
947 call = responseQueue.removeFirst();
948 SocketChannel channel = call.connection.channel;
949
950
951
952 int numBytes = channelWrite(channel, call.response);
953 if (numBytes < 0) {
954 return true;
955 }
956 if (!call.response.hasRemaining()) {
957 call.connection.decRpcCount();
958
959 if (numElements == 1) {
960 done = true;
961 } else {
962 done = false;
963 }
964 if (LOG.isDebugEnabled()) {
965 LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
966 }
967 } else {
968
969
970
971
972 call.connection.responseQueue.addFirst(call);
973
974 if (inHandler) {
975
976 call.timestamp = System.currentTimeMillis();
977 if (enqueueInSelector(call))
978 done = true;
979 }
980 if (LOG.isDebugEnabled()) {
981 LOG.debug(getName() + call.toString() + " partially sent, wrote " +
982 numBytes + " bytes.");
983 }
984 }
985 error = false;
986 }
987 } finally {
988 if (error && call != null) {
989 LOG.warn(getName() + call.toString() + ": output error");
990 done = true;
991 closeConnection(call.connection);
992 }
993 }
994 return done;
995 }
996
997
998
999
1000 private boolean enqueueInSelector(Call call) throws IOException {
1001 boolean done = false;
1002 incPending();
1003 try {
1004
1005
1006 SocketChannel channel = call.connection.channel;
1007 writeSelector.wakeup();
1008 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1009 } catch (ClosedChannelException e) {
1010
1011 done = true;
1012 } finally {
1013 decPending();
1014 }
1015 return done;
1016 }
1017
1018
1019
1020
1021 void doRespond(Call call) throws IOException {
1022
1023 call.timestamp = System.currentTimeMillis();
1024
1025 boolean doRegister = false;
1026 synchronized (call.connection.responseQueue) {
1027 call.connection.responseQueue.addLast(call);
1028 if (call.connection.responseQueue.size() == 1) {
1029 doRegister = !processResponse(call.connection.responseQueue, false);
1030 }
1031 }
1032 if (doRegister) {
1033 enqueueInSelector(call);
1034 }
1035 }
1036
1037 private synchronized void incPending() {
1038 pending++;
1039 }
1040
1041 private synchronized void decPending() {
1042 pending--;
1043 notify();
1044 }
1045
1046 private synchronized void waitPending() throws InterruptedException {
1047 while (pending > 0) {
1048 wait();
1049 }
1050 }
1051 }
1052
1053 @SuppressWarnings("serial")
1054 public static class CallQueueTooBigException extends IOException {
1055 CallQueueTooBigException() {
1056 super();
1057 }
1058 }
1059
1060 private Function<Pair<RequestHeader, Message>, Integer> qosFunction = null;
1061
1062
1063
1064
1065
1066
1067
1068 @Override
1069 public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc) {
1070 qosFunction = newFunc;
1071 }
1072
1073 protected int getQosLevel(Pair<RequestHeader, Message> headerAndParam) {
1074 if (qosFunction == null) return 0;
1075 Integer res = qosFunction.apply(headerAndParam);
1076 return res == null? 0: res;
1077 }
1078
1079
1080 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1081 value="VO_VOLATILE_INCREMENT",
1082 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1083 public class Connection {
1084
1085 private boolean connectionPreambleRead = false;
1086
1087 private boolean connectionHeaderRead = false;
1088 protected SocketChannel channel;
1089 private ByteBuffer data;
1090 private ByteBuffer dataLengthBuffer;
1091 protected final LinkedList<Call> responseQueue;
1092 private volatile int rpcCount = 0;
1093 private long lastContact;
1094 private InetAddress addr;
1095 protected Socket socket;
1096
1097
1098 protected String hostAddress;
1099 protected int remotePort;
1100 ConnectionHeader connectionHeader;
1101
1102
1103
1104 private Codec codec;
1105
1106
1107
1108 private CompressionCodec compressionCodec;
1109 BlockingService service;
1110 protected UserGroupInformation user = null;
1111 private AuthMethod authMethod;
1112 private boolean saslContextEstablished;
1113 private boolean skipInitialSaslHandshake;
1114 private ByteBuffer unwrappedData;
1115
1116 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1117 boolean useSasl;
1118 SaslServer saslServer;
1119 private boolean useWrap = false;
1120
1121 private static final int AUTHROIZATION_FAILED_CALLID = -1;
1122 private final Call authFailedCall =
1123 new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, this, null, 0, null);
1124 private ByteArrayOutputStream authFailedResponse =
1125 new ByteArrayOutputStream();
1126
1127 private static final int SASL_CALLID = -33;
1128 private final Call saslCall =
1129 new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null);
1130
1131 public UserGroupInformation attemptingUser = null;
1132
1133 public Connection(SocketChannel channel, long lastContact) {
1134 this.channel = channel;
1135 this.lastContact = lastContact;
1136 this.data = null;
1137 this.dataLengthBuffer = ByteBuffer.allocate(4);
1138 this.socket = channel.socket();
1139 this.addr = socket.getInetAddress();
1140 if (addr == null) {
1141 this.hostAddress = "*Unknown*";
1142 } else {
1143 this.hostAddress = addr.getHostAddress();
1144 }
1145 this.remotePort = socket.getPort();
1146 this.responseQueue = new LinkedList<Call>();
1147 if (socketSendBufferSize != 0) {
1148 try {
1149 socket.setSendBufferSize(socketSendBufferSize);
1150 } catch (IOException e) {
1151 LOG.warn("Connection: unable to set socket send buffer size to " +
1152 socketSendBufferSize);
1153 }
1154 }
1155 }
1156
1157 @Override
1158 public String toString() {
1159 return getHostAddress() + ":" + remotePort;
1160 }
1161
1162 public String getHostAddress() {
1163 return hostAddress;
1164 }
1165
1166 public InetAddress getHostInetAddress() {
1167 return addr;
1168 }
1169
1170 public int getRemotePort() {
1171 return remotePort;
1172 }
1173
1174 public void setLastContact(long lastContact) {
1175 this.lastContact = lastContact;
1176 }
1177
1178 public long getLastContact() {
1179 return lastContact;
1180 }
1181
1182
1183 private boolean isIdle() {
1184 return rpcCount == 0;
1185 }
1186
1187
1188 protected void decRpcCount() {
1189 rpcCount--;
1190 }
1191
1192
1193 protected void incRpcCount() {
1194 rpcCount++;
1195 }
1196
1197 protected boolean timedOut(long currentTime) {
1198 return isIdle() && currentTime - lastContact > maxIdleTime;
1199 }
1200
1201 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1202 throws IOException {
1203 if (authMethod == AuthMethod.DIGEST) {
1204 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1205 secretManager);
1206 UserGroupInformation ugi = tokenId.getUser();
1207 if (ugi == null) {
1208 throw new AccessControlException(
1209 "Can't retrieve username from tokenIdentifier.");
1210 }
1211 ugi.addTokenIdentifier(tokenId);
1212 return ugi;
1213 } else {
1214 return UserGroupInformation.createRemoteUser(authorizedId);
1215 }
1216 }
1217
1218 private void saslReadAndProcess(byte[] saslToken) throws IOException,
1219 InterruptedException {
1220 if (saslContextEstablished) {
1221 if (LOG.isDebugEnabled())
1222 LOG.debug("Have read input token of size " + saslToken.length
1223 + " for processing by saslServer.unwrap()");
1224
1225 if (!useWrap) {
1226 processOneRpc(saslToken);
1227 } else {
1228 byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1229 processUnwrappedData(plaintextData);
1230 }
1231 } else {
1232 byte[] replyToken = null;
1233 try {
1234 if (saslServer == null) {
1235 switch (authMethod) {
1236 case DIGEST:
1237 if (secretManager == null) {
1238 throw new AccessControlException(
1239 "Server is not configured to do DIGEST authentication.");
1240 }
1241 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1242 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1243 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1244 secretManager, this));
1245 break;
1246 default:
1247 UserGroupInformation current = UserGroupInformation
1248 .getCurrentUser();
1249 String fullName = current.getUserName();
1250 if (LOG.isDebugEnabled()) {
1251 LOG.debug("Kerberos principal name is " + fullName);
1252 }
1253 final String names[] = SaslUtil.splitKerberosName(fullName);
1254 if (names.length != 3) {
1255 throw new AccessControlException(
1256 "Kerberos principal name does NOT have the expected "
1257 + "hostname part: " + fullName);
1258 }
1259 current.doAs(new PrivilegedExceptionAction<Object>() {
1260 @Override
1261 public Object run() throws SaslException {
1262 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1263 .getMechanismName(), names[0], names[1],
1264 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1265 return null;
1266 }
1267 });
1268 }
1269 if (saslServer == null)
1270 throw new AccessControlException(
1271 "Unable to find SASL server implementation for "
1272 + authMethod.getMechanismName());
1273 if (LOG.isDebugEnabled()) {
1274 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1275 }
1276 }
1277 if (LOG.isDebugEnabled()) {
1278 LOG.debug("Have read input token of size " + saslToken.length
1279 + " for processing by saslServer.evaluateResponse()");
1280 }
1281 replyToken = saslServer.evaluateResponse(saslToken);
1282 } catch (IOException e) {
1283 IOException sendToClient = e;
1284 Throwable cause = e;
1285 while (cause != null) {
1286 if (cause instanceof InvalidToken) {
1287 sendToClient = (InvalidToken) cause;
1288 break;
1289 }
1290 cause = cause.getCause();
1291 }
1292 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1293 sendToClient.getLocalizedMessage());
1294 metrics.authenticationFailure();
1295 String clientIP = this.toString();
1296
1297 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1298 throw e;
1299 }
1300 if (replyToken != null) {
1301 if (LOG.isDebugEnabled()) {
1302 LOG.debug("Will send token of size " + replyToken.length
1303 + " from saslServer.");
1304 }
1305 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1306 null);
1307 }
1308 if (saslServer.isComplete()) {
1309 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1310 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1311 user = getAuthorizedUgi(saslServer.getAuthorizationID());
1312 if (LOG.isDebugEnabled()) {
1313 LOG.debug("SASL server context established. Authenticated client: "
1314 + user + ". Negotiated QoP is "
1315 + saslServer.getNegotiatedProperty(Sasl.QOP));
1316 }
1317 metrics.authenticationSuccess();
1318 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1319 saslContextEstablished = true;
1320 }
1321 }
1322 }
1323
1324
1325
1326 private void doRawSaslReply(SaslStatus status, Writable rv,
1327 String errorClass, String error) throws IOException {
1328
1329
1330 ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
1331 DataOutputStream out = new DataOutputStream(saslResponse);
1332 out.writeInt(status.state);
1333 if (status == SaslStatus.SUCCESS) {
1334 rv.write(out);
1335 } else {
1336 WritableUtils.writeString(out, errorClass);
1337 WritableUtils.writeString(out, error);
1338 }
1339 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1340 saslCall.responder = responder;
1341 saslCall.sendResponseIfReady();
1342 }
1343
1344 private void disposeSasl() {
1345 if (saslServer != null) {
1346 try {
1347 saslServer.dispose();
1348 saslServer = null;
1349 } catch (SaslException ignored) {
1350 }
1351 }
1352 }
1353
1354
1355
1356
1357
1358
1359
1360
1361 public int readAndProcess() throws IOException, InterruptedException {
1362 while (true) {
1363
1364
1365
1366
1367 int count;
1368 if (this.dataLengthBuffer.remaining() > 0) {
1369 count = channelRead(channel, this.dataLengthBuffer);
1370 if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1371 return count;
1372 }
1373 }
1374
1375 if (!connectionPreambleRead) {
1376
1377 this.dataLengthBuffer.flip();
1378 if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1379 return doBadPreambleHandling("Expected HEADER=" +
1380 Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1381 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1382 " from " + toString());
1383 }
1384
1385 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1386 count = channelRead(channel, versionAndAuthBytes);
1387 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1388 return count;
1389 }
1390 int version = versionAndAuthBytes.get(0);
1391 byte authbyte = versionAndAuthBytes.get(1);
1392 this.authMethod = AuthMethod.valueOf(authbyte);
1393 if (version != CURRENT_VERSION) {
1394 String msg = getFatalConnectionString(version, authbyte);
1395 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1396 }
1397 if (authMethod == null) {
1398 String msg = getFatalConnectionString(version, authbyte);
1399 return doBadPreambleHandling(msg, new BadAuthException(msg));
1400 }
1401 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1402 AccessControlException ae = new AccessControlException("Authentication is required");
1403 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1404 responder.doRespond(authFailedCall);
1405 throw ae;
1406 }
1407 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1408 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1409 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1410 authMethod = AuthMethod.SIMPLE;
1411
1412
1413
1414 skipInitialSaslHandshake = true;
1415 }
1416 if (authMethod != AuthMethod.SIMPLE) {
1417 useSasl = true;
1418 }
1419 connectionPreambleRead = true;
1420
1421 dataLengthBuffer.clear();
1422 continue;
1423 }
1424
1425
1426 if (data == null) {
1427 dataLengthBuffer.flip();
1428 int dataLength = dataLengthBuffer.getInt();
1429 if (dataLength == RpcClient.PING_CALL_ID) {
1430 if (!useWrap) {
1431 dataLengthBuffer.clear();
1432 return 0;
1433 }
1434 }
1435 if (dataLength < 0) {
1436 throw new IllegalArgumentException("Unexpected data length "
1437 + dataLength + "!! from " + getHostAddress());
1438 }
1439 data = ByteBuffer.allocate(dataLength);
1440 incRpcCount();
1441 }
1442 count = channelRead(channel, data);
1443 if (data.remaining() == 0) {
1444 dataLengthBuffer.clear();
1445 data.flip();
1446 if (skipInitialSaslHandshake) {
1447 data = null;
1448 skipInitialSaslHandshake = false;
1449 continue;
1450 }
1451 boolean headerRead = connectionHeaderRead;
1452 if (useSasl) {
1453 saslReadAndProcess(data.array());
1454 } else {
1455 processOneRpc(data.array());
1456 }
1457 this.data = null;
1458 if (!headerRead) {
1459 continue;
1460 }
1461 } else {
1462
1463 if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1464 continue;
1465 }
1466 return count;
1467 }
1468 }
1469
1470 private String getFatalConnectionString(final int version, final byte authByte) {
1471 return "serverVersion=" + CURRENT_VERSION +
1472 ", clientVersion=" + version + ", authMethod=" + authByte +
1473 ", authSupported=" + (authMethod != null) + " from " + toString();
1474 }
1475
1476 private int doBadPreambleHandling(final String msg) throws IOException {
1477 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1478 }
1479
1480 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1481 LOG.warn(msg);
1482 Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null);
1483 setupResponse(null, fakeCall, e, msg);
1484 responder.doRespond(fakeCall);
1485
1486 return -1;
1487 }
1488
1489
1490 private void processConnectionHeader(byte[] buf) throws IOException {
1491 this.connectionHeader = ConnectionHeader.parseFrom(buf);
1492 String serviceName = connectionHeader.getServiceName();
1493 if (serviceName == null) throw new EmptyServiceNameException();
1494 this.service = getService(services, serviceName);
1495 if (this.service == null) throw new UnknownServiceException(serviceName);
1496 setupCellBlockCodecs(this.connectionHeader);
1497 UserGroupInformation protocolUser = createUser(connectionHeader);
1498 if (!useSasl) {
1499 user = protocolUser;
1500 if (user != null) {
1501 user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1502 }
1503 } else {
1504
1505 user.setAuthenticationMethod(authMethod.authenticationMethod);
1506
1507
1508
1509 if ((protocolUser != null)
1510 && (!protocolUser.getUserName().equals(user.getUserName()))) {
1511 if (authMethod == AuthMethod.DIGEST) {
1512
1513 throw new AccessControlException("Authenticated user (" + user
1514 + ") doesn't match what the client claims to be ("
1515 + protocolUser + ")");
1516 } else {
1517
1518
1519
1520 UserGroupInformation realUser = user;
1521 user = UserGroupInformation.createProxyUser(protocolUser
1522 .getUserName(), realUser);
1523
1524 user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1525 }
1526 }
1527 }
1528 }
1529
1530
1531
1532
1533
1534
1535 private void setupCellBlockCodecs(final ConnectionHeader header)
1536 throws FatalConnectionException {
1537
1538 if (!header.hasCellBlockCodecClass()) throw new FatalConnectionException("No codec");
1539 String className = header.getCellBlockCodecClass();
1540 try {
1541 this.codec = (Codec)Class.forName(className).newInstance();
1542 } catch (Exception e) {
1543 throw new UnsupportedCellCodecException(className, e);
1544 }
1545 if (!header.hasCellBlockCompressorClass()) return;
1546 className = header.getCellBlockCompressorClass();
1547 try {
1548 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1549 } catch (Exception e) {
1550 throw new UnsupportedCompressionCodecException(className, e);
1551 }
1552 }
1553
1554 private void processUnwrappedData(byte[] inBuf) throws IOException,
1555 InterruptedException {
1556 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1557
1558 while (true) {
1559 int count = -1;
1560 if (unwrappedDataLengthBuffer.remaining() > 0) {
1561 count = channelRead(ch, unwrappedDataLengthBuffer);
1562 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1563 return;
1564 }
1565
1566 if (unwrappedData == null) {
1567 unwrappedDataLengthBuffer.flip();
1568 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1569
1570 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1571 if (LOG.isDebugEnabled())
1572 LOG.debug("Received ping message");
1573 unwrappedDataLengthBuffer.clear();
1574 continue;
1575 }
1576 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1577 }
1578
1579 count = channelRead(ch, unwrappedData);
1580 if (count <= 0 || unwrappedData.remaining() > 0)
1581 return;
1582
1583 if (unwrappedData.remaining() == 0) {
1584 unwrappedDataLengthBuffer.clear();
1585 unwrappedData.flip();
1586 processOneRpc(unwrappedData.array());
1587 unwrappedData = null;
1588 }
1589 }
1590 }
1591
1592 private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1593 if (connectionHeaderRead) {
1594 processRequest(buf);
1595 } else {
1596 processConnectionHeader(buf);
1597 this.connectionHeaderRead = true;
1598 if (!authorizeConnection()) {
1599
1600
1601 throw new AccessControlException("Connection from " + this + " for service " +
1602 connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1603 }
1604 }
1605 }
1606
1607
1608
1609
1610
1611
1612
1613 protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1614 long totalRequestSize = buf.length;
1615 int offset = 0;
1616
1617
1618 CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1619 int headerSize = cis.readRawVarint32();
1620 offset = cis.getTotalBytesRead();
1621 RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1622 offset += headerSize;
1623 int id = header.getCallId();
1624 if (LOG.isTraceEnabled()) {
1625 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1626 " totalRequestSize: " + totalRequestSize + " bytes");
1627 }
1628
1629
1630 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1631 final Call callTooBig =
1632 new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null);
1633 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1634 setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1635 "Call queue is full, is ipc.server.max.callqueue.size too small?");
1636 responder.doRespond(callTooBig);
1637 return;
1638 }
1639 MethodDescriptor md = null;
1640 Message param = null;
1641 CellScanner cellScanner = null;
1642 try {
1643 if (header.hasRequestParam() && header.getRequestParam()) {
1644 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1645 Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1646
1647 cis = CodedInputStream.newInstance(buf, offset, buf.length);
1648 int paramSize = cis.readRawVarint32();
1649 offset += cis.getTotalBytesRead();
1650 if (builder != null) {
1651 param = builder.mergeFrom(buf, offset, paramSize).build();
1652 }
1653 offset += paramSize;
1654 }
1655 if (header.hasCellBlockMeta()) {
1656 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1657 buf, offset, buf.length);
1658 }
1659 } catch (Throwable t) {
1660 String msg = "Unable to read call parameter from client " + getHostAddress();
1661 LOG.warn(msg, t);
1662 final Call readParamsFailedCall =
1663 new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null);
1664 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1665 setupResponse(responseBuffer, readParamsFailedCall, t,
1666 msg + "; " + t.getMessage());
1667 responder.doRespond(readParamsFailedCall);
1668 return;
1669 }
1670
1671 Call call = null;
1672 if (header.hasTraceInfo()) {
1673 call = new Call(id, this.service, md, param, cellScanner, this, responder, totalRequestSize,
1674 new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()));
1675 } else {
1676 call = new Call(id, this.service, md, param, cellScanner, this, responder,
1677 totalRequestSize, null);
1678 }
1679 callQueueSize.add(totalRequestSize);
1680 Pair<RequestHeader, Message> headerAndParam = new Pair<RequestHeader, Message>(header, param);
1681 if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
1682 priorityCallQueue.put(call);
1683 } else if (replicationQueue != null &&
1684 getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
1685 replicationQueue.put(call);
1686 } else {
1687 callQueue.put(call);
1688 }
1689 }
1690
1691 private boolean authorizeConnection() throws IOException {
1692 try {
1693
1694
1695
1696
1697 if (user != null && user.getRealUser() != null
1698 && (authMethod != AuthMethod.DIGEST)) {
1699 ProxyUsers.authorize(user, this.getHostAddress(), conf);
1700 }
1701 authorize(user, connectionHeader, getHostInetAddress());
1702 if (LOG.isDebugEnabled()) {
1703 LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1704 }
1705 metrics.authorizationSuccess();
1706 } catch (AuthorizationException ae) {
1707 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1708 metrics.authorizationFailure();
1709 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1710 responder.doRespond(authFailedCall);
1711 return false;
1712 }
1713 return true;
1714 }
1715
1716 protected synchronized void close() {
1717 disposeSasl();
1718 data = null;
1719 this.dataLengthBuffer = null;
1720 if (!channel.isOpen())
1721 return;
1722 try {socket.shutdownOutput();} catch(Exception ignored) {}
1723 if (channel.isOpen()) {
1724 try {channel.close();} catch(Exception ignored) {}
1725 }
1726 try {socket.close();} catch(Exception ignored) {}
1727 }
1728
1729 private UserGroupInformation createUser(ConnectionHeader head) {
1730 UserGroupInformation ugi = null;
1731
1732 if (!head.hasUserInfo()) {
1733 return null;
1734 }
1735 UserInformation userInfoProto = head.getUserInfo();
1736 String effectiveUser = null;
1737 if (userInfoProto.hasEffectiveUser()) {
1738 effectiveUser = userInfoProto.getEffectiveUser();
1739 }
1740 String realUser = null;
1741 if (userInfoProto.hasRealUser()) {
1742 realUser = userInfoProto.getRealUser();
1743 }
1744 if (effectiveUser != null) {
1745 if (realUser != null) {
1746 UserGroupInformation realUserUgi =
1747 UserGroupInformation.createRemoteUser(realUser);
1748 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1749 } else {
1750 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1751 }
1752 }
1753 return ugi;
1754 }
1755 }
1756
1757
1758 private class Handler extends Thread {
1759 private final BlockingQueue<Call> myCallQueue;
1760 private MonitoredRPCHandler status;
1761
1762 public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
1763 this.myCallQueue = cq;
1764 this.setDaemon(true);
1765
1766 String threadName = "IPC Server handler " + instanceNumber + " on " + port;
1767 if (cq == priorityCallQueue) {
1768
1769 threadName = "PRI " + threadName;
1770 } else if (cq == replicationQueue) {
1771 threadName = "REPL " + threadName;
1772 }
1773 this.setName(threadName);
1774 this.status = TaskMonitor.get().createRPCStatus(threadName);
1775 }
1776
1777 @Override
1778 public void run() {
1779 LOG.info(getName() + ": starting");
1780 status.setStatus("starting");
1781 SERVER.set(RpcServer.this);
1782 while (running) {
1783 try {
1784 status.pause("Waiting for a call");
1785 Call call = myCallQueue.take();
1786 status.setStatus("Setting up call");
1787 status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
1788 if (LOG.isDebugEnabled()) {
1789 UserGroupInformation remoteUser = call.connection.user;
1790 LOG.debug(call.toString() + " executing as " +
1791 ((remoteUser == null)? "NULL principal": remoteUser.getUserName()));
1792 }
1793 Throwable errorThrowable = null;
1794 String error = null;
1795 Pair<Message, CellScanner> resultPair = null;
1796 CurCall.set(call);
1797 Span currentRequestSpan = NullSpan.getInstance();
1798 try {
1799 if (!started) {
1800 throw new ServerNotRunningYetException("Server is not running yet");
1801 }
1802 if (call.tinfo != null) {
1803 currentRequestSpan = Trace.startSpan(
1804 "handling " + call.toString(), call.tinfo, Sampler.ALWAYS);
1805 }
1806 RequestContext.set(User.create(call.connection.user), getRemoteIp(),
1807 call.connection.service);
1808
1809
1810 resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
1811 status);
1812 } catch (Throwable e) {
1813 LOG.debug(getName() + ": " + call.toString(), e);
1814 errorThrowable = e;
1815 error = StringUtils.stringifyException(e);
1816 } finally {
1817 currentRequestSpan.stop();
1818
1819
1820 RequestContext.clear();
1821 }
1822 CurCall.set(null);
1823 callQueueSize.add(call.getSize() * -1);
1824
1825
1826 if (!call.isDelayed() || !call.isReturnValueDelayed()) {
1827 Message param = resultPair != null? resultPair.getFirst(): null;
1828 CellScanner cells = resultPair != null? resultPair.getSecond(): null;
1829 call.setResponse(param, cells, errorThrowable, error);
1830 }
1831 call.sendResponseIfReady();
1832 status.markComplete("Sent response");
1833 } catch (InterruptedException e) {
1834 if (running) {
1835 LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e));
1836 }
1837 } catch (OutOfMemoryError e) {
1838 if (errorHandler != null) {
1839 if (errorHandler.checkOOME(e)) {
1840 LOG.info(getName() + ": exiting on OutOfMemoryError");
1841 return;
1842 }
1843 } else {
1844
1845 throw e;
1846 }
1847 } catch (ClosedChannelException cce) {
1848 LOG.warn(getName() + ": caught a ClosedChannelException, " +
1849 "this means that the server was processing a " +
1850 "request but the client went away. The error message was: " +
1851 cce.getMessage());
1852 } catch (Exception e) {
1853 LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));
1854 }
1855 }
1856 LOG.info(getName() + ": exiting");
1857 }
1858 }
1859
1860
1861
1862
1863
1864
1865
1866 public static class BlockingServiceAndInterface {
1867 private final BlockingService service;
1868 private final Class<?> serviceInterface;
1869 public BlockingServiceAndInterface(final BlockingService service,
1870 final Class<?> serviceInterface) {
1871 this.service = service;
1872 this.serviceInterface = serviceInterface;
1873 }
1874 public Class<?> getServiceInterface() {
1875 return this.serviceInterface;
1876 }
1877 public BlockingService getBlockingService() {
1878 return this.service;
1879 }
1880 }
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890 public RpcServer(final BlockingService service, final InetSocketAddress isa,
1891 final Configuration conf)
1892 throws IOException {
1893 this(null, "generic", Lists.newArrayList(new BlockingServiceAndInterface(service, null)),
1894 isa, 3, 3, conf,
1895 HConstants.QOS_THRESHOLD);
1896 }
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911 public RpcServer(final Server serverInstance, final String name,
1912 final List<BlockingServiceAndInterface> services,
1913 final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf,
1914 int highPriorityLevel)
1915 throws IOException {
1916 this.serverInstance = serverInstance;
1917 this.services = services;
1918 this.isa = isa;
1919 this.conf = conf;
1920 this.handlerCount = handlerCount;
1921 this.priorityHandlerCount = priorityHandlerCount;
1922 this.socketSendBufferSize = 0;
1923 this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length",
1924 handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
1925 this.maxQueueSize =
1926 this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
1927 this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
1928 this.callQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
1929 if (priorityHandlerCount > 0) {
1930 this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
1931 } else {
1932 this.priorityCallQueue = null;
1933 }
1934 this.highPriorityLevel = highPriorityLevel;
1935 this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1936 this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1937 this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1938 this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
1939 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1940 this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3);
1941 if (numOfReplicationHandlers > 0) {
1942 this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
1943 }
1944
1945 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1946 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1947
1948
1949 listener = new Listener(name);
1950 this.port = listener.getAddress().getPort();
1951
1952 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1953 this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
1954 this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1955
1956 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1957 this.delayedCalls = new AtomicInteger(0);
1958 this.ipcUtil = new IPCUtil(conf);
1959
1960
1961
1962 responder = new Responder();
1963 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1964 this.isSecurityEnabled = User.isHBaseSecurityEnabled(this.conf);
1965 if (isSecurityEnabled) {
1966 HBaseSaslRpcServer.init(conf);
1967 }
1968 }
1969
1970
1971
1972
1973
1974 protected Connection getConnection(SocketChannel channel, long time) {
1975 return new Connection(channel, time);
1976 }
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1988 throws IOException {
1989 if (response != null) response.reset();
1990 call.setResponse(null, null, t, error);
1991 }
1992
1993 protected void closeConnection(Connection connection) {
1994 synchronized (connectionList) {
1995 if (connectionList.remove(connection)) {
1996 numConnections--;
1997 }
1998 }
1999 connection.close();
2000 }
2001
2002 Configuration getConf() {
2003 return conf;
2004 }
2005
2006
2007
2008
2009 @Override
2010 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2011
2012
2013 @Override
2014 public void start() {
2015 startThreads();
2016 openServer();
2017 }
2018
2019
2020
2021
2022 @Override
2023 public void openServer() {
2024 started = true;
2025 }
2026
2027
2028
2029
2030
2031 @Override
2032 public synchronized void startThreads() {
2033 AuthenticationTokenSecretManager mgr = createSecretManager();
2034 if (mgr != null) {
2035 setSecretManager(mgr);
2036 mgr.start();
2037 }
2038 this.authManager = new ServiceAuthorizationManager();
2039 HBasePolicyProvider.init(conf, authManager);
2040 responder.start();
2041 listener.start();
2042 handlers = startHandlers(callQueue, handlerCount);
2043 priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
2044 replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
2045 }
2046
2047 @Override
2048 public void refreshAuthManager(PolicyProvider pp) {
2049 this.authManager.refresh(this.conf, pp);
2050 }
2051
2052 private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
2053 if (numOfHandlers <= 0) {
2054 return null;
2055 }
2056 Handler[] handlers = new Handler[numOfHandlers];
2057 for (int i = 0; i < numOfHandlers; i++) {
2058 handlers[i] = new Handler(queue, i);
2059 handlers[i].start();
2060 }
2061 return handlers;
2062 }
2063
2064 private AuthenticationTokenSecretManager createSecretManager() {
2065 if (!isSecurityEnabled) return null;
2066 if (serverInstance == null) return null;
2067 if (!(serverInstance instanceof org.apache.hadoop.hbase.Server)) return null;
2068 org.apache.hadoop.hbase.Server server = (org.apache.hadoop.hbase.Server)serverInstance;
2069 Configuration conf = server.getConfiguration();
2070 long keyUpdateInterval =
2071 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2072 long maxAge =
2073 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2074 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2075 server.getServerName().toString(), keyUpdateInterval, maxAge);
2076 }
2077
2078 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2079 return this.secretManager;
2080 }
2081
2082 @SuppressWarnings("unchecked")
2083 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2084 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2085 }
2086
2087
2088
2089
2090
2091
2092 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2093 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2094 throws IOException {
2095 try {
2096 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2097
2098 status.setRPCPacket(param);
2099 status.resume("Servicing call");
2100
2101 long startTime = System.currentTimeMillis();
2102 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2103 Message result = service.callBlockingMethod(md, controller, param);
2104 int processingTime = (int) (System.currentTimeMillis() - startTime);
2105 int qTime = (int) (startTime - receiveTime);
2106 if (LOG.isTraceEnabled()) {
2107 LOG.trace(CurCall.get().toString() +
2108 ", response " + TextFormat.shortDebugString(result) +
2109 " queueTime: " + qTime +
2110 " processingTime: " + processingTime);
2111 }
2112 metrics.dequeuedCall(qTime);
2113 metrics.processedCall(processingTime);
2114 long responseSize = result.getSerializedSize();
2115
2116
2117 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2118 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2119 if (tooSlow || tooLarge) {
2120
2121
2122 StringBuilder buffer = new StringBuilder(256);
2123 buffer.append(md.getName());
2124 buffer.append("(");
2125 buffer.append(param.getClass().getName());
2126 buffer.append(")");
2127 logResponse(new Object[]{param},
2128 md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2129 status.getClient(), startTime, processingTime, qTime,
2130 responseSize);
2131 }
2132 return new Pair<Message, CellScanner>(result,
2133 controller != null? controller.cellScanner(): null);
2134 } catch (Throwable e) {
2135
2136
2137
2138 if (e instanceof ServiceException) e = e.getCause();
2139 if (e instanceof IOException) throw (IOException)e;
2140 LOG.error("Unexpected throwable object ", e);
2141 throw new IOException(e.getMessage(), e);
2142 }
2143 }
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159 void logResponse(Object[] params, String methodName, String call, String tag,
2160 String clientAddress, long startTime, int processingTime, int qTime,
2161 long responseSize)
2162 throws IOException {
2163
2164 ObjectMapper mapper = new ObjectMapper();
2165
2166 Map<String, Object> responseInfo = new HashMap<String, Object>();
2167 responseInfo.put("starttimems", startTime);
2168 responseInfo.put("processingtimems", processingTime);
2169 responseInfo.put("queuetimems", qTime);
2170 responseInfo.put("responsesize", responseSize);
2171 responseInfo.put("client", clientAddress);
2172 responseInfo.put("class", serverInstance == null? "": serverInstance.getClass().getSimpleName());
2173 responseInfo.put("method", methodName);
2174 if (params.length == 2 && serverInstance instanceof HRegionServer &&
2175 params[0] instanceof byte[] &&
2176 params[1] instanceof Operation) {
2177
2178
2179 byte [] tableName =
2180 HRegionInfo.parseRegionName((byte[]) params[0])[0];
2181 responseInfo.put("table", Bytes.toStringBinary(tableName));
2182
2183 responseInfo.putAll(((Operation) params[1]).toMap());
2184
2185 LOG.warn("(operation" + tag + "): " +
2186 mapper.writeValueAsString(responseInfo));
2187 } else if (params.length == 1 && serverInstance instanceof HRegionServer &&
2188 params[0] instanceof Operation) {
2189
2190 responseInfo.putAll(((Operation) params[0]).toMap());
2191
2192 LOG.warn("(operation" + tag + "): " +
2193 mapper.writeValueAsString(responseInfo));
2194 } else {
2195
2196
2197 responseInfo.put("call", call);
2198 LOG.warn("(response" + tag + "): " + mapper.writeValueAsString(responseInfo));
2199 }
2200 }
2201
2202
2203 @Override
2204 public synchronized void stop() {
2205 LOG.info("Stopping server on " + port);
2206 running = false;
2207 stopHandlers(handlers);
2208 stopHandlers(priorityHandlers);
2209 stopHandlers(replicationHandlers);
2210 listener.interrupt();
2211 listener.doStop();
2212 responder.interrupt();
2213 notifyAll();
2214 }
2215
2216 private void stopHandlers(Handler[] handlers) {
2217 if (handlers != null) {
2218 for (Handler handler : handlers) {
2219 if (handler != null) {
2220 handler.interrupt();
2221 }
2222 }
2223 }
2224 }
2225
2226
2227
2228
2229
2230
2231 @Override
2232 public synchronized void join() throws InterruptedException {
2233 while (running) {
2234 wait();
2235 }
2236 }
2237
2238
2239
2240
2241
2242 @Override
2243 public synchronized InetSocketAddress getListenerAddress() {
2244 return listener.getAddress();
2245 }
2246
2247
2248
2249
2250
2251 @Override
2252 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2253 this.errorHandler = handler;
2254 }
2255
2256
2257
2258
2259 public MetricsHBaseServer getMetrics() {
2260 return metrics;
2261 }
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271 @SuppressWarnings("static-access")
2272 public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2273 throws AuthorizationException {
2274 if (authorize) {
2275 Class<?> c = getServiceInterface(services, connection.getServiceName());
2276 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2277 }
2278 }
2279
2280
2281
2282
2283
2284
2285 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301 protected int channelWrite(WritableByteChannel channel,
2302 ByteBuffer buffer) throws IOException {
2303
2304 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2305 channel.write(buffer) : channelIO(null, channel, buffer);
2306 if (count > 0) {
2307 metrics.sentBytes(count);
2308 }
2309 return count;
2310 }
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324 protected int channelRead(ReadableByteChannel channel,
2325 ByteBuffer buffer) throws IOException {
2326
2327 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2328 channel.read(buffer) : channelIO(channel, null, buffer);
2329 if (count > 0) {
2330 metrics.receivedBytes(count);
2331 }
2332 return count;
2333 }
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348 private static int channelIO(ReadableByteChannel readCh,
2349 WritableByteChannel writeCh,
2350 ByteBuffer buf) throws IOException {
2351
2352 int originalLimit = buf.limit();
2353 int initialRemaining = buf.remaining();
2354 int ret = 0;
2355
2356 while (buf.remaining() > 0) {
2357 try {
2358 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2359 buf.limit(buf.position() + ioSize);
2360
2361 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2362
2363 if (ret < ioSize) {
2364 break;
2365 }
2366
2367 } finally {
2368 buf.limit(originalLimit);
2369 }
2370 }
2371
2372 int nBytes = initialRemaining - buf.remaining();
2373 return (nBytes > 0) ? nBytes : ret;
2374 }
2375
2376
2377
2378
2379
2380
2381 public static RpcCallContext getCurrentCall() {
2382 return CurCall.get();
2383 }
2384
2385
2386
2387
2388
2389
2390 static BlockingServiceAndInterface getServiceAndInterface(
2391 final List<BlockingServiceAndInterface> services, final String serviceName) {
2392 for (BlockingServiceAndInterface bs : services) {
2393 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2394 return bs;
2395 }
2396 }
2397 return null;
2398 }
2399
2400
2401
2402
2403
2404
2405 static Class<?> getServiceInterface(
2406 final List<BlockingServiceAndInterface> services,
2407 final String serviceName) {
2408 BlockingServiceAndInterface bsasi =
2409 getServiceAndInterface(services, serviceName);
2410 return bsasi == null? null: bsasi.getServiceInterface();
2411 }
2412
2413
2414
2415
2416
2417
2418 static BlockingService getService(
2419 final List<BlockingServiceAndInterface> services,
2420 final String serviceName) {
2421 BlockingServiceAndInterface bsasi =
2422 getServiceAndInterface(services, serviceName);
2423 return bsasi == null? null: bsasi.getBlockingService();
2424 }
2425
2426
2427
2428
2429
2430 public static InetAddress getRemoteIp() {
2431 Call call = CurCall.get();
2432 if (call != null) {
2433 return call.connection.socket.getInetAddress();
2434 }
2435 return null;
2436 }
2437
2438
2439
2440
2441
2442 public static String getRemoteAddress() {
2443 Call call = CurCall.get();
2444 if (call != null) {
2445 return call.connection.getHostAddress();
2446 }
2447 return null;
2448 }
2449
2450
2451
2452
2453
2454
2455
2456
2457 public static RpcServerInterface get() {
2458 return SERVER.get();
2459 }
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471 public static void bind(ServerSocket socket, InetSocketAddress address,
2472 int backlog) throws IOException {
2473 try {
2474 socket.bind(address, backlog);
2475 } catch (BindException e) {
2476 BindException bindException =
2477 new BindException("Problem binding to " + address + " : " +
2478 e.getMessage());
2479 bindException.initCause(e);
2480 throw bindException;
2481 } catch (SocketException e) {
2482
2483
2484 if ("Unresolved address".equals(e.getMessage())) {
2485 throw new UnknownHostException("Invalid hostname for server: " +
2486 address.getHostName());
2487 }
2488 throw e;
2489 }
2490 }
2491 }