View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.classification.InterfaceAudience;
72  import org.apache.hadoop.hbase.classification.InterfaceStability;
73  import org.apache.hadoop.conf.Configuration;
74  import org.apache.hadoop.hbase.CellScanner;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.HBaseIOException;
77  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
100 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
101 import org.apache.hadoop.hbase.regionserver.HRegionServer;
102 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
103 import org.apache.hadoop.hbase.security.AccessDeniedException;
104 import org.apache.hadoop.hbase.security.AuthMethod;
105 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
109 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
110 import org.apache.hadoop.hbase.security.SaslStatus;
111 import org.apache.hadoop.hbase.security.SaslUtil;
112 import org.apache.hadoop.hbase.security.UserProvider;
113 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
114 import org.apache.hadoop.hbase.util.Bytes;
115 import org.apache.hadoop.hbase.util.Counter;
116 import org.apache.hadoop.hbase.util.Pair;
117 import org.apache.hadoop.io.BytesWritable;
118 import org.apache.hadoop.io.IntWritable;
119 import org.apache.hadoop.io.Writable;
120 import org.apache.hadoop.io.WritableUtils;
121 import org.apache.hadoop.io.compress.CompressionCodec;
122 import org.apache.hadoop.security.UserGroupInformation;
123 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
124 import org.apache.hadoop.security.authorize.AuthorizationException;
125 import org.apache.hadoop.security.authorize.PolicyProvider;
126 import org.apache.hadoop.security.authorize.ProxyUsers;
127 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
128 import org.apache.hadoop.security.token.SecretManager;
129 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
130 import org.apache.hadoop.security.token.TokenIdentifier;
131 import org.apache.hadoop.util.StringUtils;
132 import org.codehaus.jackson.map.ObjectMapper;
133 import org.apache.htrace.TraceInfo;
134 
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 import com.google.protobuf.BlockingService;
137 import com.google.protobuf.CodedInputStream;
138 import com.google.protobuf.Descriptors.MethodDescriptor;
139 import com.google.protobuf.Message;
140 import com.google.protobuf.ServiceException;
141 import com.google.protobuf.TextFormat;
142 
143 /**
144  * An RPC server that hosts protobuf described Services.
145  *
146  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
147  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
148  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
149  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
150  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
151  * and loops till done.
152  *
153  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
154  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
155  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
156  * and keep taking while the server is up.
157  *
158  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
159  * queue for Responder to pull from and return result to client.
160  *
161  * @see RpcClientImpl
162  */
163 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
164 @InterfaceStability.Evolving
165 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
166   // LOG is being used in CallRunner and the log level is being changed in tests
167   public static final Log LOG = LogFactory.getLog(RpcServer.class);
168   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
169       = new CallQueueTooBigException();
170 
171   private final boolean authorize;
172   private boolean isSecurityEnabled;
173 
174   public static final byte CURRENT_VERSION = 0;
175 
176   /**
177    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
178    */
179   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
180           "hbase.ipc.server.fallback-to-simple-auth-allowed";
181 
182   /**
183    * How many calls/handler are allowed in the queue.
184    */
185   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
186 
187   /**
188    * The maximum size that we can hold in the RPC queue
189    */
190   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
191 
192   private final IPCUtil ipcUtil;
193 
194   private static final String AUTH_FAILED_FOR = "Auth failed for ";
195   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
196   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
197     Server.class.getName());
198   protected SecretManager<TokenIdentifier> secretManager;
199   protected ServiceAuthorizationManager authManager;
200 
201   /** This is set to Call object before Handler invokes an RPC and ybdie
202    * after the call returns.
203    */
204   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
205 
206   /** Keeps MonitoredRPCHandler per handler thread. */
207   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
208       = new ThreadLocal<MonitoredRPCHandler>();
209 
210   protected final InetSocketAddress bindAddress;
211   protected int port;                             // port we listen on
212   protected InetSocketAddress address;            // inet address we listen on
213   private int readThreads;                        // number of read threads
214   protected int maxIdleTime;                      // the maximum idle time after
215                                                   // which a client may be
216                                                   // disconnected
217   protected int thresholdIdleConnections;         // the number of idle
218                                                   // connections after which we
219                                                   // will start cleaning up idle
220                                                   // connections
221   int maxConnectionsToNuke;                       // the max number of
222                                                   // connections to nuke
223                                                   // during a cleanup
224 
225   protected MetricsHBaseServer metrics;
226 
227   protected final Configuration conf;
228 
229   private int maxQueueSize;
230   protected int socketSendBufferSize;
231   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
232   protected final boolean tcpKeepAlive; // if T then use keepalives
233   protected final long purgeTimeout;    // in milliseconds
234 
235   /**
236    * This flag is used to indicate to sub threads when they should go down.  When we call
237    * {@link #start()}, all threads started will consult this flag on whether they should
238    * keep going.  It is set to false when {@link #stop()} is called.
239    */
240   volatile boolean running = true;
241 
242   /**
243    * This flag is set to true after all threads are up and 'running' and the server is then opened
244    * for business by the call to {@link #start()}.
245    */
246   volatile boolean started = false;
247 
248   /**
249    * This is a running count of the size of all outstanding calls by size.
250    */
251   protected final Counter callQueueSize = new Counter();
252 
253   protected final List<Connection> connectionList =
254     Collections.synchronizedList(new LinkedList<Connection>());
255   //maintain a list
256   //of client connections
257   private Listener listener = null;
258   protected Responder responder = null;
259   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
260   protected int numConnections = 0;
261 
262   protected HBaseRPCErrorHandler errorHandler = null;
263 
264   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
265   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
266 
267   /** Default value for above params */
268   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
269   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
270 
271   private static final ObjectMapper MAPPER = new ObjectMapper();
272 
273   private final int warnResponseTime;
274   private final int warnResponseSize;
275   private final Server server;
276   private final List<BlockingServiceAndInterface> services;
277 
278   private final RpcScheduler scheduler;
279 
280   private UserProvider userProvider;
281 
282   private final BoundedByteBufferPool reservoir;
283 
284   private volatile boolean allowFallbackToSimpleAuth;
285 
286   /**
287    * Used to get details for scan with a scanner_id<br/>
288    * TODO try to figure out a better way and remove reference from regionserver package later.
289    */
290   private RSRpcServices rsRpcServices;
291 
292   /**
293    * Datastructure that holds all necessary to a method invocation and then afterward, carries
294    * the result.
295    */
296   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
297   @InterfaceStability.Evolving
298   public class Call implements RpcCallContext {
299     protected int id;                             // the client's call id
300     protected BlockingService service;
301     protected MethodDescriptor md;
302     protected RequestHeader header;
303     protected Message param;                      // the parameter passed
304     // Optional cell data passed outside of protobufs.
305     protected CellScanner cellScanner;
306     protected Connection connection;              // connection to client
307     protected long timestamp;      // the time received when response is null
308                                    // the time served when response is not null
309     /**
310      * Chain of buffers to send as response.
311      */
312     protected BufferChain response;
313     protected Responder responder;
314 
315     protected long size;                          // size of current call
316     protected boolean isError;
317     protected TraceInfo tinfo;
318     private ByteBuffer cellBlock = null;
319 
320     private User user;
321     private InetAddress remoteAddress;
322 
323     private long responseCellSize = 0;
324     private long responseBlockSize = 0;
325     private boolean retryImmediatelySupported;
326 
327     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
328         justification="Can't figure why this complaint is happening... see below")
329     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
330          Message param, CellScanner cellScanner, Connection connection, Responder responder,
331          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
332       this.id = id;
333       this.service = service;
334       this.md = md;
335       this.header = header;
336       this.param = param;
337       this.cellScanner = cellScanner;
338       this.connection = connection;
339       this.timestamp = System.currentTimeMillis();
340       this.response = null;
341       this.responder = responder;
342       this.isError = false;
343       this.size = size;
344       this.tinfo = tinfo;
345       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
346       this.remoteAddress = remoteAddress;
347       this.retryImmediatelySupported =
348           connection == null? null: connection.retryImmediatelySupported;
349     }
350 
351     /**
352      * Call is done. Execution happened and we returned results to client. It is now safe to
353      * cleanup.
354      */
355     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
356         justification="Presume the lock on processing request held by caller is protection enough")
357     void done() {
358       if (this.cellBlock != null && reservoir != null) {
359         // Return buffer to reservoir now we are done with it.
360         reservoir.putBuffer(this.cellBlock);
361         this.cellBlock = null;
362       }
363       this.connection.decRpcCount();  // Say that we're done with this call.
364     }
365 
366     @Override
367     public String toString() {
368       return toShortString() + " param: " +
369         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
370         " connection: " + connection.toString();
371     }
372 
373     protected RequestHeader getHeader() {
374       return this.header;
375     }
376 
377     public boolean hasPriority() {
378       return this.header.hasPriority();
379     }
380 
381     public int getPriority() {
382       return this.header.getPriority();
383     }
384 
385     /*
386      * Short string representation without param info because param itself could be huge depends on
387      * the payload of a command
388      */
389     String toShortString() {
390       String serviceName = this.connection.service != null ?
391           this.connection.service.getDescriptorForType().getName() : "null";
392       return "callId: " + this.id + " service: " + serviceName +
393           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
394           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
395           " connection: " + connection.toString();
396     }
397 
398     String toTraceString() {
399       String serviceName = this.connection.service != null ?
400                            this.connection.service.getDescriptorForType().getName() : "";
401       String methodName = (this.md != null) ? this.md.getName() : "";
402       return serviceName + "." + methodName;
403     }
404 
405     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
406       this.response = new BufferChain(response);
407     }
408 
409     protected synchronized void setResponse(Object m, final CellScanner cells,
410         Throwable t, String errorMsg) {
411       if (this.isError) return;
412       if (t != null) this.isError = true;
413       BufferChain bc = null;
414       try {
415         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
416         // Presume it a pb Message.  Could be null.
417         Message result = (Message)m;
418         // Call id.
419         headerBuilder.setCallId(this.id);
420         if (t != null) {
421           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
422           exceptionBuilder.setExceptionClassName(t.getClass().getName());
423           exceptionBuilder.setStackTrace(errorMsg);
424           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
425             t instanceof NeedUnmanagedConnectionException);
426           if (t instanceof RegionMovedException) {
427             // Special casing for this exception.  This is only one carrying a payload.
428             // Do this instead of build a generic system for allowing exceptions carry
429             // any kind of payload.
430             RegionMovedException rme = (RegionMovedException)t;
431             exceptionBuilder.setHostname(rme.getHostname());
432             exceptionBuilder.setPort(rme.getPort());
433           }
434           // Set the exception as the result of the method invocation.
435           headerBuilder.setException(exceptionBuilder.build());
436         }
437         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
438         // reservoir when finished. This is hacky and the hack is not contained but benefits are
439         // high when we can avoid a big buffer allocation on each rpc.
440         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
441           this.connection.compressionCodec, cells, reservoir);
442         if (this.cellBlock != null) {
443           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
444           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
445           cellBlockBuilder.setLength(this.cellBlock.limit());
446           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
447         }
448         Message header = headerBuilder.build();
449 
450         // Organize the response as a set of bytebuffers rather than collect it all together inside
451         // one big byte array; save on allocations.
452         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
453         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
454         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
455           (this.cellBlock == null? 0: this.cellBlock.limit());
456         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
457         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
458         if (connection.useWrap) {
459           bc = wrapWithSasl(bc);
460         }
461       } catch (IOException e) {
462         LOG.warn("Exception while creating response " + e);
463       }
464       this.response = bc;
465     }
466 
467     private BufferChain wrapWithSasl(BufferChain bc)
468         throws IOException {
469       if (!this.connection.useSasl) return bc;
470       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
471       // THIS IS A BIG UGLY COPY.
472       byte [] responseBytes = bc.getBytes();
473       byte [] token;
474       // synchronization may be needed since there can be multiple Handler
475       // threads using saslServer to wrap responses.
476       synchronized (connection.saslServer) {
477         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
478       }
479       if (LOG.isTraceEnabled()) {
480         LOG.trace("Adding saslServer wrapped token of size " + token.length
481             + " as call response.");
482       }
483 
484       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
485       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
486       return new BufferChain(bbTokenLength, bbTokenBytes);
487     }
488 
489     @Override
490     public boolean isClientCellBlockSupported() {
491       return this.connection != null && this.connection.codec != null;
492     }
493 
494     @Override
495     public long disconnectSince() {
496       if (!connection.channel.isOpen()) {
497         return System.currentTimeMillis() - timestamp;
498       } else {
499         return -1L;
500       }
501     }
502 
503     public long getSize() {
504       return this.size;
505     }
506 
507     public long getResponseCellSize() {
508       return responseCellSize;
509     }
510 
511     public void incrementResponseCellSize(long cellSize) {
512       responseCellSize += cellSize;
513     }
514 
515     @Override
516     public long getResponseBlockSize() {
517       return responseBlockSize;
518     }
519 
520     @Override
521     public void incrementResponseBlockSize(long blockSize) {
522       responseBlockSize += blockSize;
523     }
524 
525     public synchronized void sendResponseIfReady() throws IOException {
526       // set param null to reduce memory pressure
527       this.param = null;
528       this.responder.doRespond(this);
529     }
530 
531     public UserGroupInformation getRemoteUser() {
532       return connection.ugi;
533     }
534 
535     @Override
536     public User getRequestUser() {
537       return user;
538     }
539 
540     @Override
541     public String getRequestUserName() {
542       User user = getRequestUser();
543       return user == null? null: user.getShortName();
544     }
545 
546     @Override
547     public InetAddress getRemoteAddress() {
548       return remoteAddress;
549     }
550 
551     @Override
552     public VersionInfo getClientVersionInfo() {
553       return connection.getVersionInfo();
554     }
555 
556     @Override
557     public boolean isRetryImmediatelySupported() {
558       return retryImmediatelySupported;
559     }
560   }
561 
562   /** Listens on the socket. Creates jobs for the handler threads*/
563   private class Listener extends Thread {
564 
565     private ServerSocketChannel acceptChannel = null; //the accept channel
566     private Selector selector = null; //the selector that we use for the server
567     private Reader[] readers = null;
568     private int currentReader = 0;
569     private Random rand = new Random();
570     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
571                                          //-tion (for idle connections) ran
572     private long cleanupInterval = 10000; //the minimum interval between
573                                           //two cleanup runs
574     private int backlogLength;
575 
576     private ExecutorService readPool;
577 
578     public Listener(final String name) throws IOException {
579       super(name);
580       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
581       // Create a new server socket and set to non blocking mode
582       acceptChannel = ServerSocketChannel.open();
583       acceptChannel.configureBlocking(false);
584 
585       // Bind the server socket to the binding addrees (can be different from the default interface)
586       bind(acceptChannel.socket(), bindAddress, backlogLength);
587       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
588       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
589 
590       // create a selector;
591       selector= Selector.open();
592 
593       readers = new Reader[readThreads];
594       readPool = Executors.newFixedThreadPool(readThreads,
595         new ThreadFactoryBuilder().setNameFormat(
596           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
597           ",port=" + port).setDaemon(true).build());
598       for (int i = 0; i < readThreads; ++i) {
599         Reader reader = new Reader();
600         readers[i] = reader;
601         readPool.execute(reader);
602       }
603       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
604 
605       // Register accepts on the server socket with the selector.
606       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
607       this.setName("RpcServer.listener,port=" + port);
608       this.setDaemon(true);
609     }
610 
611 
612     private class Reader implements Runnable {
613       private volatile boolean adding = false;
614       private final Selector readSelector;
615 
616       Reader() throws IOException {
617         this.readSelector = Selector.open();
618       }
619       @Override
620       public void run() {
621         try {
622           doRunLoop();
623         } finally {
624           try {
625             readSelector.close();
626           } catch (IOException ioe) {
627             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
628           }
629         }
630       }
631 
632       private synchronized void doRunLoop() {
633         while (running) {
634           try {
635             readSelector.select();
636             while (adding) {
637               this.wait(1000);
638             }
639 
640             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
641             while (iter.hasNext()) {
642               SelectionKey key = iter.next();
643               iter.remove();
644               if (key.isValid()) {
645                 if (key.isReadable()) {
646                   doRead(key);
647                 }
648               }
649             }
650           } catch (InterruptedException e) {
651             LOG.debug("Interrupted while sleeping");
652             return;
653           } catch (IOException ex) {
654             LOG.info(getName() + ": IOException in Reader", ex);
655           }
656         }
657       }
658 
659       /**
660        * This gets reader into the state that waits for the new channel
661        * to be registered with readSelector. If it was waiting in select()
662        * the thread will be woken up, otherwise whenever select() is called
663        * it will return even if there is nothing to read and wait
664        * in while(adding) for finishAdd call
665        */
666       public void startAdd() {
667         adding = true;
668         readSelector.wakeup();
669       }
670 
671       public synchronized SelectionKey registerChannel(SocketChannel channel)
672         throws IOException {
673         return channel.register(readSelector, SelectionKey.OP_READ);
674       }
675 
676       public synchronized void finishAdd() {
677         adding = false;
678         this.notify();
679       }
680     }
681 
682     /** cleanup connections from connectionList. Choose a random range
683      * to scan and also have a limit on the number of the connections
684      * that will be cleanedup per run. The criteria for cleanup is the time
685      * for which the connection was idle. If 'force' is true then all
686      * connections will be looked at for the cleanup.
687      * @param force all connections will be looked at for cleanup
688      */
689     private void cleanupConnections(boolean force) {
690       if (force || numConnections > thresholdIdleConnections) {
691         long currentTime = System.currentTimeMillis();
692         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
693           return;
694         }
695         int start = 0;
696         int end = numConnections - 1;
697         if (!force) {
698           start = rand.nextInt() % numConnections;
699           end = rand.nextInt() % numConnections;
700           int temp;
701           if (end < start) {
702             temp = start;
703             start = end;
704             end = temp;
705           }
706         }
707         int i = start;
708         int numNuked = 0;
709         while (i <= end) {
710           Connection c;
711           synchronized (connectionList) {
712             try {
713               c = connectionList.get(i);
714             } catch (Exception e) {return;}
715           }
716           if (c.timedOut(currentTime)) {
717             if (LOG.isDebugEnabled())
718               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
719             closeConnection(c);
720             numNuked++;
721             end--;
722             //noinspection UnusedAssignment
723             c = null;
724             if (!force && numNuked == maxConnectionsToNuke) break;
725           }
726           else i++;
727         }
728         lastCleanupRunTime = System.currentTimeMillis();
729       }
730     }
731 
732     @Override
733     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
734       justification="selector access is not synchronized; seems fine but concerned changing " +
735         "it will have per impact")
736     public void run() {
737       LOG.info(getName() + ": starting");
738       while (running) {
739         SelectionKey key = null;
740         try {
741           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
742           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
743           while (iter.hasNext()) {
744             key = iter.next();
745             iter.remove();
746             try {
747               if (key.isValid()) {
748                 if (key.isAcceptable())
749                   doAccept(key);
750               }
751             } catch (IOException ignored) {
752               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
753             }
754             key = null;
755           }
756         } catch (OutOfMemoryError e) {
757           if (errorHandler != null) {
758             if (errorHandler.checkOOME(e)) {
759               LOG.info(getName() + ": exiting on OutOfMemoryError");
760               closeCurrentConnection(key, e);
761               cleanupConnections(true);
762               return;
763             }
764           } else {
765             // we can run out of memory if we have too many threads
766             // log the event and sleep for a minute and give
767             // some thread(s) a chance to finish
768             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
769             closeCurrentConnection(key, e);
770             cleanupConnections(true);
771             try {
772               Thread.sleep(60000);
773             } catch (InterruptedException ex) {
774               LOG.debug("Interrupted while sleeping");
775               return;
776             }
777           }
778         } catch (Exception e) {
779           closeCurrentConnection(key, e);
780         }
781         cleanupConnections(false);
782       }
783 
784       LOG.info(getName() + ": stopping");
785 
786       synchronized (this) {
787         try {
788           acceptChannel.close();
789           selector.close();
790         } catch (IOException ignored) {
791           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
792         }
793 
794         selector= null;
795         acceptChannel= null;
796 
797         // clean up all connections
798         while (!connectionList.isEmpty()) {
799           closeConnection(connectionList.remove(0));
800         }
801       }
802     }
803 
804     private void closeCurrentConnection(SelectionKey key, Throwable e) {
805       if (key != null) {
806         Connection c = (Connection)key.attachment();
807         if (c != null) {
808           if (LOG.isDebugEnabled()) {
809             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
810                 (e != null ? " on error " + e.getMessage() : ""));
811           }
812           closeConnection(c);
813           key.attach(null);
814         }
815       }
816     }
817 
818     InetSocketAddress getAddress() {
819       return address;
820     }
821 
822     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
823       Connection c;
824       ServerSocketChannel server = (ServerSocketChannel) key.channel();
825 
826       SocketChannel channel;
827       while ((channel = server.accept()) != null) {
828         try {
829           channel.configureBlocking(false);
830           channel.socket().setTcpNoDelay(tcpNoDelay);
831           channel.socket().setKeepAlive(tcpKeepAlive);
832         } catch (IOException ioe) {
833           channel.close();
834           throw ioe;
835         }
836 
837         Reader reader = getReader();
838         try {
839           reader.startAdd();
840           SelectionKey readKey = reader.registerChannel(channel);
841           c = getConnection(channel, System.currentTimeMillis());
842           readKey.attach(c);
843           synchronized (connectionList) {
844             connectionList.add(numConnections, c);
845             numConnections++;
846           }
847           if (LOG.isDebugEnabled())
848             LOG.debug(getName() + ": connection from " + c.toString() +
849                 "; # active connections: " + numConnections);
850         } finally {
851           reader.finishAdd();
852         }
853       }
854     }
855 
856     void doRead(SelectionKey key) throws InterruptedException {
857       int count;
858       Connection c = (Connection) key.attachment();
859       if (c == null) {
860         return;
861       }
862       c.setLastContact(System.currentTimeMillis());
863       try {
864         count = c.readAndProcess();
865 
866         if (count > 0) {
867           c.setLastContact(System.currentTimeMillis());
868         }
869 
870       } catch (InterruptedException ieo) {
871         throw ieo;
872       } catch (Exception e) {
873         if (LOG.isDebugEnabled()) {
874           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
875         }
876         count = -1; //so that the (count < 0) block is executed
877       }
878       if (count < 0) {
879         if (LOG.isDebugEnabled()) {
880           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
881               " because read count=" + count +
882               ". Number of active connections: " + numConnections);
883         }
884         closeConnection(c);
885       }
886     }
887 
888     synchronized void doStop() {
889       if (selector != null) {
890         selector.wakeup();
891         Thread.yield();
892       }
893       if (acceptChannel != null) {
894         try {
895           acceptChannel.socket().close();
896         } catch (IOException e) {
897           LOG.info(getName() + ": exception in closing listener socket. " + e);
898         }
899       }
900       readPool.shutdownNow();
901     }
902 
903     // The method that will return the next reader to work with
904     // Simplistic implementation of round robin for now
905     Reader getReader() {
906       currentReader = (currentReader + 1) % readers.length;
907       return readers[currentReader];
908     }
909   }
910 
911   // Sends responses of RPC back to clients.
912   protected class Responder extends Thread {
913     private final Selector writeSelector;
914     private final Set<Connection> writingCons =
915         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
916 
917     Responder() throws IOException {
918       this.setName("RpcServer.responder");
919       this.setDaemon(true);
920       writeSelector = Selector.open(); // create a selector
921     }
922 
923     @Override
924     public void run() {
925       LOG.info(getName() + ": starting");
926       try {
927         doRunLoop();
928       } finally {
929         LOG.info(getName() + ": stopping");
930         try {
931           writeSelector.close();
932         } catch (IOException ioe) {
933           LOG.error(getName() + ": couldn't close write selector", ioe);
934         }
935       }
936     }
937 
938     /**
939      * Take the list of the connections that want to write, and register them
940      * in the selector.
941      */
942     private void registerWrites() {
943       Iterator<Connection> it = writingCons.iterator();
944       while (it.hasNext()) {
945         Connection c = it.next();
946         it.remove();
947         SelectionKey sk = c.channel.keyFor(writeSelector);
948         try {
949           if (sk == null) {
950             try {
951               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
952             } catch (ClosedChannelException e) {
953               // ignore: the client went away.
954               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
955             }
956           } else {
957             sk.interestOps(SelectionKey.OP_WRITE);
958           }
959         } catch (CancelledKeyException e) {
960           // ignore: the client went away.
961           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
962         }
963       }
964     }
965 
966     /**
967      * Add a connection to the list that want to write,
968      */
969     public void registerForWrite(Connection c) {
970       if (writingCons.add(c)) {
971         writeSelector.wakeup();
972       }
973     }
974 
975     private void doRunLoop() {
976       long lastPurgeTime = 0;   // last check for old calls.
977       while (running) {
978         try {
979           registerWrites();
980           int keyCt = writeSelector.select(purgeTimeout);
981           if (keyCt == 0) {
982             continue;
983           }
984 
985           Set<SelectionKey> keys = writeSelector.selectedKeys();
986           Iterator<SelectionKey> iter = keys.iterator();
987           while (iter.hasNext()) {
988             SelectionKey key = iter.next();
989             iter.remove();
990             try {
991               if (key.isValid() && key.isWritable()) {
992                 doAsyncWrite(key);
993               }
994             } catch (IOException e) {
995               LOG.debug(getName() + ": asyncWrite", e);
996             }
997           }
998 
999           lastPurgeTime = purge(lastPurgeTime);
1000 
1001         } catch (OutOfMemoryError e) {
1002           if (errorHandler != null) {
1003             if (errorHandler.checkOOME(e)) {
1004               LOG.info(getName() + ": exiting on OutOfMemoryError");
1005               return;
1006             }
1007           } else {
1008             //
1009             // we can run out of memory if we have too many threads
1010             // log the event and sleep for a minute and give
1011             // some thread(s) a chance to finish
1012             //
1013             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1014             try {
1015               Thread.sleep(60000);
1016             } catch (InterruptedException ex) {
1017               LOG.debug("Interrupted while sleeping");
1018               return;
1019             }
1020           }
1021         } catch (Exception e) {
1022           LOG.warn(getName() + ": exception in Responder " +
1023               StringUtils.stringifyException(e), e);
1024         }
1025       }
1026       LOG.info(getName() + ": stopped");
1027     }
1028 
1029     /**
1030      * If there were some calls that have not been sent out for a
1031      * long time, we close the connection.
1032      * @return the time of the purge.
1033      */
1034     private long purge(long lastPurgeTime) {
1035       long now = System.currentTimeMillis();
1036       if (now < lastPurgeTime + purgeTimeout) {
1037         return lastPurgeTime;
1038       }
1039 
1040       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1041       // get the list of channels from list of keys.
1042       synchronized (writeSelector.keys()) {
1043         for (SelectionKey key : writeSelector.keys()) {
1044           Connection connection = (Connection) key.attachment();
1045           if (connection == null) {
1046             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1047           }
1048           Call call = connection.responseQueue.peekFirst();
1049           if (call != null && now > call.timestamp + purgeTimeout) {
1050             conWithOldCalls.add(call.connection);
1051           }
1052         }
1053       }
1054 
1055       // Seems safer to close the connection outside of the synchronized loop...
1056       for (Connection connection : conWithOldCalls) {
1057         closeConnection(connection);
1058       }
1059 
1060       return now;
1061     }
1062 
1063     private void doAsyncWrite(SelectionKey key) throws IOException {
1064       Connection connection = (Connection) key.attachment();
1065       if (connection == null) {
1066         throw new IOException("doAsyncWrite: no connection");
1067       }
1068       if (key.channel() != connection.channel) {
1069         throw new IOException("doAsyncWrite: bad channel");
1070       }
1071 
1072       if (processAllResponses(connection)) {
1073         try {
1074           // We wrote everything, so we don't need to be told when the socket is ready for
1075           //  write anymore.
1076          key.interestOps(0);
1077         } catch (CancelledKeyException e) {
1078           /* The Listener/reader might have closed the socket.
1079            * We don't explicitly cancel the key, so not sure if this will
1080            * ever fire.
1081            * This warning could be removed.
1082            */
1083           LOG.warn("Exception while changing ops : " + e);
1084         }
1085       }
1086     }
1087 
1088     /**
1089      * Process the response for this call. You need to have the lock on
1090      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1091      *
1092      * @param call the call
1093      * @return true if we proceed the call fully, false otherwise.
1094      * @throws IOException
1095      */
1096     private boolean processResponse(final Call call) throws IOException {
1097       boolean error = true;
1098       try {
1099         // Send as much data as we can in the non-blocking fashion
1100         long numBytes = channelWrite(call.connection.channel, call.response);
1101         if (numBytes < 0) {
1102           throw new HBaseIOException("Error writing on the socket " +
1103             "for the call:" + call.toShortString());
1104         }
1105         error = false;
1106       } finally {
1107         if (error) {
1108           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1109           closeConnection(call.connection);
1110         }
1111       }
1112 
1113       if (!call.response.hasRemaining()) {
1114         call.done();
1115         return true;
1116       } else {
1117         return false; // Socket can't take more, we will have to come back.
1118       }
1119     }
1120 
1121     /**
1122      * Process all the responses for this connection
1123      *
1124      * @return true if all the calls were processed or that someone else is doing it.
1125      * false if there * is still some work to do. In this case, we expect the caller to
1126      * delay us.
1127      * @throws IOException
1128      */
1129     private boolean processAllResponses(final Connection connection) throws IOException {
1130       // We want only one writer on the channel for a connection at a time.
1131       connection.responseWriteLock.lock();
1132       try {
1133         for (int i = 0; i < 20; i++) {
1134           // protection if some handlers manage to need all the responder
1135           Call call = connection.responseQueue.pollFirst();
1136           if (call == null) {
1137             return true;
1138           }
1139           if (!processResponse(call)) {
1140             connection.responseQueue.addFirst(call);
1141             return false;
1142           }
1143         }
1144       } finally {
1145         connection.responseWriteLock.unlock();
1146       }
1147 
1148       return connection.responseQueue.isEmpty();
1149     }
1150 
1151     //
1152     // Enqueue a response from the application.
1153     //
1154     void doRespond(Call call) throws IOException {
1155       boolean added = false;
1156 
1157       // If there is already a write in progress, we don't wait. This allows to free the handlers
1158       //  immediately for other tasks.
1159       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1160         try {
1161           if (call.connection.responseQueue.isEmpty()) {
1162             // If we're alone, we can try to do a direct call to the socket. It's
1163             //  an optimisation to save on context switches and data transfer between cores..
1164             if (processResponse(call)) {
1165               return; // we're done.
1166             }
1167             // Too big to fit, putting ahead.
1168             call.connection.responseQueue.addFirst(call);
1169             added = true; // We will register to the selector later, outside of the lock.
1170           }
1171         } finally {
1172           call.connection.responseWriteLock.unlock();
1173         }
1174       }
1175 
1176       if (!added) {
1177         call.connection.responseQueue.addLast(call);
1178       }
1179       call.responder.registerForWrite(call.connection);
1180 
1181       // set the serve time when the response has to be sent later
1182       call.timestamp = System.currentTimeMillis();
1183     }
1184   }
1185 
1186   /** Reads calls from a connection and queues them for handling. */
1187   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1188       value="VO_VOLATILE_INCREMENT",
1189       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1190   public class Connection {
1191     // If initial preamble with version and magic has been read or not.
1192     private boolean connectionPreambleRead = false;
1193     // If the connection header has been read or not.
1194     private boolean connectionHeaderRead = false;
1195     protected SocketChannel channel;
1196     private ByteBuffer data;
1197     private ByteBuffer dataLengthBuffer;
1198     private ByteBuffer preambleBuffer;
1199     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1200     private final Lock responseWriteLock = new ReentrantLock();
1201     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1202     private long lastContact;
1203     private InetAddress addr;
1204     protected Socket socket;
1205     // Cache the remote host & port info so that even if the socket is
1206     // disconnected, we can say where it used to connect to.
1207     protected String hostAddress;
1208     protected int remotePort;
1209     ConnectionHeader connectionHeader;
1210     /**
1211      * Codec the client asked use.
1212      */
1213     private Codec codec;
1214     /**
1215      * Compression codec the client asked us use.
1216      */
1217     private CompressionCodec compressionCodec;
1218     BlockingService service;
1219 
1220     private AuthMethod authMethod;
1221     private boolean saslContextEstablished;
1222     private boolean skipInitialSaslHandshake;
1223     private ByteBuffer unwrappedData;
1224     // When is this set?  FindBugs wants to know!  Says NP
1225     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1226     boolean useSasl;
1227     SaslServer saslServer;
1228     private boolean useWrap = false;
1229     // Fake 'call' for failed authorization response
1230     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1231     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1232         null, null, this, null, 0, null, null);
1233     private ByteArrayOutputStream authFailedResponse =
1234         new ByteArrayOutputStream();
1235     // Fake 'call' for SASL context setup
1236     private static final int SASL_CALLID = -33;
1237     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1238         0, null, null);
1239 
1240     // was authentication allowed with a fallback to simple auth
1241     private boolean authenticatedWithFallback;
1242 
1243     private boolean retryImmediatelySupported = false;
1244 
1245     public UserGroupInformation attemptingUser = null; // user name before auth
1246     protected User user = null;
1247     protected UserGroupInformation ugi = null;
1248 
1249     public Connection(SocketChannel channel, long lastContact) {
1250       this.channel = channel;
1251       this.lastContact = lastContact;
1252       this.data = null;
1253       this.dataLengthBuffer = ByteBuffer.allocate(4);
1254       this.socket = channel.socket();
1255       this.addr = socket.getInetAddress();
1256       if (addr == null) {
1257         this.hostAddress = "*Unknown*";
1258       } else {
1259         this.hostAddress = addr.getHostAddress();
1260       }
1261       this.remotePort = socket.getPort();
1262       if (socketSendBufferSize != 0) {
1263         try {
1264           socket.setSendBufferSize(socketSendBufferSize);
1265         } catch (IOException e) {
1266           LOG.warn("Connection: unable to set socket send buffer size to " +
1267                    socketSendBufferSize);
1268         }
1269       }
1270     }
1271 
1272       @Override
1273     public String toString() {
1274       return getHostAddress() + ":" + remotePort;
1275     }
1276 
1277     public String getHostAddress() {
1278       return hostAddress;
1279     }
1280 
1281     public InetAddress getHostInetAddress() {
1282       return addr;
1283     }
1284 
1285     public int getRemotePort() {
1286       return remotePort;
1287     }
1288 
1289     public void setLastContact(long lastContact) {
1290       this.lastContact = lastContact;
1291     }
1292 
1293     public VersionInfo getVersionInfo() {
1294       if (connectionHeader.hasVersionInfo()) {
1295         return connectionHeader.getVersionInfo();
1296       }
1297       return null;
1298     }
1299 
1300     /* Return true if the connection has no outstanding rpc */
1301     private boolean isIdle() {
1302       return rpcCount.get() == 0;
1303     }
1304 
1305     /* Decrement the outstanding RPC count */
1306     protected void decRpcCount() {
1307       rpcCount.decrement();
1308     }
1309 
1310     /* Increment the outstanding RPC count */
1311     protected void incRpcCount() {
1312       rpcCount.increment();
1313     }
1314 
1315     protected boolean timedOut(long currentTime) {
1316       return isIdle() && currentTime - lastContact > maxIdleTime;
1317     }
1318 
1319     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1320         throws IOException {
1321       UserGroupInformation authorizedUgi;
1322       if (authMethod == AuthMethod.DIGEST) {
1323         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1324             secretManager);
1325         authorizedUgi = tokenId.getUser();
1326         if (authorizedUgi == null) {
1327           throw new AccessDeniedException(
1328               "Can't retrieve username from tokenIdentifier.");
1329         }
1330         authorizedUgi.addTokenIdentifier(tokenId);
1331       } else {
1332         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1333       }
1334       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1335       return authorizedUgi;
1336     }
1337 
1338     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1339         InterruptedException {
1340       if (saslContextEstablished) {
1341         if (LOG.isTraceEnabled())
1342           LOG.trace("Have read input token of size " + saslToken.length
1343               + " for processing by saslServer.unwrap()");
1344 
1345         if (!useWrap) {
1346           processOneRpc(saslToken);
1347         } else {
1348           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1349           processUnwrappedData(plaintextData);
1350         }
1351       } else {
1352         byte[] replyToken;
1353         try {
1354           if (saslServer == null) {
1355             switch (authMethod) {
1356             case DIGEST:
1357               if (secretManager == null) {
1358                 throw new AccessDeniedException(
1359                     "Server is not configured to do DIGEST authentication.");
1360               }
1361               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1362                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1363                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1364                       secretManager, this));
1365               break;
1366             default:
1367               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1368               String fullName = current.getUserName();
1369               if (LOG.isDebugEnabled()) {
1370                 LOG.debug("Kerberos principal name is " + fullName);
1371               }
1372               final String names[] = SaslUtil.splitKerberosName(fullName);
1373               if (names.length != 3) {
1374                 throw new AccessDeniedException(
1375                     "Kerberos principal name does NOT have the expected "
1376                         + "hostname part: " + fullName);
1377               }
1378               current.doAs(new PrivilegedExceptionAction<Object>() {
1379                 @Override
1380                 public Object run() throws SaslException {
1381                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1382                       .getMechanismName(), names[0], names[1],
1383                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1384                   return null;
1385                 }
1386               });
1387             }
1388             if (saslServer == null)
1389               throw new AccessDeniedException(
1390                   "Unable to find SASL server implementation for "
1391                       + authMethod.getMechanismName());
1392             if (LOG.isDebugEnabled()) {
1393               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1394             }
1395           }
1396           if (LOG.isDebugEnabled()) {
1397             LOG.debug("Have read input token of size " + saslToken.length
1398                 + " for processing by saslServer.evaluateResponse()");
1399           }
1400           replyToken = saslServer.evaluateResponse(saslToken);
1401         } catch (IOException e) {
1402           IOException sendToClient = e;
1403           Throwable cause = e;
1404           while (cause != null) {
1405             if (cause instanceof InvalidToken) {
1406               sendToClient = (InvalidToken) cause;
1407               break;
1408             }
1409             cause = cause.getCause();
1410           }
1411           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1412             sendToClient.getLocalizedMessage());
1413           metrics.authenticationFailure();
1414           String clientIP = this.toString();
1415           // attempting user could be null
1416           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1417           throw e;
1418         }
1419         if (replyToken != null) {
1420           if (LOG.isDebugEnabled()) {
1421             LOG.debug("Will send token of size " + replyToken.length
1422                 + " from saslServer.");
1423           }
1424           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1425               null);
1426         }
1427         if (saslServer.isComplete()) {
1428           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1429           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1430           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1431           if (LOG.isDebugEnabled()) {
1432             LOG.debug("SASL server context established. Authenticated client: "
1433               + ugi + ". Negotiated QoP is "
1434               + saslServer.getNegotiatedProperty(Sasl.QOP));
1435           }
1436           metrics.authenticationSuccess();
1437           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1438           saslContextEstablished = true;
1439         }
1440       }
1441     }
1442 
1443     /**
1444      * No protobuf encoding of raw sasl messages
1445      */
1446     private void doRawSaslReply(SaslStatus status, Writable rv,
1447         String errorClass, String error) throws IOException {
1448       ByteBufferOutputStream saslResponse = null;
1449       DataOutputStream out = null;
1450       try {
1451         // In my testing, have noticed that sasl messages are usually
1452         // in the ballpark of 100-200. That's why the initial capacity is 256.
1453         saslResponse = new ByteBufferOutputStream(256);
1454         out = new DataOutputStream(saslResponse);
1455         out.writeInt(status.state); // write status
1456         if (status == SaslStatus.SUCCESS) {
1457           rv.write(out);
1458         } else {
1459           WritableUtils.writeString(out, errorClass);
1460           WritableUtils.writeString(out, error);
1461         }
1462         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1463         saslCall.responder = responder;
1464         saslCall.sendResponseIfReady();
1465       } finally {
1466         if (saslResponse != null) {
1467           saslResponse.close();
1468         }
1469         if (out != null) {
1470           out.close();
1471         }
1472       }
1473     }
1474 
1475     private void disposeSasl() {
1476       if (saslServer != null) {
1477         try {
1478           saslServer.dispose();
1479           saslServer = null;
1480         } catch (SaslException ignored) {
1481           // Ignored. This is being disposed of anyway.
1482         }
1483       }
1484     }
1485 
1486     private int readPreamble() throws IOException {
1487       if (preambleBuffer == null) {
1488         preambleBuffer = ByteBuffer.allocate(6);
1489       }
1490       int count = channelRead(channel, preambleBuffer);
1491       if (count < 0 || preambleBuffer.remaining() > 0) {
1492         return count;
1493       }
1494       // Check for 'HBas' magic.
1495       preambleBuffer.flip();
1496       for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
1497         if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
1498           return doBadPreambleHandling("Expected HEADER=" +
1499               Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
1500               Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
1501               " from " + toString());
1502         }
1503       }
1504       int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
1505       byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
1506       this.authMethod = AuthMethod.valueOf(authbyte);
1507       if (version != CURRENT_VERSION) {
1508         String msg = getFatalConnectionString(version, authbyte);
1509         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1510       }
1511       if (authMethod == null) {
1512         String msg = getFatalConnectionString(version, authbyte);
1513         return doBadPreambleHandling(msg, new BadAuthException(msg));
1514       }
1515       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1516         if (allowFallbackToSimpleAuth) {
1517           metrics.authenticationFallback();
1518           authenticatedWithFallback = true;
1519         } else {
1520           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1521           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1522           responder.doRespond(authFailedCall);
1523           throw ae;
1524         }
1525       }
1526       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1527         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1528             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1529         authMethod = AuthMethod.SIMPLE;
1530         // client has already sent the initial Sasl message and we
1531         // should ignore it. Both client and server should fall back
1532         // to simple auth from now on.
1533         skipInitialSaslHandshake = true;
1534       }
1535       if (authMethod != AuthMethod.SIMPLE) {
1536         useSasl = true;
1537       }
1538 
1539       preambleBuffer = null; // do not need it anymore
1540       connectionPreambleRead = true;
1541       return count;
1542     }
1543 
1544     private int read4Bytes() throws IOException {
1545       if (this.dataLengthBuffer.remaining() > 0) {
1546         return channelRead(channel, this.dataLengthBuffer);
1547       } else {
1548         return 0;
1549       }
1550     }
1551 
1552 
1553     /**
1554      * Read off the wire. If there is not enough data to read, update the connection state with
1555      *  what we have and returns.
1556      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1557      * @throws IOException
1558      * @throws InterruptedException
1559      */
1560     public int readAndProcess() throws IOException, InterruptedException {
1561       // If we have not read the connection setup preamble, look to see if that is on the wire.
1562       if (!connectionPreambleRead) {
1563         int count = readPreamble();
1564         if (!connectionPreambleRead) {
1565           return count;
1566         }
1567       }
1568       // Try and read in an int. It will be length of the data to read (or -1 if a ping). We catch
1569       // the integer length into the 4-byte this.dataLengthBuffer.
1570       int count = read4Bytes();
1571       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1572         return count;
1573       }
1574 
1575       // If we have not read the connection setup preamble, look to see if that is on the wire.
1576       if (!connectionPreambleRead) {
1577         count = readPreamble();
1578         if (!connectionPreambleRead) {
1579           return count;
1580         }
1581 
1582         count = read4Bytes();
1583         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1584           return count;
1585         }
1586       }
1587 
1588       final boolean useWrap = this.useWrap;
1589 
1590       // we're guarding against data being modified concurrently
1591       // while trying to keep other instance members out of the block
1592       synchronized(this) {
1593         // We have read a length and we have read the preamble.  It is either the connection header
1594         // or it is a request.
1595         if (data == null) {
1596           dataLengthBuffer.flip();
1597           int dataLength = dataLengthBuffer.getInt();
1598           if (dataLength == RpcClient.PING_CALL_ID) {
1599             if (!useWrap) { //covers the !useSasl too
1600               dataLengthBuffer.clear();
1601               return 0;  //ping message
1602             }
1603           }
1604 
1605           if (dataLength < 0) { // A data length of zero is legal.
1606             throw new IllegalArgumentException("Unexpected data length "
1607                 + dataLength + "!! from " + getHostAddress());
1608           }
1609           data = ByteBuffer.allocate(dataLength);
1610 
1611           // Increment the rpc count. This counter will be decreased when we write
1612           //  the response.  If we want the connection to be detected as idle properly, we
1613           //  need to keep the inc / dec correct.
1614           incRpcCount();
1615         }
1616 
1617         count = channelRead(channel, data);
1618 
1619         if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1620           process();
1621         }
1622       }
1623 
1624       return count;
1625     }
1626 
1627     /**
1628      * Process the data buffer and clean the connection state for the next call.
1629      */
1630     private void process() throws IOException, InterruptedException {
1631       data.flip();
1632       try {
1633         if (skipInitialSaslHandshake) {
1634           skipInitialSaslHandshake = false;
1635           return;
1636         }
1637 
1638         if (useSasl) {
1639           saslReadAndProcess(data.array());
1640         } else {
1641           processOneRpc(data.array());
1642         }
1643 
1644       } finally {
1645         dataLengthBuffer.clear(); // Clean for the next call
1646         data = null; // For the GC
1647       }
1648     }
1649 
1650     private String getFatalConnectionString(final int version, final byte authByte) {
1651       return "serverVersion=" + CURRENT_VERSION +
1652       ", clientVersion=" + version + ", authMethod=" + authByte +
1653       ", authSupported=" + (authMethod != null) + " from " + toString();
1654     }
1655 
1656     private int doBadPreambleHandling(final String msg) throws IOException {
1657       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1658     }
1659 
1660     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1661       LOG.warn(msg);
1662       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1663       setupResponse(null, fakeCall, e, msg);
1664       responder.doRespond(fakeCall);
1665       // Returning -1 closes out the connection.
1666       return -1;
1667     }
1668 
1669     // Reads the connection header following version
1670     private void processConnectionHeader(byte[] buf) throws IOException {
1671       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1672       String serviceName = connectionHeader.getServiceName();
1673       if (serviceName == null) throw new EmptyServiceNameException();
1674       this.service = getService(services, serviceName);
1675       if (this.service == null) throw new UnknownServiceException(serviceName);
1676       setupCellBlockCodecs(this.connectionHeader);
1677       UserGroupInformation protocolUser = createUser(connectionHeader);
1678       if (!useSasl) {
1679         ugi = protocolUser;
1680         if (ugi != null) {
1681           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1682         }
1683         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1684         if (authenticatedWithFallback) {
1685           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1686               + " connecting from " + getHostAddress());
1687         }
1688         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1689       } else {
1690         // user is authenticated
1691         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1692         //Now we check if this is a proxy user case. If the protocol user is
1693         //different from the 'user', it is a proxy user scenario. However,
1694         //this is not allowed if user authenticated with DIGEST.
1695         if ((protocolUser != null)
1696             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1697           if (authMethod == AuthMethod.DIGEST) {
1698             // Not allowed to doAs if token authentication is used
1699             throw new AccessDeniedException("Authenticated user (" + ugi
1700                 + ") doesn't match what the client claims to be ("
1701                 + protocolUser + ")");
1702           } else {
1703             // Effective user can be different from authenticated user
1704             // for simple auth or kerberos auth
1705             // The user is the real user. Now we create a proxy user
1706             UserGroupInformation realUser = ugi;
1707             ugi = UserGroupInformation.createProxyUser(protocolUser
1708                 .getUserName(), realUser);
1709             // Now the user is a proxy user, set Authentication method Proxy.
1710             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1711           }
1712         }
1713       }
1714       if (connectionHeader.hasVersionInfo()) {
1715         // see if this connection will support RetryImmediatelyException
1716         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1717 
1718         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1719             + " with version info: "
1720             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1721       } else {
1722         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1723             + " with unknown version info");
1724       }
1725 
1726 
1727     }
1728 
1729     /**
1730      * Set up cell block codecs
1731      * @throws FatalConnectionException
1732      */
1733     private void setupCellBlockCodecs(final ConnectionHeader header)
1734     throws FatalConnectionException {
1735       // TODO: Plug in other supported decoders.
1736       if (!header.hasCellBlockCodecClass()) return;
1737       String className = header.getCellBlockCodecClass();
1738       if (className == null || className.length() == 0) return;
1739       try {
1740         this.codec = (Codec)Class.forName(className).newInstance();
1741       } catch (Exception e) {
1742         throw new UnsupportedCellCodecException(className, e);
1743       }
1744       if (!header.hasCellBlockCompressorClass()) return;
1745       className = header.getCellBlockCompressorClass();
1746       try {
1747         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1748       } catch (Exception e) {
1749         throw new UnsupportedCompressionCodecException(className, e);
1750       }
1751     }
1752 
1753     private void processUnwrappedData(byte[] inBuf) throws IOException,
1754     InterruptedException {
1755       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1756       // Read all RPCs contained in the inBuf, even partial ones
1757       while (true) {
1758         int count;
1759         if (unwrappedDataLengthBuffer.remaining() > 0) {
1760           count = channelRead(ch, unwrappedDataLengthBuffer);
1761           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1762             return;
1763         }
1764 
1765         if (unwrappedData == null) {
1766           unwrappedDataLengthBuffer.flip();
1767           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1768 
1769           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1770             if (LOG.isDebugEnabled())
1771               LOG.debug("Received ping message");
1772             unwrappedDataLengthBuffer.clear();
1773             continue; // ping message
1774           }
1775           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1776         }
1777 
1778         count = channelRead(ch, unwrappedData);
1779         if (count <= 0 || unwrappedData.remaining() > 0)
1780           return;
1781 
1782         if (unwrappedData.remaining() == 0) {
1783           unwrappedDataLengthBuffer.clear();
1784           unwrappedData.flip();
1785           processOneRpc(unwrappedData.array());
1786           unwrappedData = null;
1787         }
1788       }
1789     }
1790 
1791     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1792       if (connectionHeaderRead) {
1793         processRequest(buf);
1794       } else {
1795         processConnectionHeader(buf);
1796         this.connectionHeaderRead = true;
1797         if (!authorizeConnection()) {
1798           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1799           // down the connection instead of trying to read non-existent retun.
1800           throw new AccessDeniedException("Connection from " + this + " for service " +
1801             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1802         }
1803         this.user = userProvider.create(this.ugi);
1804       }
1805     }
1806 
1807     /**
1808      * @param buf Has the request header and the request param and optionally encoded data buffer
1809      * all in this one array.
1810      * @throws IOException
1811      * @throws InterruptedException
1812      */
1813     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1814       long totalRequestSize = buf.length;
1815       int offset = 0;
1816       // Here we read in the header.  We avoid having pb
1817       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1818       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1819       int headerSize = cis.readRawVarint32();
1820       offset = cis.getTotalBytesRead();
1821       Message.Builder builder = RequestHeader.newBuilder();
1822       ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1823       RequestHeader header = (RequestHeader) builder.build();
1824       offset += headerSize;
1825       int id = header.getCallId();
1826       if (LOG.isTraceEnabled()) {
1827         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1828           " totalRequestSize: " + totalRequestSize + " bytes");
1829       }
1830       // Enforcing the call queue size, this triggers a retry in the client
1831       // This is a bit late to be doing this check - we have already read in the total request.
1832       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1833         final Call callTooBig =
1834           new Call(id, this.service, null, null, null, null, this,
1835             responder, totalRequestSize, null, null);
1836         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1837         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1838         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1839             "Call queue is full on " + server.getServerName() +
1840                 ", is hbase.ipc.server.max.callqueue.size too small?");
1841         responder.doRespond(callTooBig);
1842         return;
1843       }
1844       MethodDescriptor md = null;
1845       Message param = null;
1846       CellScanner cellScanner = null;
1847       try {
1848         if (header.hasRequestParam() && header.getRequestParam()) {
1849           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1850           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1851           builder = this.service.getRequestPrototype(md).newBuilderForType();
1852           // To read the varint, I need an inputstream; might as well be a CIS.
1853           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1854           int paramSize = cis.readRawVarint32();
1855           offset += cis.getTotalBytesRead();
1856           if (builder != null) {
1857             ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1858             param = builder.build();
1859           }
1860           offset += paramSize;
1861         }
1862         if (header.hasCellBlockMeta()) {
1863           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1864             buf, offset, buf.length);
1865         }
1866       } catch (Throwable t) {
1867         InetSocketAddress address = getListenerAddress();
1868         String msg = (address != null ? address : "(channel closed)") +
1869             " is unable to read call parameter from client " + getHostAddress();
1870         LOG.warn(msg, t);
1871 
1872         metrics.exception(t);
1873 
1874         // probably the hbase hadoop version does not match the running hadoop version
1875         if (t instanceof LinkageError) {
1876           t = new DoNotRetryIOException(t);
1877         }
1878         // If the method is not present on the server, do not retry.
1879         if (t instanceof UnsupportedOperationException) {
1880           t = new DoNotRetryIOException(t);
1881         }
1882 
1883         final Call readParamsFailedCall =
1884           new Call(id, this.service, null, null, null, null, this,
1885             responder, totalRequestSize, null, null);
1886         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1887         setupResponse(responseBuffer, readParamsFailedCall, t,
1888           msg + "; " + t.getMessage());
1889         responder.doRespond(readParamsFailedCall);
1890         return;
1891       }
1892 
1893       TraceInfo traceInfo = header.hasTraceInfo()
1894           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1895           : null;
1896       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1897               totalRequestSize, traceInfo, this.addr);
1898 
1899       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1900         callQueueSize.add(-1 * call.getSize());
1901 
1902         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1903         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1904         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1905             "Call queue is full on " + server.getServerName() +
1906                 ", too many items queued ?");
1907         responder.doRespond(call);
1908       }
1909     }
1910 
1911     private boolean authorizeConnection() throws IOException {
1912       try {
1913         // If auth method is DIGEST, the token was obtained by the
1914         // real user for the effective user, therefore not required to
1915         // authorize real user. doAs is allowed only for simple or kerberos
1916         // authentication
1917         if (ugi != null && ugi.getRealUser() != null
1918             && (authMethod != AuthMethod.DIGEST)) {
1919           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1920         }
1921         authorize(ugi, connectionHeader, getHostInetAddress());
1922         metrics.authorizationSuccess();
1923       } catch (AuthorizationException ae) {
1924         if (LOG.isDebugEnabled()) {
1925           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1926         }
1927         metrics.authorizationFailure();
1928         setupResponse(authFailedResponse, authFailedCall,
1929           new AccessDeniedException(ae), ae.getMessage());
1930         responder.doRespond(authFailedCall);
1931         return false;
1932       }
1933       return true;
1934     }
1935 
1936     protected synchronized void close() {
1937       disposeSasl();
1938       data = null;
1939       if (!channel.isOpen())
1940         return;
1941       try {socket.shutdownOutput();} catch(Exception ignored) {
1942         if (LOG.isTraceEnabled()) {
1943           LOG.trace(ignored);
1944         }
1945       }
1946       if (channel.isOpen()) {
1947         try {channel.close();} catch(Exception ignored) {
1948           if (LOG.isTraceEnabled()) {
1949             LOG.trace(ignored);
1950           }
1951         }
1952       }
1953       try {socket.close();} catch(Exception ignored) {
1954         if (LOG.isTraceEnabled()) {
1955           LOG.trace(ignored);
1956         }
1957       }
1958     }
1959 
1960     private UserGroupInformation createUser(ConnectionHeader head) {
1961       UserGroupInformation ugi = null;
1962 
1963       if (!head.hasUserInfo()) {
1964         return null;
1965       }
1966       UserInformation userInfoProto = head.getUserInfo();
1967       String effectiveUser = null;
1968       if (userInfoProto.hasEffectiveUser()) {
1969         effectiveUser = userInfoProto.getEffectiveUser();
1970       }
1971       String realUser = null;
1972       if (userInfoProto.hasRealUser()) {
1973         realUser = userInfoProto.getRealUser();
1974       }
1975       if (effectiveUser != null) {
1976         if (realUser != null) {
1977           UserGroupInformation realUserUgi =
1978               UserGroupInformation.createRemoteUser(realUser);
1979           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1980         } else {
1981           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1982         }
1983       }
1984       return ugi;
1985     }
1986   }
1987 
1988   /**
1989    * Datastructure for passing a {@link BlockingService} and its associated class of
1990    * protobuf service interface.  For example, a server that fielded what is defined
1991    * in the client protobuf service would pass in an implementation of the client blocking service
1992    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1993    */
1994   public static class BlockingServiceAndInterface {
1995     private final BlockingService service;
1996     private final Class<?> serviceInterface;
1997     public BlockingServiceAndInterface(final BlockingService service,
1998         final Class<?> serviceInterface) {
1999       this.service = service;
2000       this.serviceInterface = serviceInterface;
2001     }
2002     public Class<?> getServiceInterface() {
2003       return this.serviceInterface;
2004     }
2005     public BlockingService getBlockingService() {
2006       return this.service;
2007     }
2008   }
2009 
2010   /**
2011    * Constructs a server listening on the named port and address.
2012    * @param server hosting instance of {@link Server}. We will do authentications if an
2013    * instance else pass null for no authentication check.
2014    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2015    * @param services A list of services.
2016    * @param bindAddress Where to listen
2017    * @param conf
2018    * @param scheduler
2019    */
2020   public RpcServer(final Server server, final String name,
2021       final List<BlockingServiceAndInterface> services,
2022       final InetSocketAddress bindAddress, Configuration conf,
2023       RpcScheduler scheduler)
2024       throws IOException {
2025 
2026     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2027       this.reservoir = new BoundedByteBufferPool(
2028           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2029           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2030           // Make the max twice the number of handlers to be safe.
2031           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2032               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2033                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2034     } else {
2035       reservoir = null;
2036     }
2037     
2038     this.server = server;
2039     this.services = services;
2040     this.bindAddress = bindAddress;
2041     this.conf = conf;
2042     this.socketSendBufferSize = 0;
2043     this.maxQueueSize =
2044       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2045     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2046     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2047     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2048     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2049     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2050       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2051     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2052     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2053 
2054     // Start the listener here and let it bind to the port
2055     listener = new Listener(name);
2056     this.port = listener.getAddress().getPort();
2057 
2058     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2059     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2060     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2061 
2062     this.ipcUtil = new IPCUtil(conf);
2063 
2064     // Create the responder here
2065     responder = new Responder();
2066     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2067     this.userProvider = UserProvider.instantiate(conf);
2068     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2069     if (isSecurityEnabled) {
2070       HBaseSaslRpcServer.init(conf);
2071     }
2072     initReconfigurable(conf);
2073 
2074     this.scheduler = scheduler;
2075     this.scheduler.init(new RpcSchedulerContext(this));
2076   }
2077 
2078   @Override
2079   public void onConfigurationChange(Configuration newConf) {
2080     initReconfigurable(newConf);
2081   }
2082 
2083   private void initReconfigurable(Configuration confToLoad) {
2084     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2085     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2086       LOG.warn("********* WARNING! *********");
2087       LOG.warn("This server is configured to allow connections from INSECURE clients");
2088       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2089       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2090       LOG.warn("impersonation is possible!");
2091       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2092       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2093       LOG.warn("****************************");
2094     }
2095   }
2096 
2097   /**
2098    * Subclasses of HBaseServer can override this to provide their own
2099    * Connection implementations.
2100    */
2101   protected Connection getConnection(SocketChannel channel, long time) {
2102     return new Connection(channel, time);
2103   }
2104 
2105   /**
2106    * Setup response for the RPC Call.
2107    *
2108    * @param response buffer to serialize the response into
2109    * @param call {@link Call} to which we are setting up the response
2110    * @param error error message, if the call failed
2111    * @throws IOException
2112    */
2113   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2114   throws IOException {
2115     if (response != null) response.reset();
2116     call.setResponse(null, null, t, error);
2117   }
2118 
2119   protected void closeConnection(Connection connection) {
2120     synchronized (connectionList) {
2121       if (connectionList.remove(connection)) {
2122         numConnections--;
2123       }
2124     }
2125     connection.close();
2126   }
2127 
2128   Configuration getConf() {
2129     return conf;
2130   }
2131 
2132   /** Sets the socket buffer size used for responding to RPCs.
2133    * @param size send size
2134    */
2135   @Override
2136   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2137 
2138   @Override
2139   public boolean isStarted() {
2140     return this.started;
2141   }
2142 
2143   /** Starts the service.  Must be called before any calls will be handled. */
2144   @Override
2145   public synchronized void start() {
2146     if (started) return;
2147     authTokenSecretMgr = createSecretManager();
2148     if (authTokenSecretMgr != null) {
2149       setSecretManager(authTokenSecretMgr);
2150       authTokenSecretMgr.start();
2151     }
2152     this.authManager = new ServiceAuthorizationManager();
2153     HBasePolicyProvider.init(conf, authManager);
2154     responder.start();
2155     listener.start();
2156     scheduler.start();
2157     started = true;
2158   }
2159 
2160   @Override
2161   public synchronized void refreshAuthManager(PolicyProvider pp) {
2162     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2163     // it'll break if you go via static route.
2164     this.authManager.refresh(this.conf, pp);
2165   }
2166 
2167   private AuthenticationTokenSecretManager createSecretManager() {
2168     if (!isSecurityEnabled) return null;
2169     if (server == null) return null;
2170     Configuration conf = server.getConfiguration();
2171     long keyUpdateInterval =
2172         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2173     long maxAge =
2174         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2175     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2176         server.getServerName().toString(), keyUpdateInterval, maxAge);
2177   }
2178 
2179   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2180     return this.secretManager;
2181   }
2182 
2183   @SuppressWarnings("unchecked")
2184   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2185     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2186   }
2187 
2188   /**
2189    * This is a server side method, which is invoked over RPC. On success
2190    * the return response has protobuf response payload. On failure, the
2191    * exception name and the stack trace are returned in the protobuf response.
2192    */
2193   @Override
2194   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2195       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2196   throws IOException {
2197     try {
2198       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2199       // TODO: Review after we add in encoded data blocks.
2200       status.setRPCPacket(param);
2201       status.resume("Servicing call");
2202       //get an instance of the method arg type
2203       long startTime = System.currentTimeMillis();
2204       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2205       Message result = service.callBlockingMethod(md, controller, param);
2206       long endTime = System.currentTimeMillis();
2207       int processingTime = (int) (endTime - startTime);
2208       int qTime = (int) (startTime - receiveTime);
2209       int totalTime = (int) (endTime - receiveTime);
2210       if (LOG.isTraceEnabled()) {
2211         LOG.trace(CurCall.get().toString() +
2212             ", response " + TextFormat.shortDebugString(result) +
2213             " queueTime: " + qTime +
2214             " processingTime: " + processingTime +
2215             " totalTime: " + totalTime);
2216       }
2217       long requestSize = param.getSerializedSize();
2218       long responseSize = result.getSerializedSize();
2219       metrics.dequeuedCall(qTime);
2220       metrics.processedCall(processingTime);
2221       metrics.totalCall(totalTime);
2222       metrics.receivedRequest(requestSize);
2223       metrics.sentResponse(responseSize);
2224       // log any RPC responses that are slower than the configured warn
2225       // response time or larger than configured warning size
2226       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2227       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2228       if (tooSlow || tooLarge) {
2229         // when tagging, we let TooLarge trump TooSmall to keep output simple
2230         // note that large responses will often also be slow.
2231         logResponse(param,
2232             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2233             (tooLarge ? "TooLarge" : "TooSlow"),
2234             status.getClient(), startTime, processingTime, qTime,
2235             responseSize);
2236       }
2237       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2238     } catch (Throwable e) {
2239       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2240       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2241       // need to pass it over the wire.
2242       if (e instanceof ServiceException) {
2243         if (e.getCause() == null) {
2244           LOG.debug("Caught a ServiceException with null cause", e);
2245         } else {
2246           e = e.getCause();
2247         }
2248       }
2249 
2250       // increment the number of requests that were exceptions.
2251       metrics.exception(e);
2252 
2253       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2254       if (e instanceof IOException) throw (IOException)e;
2255       LOG.error("Unexpected throwable object ", e);
2256       throw new IOException(e.getMessage(), e);
2257     }
2258   }
2259 
2260   /**
2261    * Logs an RPC response to the LOG file, producing valid JSON objects for
2262    * client Operations.
2263    * @param param The parameters received in the call.
2264    * @param methodName The name of the method invoked
2265    * @param call The string representation of the call
2266    * @param tag  The tag that will be used to indicate this event in the log.
2267    * @param clientAddress   The address of the client who made this call.
2268    * @param startTime       The time that the call was initiated, in ms.
2269    * @param processingTime  The duration that the call took to run, in ms.
2270    * @param qTime           The duration that the call spent on the queue
2271    *                        prior to being initiated, in ms.
2272    * @param responseSize    The size in bytes of the response buffer.
2273    */
2274   void logResponse(Message param, String methodName, String call, String tag,
2275       String clientAddress, long startTime, int processingTime, int qTime,
2276       long responseSize)
2277           throws IOException {
2278     // base information that is reported regardless of type of call
2279     Map<String, Object> responseInfo = new HashMap<String, Object>();
2280     responseInfo.put("starttimems", startTime);
2281     responseInfo.put("processingtimems", processingTime);
2282     responseInfo.put("queuetimems", qTime);
2283     responseInfo.put("responsesize", responseSize);
2284     responseInfo.put("client", clientAddress);
2285     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2286     responseInfo.put("method", methodName);
2287     responseInfo.put("call", call);
2288     // The params could be really big, make sure they don't kill us at WARN
2289     String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
2290     if (stringifiedParam.length() > 150) {
2291       // Truncate to 1000 chars if TRACE is on, else to 150 chars
2292       stringifiedParam = stringifiedParam.subSequence(
2293           0, LOG.isTraceEnabled() ? 1000 : 150) + " <TRUNCATED>";
2294     }
2295     responseInfo.put("param", stringifiedParam);
2296     if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
2297       ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
2298       if (request.hasScannerId()) {
2299         long scannerId = request.getScannerId();
2300         String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
2301         if (scanDetails != null) {
2302           responseInfo.put("scandetails", scanDetails);
2303         }
2304       }
2305     }
2306     LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2307   }
2308 
2309   /** Stops the service.  No new calls will be handled after this is called. */
2310   @Override
2311   public synchronized void stop() {
2312     LOG.info("Stopping server on " + port);
2313     running = false;
2314     if (authTokenSecretMgr != null) {
2315       authTokenSecretMgr.stop();
2316       authTokenSecretMgr = null;
2317     }
2318     listener.interrupt();
2319     listener.doStop();
2320     responder.interrupt();
2321     scheduler.stop();
2322     notifyAll();
2323   }
2324 
2325   /** Wait for the server to be stopped.
2326    * Does not wait for all subthreads to finish.
2327    *  See {@link #stop()}.
2328    * @throws InterruptedException e
2329    */
2330   @Override
2331   public synchronized void join() throws InterruptedException {
2332     while (running) {
2333       wait();
2334     }
2335   }
2336 
2337   /**
2338    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2339    * the listener channel is closed.
2340    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2341    * information cannot be determined
2342    */
2343   @Override
2344   public synchronized InetSocketAddress getListenerAddress() {
2345     if (listener == null) {
2346       return null;
2347     }
2348     return listener.getAddress();
2349   }
2350 
2351   /**
2352    * Set the handler for calling out of RPC for error conditions.
2353    * @param handler the handler implementation
2354    */
2355   @Override
2356   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2357     this.errorHandler = handler;
2358   }
2359 
2360   @Override
2361   public HBaseRPCErrorHandler getErrorHandler() {
2362     return this.errorHandler;
2363   }
2364 
2365   /**
2366    * Returns the metrics instance for reporting RPC call statistics
2367    */
2368   @Override
2369   public MetricsHBaseServer getMetrics() {
2370     return metrics;
2371   }
2372 
2373   @Override
2374   public void addCallSize(final long diff) {
2375     this.callQueueSize.add(diff);
2376   }
2377 
2378   /**
2379    * Authorize the incoming client connection.
2380    *
2381    * @param user client user
2382    * @param connection incoming connection
2383    * @param addr InetAddress of incoming connection
2384    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2385    *         when the client isn't authorized to talk the protocol
2386    */
2387   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2388       InetAddress addr)
2389   throws AuthorizationException {
2390     if (authorize) {
2391       Class<?> c = getServiceInterface(services, connection.getServiceName());
2392       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2393     }
2394   }
2395 
2396   /**
2397    * When the read or write buffer size is larger than this limit, i/o will be
2398    * done in chunks of this size. Most RPC requests and responses would be
2399    * be smaller.
2400    */
2401   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2402 
2403   /**
2404    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2405    * If the amount of data is large, it writes to channel in smaller chunks.
2406    * This is to avoid jdk from creating many direct buffers as the size of
2407    * buffer increases. This also minimizes extra copies in NIO layer
2408    * as a result of multiple write operations required to write a large
2409    * buffer.
2410    *
2411    * @param channel writable byte channel to write to
2412    * @param bufferChain Chain of buffers to write
2413    * @return number of bytes written
2414    * @throws java.io.IOException e
2415    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2416    */
2417   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2418   throws IOException {
2419     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2420     if (count > 0) this.metrics.sentBytes(count);
2421     return count;
2422   }
2423 
2424   /**
2425    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2426    * If the amount of data is large, it writes to channel in smaller chunks.
2427    * This is to avoid jdk from creating many direct buffers as the size of
2428    * ByteBuffer increases. There should not be any performance degredation.
2429    *
2430    * @param channel writable byte channel to write on
2431    * @param buffer buffer to write
2432    * @return number of bytes written
2433    * @throws java.io.IOException e
2434    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2435    */
2436   protected int channelRead(ReadableByteChannel channel,
2437                                    ByteBuffer buffer) throws IOException {
2438 
2439     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2440            channel.read(buffer) : channelIO(channel, null, buffer);
2441     if (count > 0) {
2442       metrics.receivedBytes(count);
2443     }
2444     return count;
2445   }
2446 
2447   /**
2448    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2449    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2450    * one of readCh or writeCh should be non-null.
2451    *
2452    * @param readCh read channel
2453    * @param writeCh write channel
2454    * @param buf buffer to read or write into/out of
2455    * @return bytes written
2456    * @throws java.io.IOException e
2457    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2458    * @see #channelWrite(GatheringByteChannel, BufferChain)
2459    */
2460   private static int channelIO(ReadableByteChannel readCh,
2461                                WritableByteChannel writeCh,
2462                                ByteBuffer buf) throws IOException {
2463 
2464     int originalLimit = buf.limit();
2465     int initialRemaining = buf.remaining();
2466     int ret = 0;
2467 
2468     while (buf.remaining() > 0) {
2469       try {
2470         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2471         buf.limit(buf.position() + ioSize);
2472 
2473         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2474 
2475         if (ret < ioSize) {
2476           break;
2477         }
2478 
2479       } finally {
2480         buf.limit(originalLimit);
2481       }
2482     }
2483 
2484     int nBytes = initialRemaining - buf.remaining();
2485     return (nBytes > 0) ? nBytes : ret;
2486   }
2487 
2488   /**
2489    * Needed for features such as delayed calls.  We need to be able to store the current call
2490    * so that we can complete it later or ask questions of what is supported by the current ongoing
2491    * call.
2492    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2493    */
2494   public static RpcCallContext getCurrentCall() {
2495     return CurCall.get();
2496   }
2497 
2498   public static boolean isInRpcCallContext() {
2499     return CurCall.get() != null;
2500   }
2501 
2502   /**
2503    * Returns the user credentials associated with the current RPC request or
2504    * <code>null</code> if no credentials were provided.
2505    * @return A User
2506    */
2507   public static User getRequestUser() {
2508     RpcCallContext ctx = getCurrentCall();
2509     return ctx == null? null: ctx.getRequestUser();
2510   }
2511 
2512   /**
2513    * Returns the username for any user associated with the current RPC
2514    * request or <code>null</code> if no user is set.
2515    */
2516   public static String getRequestUserName() {
2517     User user = getRequestUser();
2518     return user == null? null: user.getShortName();
2519   }
2520 
2521   /**
2522    * @return Address of remote client if a request is ongoing, else null
2523    */
2524   public static InetAddress getRemoteAddress() {
2525     RpcCallContext ctx = getCurrentCall();
2526     return ctx == null? null: ctx.getRemoteAddress();
2527   }
2528 
2529   /**
2530    * @param serviceName Some arbitrary string that represents a 'service'.
2531    * @param services Available service instances
2532    * @return Matching BlockingServiceAndInterface pair
2533    */
2534   static BlockingServiceAndInterface getServiceAndInterface(
2535       final List<BlockingServiceAndInterface> services, final String serviceName) {
2536     for (BlockingServiceAndInterface bs : services) {
2537       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2538         return bs;
2539       }
2540     }
2541     return null;
2542   }
2543 
2544   /**
2545    * @param serviceName Some arbitrary string that represents a 'service'.
2546    * @param services Available services and their service interfaces.
2547    * @return Service interface class for <code>serviceName</code>
2548    */
2549   static Class<?> getServiceInterface(
2550       final List<BlockingServiceAndInterface> services,
2551       final String serviceName) {
2552     BlockingServiceAndInterface bsasi =
2553         getServiceAndInterface(services, serviceName);
2554     return bsasi == null? null: bsasi.getServiceInterface();
2555   }
2556 
2557   /**
2558    * @param serviceName Some arbitrary string that represents a 'service'.
2559    * @param services Available services and their service interfaces.
2560    * @return BlockingService that goes with the passed <code>serviceName</code>
2561    */
2562   static BlockingService getService(
2563       final List<BlockingServiceAndInterface> services,
2564       final String serviceName) {
2565     BlockingServiceAndInterface bsasi =
2566         getServiceAndInterface(services, serviceName);
2567     return bsasi == null? null: bsasi.getBlockingService();
2568   }
2569 
2570   static MonitoredRPCHandler getStatus() {
2571     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2572     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2573     if (status != null) {
2574       return status;
2575     }
2576     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2577     status.pause("Waiting for a call");
2578     RpcServer.MONITORED_RPC.set(status);
2579     return status;
2580   }
2581 
2582   /** Returns the remote side ip address when invoked inside an RPC
2583    *  Returns null incase of an error.
2584    *  @return InetAddress
2585    */
2586   public static InetAddress getRemoteIp() {
2587     Call call = CurCall.get();
2588     if (call != null && call.connection != null && call.connection.socket != null) {
2589       return call.connection.socket.getInetAddress();
2590     }
2591     return null;
2592   }
2593 
2594 
2595   /**
2596    * A convenience method to bind to a given address and report
2597    * better exceptions if the address is not a valid host.
2598    * @param socket the socket to bind
2599    * @param address the address to bind to
2600    * @param backlog the number of connections allowed in the queue
2601    * @throws BindException if the address can't be bound
2602    * @throws UnknownHostException if the address isn't a valid host name
2603    * @throws IOException other random errors from bind
2604    */
2605   public static void bind(ServerSocket socket, InetSocketAddress address,
2606                           int backlog) throws IOException {
2607     try {
2608       socket.bind(address, backlog);
2609     } catch (BindException e) {
2610       BindException bindException =
2611         new BindException("Problem binding to " + address + " : " +
2612             e.getMessage());
2613       bindException.initCause(e);
2614       throw bindException;
2615     } catch (SocketException e) {
2616       // If they try to bind to a different host's address, give a better
2617       // error message.
2618       if ("Unresolved address".equals(e.getMessage())) {
2619         throw new UnknownHostException("Invalid hostname for server: " +
2620                                        address.getHostName());
2621       }
2622       throw e;
2623     }
2624   }
2625 
2626   @Override
2627   public RpcScheduler getScheduler() {
2628     return scheduler;
2629   }
2630 
2631   @Override
2632   public void setRsRpcServices(RSRpcServices rsRpcServices) {
2633     this.rsRpcServices = rsRpcServices;
2634   }
2635 }