View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.classification.InterfaceAudience;
72  import org.apache.hadoop.hbase.classification.InterfaceStability;
73  import org.apache.hadoop.conf.Configuration;
74  import org.apache.hadoop.hbase.CellScanner;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.HBaseIOException;
77  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
100 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
101 import org.apache.hadoop.hbase.regionserver.HRegionServer;
102 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
103 import org.apache.hadoop.hbase.security.AccessDeniedException;
104 import org.apache.hadoop.hbase.security.AuthMethod;
105 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
109 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
110 import org.apache.hadoop.hbase.security.SaslStatus;
111 import org.apache.hadoop.hbase.security.SaslUtil;
112 import org.apache.hadoop.hbase.security.UserProvider;
113 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
114 import org.apache.hadoop.hbase.util.Bytes;
115 import org.apache.hadoop.hbase.util.Counter;
116 import org.apache.hadoop.hbase.util.Pair;
117 import org.apache.hadoop.io.BytesWritable;
118 import org.apache.hadoop.io.IntWritable;
119 import org.apache.hadoop.io.Writable;
120 import org.apache.hadoop.io.WritableUtils;
121 import org.apache.hadoop.io.compress.CompressionCodec;
122 import org.apache.hadoop.security.UserGroupInformation;
123 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
124 import org.apache.hadoop.security.authorize.AuthorizationException;
125 import org.apache.hadoop.security.authorize.PolicyProvider;
126 import org.apache.hadoop.security.authorize.ProxyUsers;
127 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
128 import org.apache.hadoop.security.token.SecretManager;
129 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
130 import org.apache.hadoop.security.token.TokenIdentifier;
131 import org.apache.hadoop.util.StringUtils;
132 import org.codehaus.jackson.map.ObjectMapper;
133 import org.apache.htrace.TraceInfo;
134 
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 import com.google.protobuf.BlockingService;
137 import com.google.protobuf.CodedInputStream;
138 import com.google.protobuf.Descriptors.MethodDescriptor;
139 import com.google.protobuf.Message;
140 import com.google.protobuf.ServiceException;
141 import com.google.protobuf.TextFormat;
142 
143 /**
144  * An RPC server that hosts protobuf described Services.
145  *
146  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
147  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
148  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
149  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
150  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
151  * and loops till done.
152  *
153  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
154  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
155  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
156  * and keep taking while the server is up.
157  *
158  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
159  * queue for Responder to pull from and return result to client.
160  *
161  * @see RpcClientImpl
162  */
163 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
164 @InterfaceStability.Evolving
165 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
166   // LOG is being used in CallRunner and the log level is being changed in tests
167   public static final Log LOG = LogFactory.getLog(RpcServer.class);
168   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
169       = new CallQueueTooBigException();
170 
171   private final boolean authorize;
172   private boolean isSecurityEnabled;
173 
174   public static final byte CURRENT_VERSION = 0;
175 
176   /**
177    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
178    */
179   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
180           "hbase.ipc.server.fallback-to-simple-auth-allowed";
181 
182   /**
183    * How many calls/handler are allowed in the queue.
184    */
185   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
186 
187   /**
188    * The maximum size that we can hold in the RPC queue
189    */
190   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
191 
192   private final IPCUtil ipcUtil;
193 
194   private static final String AUTH_FAILED_FOR = "Auth failed for ";
195   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
196   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
197     Server.class.getName());
198   protected SecretManager<TokenIdentifier> secretManager;
199   protected ServiceAuthorizationManager authManager;
200 
201   /** This is set to Call object before Handler invokes an RPC and ybdie
202    * after the call returns.
203    */
204   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
205 
206   /** Keeps MonitoredRPCHandler per handler thread. */
207   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
208       = new ThreadLocal<MonitoredRPCHandler>();
209 
210   protected final InetSocketAddress bindAddress;
211   protected int port;                             // port we listen on
212   protected InetSocketAddress address;            // inet address we listen on
213   private int readThreads;                        // number of read threads
214   protected int maxIdleTime;                      // the maximum idle time after
215                                                   // which a client may be
216                                                   // disconnected
217   protected int thresholdIdleConnections;         // the number of idle
218                                                   // connections after which we
219                                                   // will start cleaning up idle
220                                                   // connections
221   int maxConnectionsToNuke;                       // the max number of
222                                                   // connections to nuke
223                                                   // during a cleanup
224 
225   protected MetricsHBaseServer metrics;
226 
227   protected final Configuration conf;
228 
229   private int maxQueueSize;
230   protected int socketSendBufferSize;
231   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
232   protected final boolean tcpKeepAlive; // if T then use keepalives
233   protected final long purgeTimeout;    // in milliseconds
234 
235   /**
236    * This flag is used to indicate to sub threads when they should go down.  When we call
237    * {@link #start()}, all threads started will consult this flag on whether they should
238    * keep going.  It is set to false when {@link #stop()} is called.
239    */
240   volatile boolean running = true;
241 
242   /**
243    * This flag is set to true after all threads are up and 'running' and the server is then opened
244    * for business by the call to {@link #start()}.
245    */
246   volatile boolean started = false;
247 
248   /**
249    * This is a running count of the size of all outstanding calls by size.
250    */
251   protected final Counter callQueueSize = new Counter();
252 
253   protected final List<Connection> connectionList =
254     Collections.synchronizedList(new LinkedList<Connection>());
255   //maintain a list
256   //of client connections
257   private Listener listener = null;
258   protected Responder responder = null;
259   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
260   protected int numConnections = 0;
261 
262   protected HBaseRPCErrorHandler errorHandler = null;
263 
264   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
265   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
266 
267   /** Default value for above params */
268   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
269   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
270 
271   private static final ObjectMapper MAPPER = new ObjectMapper();
272 
273   private final int warnResponseTime;
274   private final int warnResponseSize;
275   private final Server server;
276   private final List<BlockingServiceAndInterface> services;
277 
278   private final RpcScheduler scheduler;
279 
280   private UserProvider userProvider;
281 
282   private final BoundedByteBufferPool reservoir;
283 
284   private volatile boolean allowFallbackToSimpleAuth;
285 
286   /**
287    * Used to get details for scan with a scanner_id<br/>
288    * TODO try to figure out a better way and remove reference from regionserver package later.
289    */
290   private RSRpcServices rsRpcServices;
291 
292   /**
293    * Datastructure that holds all necessary to a method invocation and then afterward, carries
294    * the result.
295    */
296   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
297   @InterfaceStability.Evolving
298   public class Call implements RpcCallContext {
299     protected int id;                             // the client's call id
300     protected BlockingService service;
301     protected MethodDescriptor md;
302     protected RequestHeader header;
303     protected Message param;                      // the parameter passed
304     // Optional cell data passed outside of protobufs.
305     protected CellScanner cellScanner;
306     protected Connection connection;              // connection to client
307     protected long timestamp;      // the time received when response is null
308                                    // the time served when response is not null
309     /**
310      * Chain of buffers to send as response.
311      */
312     protected BufferChain response;
313     protected Responder responder;
314 
315     protected long size;                          // size of current call
316     protected boolean isError;
317     protected TraceInfo tinfo;
318     private ByteBuffer cellBlock = null;
319 
320     private User user;
321     private InetAddress remoteAddress;
322 
323     private long responseCellSize = 0;
324     private long responseBlockSize = 0;
325     private boolean retryImmediatelySupported;
326 
327     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
328         justification="Can't figure why this complaint is happening... see below")
329     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
330          Message param, CellScanner cellScanner, Connection connection, Responder responder,
331          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
332       this.id = id;
333       this.service = service;
334       this.md = md;
335       this.header = header;
336       this.param = param;
337       this.cellScanner = cellScanner;
338       this.connection = connection;
339       this.timestamp = System.currentTimeMillis();
340       this.response = null;
341       this.responder = responder;
342       this.isError = false;
343       this.size = size;
344       this.tinfo = tinfo;
345       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
346       this.remoteAddress = remoteAddress;
347       this.retryImmediatelySupported =
348           connection == null? null: connection.retryImmediatelySupported;
349     }
350 
351     /**
352      * Call is done. Execution happened and we returned results to client. It is now safe to
353      * cleanup.
354      */
355     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
356         justification="Presume the lock on processing request held by caller is protection enough")
357     void done() {
358       if (this.cellBlock != null && reservoir != null) {
359         // Return buffer to reservoir now we are done with it.
360         reservoir.putBuffer(this.cellBlock);
361         this.cellBlock = null;
362       }
363       this.connection.decRpcCount();  // Say that we're done with this call.
364     }
365 
366     @Override
367     public String toString() {
368       return toShortString() + " param: " +
369         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
370         " connection: " + connection.toString();
371     }
372 
373     protected RequestHeader getHeader() {
374       return this.header;
375     }
376 
377     public boolean hasPriority() {
378       return this.header.hasPriority();
379     }
380 
381     public int getPriority() {
382       return this.header.getPriority();
383     }
384 
385     /*
386      * Short string representation without param info because param itself could be huge depends on
387      * the payload of a command
388      */
389     String toShortString() {
390       String serviceName = this.connection.service != null ?
391           this.connection.service.getDescriptorForType().getName() : "null";
392       return "callId: " + this.id + " service: " + serviceName +
393           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
394           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
395           " connection: " + connection.toString();
396     }
397 
398     String toTraceString() {
399       String serviceName = this.connection.service != null ?
400                            this.connection.service.getDescriptorForType().getName() : "";
401       String methodName = (this.md != null) ? this.md.getName() : "";
402       return serviceName + "." + methodName;
403     }
404 
405     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
406       this.response = new BufferChain(response);
407     }
408 
409     protected synchronized void setResponse(Object m, final CellScanner cells,
410         Throwable t, String errorMsg) {
411       if (this.isError) return;
412       if (t != null) this.isError = true;
413       BufferChain bc = null;
414       try {
415         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
416         // Presume it a pb Message.  Could be null.
417         Message result = (Message)m;
418         // Call id.
419         headerBuilder.setCallId(this.id);
420         if (t != null) {
421           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
422           exceptionBuilder.setExceptionClassName(t.getClass().getName());
423           exceptionBuilder.setStackTrace(errorMsg);
424           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
425             t instanceof NeedUnmanagedConnectionException);
426           if (t instanceof RegionMovedException) {
427             // Special casing for this exception.  This is only one carrying a payload.
428             // Do this instead of build a generic system for allowing exceptions carry
429             // any kind of payload.
430             RegionMovedException rme = (RegionMovedException)t;
431             exceptionBuilder.setHostname(rme.getHostname());
432             exceptionBuilder.setPort(rme.getPort());
433           }
434           // Set the exception as the result of the method invocation.
435           headerBuilder.setException(exceptionBuilder.build());
436         }
437         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
438         // reservoir when finished. This is hacky and the hack is not contained but benefits are
439         // high when we can avoid a big buffer allocation on each rpc.
440         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
441           this.connection.compressionCodec, cells, reservoir);
442         if (this.cellBlock != null) {
443           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
444           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
445           cellBlockBuilder.setLength(this.cellBlock.limit());
446           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
447         }
448         Message header = headerBuilder.build();
449 
450         // Organize the response as a set of bytebuffers rather than collect it all together inside
451         // one big byte array; save on allocations.
452         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
453         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
454         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
455           (this.cellBlock == null? 0: this.cellBlock.limit());
456         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
457         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
458         if (connection.useWrap) {
459           bc = wrapWithSasl(bc);
460         }
461       } catch (IOException e) {
462         LOG.warn("Exception while creating response " + e);
463       }
464       this.response = bc;
465     }
466 
467     private BufferChain wrapWithSasl(BufferChain bc)
468         throws IOException {
469       if (!this.connection.useSasl) return bc;
470       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
471       // THIS IS A BIG UGLY COPY.
472       byte [] responseBytes = bc.getBytes();
473       byte [] token;
474       // synchronization may be needed since there can be multiple Handler
475       // threads using saslServer to wrap responses.
476       synchronized (connection.saslServer) {
477         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
478       }
479       if (LOG.isTraceEnabled()) {
480         LOG.trace("Adding saslServer wrapped token of size " + token.length
481             + " as call response.");
482       }
483 
484       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
485       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
486       return new BufferChain(bbTokenLength, bbTokenBytes);
487     }
488 
489     @Override
490     public boolean isClientCellBlockSupported() {
491       return this.connection != null && this.connection.codec != null;
492     }
493 
494     @Override
495     public long disconnectSince() {
496       if (!connection.channel.isOpen()) {
497         return System.currentTimeMillis() - timestamp;
498       } else {
499         return -1L;
500       }
501     }
502 
503     public long getSize() {
504       return this.size;
505     }
506 
507     public long getResponseCellSize() {
508       return responseCellSize;
509     }
510 
511     public void incrementResponseCellSize(long cellSize) {
512       responseCellSize += cellSize;
513     }
514 
515     @Override
516     public long getResponseBlockSize() {
517       return responseBlockSize;
518     }
519 
520     @Override
521     public void incrementResponseBlockSize(long blockSize) {
522       responseBlockSize += blockSize;
523     }
524 
525     public synchronized void sendResponseIfReady() throws IOException {
526       // set param null to reduce memory pressure
527       this.param = null;
528       this.responder.doRespond(this);
529     }
530 
531     public UserGroupInformation getRemoteUser() {
532       return connection.ugi;
533     }
534 
535     @Override
536     public User getRequestUser() {
537       return user;
538     }
539 
540     @Override
541     public String getRequestUserName() {
542       User user = getRequestUser();
543       return user == null? null: user.getShortName();
544     }
545 
546     @Override
547     public InetAddress getRemoteAddress() {
548       return remoteAddress;
549     }
550 
551     @Override
552     public VersionInfo getClientVersionInfo() {
553       return connection.getVersionInfo();
554     }
555 
556     @Override
557     public boolean isRetryImmediatelySupported() {
558       return retryImmediatelySupported;
559     }
560   }
561 
562   /** Listens on the socket. Creates jobs for the handler threads*/
563   private class Listener extends Thread {
564 
565     private ServerSocketChannel acceptChannel = null; //the accept channel
566     private Selector selector = null; //the selector that we use for the server
567     private Reader[] readers = null;
568     private int currentReader = 0;
569     private Random rand = new Random();
570     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
571                                          //-tion (for idle connections) ran
572     private long cleanupInterval = 10000; //the minimum interval between
573                                           //two cleanup runs
574     private int backlogLength;
575 
576     private ExecutorService readPool;
577 
578     public Listener(final String name) throws IOException {
579       super(name);
580       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
581       // Create a new server socket and set to non blocking mode
582       acceptChannel = ServerSocketChannel.open();
583       acceptChannel.configureBlocking(false);
584 
585       // Bind the server socket to the binding addrees (can be different from the default interface)
586       bind(acceptChannel.socket(), bindAddress, backlogLength);
587       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
588       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
589 
590       // create a selector;
591       selector= Selector.open();
592 
593       readers = new Reader[readThreads];
594       readPool = Executors.newFixedThreadPool(readThreads,
595         new ThreadFactoryBuilder().setNameFormat(
596           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
597           ",port=" + port).setDaemon(true).build());
598       for (int i = 0; i < readThreads; ++i) {
599         Reader reader = new Reader();
600         readers[i] = reader;
601         readPool.execute(reader);
602       }
603       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
604 
605       // Register accepts on the server socket with the selector.
606       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
607       this.setName("RpcServer.listener,port=" + port);
608       this.setDaemon(true);
609     }
610 
611 
612     private class Reader implements Runnable {
613       private volatile boolean adding = false;
614       private final Selector readSelector;
615 
616       Reader() throws IOException {
617         this.readSelector = Selector.open();
618       }
619       @Override
620       public void run() {
621         try {
622           doRunLoop();
623         } finally {
624           try {
625             readSelector.close();
626           } catch (IOException ioe) {
627             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
628           }
629         }
630       }
631 
632       private synchronized void doRunLoop() {
633         while (running) {
634           try {
635             readSelector.select();
636             while (adding) {
637               this.wait(1000);
638             }
639 
640             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
641             while (iter.hasNext()) {
642               SelectionKey key = iter.next();
643               iter.remove();
644               if (key.isValid()) {
645                 if (key.isReadable()) {
646                   doRead(key);
647                 }
648               }
649             }
650           } catch (InterruptedException e) {
651             LOG.debug("Interrupted while sleeping");
652             return;
653           } catch (IOException ex) {
654             LOG.info(getName() + ": IOException in Reader", ex);
655           } catch (OutOfMemoryError e) {
656             if (getErrorHandler() != null) {
657               if (getErrorHandler().checkOOME(e)) {
658                 RpcServer.LOG.info(Thread.currentThread().getName()
659                     + ": exiting on OutOfMemoryError");
660                 return;
661               }
662             } else {
663               // rethrow if no handler
664               throw e;
665             }
666           }
667         }
668       }
669 
670       /**
671        * This gets reader into the state that waits for the new channel
672        * to be registered with readSelector. If it was waiting in select()
673        * the thread will be woken up, otherwise whenever select() is called
674        * it will return even if there is nothing to read and wait
675        * in while(adding) for finishAdd call
676        */
677       public void startAdd() {
678         adding = true;
679         readSelector.wakeup();
680       }
681 
682       public synchronized SelectionKey registerChannel(SocketChannel channel)
683         throws IOException {
684         return channel.register(readSelector, SelectionKey.OP_READ);
685       }
686 
687       public synchronized void finishAdd() {
688         adding = false;
689         this.notify();
690       }
691     }
692 
693     /** cleanup connections from connectionList. Choose a random range
694      * to scan and also have a limit on the number of the connections
695      * that will be cleanedup per run. The criteria for cleanup is the time
696      * for which the connection was idle. If 'force' is true then all
697      * connections will be looked at for the cleanup.
698      * @param force all connections will be looked at for cleanup
699      */
700     private void cleanupConnections(boolean force) {
701       if (force || numConnections > thresholdIdleConnections) {
702         long currentTime = System.currentTimeMillis();
703         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
704           return;
705         }
706         int start = 0;
707         int end = numConnections - 1;
708         if (!force) {
709           start = rand.nextInt() % numConnections;
710           end = rand.nextInt() % numConnections;
711           int temp;
712           if (end < start) {
713             temp = start;
714             start = end;
715             end = temp;
716           }
717         }
718         int i = start;
719         int numNuked = 0;
720         while (i <= end) {
721           Connection c;
722           synchronized (connectionList) {
723             try {
724               c = connectionList.get(i);
725             } catch (Exception e) {return;}
726           }
727           if (c.timedOut(currentTime)) {
728             if (LOG.isDebugEnabled())
729               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
730             closeConnection(c);
731             numNuked++;
732             end--;
733             //noinspection UnusedAssignment
734             c = null;
735             if (!force && numNuked == maxConnectionsToNuke) break;
736           }
737           else i++;
738         }
739         lastCleanupRunTime = System.currentTimeMillis();
740       }
741     }
742 
743     @Override
744     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
745       justification="selector access is not synchronized; seems fine but concerned changing " +
746         "it will have per impact")
747     public void run() {
748       LOG.info(getName() + ": starting");
749       while (running) {
750         SelectionKey key = null;
751         try {
752           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
753           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
754           while (iter.hasNext()) {
755             key = iter.next();
756             iter.remove();
757             try {
758               if (key.isValid()) {
759                 if (key.isAcceptable())
760                   doAccept(key);
761               }
762             } catch (IOException ignored) {
763               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
764             }
765             key = null;
766           }
767         } catch (OutOfMemoryError e) {
768           if (errorHandler != null) {
769             if (errorHandler.checkOOME(e)) {
770               LOG.info(getName() + ": exiting on OutOfMemoryError");
771               closeCurrentConnection(key, e);
772               cleanupConnections(true);
773               return;
774             }
775           } else {
776             // we can run out of memory if we have too many threads
777             // log the event and sleep for a minute and give
778             // some thread(s) a chance to finish
779             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
780             closeCurrentConnection(key, e);
781             cleanupConnections(true);
782             try {
783               Thread.sleep(60000);
784             } catch (InterruptedException ex) {
785               LOG.debug("Interrupted while sleeping");
786               return;
787             }
788           }
789         } catch (Exception e) {
790           closeCurrentConnection(key, e);
791         }
792         cleanupConnections(false);
793       }
794 
795       LOG.info(getName() + ": stopping");
796 
797       synchronized (this) {
798         try {
799           acceptChannel.close();
800           selector.close();
801         } catch (IOException ignored) {
802           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
803         }
804 
805         selector= null;
806         acceptChannel= null;
807 
808         // clean up all connections
809         while (!connectionList.isEmpty()) {
810           closeConnection(connectionList.remove(0));
811         }
812       }
813     }
814 
815     private void closeCurrentConnection(SelectionKey key, Throwable e) {
816       if (key != null) {
817         Connection c = (Connection)key.attachment();
818         if (c != null) {
819           if (LOG.isDebugEnabled()) {
820             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
821                 (e != null ? " on error " + e.getMessage() : ""));
822           }
823           closeConnection(c);
824           key.attach(null);
825         }
826       }
827     }
828 
829     InetSocketAddress getAddress() {
830       return address;
831     }
832 
833     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
834       Connection c;
835       ServerSocketChannel server = (ServerSocketChannel) key.channel();
836 
837       SocketChannel channel;
838       while ((channel = server.accept()) != null) {
839         try {
840           channel.configureBlocking(false);
841           channel.socket().setTcpNoDelay(tcpNoDelay);
842           channel.socket().setKeepAlive(tcpKeepAlive);
843         } catch (IOException ioe) {
844           channel.close();
845           throw ioe;
846         }
847 
848         Reader reader = getReader();
849         try {
850           reader.startAdd();
851           SelectionKey readKey = reader.registerChannel(channel);
852           c = getConnection(channel, System.currentTimeMillis());
853           readKey.attach(c);
854           synchronized (connectionList) {
855             connectionList.add(numConnections, c);
856             numConnections++;
857           }
858           if (LOG.isDebugEnabled())
859             LOG.debug(getName() + ": connection from " + c.toString() +
860                 "; # active connections: " + numConnections);
861         } finally {
862           reader.finishAdd();
863         }
864       }
865     }
866 
867     void doRead(SelectionKey key) throws InterruptedException {
868       int count;
869       Connection c = (Connection) key.attachment();
870       if (c == null) {
871         return;
872       }
873       c.setLastContact(System.currentTimeMillis());
874       try {
875         count = c.readAndProcess();
876 
877         if (count > 0) {
878           c.setLastContact(System.currentTimeMillis());
879         }
880 
881       } catch (InterruptedException ieo) {
882         throw ieo;
883       } catch (Exception e) {
884         if (LOG.isDebugEnabled()) {
885           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
886         }
887         count = -1; //so that the (count < 0) block is executed
888       }
889       if (count < 0) {
890         if (LOG.isDebugEnabled()) {
891           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
892               " because read count=" + count +
893               ". Number of active connections: " + numConnections);
894         }
895         closeConnection(c);
896       }
897     }
898 
899     synchronized void doStop() {
900       if (selector != null) {
901         selector.wakeup();
902         Thread.yield();
903       }
904       if (acceptChannel != null) {
905         try {
906           acceptChannel.socket().close();
907         } catch (IOException e) {
908           LOG.info(getName() + ": exception in closing listener socket. " + e);
909         }
910       }
911       readPool.shutdownNow();
912     }
913 
914     // The method that will return the next reader to work with
915     // Simplistic implementation of round robin for now
916     Reader getReader() {
917       currentReader = (currentReader + 1) % readers.length;
918       return readers[currentReader];
919     }
920   }
921 
922   // Sends responses of RPC back to clients.
923   protected class Responder extends Thread {
924     private final Selector writeSelector;
925     private final Set<Connection> writingCons =
926         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
927 
928     Responder() throws IOException {
929       this.setName("RpcServer.responder");
930       this.setDaemon(true);
931       writeSelector = Selector.open(); // create a selector
932     }
933 
934     @Override
935     public void run() {
936       LOG.info(getName() + ": starting");
937       try {
938         doRunLoop();
939       } finally {
940         LOG.info(getName() + ": stopping");
941         try {
942           writeSelector.close();
943         } catch (IOException ioe) {
944           LOG.error(getName() + ": couldn't close write selector", ioe);
945         }
946       }
947     }
948 
949     /**
950      * Take the list of the connections that want to write, and register them
951      * in the selector.
952      */
953     private void registerWrites() {
954       Iterator<Connection> it = writingCons.iterator();
955       while (it.hasNext()) {
956         Connection c = it.next();
957         it.remove();
958         SelectionKey sk = c.channel.keyFor(writeSelector);
959         try {
960           if (sk == null) {
961             try {
962               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
963             } catch (ClosedChannelException e) {
964               // ignore: the client went away.
965               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
966             }
967           } else {
968             sk.interestOps(SelectionKey.OP_WRITE);
969           }
970         } catch (CancelledKeyException e) {
971           // ignore: the client went away.
972           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
973         }
974       }
975     }
976 
977     /**
978      * Add a connection to the list that want to write,
979      */
980     public void registerForWrite(Connection c) {
981       if (writingCons.add(c)) {
982         writeSelector.wakeup();
983       }
984     }
985 
986     private void doRunLoop() {
987       long lastPurgeTime = 0;   // last check for old calls.
988       while (running) {
989         try {
990           registerWrites();
991           int keyCt = writeSelector.select(purgeTimeout);
992           if (keyCt == 0) {
993             continue;
994           }
995 
996           Set<SelectionKey> keys = writeSelector.selectedKeys();
997           Iterator<SelectionKey> iter = keys.iterator();
998           while (iter.hasNext()) {
999             SelectionKey key = iter.next();
1000             iter.remove();
1001             try {
1002               if (key.isValid() && key.isWritable()) {
1003                 doAsyncWrite(key);
1004               }
1005             } catch (IOException e) {
1006               LOG.debug(getName() + ": asyncWrite", e);
1007             }
1008           }
1009 
1010           lastPurgeTime = purge(lastPurgeTime);
1011 
1012         } catch (OutOfMemoryError e) {
1013           if (errorHandler != null) {
1014             if (errorHandler.checkOOME(e)) {
1015               LOG.info(getName() + ": exiting on OutOfMemoryError");
1016               return;
1017             }
1018           } else {
1019             //
1020             // we can run out of memory if we have too many threads
1021             // log the event and sleep for a minute and give
1022             // some thread(s) a chance to finish
1023             //
1024             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1025             try {
1026               Thread.sleep(60000);
1027             } catch (InterruptedException ex) {
1028               LOG.debug("Interrupted while sleeping");
1029               return;
1030             }
1031           }
1032         } catch (Exception e) {
1033           LOG.warn(getName() + ": exception in Responder " +
1034               StringUtils.stringifyException(e), e);
1035         }
1036       }
1037       LOG.info(getName() + ": stopped");
1038     }
1039 
1040     /**
1041      * If there were some calls that have not been sent out for a
1042      * long time, we close the connection.
1043      * @return the time of the purge.
1044      */
1045     private long purge(long lastPurgeTime) {
1046       long now = System.currentTimeMillis();
1047       if (now < lastPurgeTime + purgeTimeout) {
1048         return lastPurgeTime;
1049       }
1050 
1051       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1052       // get the list of channels from list of keys.
1053       synchronized (writeSelector.keys()) {
1054         for (SelectionKey key : writeSelector.keys()) {
1055           Connection connection = (Connection) key.attachment();
1056           if (connection == null) {
1057             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1058           }
1059           Call call = connection.responseQueue.peekFirst();
1060           if (call != null && now > call.timestamp + purgeTimeout) {
1061             conWithOldCalls.add(call.connection);
1062           }
1063         }
1064       }
1065 
1066       // Seems safer to close the connection outside of the synchronized loop...
1067       for (Connection connection : conWithOldCalls) {
1068         closeConnection(connection);
1069       }
1070 
1071       return now;
1072     }
1073 
1074     private void doAsyncWrite(SelectionKey key) throws IOException {
1075       Connection connection = (Connection) key.attachment();
1076       if (connection == null) {
1077         throw new IOException("doAsyncWrite: no connection");
1078       }
1079       if (key.channel() != connection.channel) {
1080         throw new IOException("doAsyncWrite: bad channel");
1081       }
1082 
1083       if (processAllResponses(connection)) {
1084         try {
1085           // We wrote everything, so we don't need to be told when the socket is ready for
1086           //  write anymore.
1087          key.interestOps(0);
1088         } catch (CancelledKeyException e) {
1089           /* The Listener/reader might have closed the socket.
1090            * We don't explicitly cancel the key, so not sure if this will
1091            * ever fire.
1092            * This warning could be removed.
1093            */
1094           LOG.warn("Exception while changing ops : " + e);
1095         }
1096       }
1097     }
1098 
1099     /**
1100      * Process the response for this call. You need to have the lock on
1101      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1102      *
1103      * @param call the call
1104      * @return true if we proceed the call fully, false otherwise.
1105      * @throws IOException
1106      */
1107     private boolean processResponse(final Call call) throws IOException {
1108       boolean error = true;
1109       try {
1110         // Send as much data as we can in the non-blocking fashion
1111         long numBytes = channelWrite(call.connection.channel, call.response);
1112         if (numBytes < 0) {
1113           throw new HBaseIOException("Error writing on the socket " +
1114             "for the call:" + call.toShortString());
1115         }
1116         error = false;
1117       } finally {
1118         if (error) {
1119           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1120           closeConnection(call.connection);
1121         }
1122       }
1123 
1124       if (!call.response.hasRemaining()) {
1125         call.done();
1126         return true;
1127       } else {
1128         return false; // Socket can't take more, we will have to come back.
1129       }
1130     }
1131 
1132     /**
1133      * Process all the responses for this connection
1134      *
1135      * @return true if all the calls were processed or that someone else is doing it.
1136      * false if there * is still some work to do. In this case, we expect the caller to
1137      * delay us.
1138      * @throws IOException
1139      */
1140     private boolean processAllResponses(final Connection connection) throws IOException {
1141       // We want only one writer on the channel for a connection at a time.
1142       connection.responseWriteLock.lock();
1143       try {
1144         for (int i = 0; i < 20; i++) {
1145           // protection if some handlers manage to need all the responder
1146           Call call = connection.responseQueue.pollFirst();
1147           if (call == null) {
1148             return true;
1149           }
1150           if (!processResponse(call)) {
1151             connection.responseQueue.addFirst(call);
1152             return false;
1153           }
1154         }
1155       } finally {
1156         connection.responseWriteLock.unlock();
1157       }
1158 
1159       return connection.responseQueue.isEmpty();
1160     }
1161 
1162     //
1163     // Enqueue a response from the application.
1164     //
1165     void doRespond(Call call) throws IOException {
1166       boolean added = false;
1167 
1168       // If there is already a write in progress, we don't wait. This allows to free the handlers
1169       //  immediately for other tasks.
1170       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1171         try {
1172           if (call.connection.responseQueue.isEmpty()) {
1173             // If we're alone, we can try to do a direct call to the socket. It's
1174             //  an optimisation to save on context switches and data transfer between cores..
1175             if (processResponse(call)) {
1176               return; // we're done.
1177             }
1178             // Too big to fit, putting ahead.
1179             call.connection.responseQueue.addFirst(call);
1180             added = true; // We will register to the selector later, outside of the lock.
1181           }
1182         } finally {
1183           call.connection.responseWriteLock.unlock();
1184         }
1185       }
1186 
1187       if (!added) {
1188         call.connection.responseQueue.addLast(call);
1189       }
1190       call.responder.registerForWrite(call.connection);
1191 
1192       // set the serve time when the response has to be sent later
1193       call.timestamp = System.currentTimeMillis();
1194     }
1195   }
1196 
1197   /** Reads calls from a connection and queues them for handling. */
1198   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1199       value="VO_VOLATILE_INCREMENT",
1200       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1201   public class Connection {
1202     // If initial preamble with version and magic has been read or not.
1203     private boolean connectionPreambleRead = false;
1204     // If the connection header has been read or not.
1205     private boolean connectionHeaderRead = false;
1206     protected SocketChannel channel;
1207     private ByteBuffer data;
1208     private ByteBuffer dataLengthBuffer;
1209     private ByteBuffer preambleBuffer;
1210     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1211     private final Lock responseWriteLock = new ReentrantLock();
1212     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1213     private long lastContact;
1214     private InetAddress addr;
1215     protected Socket socket;
1216     // Cache the remote host & port info so that even if the socket is
1217     // disconnected, we can say where it used to connect to.
1218     protected String hostAddress;
1219     protected int remotePort;
1220     ConnectionHeader connectionHeader;
1221     /**
1222      * Codec the client asked use.
1223      */
1224     private Codec codec;
1225     /**
1226      * Compression codec the client asked us use.
1227      */
1228     private CompressionCodec compressionCodec;
1229     BlockingService service;
1230 
1231     private AuthMethod authMethod;
1232     private boolean saslContextEstablished;
1233     private boolean skipInitialSaslHandshake;
1234     private ByteBuffer unwrappedData;
1235     // When is this set?  FindBugs wants to know!  Says NP
1236     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1237     boolean useSasl;
1238     SaslServer saslServer;
1239     private boolean useWrap = false;
1240     // Fake 'call' for failed authorization response
1241     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1242     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1243         null, null, this, null, 0, null, null);
1244     private ByteArrayOutputStream authFailedResponse =
1245         new ByteArrayOutputStream();
1246     // Fake 'call' for SASL context setup
1247     private static final int SASL_CALLID = -33;
1248     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1249         0, null, null);
1250 
1251     // was authentication allowed with a fallback to simple auth
1252     private boolean authenticatedWithFallback;
1253 
1254     private boolean retryImmediatelySupported = false;
1255 
1256     public UserGroupInformation attemptingUser = null; // user name before auth
1257     protected User user = null;
1258     protected UserGroupInformation ugi = null;
1259 
1260     public Connection(SocketChannel channel, long lastContact) {
1261       this.channel = channel;
1262       this.lastContact = lastContact;
1263       this.data = null;
1264       this.dataLengthBuffer = ByteBuffer.allocate(4);
1265       this.socket = channel.socket();
1266       this.addr = socket.getInetAddress();
1267       if (addr == null) {
1268         this.hostAddress = "*Unknown*";
1269       } else {
1270         this.hostAddress = addr.getHostAddress();
1271       }
1272       this.remotePort = socket.getPort();
1273       if (socketSendBufferSize != 0) {
1274         try {
1275           socket.setSendBufferSize(socketSendBufferSize);
1276         } catch (IOException e) {
1277           LOG.warn("Connection: unable to set socket send buffer size to " +
1278                    socketSendBufferSize);
1279         }
1280       }
1281     }
1282 
1283       @Override
1284     public String toString() {
1285       return getHostAddress() + ":" + remotePort;
1286     }
1287 
1288     public String getHostAddress() {
1289       return hostAddress;
1290     }
1291 
1292     public InetAddress getHostInetAddress() {
1293       return addr;
1294     }
1295 
1296     public int getRemotePort() {
1297       return remotePort;
1298     }
1299 
1300     public void setLastContact(long lastContact) {
1301       this.lastContact = lastContact;
1302     }
1303 
1304     public VersionInfo getVersionInfo() {
1305       if (connectionHeader.hasVersionInfo()) {
1306         return connectionHeader.getVersionInfo();
1307       }
1308       return null;
1309     }
1310 
1311     /* Return true if the connection has no outstanding rpc */
1312     private boolean isIdle() {
1313       return rpcCount.get() == 0;
1314     }
1315 
1316     /* Decrement the outstanding RPC count */
1317     protected void decRpcCount() {
1318       rpcCount.decrement();
1319     }
1320 
1321     /* Increment the outstanding RPC count */
1322     protected void incRpcCount() {
1323       rpcCount.increment();
1324     }
1325 
1326     protected boolean timedOut(long currentTime) {
1327       return isIdle() && currentTime - lastContact > maxIdleTime;
1328     }
1329 
1330     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1331         throws IOException {
1332       UserGroupInformation authorizedUgi;
1333       if (authMethod == AuthMethod.DIGEST) {
1334         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1335             secretManager);
1336         authorizedUgi = tokenId.getUser();
1337         if (authorizedUgi == null) {
1338           throw new AccessDeniedException(
1339               "Can't retrieve username from tokenIdentifier.");
1340         }
1341         authorizedUgi.addTokenIdentifier(tokenId);
1342       } else {
1343         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1344       }
1345       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1346       return authorizedUgi;
1347     }
1348 
1349     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1350         InterruptedException {
1351       if (saslContextEstablished) {
1352         if (LOG.isTraceEnabled())
1353           LOG.trace("Have read input token of size " + saslToken.length
1354               + " for processing by saslServer.unwrap()");
1355 
1356         if (!useWrap) {
1357           processOneRpc(saslToken);
1358         } else {
1359           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1360           processUnwrappedData(plaintextData);
1361         }
1362       } else {
1363         byte[] replyToken;
1364         try {
1365           if (saslServer == null) {
1366             switch (authMethod) {
1367             case DIGEST:
1368               if (secretManager == null) {
1369                 throw new AccessDeniedException(
1370                     "Server is not configured to do DIGEST authentication.");
1371               }
1372               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1373                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1374                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1375                       secretManager, this));
1376               break;
1377             default:
1378               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1379               String fullName = current.getUserName();
1380               if (LOG.isDebugEnabled()) {
1381                 LOG.debug("Kerberos principal name is " + fullName);
1382               }
1383               final String names[] = SaslUtil.splitKerberosName(fullName);
1384               if (names.length != 3) {
1385                 throw new AccessDeniedException(
1386                     "Kerberos principal name does NOT have the expected "
1387                         + "hostname part: " + fullName);
1388               }
1389               current.doAs(new PrivilegedExceptionAction<Object>() {
1390                 @Override
1391                 public Object run() throws SaslException {
1392                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1393                       .getMechanismName(), names[0], names[1],
1394                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1395                   return null;
1396                 }
1397               });
1398             }
1399             if (saslServer == null)
1400               throw new AccessDeniedException(
1401                   "Unable to find SASL server implementation for "
1402                       + authMethod.getMechanismName());
1403             if (LOG.isDebugEnabled()) {
1404               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1405             }
1406           }
1407           if (LOG.isDebugEnabled()) {
1408             LOG.debug("Have read input token of size " + saslToken.length
1409                 + " for processing by saslServer.evaluateResponse()");
1410           }
1411           replyToken = saslServer.evaluateResponse(saslToken);
1412         } catch (IOException e) {
1413           IOException sendToClient = e;
1414           Throwable cause = e;
1415           while (cause != null) {
1416             if (cause instanceof InvalidToken) {
1417               sendToClient = (InvalidToken) cause;
1418               break;
1419             }
1420             cause = cause.getCause();
1421           }
1422           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1423             sendToClient.getLocalizedMessage());
1424           metrics.authenticationFailure();
1425           String clientIP = this.toString();
1426           // attempting user could be null
1427           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1428           throw e;
1429         }
1430         if (replyToken != null) {
1431           if (LOG.isDebugEnabled()) {
1432             LOG.debug("Will send token of size " + replyToken.length
1433                 + " from saslServer.");
1434           }
1435           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1436               null);
1437         }
1438         if (saslServer.isComplete()) {
1439           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1440           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1441           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1442           if (LOG.isDebugEnabled()) {
1443             LOG.debug("SASL server context established. Authenticated client: "
1444               + ugi + ". Negotiated QoP is "
1445               + saslServer.getNegotiatedProperty(Sasl.QOP));
1446           }
1447           metrics.authenticationSuccess();
1448           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1449           saslContextEstablished = true;
1450         }
1451       }
1452     }
1453 
1454     /**
1455      * No protobuf encoding of raw sasl messages
1456      */
1457     private void doRawSaslReply(SaslStatus status, Writable rv,
1458         String errorClass, String error) throws IOException {
1459       ByteBufferOutputStream saslResponse = null;
1460       DataOutputStream out = null;
1461       try {
1462         // In my testing, have noticed that sasl messages are usually
1463         // in the ballpark of 100-200. That's why the initial capacity is 256.
1464         saslResponse = new ByteBufferOutputStream(256);
1465         out = new DataOutputStream(saslResponse);
1466         out.writeInt(status.state); // write status
1467         if (status == SaslStatus.SUCCESS) {
1468           rv.write(out);
1469         } else {
1470           WritableUtils.writeString(out, errorClass);
1471           WritableUtils.writeString(out, error);
1472         }
1473         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1474         saslCall.responder = responder;
1475         saslCall.sendResponseIfReady();
1476       } finally {
1477         if (saslResponse != null) {
1478           saslResponse.close();
1479         }
1480         if (out != null) {
1481           out.close();
1482         }
1483       }
1484     }
1485 
1486     private void disposeSasl() {
1487       if (saslServer != null) {
1488         try {
1489           saslServer.dispose();
1490           saslServer = null;
1491         } catch (SaslException ignored) {
1492           // Ignored. This is being disposed of anyway.
1493         }
1494       }
1495     }
1496 
1497     private int readPreamble() throws IOException {
1498       if (preambleBuffer == null) {
1499         preambleBuffer = ByteBuffer.allocate(6);
1500       }
1501       int count = channelRead(channel, preambleBuffer);
1502       if (count < 0 || preambleBuffer.remaining() > 0) {
1503         return count;
1504       }
1505       // Check for 'HBas' magic.
1506       preambleBuffer.flip();
1507       for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
1508         if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
1509           return doBadPreambleHandling("Expected HEADER=" +
1510               Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
1511               Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
1512               " from " + toString());
1513         }
1514       }
1515       int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
1516       byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
1517       this.authMethod = AuthMethod.valueOf(authbyte);
1518       if (version != CURRENT_VERSION) {
1519         String msg = getFatalConnectionString(version, authbyte);
1520         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1521       }
1522       if (authMethod == null) {
1523         String msg = getFatalConnectionString(version, authbyte);
1524         return doBadPreambleHandling(msg, new BadAuthException(msg));
1525       }
1526       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1527         if (allowFallbackToSimpleAuth) {
1528           metrics.authenticationFallback();
1529           authenticatedWithFallback = true;
1530         } else {
1531           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1532           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1533           responder.doRespond(authFailedCall);
1534           throw ae;
1535         }
1536       }
1537       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1538         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1539             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1540         authMethod = AuthMethod.SIMPLE;
1541         // client has already sent the initial Sasl message and we
1542         // should ignore it. Both client and server should fall back
1543         // to simple auth from now on.
1544         skipInitialSaslHandshake = true;
1545       }
1546       if (authMethod != AuthMethod.SIMPLE) {
1547         useSasl = true;
1548       }
1549 
1550       preambleBuffer = null; // do not need it anymore
1551       connectionPreambleRead = true;
1552       return count;
1553     }
1554 
1555     private int read4Bytes() throws IOException {
1556       if (this.dataLengthBuffer.remaining() > 0) {
1557         return channelRead(channel, this.dataLengthBuffer);
1558       } else {
1559         return 0;
1560       }
1561     }
1562 
1563 
1564     /**
1565      * Read off the wire. If there is not enough data to read, update the connection state with
1566      *  what we have and returns.
1567      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1568      * @throws IOException
1569      * @throws InterruptedException
1570      */
1571     public int readAndProcess() throws IOException, InterruptedException {
1572       // If we have not read the connection setup preamble, look to see if that is on the wire.
1573       if (!connectionPreambleRead) {
1574         int count = readPreamble();
1575         if (!connectionPreambleRead) {
1576           return count;
1577         }
1578       }
1579       // Try and read in an int. It will be length of the data to read (or -1 if a ping). We catch
1580       // the integer length into the 4-byte this.dataLengthBuffer.
1581       int count = read4Bytes();
1582       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1583         return count;
1584       }
1585 
1586       // If we have not read the connection setup preamble, look to see if that is on the wire.
1587       if (!connectionPreambleRead) {
1588         count = readPreamble();
1589         if (!connectionPreambleRead) {
1590           return count;
1591         }
1592 
1593         count = read4Bytes();
1594         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1595           return count;
1596         }
1597       }
1598 
1599       final boolean useWrap = this.useWrap;
1600 
1601       // we're guarding against data being modified concurrently
1602       // while trying to keep other instance members out of the block
1603       synchronized(this) {
1604         // We have read a length and we have read the preamble.  It is either the connection header
1605         // or it is a request.
1606         if (data == null) {
1607           dataLengthBuffer.flip();
1608           int dataLength = dataLengthBuffer.getInt();
1609           if (dataLength == RpcClient.PING_CALL_ID) {
1610             if (!useWrap) { //covers the !useSasl too
1611               dataLengthBuffer.clear();
1612               return 0;  //ping message
1613             }
1614           }
1615 
1616           if (dataLength < 0) { // A data length of zero is legal.
1617             throw new IllegalArgumentException("Unexpected data length "
1618                 + dataLength + "!! from " + getHostAddress());
1619           }
1620           data = ByteBuffer.allocate(dataLength);
1621 
1622           // Increment the rpc count. This counter will be decreased when we write
1623           //  the response.  If we want the connection to be detected as idle properly, we
1624           //  need to keep the inc / dec correct.
1625           incRpcCount();
1626         }
1627 
1628         count = channelRead(channel, data);
1629 
1630         if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1631           process();
1632         }
1633       }
1634 
1635       return count;
1636     }
1637 
1638     /**
1639      * Process the data buffer and clean the connection state for the next call.
1640      */
1641     private void process() throws IOException, InterruptedException {
1642       data.flip();
1643       try {
1644         if (skipInitialSaslHandshake) {
1645           skipInitialSaslHandshake = false;
1646           return;
1647         }
1648 
1649         if (useSasl) {
1650           saslReadAndProcess(data.array());
1651         } else {
1652           processOneRpc(data.array());
1653         }
1654 
1655       } finally {
1656         dataLengthBuffer.clear(); // Clean for the next call
1657         data = null; // For the GC
1658       }
1659     }
1660 
1661     private String getFatalConnectionString(final int version, final byte authByte) {
1662       return "serverVersion=" + CURRENT_VERSION +
1663       ", clientVersion=" + version + ", authMethod=" + authByte +
1664       ", authSupported=" + (authMethod != null) + " from " + toString();
1665     }
1666 
1667     private int doBadPreambleHandling(final String msg) throws IOException {
1668       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1669     }
1670 
1671     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1672       LOG.warn(msg);
1673       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1674       setupResponse(null, fakeCall, e, msg);
1675       responder.doRespond(fakeCall);
1676       // Returning -1 closes out the connection.
1677       return -1;
1678     }
1679 
1680     // Reads the connection header following version
1681     private void processConnectionHeader(byte[] buf) throws IOException {
1682       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1683       String serviceName = connectionHeader.getServiceName();
1684       if (serviceName == null) throw new EmptyServiceNameException();
1685       this.service = getService(services, serviceName);
1686       if (this.service == null) throw new UnknownServiceException(serviceName);
1687       setupCellBlockCodecs(this.connectionHeader);
1688       UserGroupInformation protocolUser = createUser(connectionHeader);
1689       if (!useSasl) {
1690         ugi = protocolUser;
1691         if (ugi != null) {
1692           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1693         }
1694         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1695         if (authenticatedWithFallback) {
1696           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1697               + " connecting from " + getHostAddress());
1698         }
1699         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1700       } else {
1701         // user is authenticated
1702         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1703         //Now we check if this is a proxy user case. If the protocol user is
1704         //different from the 'user', it is a proxy user scenario. However,
1705         //this is not allowed if user authenticated with DIGEST.
1706         if ((protocolUser != null)
1707             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1708           if (authMethod == AuthMethod.DIGEST) {
1709             // Not allowed to doAs if token authentication is used
1710             throw new AccessDeniedException("Authenticated user (" + ugi
1711                 + ") doesn't match what the client claims to be ("
1712                 + protocolUser + ")");
1713           } else {
1714             // Effective user can be different from authenticated user
1715             // for simple auth or kerberos auth
1716             // The user is the real user. Now we create a proxy user
1717             UserGroupInformation realUser = ugi;
1718             ugi = UserGroupInformation.createProxyUser(protocolUser
1719                 .getUserName(), realUser);
1720             // Now the user is a proxy user, set Authentication method Proxy.
1721             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1722           }
1723         }
1724       }
1725       if (connectionHeader.hasVersionInfo()) {
1726         // see if this connection will support RetryImmediatelyException
1727         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1728 
1729         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1730             + " with version info: "
1731             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1732       } else {
1733         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1734             + " with unknown version info");
1735       }
1736 
1737 
1738     }
1739 
1740     /**
1741      * Set up cell block codecs
1742      * @throws FatalConnectionException
1743      */
1744     private void setupCellBlockCodecs(final ConnectionHeader header)
1745     throws FatalConnectionException {
1746       // TODO: Plug in other supported decoders.
1747       if (!header.hasCellBlockCodecClass()) return;
1748       String className = header.getCellBlockCodecClass();
1749       if (className == null || className.length() == 0) return;
1750       try {
1751         this.codec = (Codec)Class.forName(className).newInstance();
1752       } catch (Exception e) {
1753         throw new UnsupportedCellCodecException(className, e);
1754       }
1755       if (!header.hasCellBlockCompressorClass()) return;
1756       className = header.getCellBlockCompressorClass();
1757       try {
1758         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1759       } catch (Exception e) {
1760         throw new UnsupportedCompressionCodecException(className, e);
1761       }
1762     }
1763 
1764     private void processUnwrappedData(byte[] inBuf) throws IOException,
1765     InterruptedException {
1766       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1767       // Read all RPCs contained in the inBuf, even partial ones
1768       while (true) {
1769         int count;
1770         if (unwrappedDataLengthBuffer.remaining() > 0) {
1771           count = channelRead(ch, unwrappedDataLengthBuffer);
1772           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1773             return;
1774         }
1775 
1776         if (unwrappedData == null) {
1777           unwrappedDataLengthBuffer.flip();
1778           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1779 
1780           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1781             if (LOG.isDebugEnabled())
1782               LOG.debug("Received ping message");
1783             unwrappedDataLengthBuffer.clear();
1784             continue; // ping message
1785           }
1786           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1787         }
1788 
1789         count = channelRead(ch, unwrappedData);
1790         if (count <= 0 || unwrappedData.remaining() > 0)
1791           return;
1792 
1793         if (unwrappedData.remaining() == 0) {
1794           unwrappedDataLengthBuffer.clear();
1795           unwrappedData.flip();
1796           processOneRpc(unwrappedData.array());
1797           unwrappedData = null;
1798         }
1799       }
1800     }
1801 
1802     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1803       if (connectionHeaderRead) {
1804         processRequest(buf);
1805       } else {
1806         processConnectionHeader(buf);
1807         this.connectionHeaderRead = true;
1808         if (!authorizeConnection()) {
1809           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1810           // down the connection instead of trying to read non-existent retun.
1811           throw new AccessDeniedException("Connection from " + this + " for service " +
1812             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1813         }
1814         this.user = userProvider.create(this.ugi);
1815       }
1816     }
1817 
1818     /**
1819      * @param buf Has the request header and the request param and optionally encoded data buffer
1820      * all in this one array.
1821      * @throws IOException
1822      * @throws InterruptedException
1823      */
1824     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1825       long totalRequestSize = buf.length;
1826       int offset = 0;
1827       // Here we read in the header.  We avoid having pb
1828       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1829       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1830       int headerSize = cis.readRawVarint32();
1831       offset = cis.getTotalBytesRead();
1832       Message.Builder builder = RequestHeader.newBuilder();
1833       ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1834       RequestHeader header = (RequestHeader) builder.build();
1835       offset += headerSize;
1836       int id = header.getCallId();
1837       if (LOG.isTraceEnabled()) {
1838         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1839           " totalRequestSize: " + totalRequestSize + " bytes");
1840       }
1841       // Enforcing the call queue size, this triggers a retry in the client
1842       // This is a bit late to be doing this check - we have already read in the total request.
1843       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1844         final Call callTooBig =
1845           new Call(id, this.service, null, null, null, null, this,
1846             responder, totalRequestSize, null, null);
1847         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1848         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1849         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1850             "Call queue is full on " + server.getServerName() +
1851                 ", is hbase.ipc.server.max.callqueue.size too small?");
1852         responder.doRespond(callTooBig);
1853         return;
1854       }
1855       MethodDescriptor md = null;
1856       Message param = null;
1857       CellScanner cellScanner = null;
1858       try {
1859         if (header.hasRequestParam() && header.getRequestParam()) {
1860           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1861           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1862           builder = this.service.getRequestPrototype(md).newBuilderForType();
1863           // To read the varint, I need an inputstream; might as well be a CIS.
1864           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1865           int paramSize = cis.readRawVarint32();
1866           offset += cis.getTotalBytesRead();
1867           if (builder != null) {
1868             ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1869             param = builder.build();
1870           }
1871           offset += paramSize;
1872         }
1873         if (header.hasCellBlockMeta()) {
1874           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1875             buf, offset, buf.length);
1876         }
1877       } catch (Throwable t) {
1878         InetSocketAddress address = getListenerAddress();
1879         String msg = (address != null ? address : "(channel closed)") +
1880             " is unable to read call parameter from client " + getHostAddress();
1881         LOG.warn(msg, t);
1882 
1883         metrics.exception(t);
1884 
1885         // probably the hbase hadoop version does not match the running hadoop version
1886         if (t instanceof LinkageError) {
1887           t = new DoNotRetryIOException(t);
1888         }
1889         // If the method is not present on the server, do not retry.
1890         if (t instanceof UnsupportedOperationException) {
1891           t = new DoNotRetryIOException(t);
1892         }
1893 
1894         final Call readParamsFailedCall =
1895           new Call(id, this.service, null, null, null, null, this,
1896             responder, totalRequestSize, null, null);
1897         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1898         setupResponse(responseBuffer, readParamsFailedCall, t,
1899           msg + "; " + t.getMessage());
1900         responder.doRespond(readParamsFailedCall);
1901         return;
1902       }
1903 
1904       TraceInfo traceInfo = header.hasTraceInfo()
1905           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1906           : null;
1907       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1908               totalRequestSize, traceInfo, this.addr);
1909 
1910       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1911         callQueueSize.add(-1 * call.getSize());
1912 
1913         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1914         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1915         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1916             "Call queue is full on " + server.getServerName() +
1917                 ", too many items queued ?");
1918         responder.doRespond(call);
1919       }
1920     }
1921 
1922     private boolean authorizeConnection() throws IOException {
1923       try {
1924         // If auth method is DIGEST, the token was obtained by the
1925         // real user for the effective user, therefore not required to
1926         // authorize real user. doAs is allowed only for simple or kerberos
1927         // authentication
1928         if (ugi != null && ugi.getRealUser() != null
1929             && (authMethod != AuthMethod.DIGEST)) {
1930           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1931         }
1932         authorize(ugi, connectionHeader, getHostInetAddress());
1933         metrics.authorizationSuccess();
1934       } catch (AuthorizationException ae) {
1935         if (LOG.isDebugEnabled()) {
1936           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1937         }
1938         metrics.authorizationFailure();
1939         setupResponse(authFailedResponse, authFailedCall,
1940           new AccessDeniedException(ae), ae.getMessage());
1941         responder.doRespond(authFailedCall);
1942         return false;
1943       }
1944       return true;
1945     }
1946 
1947     protected synchronized void close() {
1948       disposeSasl();
1949       data = null;
1950       if (!channel.isOpen())
1951         return;
1952       try {socket.shutdownOutput();} catch(Exception ignored) {
1953         if (LOG.isTraceEnabled()) {
1954           LOG.trace(ignored);
1955         }
1956       }
1957       if (channel.isOpen()) {
1958         try {channel.close();} catch(Exception ignored) {
1959           if (LOG.isTraceEnabled()) {
1960             LOG.trace(ignored);
1961           }
1962         }
1963       }
1964       try {socket.close();} catch(Exception ignored) {
1965         if (LOG.isTraceEnabled()) {
1966           LOG.trace(ignored);
1967         }
1968       }
1969     }
1970 
1971     private UserGroupInformation createUser(ConnectionHeader head) {
1972       UserGroupInformation ugi = null;
1973 
1974       if (!head.hasUserInfo()) {
1975         return null;
1976       }
1977       UserInformation userInfoProto = head.getUserInfo();
1978       String effectiveUser = null;
1979       if (userInfoProto.hasEffectiveUser()) {
1980         effectiveUser = userInfoProto.getEffectiveUser();
1981       }
1982       String realUser = null;
1983       if (userInfoProto.hasRealUser()) {
1984         realUser = userInfoProto.getRealUser();
1985       }
1986       if (effectiveUser != null) {
1987         if (realUser != null) {
1988           UserGroupInformation realUserUgi =
1989               UserGroupInformation.createRemoteUser(realUser);
1990           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1991         } else {
1992           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1993         }
1994       }
1995       return ugi;
1996     }
1997   }
1998 
1999   /**
2000    * Datastructure for passing a {@link BlockingService} and its associated class of
2001    * protobuf service interface.  For example, a server that fielded what is defined
2002    * in the client protobuf service would pass in an implementation of the client blocking service
2003    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
2004    */
2005   public static class BlockingServiceAndInterface {
2006     private final BlockingService service;
2007     private final Class<?> serviceInterface;
2008     public BlockingServiceAndInterface(final BlockingService service,
2009         final Class<?> serviceInterface) {
2010       this.service = service;
2011       this.serviceInterface = serviceInterface;
2012     }
2013     public Class<?> getServiceInterface() {
2014       return this.serviceInterface;
2015     }
2016     public BlockingService getBlockingService() {
2017       return this.service;
2018     }
2019   }
2020 
2021   /**
2022    * Constructs a server listening on the named port and address.
2023    * @param server hosting instance of {@link Server}. We will do authentications if an
2024    * instance else pass null for no authentication check.
2025    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2026    * @param services A list of services.
2027    * @param bindAddress Where to listen
2028    * @param conf
2029    * @param scheduler
2030    */
2031   public RpcServer(final Server server, final String name,
2032       final List<BlockingServiceAndInterface> services,
2033       final InetSocketAddress bindAddress, Configuration conf,
2034       RpcScheduler scheduler)
2035       throws IOException {
2036 
2037     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2038       this.reservoir = new BoundedByteBufferPool(
2039           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2040           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2041           // Make the max twice the number of handlers to be safe.
2042           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2043               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2044                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2045     } else {
2046       reservoir = null;
2047     }
2048     
2049     this.server = server;
2050     this.services = services;
2051     this.bindAddress = bindAddress;
2052     this.conf = conf;
2053     this.socketSendBufferSize = 0;
2054     this.maxQueueSize =
2055       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2056     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2057     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2058     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2059     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2060     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2061       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2062     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2063     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2064 
2065     // Start the listener here and let it bind to the port
2066     listener = new Listener(name);
2067     this.port = listener.getAddress().getPort();
2068 
2069     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2070     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2071     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2072 
2073     this.ipcUtil = new IPCUtil(conf);
2074 
2075     // Create the responder here
2076     responder = new Responder();
2077     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2078     this.userProvider = UserProvider.instantiate(conf);
2079     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2080     if (isSecurityEnabled) {
2081       HBaseSaslRpcServer.init(conf);
2082     }
2083     initReconfigurable(conf);
2084 
2085     this.scheduler = scheduler;
2086     this.scheduler.init(new RpcSchedulerContext(this));
2087   }
2088 
2089   @Override
2090   public void onConfigurationChange(Configuration newConf) {
2091     initReconfigurable(newConf);
2092   }
2093 
2094   private void initReconfigurable(Configuration confToLoad) {
2095     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2096     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2097       LOG.warn("********* WARNING! *********");
2098       LOG.warn("This server is configured to allow connections from INSECURE clients");
2099       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2100       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2101       LOG.warn("impersonation is possible!");
2102       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2103       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2104       LOG.warn("****************************");
2105     }
2106   }
2107 
2108   /**
2109    * Subclasses of HBaseServer can override this to provide their own
2110    * Connection implementations.
2111    */
2112   protected Connection getConnection(SocketChannel channel, long time) {
2113     return new Connection(channel, time);
2114   }
2115 
2116   /**
2117    * Setup response for the RPC Call.
2118    *
2119    * @param response buffer to serialize the response into
2120    * @param call {@link Call} to which we are setting up the response
2121    * @param error error message, if the call failed
2122    * @throws IOException
2123    */
2124   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2125   throws IOException {
2126     if (response != null) response.reset();
2127     call.setResponse(null, null, t, error);
2128   }
2129 
2130   protected void closeConnection(Connection connection) {
2131     synchronized (connectionList) {
2132       if (connectionList.remove(connection)) {
2133         numConnections--;
2134       }
2135     }
2136     connection.close();
2137   }
2138 
2139   Configuration getConf() {
2140     return conf;
2141   }
2142 
2143   /** Sets the socket buffer size used for responding to RPCs.
2144    * @param size send size
2145    */
2146   @Override
2147   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2148 
2149   @Override
2150   public boolean isStarted() {
2151     return this.started;
2152   }
2153 
2154   /** Starts the service.  Must be called before any calls will be handled. */
2155   @Override
2156   public synchronized void start() {
2157     if (started) return;
2158     authTokenSecretMgr = createSecretManager();
2159     if (authTokenSecretMgr != null) {
2160       setSecretManager(authTokenSecretMgr);
2161       authTokenSecretMgr.start();
2162     }
2163     this.authManager = new ServiceAuthorizationManager();
2164     HBasePolicyProvider.init(conf, authManager);
2165     responder.start();
2166     listener.start();
2167     scheduler.start();
2168     started = true;
2169   }
2170 
2171   @Override
2172   public synchronized void refreshAuthManager(PolicyProvider pp) {
2173     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2174     // it'll break if you go via static route.
2175     this.authManager.refresh(this.conf, pp);
2176   }
2177 
2178   private AuthenticationTokenSecretManager createSecretManager() {
2179     if (!isSecurityEnabled) return null;
2180     if (server == null) return null;
2181     Configuration conf = server.getConfiguration();
2182     long keyUpdateInterval =
2183         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2184     long maxAge =
2185         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2186     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2187         server.getServerName().toString(), keyUpdateInterval, maxAge);
2188   }
2189 
2190   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2191     return this.secretManager;
2192   }
2193 
2194   @SuppressWarnings("unchecked")
2195   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2196     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2197   }
2198 
2199   /**
2200    * This is a server side method, which is invoked over RPC. On success
2201    * the return response has protobuf response payload. On failure, the
2202    * exception name and the stack trace are returned in the protobuf response.
2203    */
2204   @Override
2205   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2206       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2207   throws IOException {
2208     try {
2209       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2210       // TODO: Review after we add in encoded data blocks.
2211       status.setRPCPacket(param);
2212       status.resume("Servicing call");
2213       //get an instance of the method arg type
2214       long startTime = System.currentTimeMillis();
2215       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2216       Message result = service.callBlockingMethod(md, controller, param);
2217       long endTime = System.currentTimeMillis();
2218       int processingTime = (int) (endTime - startTime);
2219       int qTime = (int) (startTime - receiveTime);
2220       int totalTime = (int) (endTime - receiveTime);
2221       if (LOG.isTraceEnabled()) {
2222         LOG.trace(CurCall.get().toString() +
2223             ", response " + TextFormat.shortDebugString(result) +
2224             " queueTime: " + qTime +
2225             " processingTime: " + processingTime +
2226             " totalTime: " + totalTime);
2227       }
2228       long requestSize = param.getSerializedSize();
2229       long responseSize = result.getSerializedSize();
2230       metrics.dequeuedCall(qTime);
2231       metrics.processedCall(processingTime);
2232       metrics.totalCall(totalTime);
2233       metrics.receivedRequest(requestSize);
2234       metrics.sentResponse(responseSize);
2235       // log any RPC responses that are slower than the configured warn
2236       // response time or larger than configured warning size
2237       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2238       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2239       if (tooSlow || tooLarge) {
2240         // when tagging, we let TooLarge trump TooSmall to keep output simple
2241         // note that large responses will often also be slow.
2242         logResponse(param,
2243             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2244             (tooLarge ? "TooLarge" : "TooSlow"),
2245             status.getClient(), startTime, processingTime, qTime,
2246             responseSize);
2247       }
2248       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2249     } catch (Throwable e) {
2250       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2251       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2252       // need to pass it over the wire.
2253       if (e instanceof ServiceException) {
2254         if (e.getCause() == null) {
2255           LOG.debug("Caught a ServiceException with null cause", e);
2256         } else {
2257           e = e.getCause();
2258         }
2259       }
2260 
2261       // increment the number of requests that were exceptions.
2262       metrics.exception(e);
2263 
2264       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2265       if (e instanceof IOException) throw (IOException)e;
2266       LOG.error("Unexpected throwable object ", e);
2267       throw new IOException(e.getMessage(), e);
2268     }
2269   }
2270 
2271   /**
2272    * Logs an RPC response to the LOG file, producing valid JSON objects for
2273    * client Operations.
2274    * @param param The parameters received in the call.
2275    * @param methodName The name of the method invoked
2276    * @param call The string representation of the call
2277    * @param tag  The tag that will be used to indicate this event in the log.
2278    * @param clientAddress   The address of the client who made this call.
2279    * @param startTime       The time that the call was initiated, in ms.
2280    * @param processingTime  The duration that the call took to run, in ms.
2281    * @param qTime           The duration that the call spent on the queue
2282    *                        prior to being initiated, in ms.
2283    * @param responseSize    The size in bytes of the response buffer.
2284    */
2285   void logResponse(Message param, String methodName, String call, String tag,
2286       String clientAddress, long startTime, int processingTime, int qTime,
2287       long responseSize)
2288           throws IOException {
2289     // base information that is reported regardless of type of call
2290     Map<String, Object> responseInfo = new HashMap<String, Object>();
2291     responseInfo.put("starttimems", startTime);
2292     responseInfo.put("processingtimems", processingTime);
2293     responseInfo.put("queuetimems", qTime);
2294     responseInfo.put("responsesize", responseSize);
2295     responseInfo.put("client", clientAddress);
2296     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2297     responseInfo.put("method", methodName);
2298     responseInfo.put("call", call);
2299     // The params could be really big, make sure they don't kill us at WARN
2300     String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
2301     if (stringifiedParam.length() > 150) {
2302       // Truncate to 1000 chars if TRACE is on, else to 150 chars
2303       stringifiedParam = stringifiedParam.subSequence(
2304           0, LOG.isTraceEnabled() ? 1000 : 150) + " <TRUNCATED>";
2305     }
2306     responseInfo.put("param", stringifiedParam);
2307     if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
2308       ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
2309       if (request.hasScannerId()) {
2310         long scannerId = request.getScannerId();
2311         String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
2312         if (scanDetails != null) {
2313           responseInfo.put("scandetails", scanDetails);
2314         }
2315       }
2316     }
2317     LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2318   }
2319 
2320   /** Stops the service.  No new calls will be handled after this is called. */
2321   @Override
2322   public synchronized void stop() {
2323     LOG.info("Stopping server on " + port);
2324     running = false;
2325     if (authTokenSecretMgr != null) {
2326       authTokenSecretMgr.stop();
2327       authTokenSecretMgr = null;
2328     }
2329     listener.interrupt();
2330     listener.doStop();
2331     responder.interrupt();
2332     scheduler.stop();
2333     notifyAll();
2334   }
2335 
2336   /** Wait for the server to be stopped.
2337    * Does not wait for all subthreads to finish.
2338    *  See {@link #stop()}.
2339    * @throws InterruptedException e
2340    */
2341   @Override
2342   public synchronized void join() throws InterruptedException {
2343     while (running) {
2344       wait();
2345     }
2346   }
2347 
2348   /**
2349    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2350    * the listener channel is closed.
2351    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2352    * information cannot be determined
2353    */
2354   @Override
2355   public synchronized InetSocketAddress getListenerAddress() {
2356     if (listener == null) {
2357       return null;
2358     }
2359     return listener.getAddress();
2360   }
2361 
2362   /**
2363    * Set the handler for calling out of RPC for error conditions.
2364    * @param handler the handler implementation
2365    */
2366   @Override
2367   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2368     this.errorHandler = handler;
2369   }
2370 
2371   @Override
2372   public HBaseRPCErrorHandler getErrorHandler() {
2373     return this.errorHandler;
2374   }
2375 
2376   /**
2377    * Returns the metrics instance for reporting RPC call statistics
2378    */
2379   @Override
2380   public MetricsHBaseServer getMetrics() {
2381     return metrics;
2382   }
2383 
2384   @Override
2385   public void addCallSize(final long diff) {
2386     this.callQueueSize.add(diff);
2387   }
2388 
2389   /**
2390    * Authorize the incoming client connection.
2391    *
2392    * @param user client user
2393    * @param connection incoming connection
2394    * @param addr InetAddress of incoming connection
2395    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2396    *         when the client isn't authorized to talk the protocol
2397    */
2398   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2399       InetAddress addr)
2400   throws AuthorizationException {
2401     if (authorize) {
2402       Class<?> c = getServiceInterface(services, connection.getServiceName());
2403       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2404     }
2405   }
2406 
2407   /**
2408    * When the read or write buffer size is larger than this limit, i/o will be
2409    * done in chunks of this size. Most RPC requests and responses would be
2410    * be smaller.
2411    */
2412   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2413 
2414   /**
2415    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2416    * If the amount of data is large, it writes to channel in smaller chunks.
2417    * This is to avoid jdk from creating many direct buffers as the size of
2418    * buffer increases. This also minimizes extra copies in NIO layer
2419    * as a result of multiple write operations required to write a large
2420    * buffer.
2421    *
2422    * @param channel writable byte channel to write to
2423    * @param bufferChain Chain of buffers to write
2424    * @return number of bytes written
2425    * @throws java.io.IOException e
2426    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2427    */
2428   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2429   throws IOException {
2430     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2431     if (count > 0) this.metrics.sentBytes(count);
2432     return count;
2433   }
2434 
2435   /**
2436    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2437    * If the amount of data is large, it writes to channel in smaller chunks.
2438    * This is to avoid jdk from creating many direct buffers as the size of
2439    * ByteBuffer increases. There should not be any performance degredation.
2440    *
2441    * @param channel writable byte channel to write on
2442    * @param buffer buffer to write
2443    * @return number of bytes written
2444    * @throws java.io.IOException e
2445    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2446    */
2447   protected int channelRead(ReadableByteChannel channel,
2448                                    ByteBuffer buffer) throws IOException {
2449 
2450     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2451            channel.read(buffer) : channelIO(channel, null, buffer);
2452     if (count > 0) {
2453       metrics.receivedBytes(count);
2454     }
2455     return count;
2456   }
2457 
2458   /**
2459    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2460    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2461    * one of readCh or writeCh should be non-null.
2462    *
2463    * @param readCh read channel
2464    * @param writeCh write channel
2465    * @param buf buffer to read or write into/out of
2466    * @return bytes written
2467    * @throws java.io.IOException e
2468    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2469    * @see #channelWrite(GatheringByteChannel, BufferChain)
2470    */
2471   private static int channelIO(ReadableByteChannel readCh,
2472                                WritableByteChannel writeCh,
2473                                ByteBuffer buf) throws IOException {
2474 
2475     int originalLimit = buf.limit();
2476     int initialRemaining = buf.remaining();
2477     int ret = 0;
2478 
2479     while (buf.remaining() > 0) {
2480       try {
2481         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2482         buf.limit(buf.position() + ioSize);
2483 
2484         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2485 
2486         if (ret < ioSize) {
2487           break;
2488         }
2489 
2490       } finally {
2491         buf.limit(originalLimit);
2492       }
2493     }
2494 
2495     int nBytes = initialRemaining - buf.remaining();
2496     return (nBytes > 0) ? nBytes : ret;
2497   }
2498 
2499   /**
2500    * Needed for features such as delayed calls.  We need to be able to store the current call
2501    * so that we can complete it later or ask questions of what is supported by the current ongoing
2502    * call.
2503    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2504    */
2505   public static RpcCallContext getCurrentCall() {
2506     return CurCall.get();
2507   }
2508 
2509   public static boolean isInRpcCallContext() {
2510     return CurCall.get() != null;
2511   }
2512 
2513   /**
2514    * Returns the user credentials associated with the current RPC request or
2515    * <code>null</code> if no credentials were provided.
2516    * @return A User
2517    */
2518   public static User getRequestUser() {
2519     RpcCallContext ctx = getCurrentCall();
2520     return ctx == null? null: ctx.getRequestUser();
2521   }
2522 
2523   /**
2524    * Returns the username for any user associated with the current RPC
2525    * request or <code>null</code> if no user is set.
2526    */
2527   public static String getRequestUserName() {
2528     User user = getRequestUser();
2529     return user == null? null: user.getShortName();
2530   }
2531 
2532   /**
2533    * @return Address of remote client if a request is ongoing, else null
2534    */
2535   public static InetAddress getRemoteAddress() {
2536     RpcCallContext ctx = getCurrentCall();
2537     return ctx == null? null: ctx.getRemoteAddress();
2538   }
2539 
2540   /**
2541    * @param serviceName Some arbitrary string that represents a 'service'.
2542    * @param services Available service instances
2543    * @return Matching BlockingServiceAndInterface pair
2544    */
2545   static BlockingServiceAndInterface getServiceAndInterface(
2546       final List<BlockingServiceAndInterface> services, final String serviceName) {
2547     for (BlockingServiceAndInterface bs : services) {
2548       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2549         return bs;
2550       }
2551     }
2552     return null;
2553   }
2554 
2555   /**
2556    * @param serviceName Some arbitrary string that represents a 'service'.
2557    * @param services Available services and their service interfaces.
2558    * @return Service interface class for <code>serviceName</code>
2559    */
2560   static Class<?> getServiceInterface(
2561       final List<BlockingServiceAndInterface> services,
2562       final String serviceName) {
2563     BlockingServiceAndInterface bsasi =
2564         getServiceAndInterface(services, serviceName);
2565     return bsasi == null? null: bsasi.getServiceInterface();
2566   }
2567 
2568   /**
2569    * @param serviceName Some arbitrary string that represents a 'service'.
2570    * @param services Available services and their service interfaces.
2571    * @return BlockingService that goes with the passed <code>serviceName</code>
2572    */
2573   static BlockingService getService(
2574       final List<BlockingServiceAndInterface> services,
2575       final String serviceName) {
2576     BlockingServiceAndInterface bsasi =
2577         getServiceAndInterface(services, serviceName);
2578     return bsasi == null? null: bsasi.getBlockingService();
2579   }
2580 
2581   static MonitoredRPCHandler getStatus() {
2582     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2583     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2584     if (status != null) {
2585       return status;
2586     }
2587     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2588     status.pause("Waiting for a call");
2589     RpcServer.MONITORED_RPC.set(status);
2590     return status;
2591   }
2592 
2593   /** Returns the remote side ip address when invoked inside an RPC
2594    *  Returns null incase of an error.
2595    *  @return InetAddress
2596    */
2597   public static InetAddress getRemoteIp() {
2598     Call call = CurCall.get();
2599     if (call != null && call.connection != null && call.connection.socket != null) {
2600       return call.connection.socket.getInetAddress();
2601     }
2602     return null;
2603   }
2604 
2605 
2606   /**
2607    * A convenience method to bind to a given address and report
2608    * better exceptions if the address is not a valid host.
2609    * @param socket the socket to bind
2610    * @param address the address to bind to
2611    * @param backlog the number of connections allowed in the queue
2612    * @throws BindException if the address can't be bound
2613    * @throws UnknownHostException if the address isn't a valid host name
2614    * @throws IOException other random errors from bind
2615    */
2616   public static void bind(ServerSocket socket, InetSocketAddress address,
2617                           int backlog) throws IOException {
2618     try {
2619       socket.bind(address, backlog);
2620     } catch (BindException e) {
2621       BindException bindException =
2622         new BindException("Problem binding to " + address + " : " +
2623             e.getMessage());
2624       bindException.initCause(e);
2625       throw bindException;
2626     } catch (SocketException e) {
2627       // If they try to bind to a different host's address, give a better
2628       // error message.
2629       if ("Unresolved address".equals(e.getMessage())) {
2630         throw new UnknownHostException("Invalid hostname for server: " +
2631                                        address.getHostName());
2632       }
2633       throw e;
2634     }
2635   }
2636 
2637   @Override
2638   public RpcScheduler getScheduler() {
2639     return scheduler;
2640   }
2641 
2642   @Override
2643   public void setRsRpcServices(RSRpcServices rsRpcServices) {
2644     this.rsRpcServices = rsRpcServices;
2645   }
2646 }