View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.classification.InterfaceAudience;
72  import org.apache.hadoop.hbase.classification.InterfaceStability;
73  import org.apache.hadoop.conf.Configuration;
74  import org.apache.hadoop.hbase.CellScanner;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.HBaseIOException;
77  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
100 import org.apache.hadoop.hbase.regionserver.HRegionServer;
101 import org.apache.hadoop.hbase.security.AccessDeniedException;
102 import org.apache.hadoop.hbase.security.AuthMethod;
103 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
105 import org.apache.hadoop.hbase.security.User;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
107 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
108 import org.apache.hadoop.hbase.security.SaslStatus;
109 import org.apache.hadoop.hbase.security.SaslUtil;
110 import org.apache.hadoop.hbase.security.UserProvider;
111 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.Counter;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.codehaus.jackson.map.ObjectMapper;
131 import org.apache.htrace.TraceInfo;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.Descriptors.MethodDescriptor;
137 import com.google.protobuf.Message;
138 import com.google.protobuf.ServiceException;
139 import com.google.protobuf.TextFormat;
140 
141 /**
142  * An RPC server that hosts protobuf described Services.
143  *
144  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
145  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
146  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
147  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
148  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
149  * and loops till done.
150  *
151  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
152  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
153  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
154  * and keep taking while the server is up.
155  *
156  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
157  * queue for Responder to pull from and return result to client.
158  *
159  * @see RpcClientImpl
160  */
161 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
162 @InterfaceStability.Evolving
163 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
164   // LOG is being used in CallRunner and the log level is being changed in tests
165   public static final Log LOG = LogFactory.getLog(RpcServer.class);
166   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
167       = new CallQueueTooBigException();
168 
169   private final boolean authorize;
170   private boolean isSecurityEnabled;
171 
172   public static final byte CURRENT_VERSION = 0;
173 
174   /**
175    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
176    */
177   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
178           "hbase.ipc.server.fallback-to-simple-auth-allowed";
179 
180   /**
181    * How many calls/handler are allowed in the queue.
182    */
183   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
184 
185   /**
186    * The maximum size that we can hold in the RPC queue
187    */
188   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
189 
190   private final IPCUtil ipcUtil;
191 
192   private static final String AUTH_FAILED_FOR = "Auth failed for ";
193   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
194   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
195     Server.class.getName());
196   protected SecretManager<TokenIdentifier> secretManager;
197   protected ServiceAuthorizationManager authManager;
198 
199   /** This is set to Call object before Handler invokes an RPC and ybdie
200    * after the call returns.
201    */
202   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
203 
204   /** Keeps MonitoredRPCHandler per handler thread. */
205   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
206       = new ThreadLocal<MonitoredRPCHandler>();
207 
208   protected final InetSocketAddress bindAddress;
209   protected int port;                             // port we listen on
210   private int readThreads;                        // number of read threads
211   protected int maxIdleTime;                      // the maximum idle time after
212                                                   // which a client may be
213                                                   // disconnected
214   protected int thresholdIdleConnections;         // the number of idle
215                                                   // connections after which we
216                                                   // will start cleaning up idle
217                                                   // connections
218   int maxConnectionsToNuke;                       // the max number of
219                                                   // connections to nuke
220                                                   // during a cleanup
221 
222   protected MetricsHBaseServer metrics;
223 
224   protected final Configuration conf;
225 
226   private int maxQueueSize;
227   protected int socketSendBufferSize;
228   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
229   protected final boolean tcpKeepAlive; // if T then use keepalives
230   protected final long purgeTimeout;    // in milliseconds
231 
232   /**
233    * This flag is used to indicate to sub threads when they should go down.  When we call
234    * {@link #start()}, all threads started will consult this flag on whether they should
235    * keep going.  It is set to false when {@link #stop()} is called.
236    */
237   volatile boolean running = true;
238 
239   /**
240    * This flag is set to true after all threads are up and 'running' and the server is then opened
241    * for business by the call to {@link #start()}.
242    */
243   volatile boolean started = false;
244 
245   /**
246    * This is a running count of the size of all outstanding calls by size.
247    */
248   protected final Counter callQueueSize = new Counter();
249 
250   protected final List<Connection> connectionList =
251     Collections.synchronizedList(new LinkedList<Connection>());
252   //maintain a list
253   //of client connections
254   private Listener listener = null;
255   protected Responder responder = null;
256   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
257   protected int numConnections = 0;
258 
259   protected HBaseRPCErrorHandler errorHandler = null;
260 
261   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
262   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
263 
264   /** Default value for above params */
265   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
266   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
267 
268   private static final ObjectMapper MAPPER = new ObjectMapper();
269 
270   private final int warnResponseTime;
271   private final int warnResponseSize;
272   private final Server server;
273   private final List<BlockingServiceAndInterface> services;
274 
275   private final RpcScheduler scheduler;
276 
277   private UserProvider userProvider;
278 
279   private final BoundedByteBufferPool reservoir;
280 
281   private volatile boolean allowFallbackToSimpleAuth;
282 
283   /**
284    * Datastructure that holds all necessary to a method invocation and then afterward, carries
285    * the result.
286    */
287   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
288   @InterfaceStability.Evolving
289   public class Call implements RpcCallContext {
290     protected int id;                             // the client's call id
291     protected BlockingService service;
292     protected MethodDescriptor md;
293     protected RequestHeader header;
294     protected Message param;                      // the parameter passed
295     // Optional cell data passed outside of protobufs.
296     protected CellScanner cellScanner;
297     protected Connection connection;              // connection to client
298     protected long timestamp;      // the time received when response is null
299                                    // the time served when response is not null
300     /**
301      * Chain of buffers to send as response.
302      */
303     protected BufferChain response;
304     protected Responder responder;
305 
306     protected long size;                          // size of current call
307     protected boolean isError;
308     protected TraceInfo tinfo;
309     private ByteBuffer cellBlock = null;
310 
311     private User user;
312     private InetAddress remoteAddress;
313 
314     private long responseCellSize = 0;
315     private long responseBlockSize = 0;
316     private boolean retryImmediatelySupported;
317 
318     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
319         justification="Can't figure why this complaint is happening... see below")
320     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
321          Message param, CellScanner cellScanner, Connection connection, Responder responder,
322          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
323       this.id = id;
324       this.service = service;
325       this.md = md;
326       this.header = header;
327       this.param = param;
328       this.cellScanner = cellScanner;
329       this.connection = connection;
330       this.timestamp = System.currentTimeMillis();
331       this.response = null;
332       this.responder = responder;
333       this.isError = false;
334       this.size = size;
335       this.tinfo = tinfo;
336       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
337       this.remoteAddress = remoteAddress;
338       this.retryImmediatelySupported =
339           connection == null? null: connection.retryImmediatelySupported;
340     }
341 
342     /**
343      * Call is done. Execution happened and we returned results to client. It is now safe to
344      * cleanup.
345      */
346     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
347         justification="Presume the lock on processing request held by caller is protection enough")
348     void done() {
349       if (this.cellBlock != null && reservoir != null) {
350         // Return buffer to reservoir now we are done with it.
351         reservoir.putBuffer(this.cellBlock);
352         this.cellBlock = null;
353       }
354       this.connection.decRpcCount();  // Say that we're done with this call.
355     }
356 
357     @Override
358     public String toString() {
359       return toShortString() + " param: " +
360         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
361         " connection: " + connection.toString();
362     }
363 
364     protected RequestHeader getHeader() {
365       return this.header;
366     }
367 
368     public boolean hasPriority() {
369       return this.header.hasPriority();
370     }
371 
372     public int getPriority() {
373       return this.header.getPriority();
374     }
375 
376     /*
377      * Short string representation without param info because param itself could be huge depends on
378      * the payload of a command
379      */
380     String toShortString() {
381       String serviceName = this.connection.service != null ?
382           this.connection.service.getDescriptorForType().getName() : "null";
383       return "callId: " + this.id + " service: " + serviceName +
384           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
385           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
386           " connection: " + connection.toString();
387     }
388 
389     String toTraceString() {
390       String serviceName = this.connection.service != null ?
391                            this.connection.service.getDescriptorForType().getName() : "";
392       String methodName = (this.md != null) ? this.md.getName() : "";
393       return serviceName + "." + methodName;
394     }
395 
396     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
397       this.response = new BufferChain(response);
398     }
399 
400     protected synchronized void setResponse(Object m, final CellScanner cells,
401         Throwable t, String errorMsg) {
402       if (this.isError) return;
403       if (t != null) this.isError = true;
404       BufferChain bc = null;
405       try {
406         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
407         // Presume it a pb Message.  Could be null.
408         Message result = (Message)m;
409         // Call id.
410         headerBuilder.setCallId(this.id);
411         if (t != null) {
412           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
413           exceptionBuilder.setExceptionClassName(t.getClass().getName());
414           exceptionBuilder.setStackTrace(errorMsg);
415           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
416             t instanceof NeedUnmanagedConnectionException);
417           if (t instanceof RegionMovedException) {
418             // Special casing for this exception.  This is only one carrying a payload.
419             // Do this instead of build a generic system for allowing exceptions carry
420             // any kind of payload.
421             RegionMovedException rme = (RegionMovedException)t;
422             exceptionBuilder.setHostname(rme.getHostname());
423             exceptionBuilder.setPort(rme.getPort());
424           }
425           // Set the exception as the result of the method invocation.
426           headerBuilder.setException(exceptionBuilder.build());
427         }
428         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
429         // reservoir when finished. This is hacky and the hack is not contained but benefits are
430         // high when we can avoid a big buffer allocation on each rpc.
431         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
432           this.connection.compressionCodec, cells, reservoir);
433         if (this.cellBlock != null) {
434           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
435           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
436           cellBlockBuilder.setLength(this.cellBlock.limit());
437           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
438         }
439         Message header = headerBuilder.build();
440 
441         // Organize the response as a set of bytebuffers rather than collect it all together inside
442         // one big byte array; save on allocations.
443         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
444         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
445         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
446           (this.cellBlock == null? 0: this.cellBlock.limit());
447         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
448         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
449         if (connection.useWrap) {
450           bc = wrapWithSasl(bc);
451         }
452       } catch (IOException e) {
453         LOG.warn("Exception while creating response " + e);
454       }
455       this.response = bc;
456     }
457 
458     private BufferChain wrapWithSasl(BufferChain bc)
459         throws IOException {
460       if (!this.connection.useSasl) return bc;
461       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
462       // THIS IS A BIG UGLY COPY.
463       byte [] responseBytes = bc.getBytes();
464       byte [] token;
465       // synchronization may be needed since there can be multiple Handler
466       // threads using saslServer to wrap responses.
467       synchronized (connection.saslServer) {
468         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
469       }
470       if (LOG.isTraceEnabled()) {
471         LOG.trace("Adding saslServer wrapped token of size " + token.length
472             + " as call response.");
473       }
474 
475       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
476       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
477       return new BufferChain(bbTokenLength, bbTokenBytes);
478     }
479 
480     @Override
481     public boolean isClientCellBlockSupported() {
482       return this.connection != null && this.connection.codec != null;
483     }
484 
485     @Override
486     public long disconnectSince() {
487       if (!connection.channel.isOpen()) {
488         return System.currentTimeMillis() - timestamp;
489       } else {
490         return -1L;
491       }
492     }
493 
494     public long getSize() {
495       return this.size;
496     }
497 
498     public long getResponseCellSize() {
499       return responseCellSize;
500     }
501 
502     public void incrementResponseCellSize(long cellSize) {
503       responseCellSize += cellSize;
504     }
505 
506     @Override
507     public long getResponseBlockSize() {
508       return responseBlockSize;
509     }
510 
511     @Override
512     public void incrementResponseBlockSize(long blockSize) {
513       responseBlockSize += blockSize;
514     }
515 
516     public synchronized void sendResponseIfReady() throws IOException {
517       // set param null to reduce memory pressure
518       this.param = null;
519       this.responder.doRespond(this);
520     }
521 
522     public UserGroupInformation getRemoteUser() {
523       return connection.ugi;
524     }
525 
526     @Override
527     public User getRequestUser() {
528       return user;
529     }
530 
531     @Override
532     public String getRequestUserName() {
533       User user = getRequestUser();
534       return user == null? null: user.getShortName();
535     }
536 
537     @Override
538     public InetAddress getRemoteAddress() {
539       return remoteAddress;
540     }
541 
542     @Override
543     public VersionInfo getClientVersionInfo() {
544       return connection.getVersionInfo();
545     }
546 
547     @Override
548     public boolean isRetryImmediatelySupported() {
549       return retryImmediatelySupported;
550     }
551   }
552 
553   /** Listens on the socket. Creates jobs for the handler threads*/
554   private class Listener extends Thread {
555 
556     private ServerSocketChannel acceptChannel = null; //the accept channel
557     private Selector selector = null; //the selector that we use for the server
558     private Reader[] readers = null;
559     private int currentReader = 0;
560     private Random rand = new Random();
561     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
562                                          //-tion (for idle connections) ran
563     private long cleanupInterval = 10000; //the minimum interval between
564                                           //two cleanup runs
565     private int backlogLength;
566 
567     private ExecutorService readPool;
568 
569     public Listener(final String name) throws IOException {
570       super(name);
571       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
572       // Create a new server socket and set to non blocking mode
573       acceptChannel = ServerSocketChannel.open();
574       acceptChannel.configureBlocking(false);
575 
576       // Bind the server socket to the binding addrees (can be different from the default interface)
577       bind(acceptChannel.socket(), bindAddress, backlogLength);
578       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
579       // create a selector;
580       selector= Selector.open();
581 
582       readers = new Reader[readThreads];
583       readPool = Executors.newFixedThreadPool(readThreads,
584         new ThreadFactoryBuilder().setNameFormat(
585           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
586           ",port=" + port).setDaemon(true).build());
587       for (int i = 0; i < readThreads; ++i) {
588         Reader reader = new Reader();
589         readers[i] = reader;
590         readPool.execute(reader);
591       }
592       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
593 
594       // Register accepts on the server socket with the selector.
595       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
596       this.setName("RpcServer.listener,port=" + port);
597       this.setDaemon(true);
598     }
599 
600 
601     private class Reader implements Runnable {
602       private volatile boolean adding = false;
603       private final Selector readSelector;
604 
605       Reader() throws IOException {
606         this.readSelector = Selector.open();
607       }
608       @Override
609       public void run() {
610         try {
611           doRunLoop();
612         } finally {
613           try {
614             readSelector.close();
615           } catch (IOException ioe) {
616             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
617           }
618         }
619       }
620 
621       private synchronized void doRunLoop() {
622         while (running) {
623           try {
624             readSelector.select();
625             while (adding) {
626               this.wait(1000);
627             }
628 
629             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
630             while (iter.hasNext()) {
631               SelectionKey key = iter.next();
632               iter.remove();
633               if (key.isValid()) {
634                 if (key.isReadable()) {
635                   doRead(key);
636                 }
637               }
638             }
639           } catch (InterruptedException e) {
640             LOG.debug("Interrupted while sleeping");
641             return;
642           } catch (IOException ex) {
643             LOG.info(getName() + ": IOException in Reader", ex);
644           }
645         }
646       }
647 
648       /**
649        * This gets reader into the state that waits for the new channel
650        * to be registered with readSelector. If it was waiting in select()
651        * the thread will be woken up, otherwise whenever select() is called
652        * it will return even if there is nothing to read and wait
653        * in while(adding) for finishAdd call
654        */
655       public void startAdd() {
656         adding = true;
657         readSelector.wakeup();
658       }
659 
660       public synchronized SelectionKey registerChannel(SocketChannel channel)
661         throws IOException {
662         return channel.register(readSelector, SelectionKey.OP_READ);
663       }
664 
665       public synchronized void finishAdd() {
666         adding = false;
667         this.notify();
668       }
669     }
670 
671     /** cleanup connections from connectionList. Choose a random range
672      * to scan and also have a limit on the number of the connections
673      * that will be cleanedup per run. The criteria for cleanup is the time
674      * for which the connection was idle. If 'force' is true then all
675      * connections will be looked at for the cleanup.
676      * @param force all connections will be looked at for cleanup
677      */
678     private void cleanupConnections(boolean force) {
679       if (force || numConnections > thresholdIdleConnections) {
680         long currentTime = System.currentTimeMillis();
681         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
682           return;
683         }
684         int start = 0;
685         int end = numConnections - 1;
686         if (!force) {
687           start = rand.nextInt() % numConnections;
688           end = rand.nextInt() % numConnections;
689           int temp;
690           if (end < start) {
691             temp = start;
692             start = end;
693             end = temp;
694           }
695         }
696         int i = start;
697         int numNuked = 0;
698         while (i <= end) {
699           Connection c;
700           synchronized (connectionList) {
701             try {
702               c = connectionList.get(i);
703             } catch (Exception e) {return;}
704           }
705           if (c.timedOut(currentTime)) {
706             if (LOG.isDebugEnabled())
707               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
708             closeConnection(c);
709             numNuked++;
710             end--;
711             //noinspection UnusedAssignment
712             c = null;
713             if (!force && numNuked == maxConnectionsToNuke) break;
714           }
715           else i++;
716         }
717         lastCleanupRunTime = System.currentTimeMillis();
718       }
719     }
720 
721     @Override
722     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
723       justification="selector access is not synchronized; seems fine but concerned changing " +
724         "it will have per impact")
725     public void run() {
726       LOG.info(getName() + ": starting");
727       while (running) {
728         SelectionKey key = null;
729         try {
730           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
731           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
732           while (iter.hasNext()) {
733             key = iter.next();
734             iter.remove();
735             try {
736               if (key.isValid()) {
737                 if (key.isAcceptable())
738                   doAccept(key);
739               }
740             } catch (IOException ignored) {
741               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
742             }
743             key = null;
744           }
745         } catch (OutOfMemoryError e) {
746           if (errorHandler != null) {
747             if (errorHandler.checkOOME(e)) {
748               LOG.info(getName() + ": exiting on OutOfMemoryError");
749               closeCurrentConnection(key, e);
750               cleanupConnections(true);
751               return;
752             }
753           } else {
754             // we can run out of memory if we have too many threads
755             // log the event and sleep for a minute and give
756             // some thread(s) a chance to finish
757             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
758             closeCurrentConnection(key, e);
759             cleanupConnections(true);
760             try {
761               Thread.sleep(60000);
762             } catch (InterruptedException ex) {
763               LOG.debug("Interrupted while sleeping");
764               return;
765             }
766           }
767         } catch (Exception e) {
768           closeCurrentConnection(key, e);
769         }
770         cleanupConnections(false);
771       }
772 
773       LOG.info(getName() + ": stopping");
774 
775       synchronized (this) {
776         try {
777           acceptChannel.close();
778           selector.close();
779         } catch (IOException ignored) {
780           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
781         }
782 
783         selector= null;
784         acceptChannel= null;
785 
786         // clean up all connections
787         while (!connectionList.isEmpty()) {
788           closeConnection(connectionList.remove(0));
789         }
790       }
791     }
792 
793     private void closeCurrentConnection(SelectionKey key, Throwable e) {
794       if (key != null) {
795         Connection c = (Connection)key.attachment();
796         if (c != null) {
797           if (LOG.isDebugEnabled()) {
798             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
799                 (e != null ? " on error " + e.getMessage() : ""));
800           }
801           closeConnection(c);
802           key.attach(null);
803         }
804       }
805     }
806 
807     InetSocketAddress getAddress() {
808       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
809     }
810 
811     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
812       Connection c;
813       ServerSocketChannel server = (ServerSocketChannel) key.channel();
814 
815       SocketChannel channel;
816       while ((channel = server.accept()) != null) {
817         try {
818           channel.configureBlocking(false);
819           channel.socket().setTcpNoDelay(tcpNoDelay);
820           channel.socket().setKeepAlive(tcpKeepAlive);
821         } catch (IOException ioe) {
822           channel.close();
823           throw ioe;
824         }
825 
826         Reader reader = getReader();
827         try {
828           reader.startAdd();
829           SelectionKey readKey = reader.registerChannel(channel);
830           c = getConnection(channel, System.currentTimeMillis());
831           readKey.attach(c);
832           synchronized (connectionList) {
833             connectionList.add(numConnections, c);
834             numConnections++;
835           }
836           if (LOG.isDebugEnabled())
837             LOG.debug(getName() + ": connection from " + c.toString() +
838                 "; # active connections: " + numConnections);
839         } finally {
840           reader.finishAdd();
841         }
842       }
843     }
844 
845     void doRead(SelectionKey key) throws InterruptedException {
846       int count;
847       Connection c = (Connection) key.attachment();
848       if (c == null) {
849         return;
850       }
851       c.setLastContact(System.currentTimeMillis());
852       try {
853         count = c.readAndProcess();
854 
855         if (count > 0) {
856           c.setLastContact(System.currentTimeMillis());
857         }
858 
859       } catch (InterruptedException ieo) {
860         throw ieo;
861       } catch (Exception e) {
862         if (LOG.isDebugEnabled()) {
863           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
864         }
865         count = -1; //so that the (count < 0) block is executed
866       }
867       if (count < 0) {
868         if (LOG.isDebugEnabled()) {
869           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
870               " because read count=" + count +
871               ". Number of active connections: " + numConnections);
872         }
873         closeConnection(c);
874       }
875     }
876 
877     synchronized void doStop() {
878       if (selector != null) {
879         selector.wakeup();
880         Thread.yield();
881       }
882       if (acceptChannel != null) {
883         try {
884           acceptChannel.socket().close();
885         } catch (IOException e) {
886           LOG.info(getName() + ": exception in closing listener socket. " + e);
887         }
888       }
889       readPool.shutdownNow();
890     }
891 
892     // The method that will return the next reader to work with
893     // Simplistic implementation of round robin for now
894     Reader getReader() {
895       currentReader = (currentReader + 1) % readers.length;
896       return readers[currentReader];
897     }
898   }
899 
900   // Sends responses of RPC back to clients.
901   protected class Responder extends Thread {
902     private final Selector writeSelector;
903     private final Set<Connection> writingCons =
904         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
905 
906     Responder() throws IOException {
907       this.setName("RpcServer.responder");
908       this.setDaemon(true);
909       writeSelector = Selector.open(); // create a selector
910     }
911 
912     @Override
913     public void run() {
914       LOG.info(getName() + ": starting");
915       try {
916         doRunLoop();
917       } finally {
918         LOG.info(getName() + ": stopping");
919         try {
920           writeSelector.close();
921         } catch (IOException ioe) {
922           LOG.error(getName() + ": couldn't close write selector", ioe);
923         }
924       }
925     }
926 
927     /**
928      * Take the list of the connections that want to write, and register them
929      * in the selector.
930      */
931     private void registerWrites() {
932       Iterator<Connection> it = writingCons.iterator();
933       while (it.hasNext()) {
934         Connection c = it.next();
935         it.remove();
936         SelectionKey sk = c.channel.keyFor(writeSelector);
937         try {
938           if (sk == null) {
939             try {
940               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
941             } catch (ClosedChannelException e) {
942               // ignore: the client went away.
943               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
944             }
945           } else {
946             sk.interestOps(SelectionKey.OP_WRITE);
947           }
948         } catch (CancelledKeyException e) {
949           // ignore: the client went away.
950           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
951         }
952       }
953     }
954 
955     /**
956      * Add a connection to the list that want to write,
957      */
958     public void registerForWrite(Connection c) {
959       if (writingCons.add(c)) {
960         writeSelector.wakeup();
961       }
962     }
963 
964     private void doRunLoop() {
965       long lastPurgeTime = 0;   // last check for old calls.
966       while (running) {
967         try {
968           registerWrites();
969           int keyCt = writeSelector.select(purgeTimeout);
970           if (keyCt == 0) {
971             continue;
972           }
973 
974           Set<SelectionKey> keys = writeSelector.selectedKeys();
975           Iterator<SelectionKey> iter = keys.iterator();
976           while (iter.hasNext()) {
977             SelectionKey key = iter.next();
978             iter.remove();
979             try {
980               if (key.isValid() && key.isWritable()) {
981                 doAsyncWrite(key);
982               }
983             } catch (IOException e) {
984               LOG.debug(getName() + ": asyncWrite", e);
985             }
986           }
987 
988           lastPurgeTime = purge(lastPurgeTime);
989 
990         } catch (OutOfMemoryError e) {
991           if (errorHandler != null) {
992             if (errorHandler.checkOOME(e)) {
993               LOG.info(getName() + ": exiting on OutOfMemoryError");
994               return;
995             }
996           } else {
997             //
998             // we can run out of memory if we have too many threads
999             // log the event and sleep for a minute and give
1000             // some thread(s) a chance to finish
1001             //
1002             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1003             try {
1004               Thread.sleep(60000);
1005             } catch (InterruptedException ex) {
1006               LOG.debug("Interrupted while sleeping");
1007               return;
1008             }
1009           }
1010         } catch (Exception e) {
1011           LOG.warn(getName() + ": exception in Responder " +
1012               StringUtils.stringifyException(e), e);
1013         }
1014       }
1015       LOG.info(getName() + ": stopped");
1016     }
1017 
1018     /**
1019      * If there were some calls that have not been sent out for a
1020      * long time, we close the connection.
1021      * @return the time of the purge.
1022      */
1023     private long purge(long lastPurgeTime) {
1024       long now = System.currentTimeMillis();
1025       if (now < lastPurgeTime + purgeTimeout) {
1026         return lastPurgeTime;
1027       }
1028 
1029       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1030       // get the list of channels from list of keys.
1031       synchronized (writeSelector.keys()) {
1032         for (SelectionKey key : writeSelector.keys()) {
1033           Connection connection = (Connection) key.attachment();
1034           if (connection == null) {
1035             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1036           }
1037           Call call = connection.responseQueue.peekFirst();
1038           if (call != null && now > call.timestamp + purgeTimeout) {
1039             conWithOldCalls.add(call.connection);
1040           }
1041         }
1042       }
1043 
1044       // Seems safer to close the connection outside of the synchronized loop...
1045       for (Connection connection : conWithOldCalls) {
1046         closeConnection(connection);
1047       }
1048 
1049       return now;
1050     }
1051 
1052     private void doAsyncWrite(SelectionKey key) throws IOException {
1053       Connection connection = (Connection) key.attachment();
1054       if (connection == null) {
1055         throw new IOException("doAsyncWrite: no connection");
1056       }
1057       if (key.channel() != connection.channel) {
1058         throw new IOException("doAsyncWrite: bad channel");
1059       }
1060 
1061       if (processAllResponses(connection)) {
1062         try {
1063           // We wrote everything, so we don't need to be told when the socket is ready for
1064           //  write anymore.
1065          key.interestOps(0);
1066         } catch (CancelledKeyException e) {
1067           /* The Listener/reader might have closed the socket.
1068            * We don't explicitly cancel the key, so not sure if this will
1069            * ever fire.
1070            * This warning could be removed.
1071            */
1072           LOG.warn("Exception while changing ops : " + e);
1073         }
1074       }
1075     }
1076 
1077     /**
1078      * Process the response for this call. You need to have the lock on
1079      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1080      *
1081      * @param call the call
1082      * @return true if we proceed the call fully, false otherwise.
1083      * @throws IOException
1084      */
1085     private boolean processResponse(final Call call) throws IOException {
1086       boolean error = true;
1087       try {
1088         // Send as much data as we can in the non-blocking fashion
1089         long numBytes = channelWrite(call.connection.channel, call.response);
1090         if (numBytes < 0) {
1091           throw new HBaseIOException("Error writing on the socket " +
1092             "for the call:" + call.toShortString());
1093         }
1094         error = false;
1095       } finally {
1096         if (error) {
1097           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1098           closeConnection(call.connection);
1099         }
1100       }
1101 
1102       if (!call.response.hasRemaining()) {
1103         call.done();
1104         return true;
1105       } else {
1106         return false; // Socket can't take more, we will have to come back.
1107       }
1108     }
1109 
1110     /**
1111      * Process all the responses for this connection
1112      *
1113      * @return true if all the calls were processed or that someone else is doing it.
1114      * false if there * is still some work to do. In this case, we expect the caller to
1115      * delay us.
1116      * @throws IOException
1117      */
1118     private boolean processAllResponses(final Connection connection) throws IOException {
1119       // We want only one writer on the channel for a connection at a time.
1120       connection.responseWriteLock.lock();
1121       try {
1122         for (int i = 0; i < 20; i++) {
1123           // protection if some handlers manage to need all the responder
1124           Call call = connection.responseQueue.pollFirst();
1125           if (call == null) {
1126             return true;
1127           }
1128           if (!processResponse(call)) {
1129             connection.responseQueue.addFirst(call);
1130             return false;
1131           }
1132         }
1133       } finally {
1134         connection.responseWriteLock.unlock();
1135       }
1136 
1137       return connection.responseQueue.isEmpty();
1138     }
1139 
1140     //
1141     // Enqueue a response from the application.
1142     //
1143     void doRespond(Call call) throws IOException {
1144       boolean added = false;
1145 
1146       // If there is already a write in progress, we don't wait. This allows to free the handlers
1147       //  immediately for other tasks.
1148       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1149         try {
1150           if (call.connection.responseQueue.isEmpty()) {
1151             // If we're alone, we can try to do a direct call to the socket. It's
1152             //  an optimisation to save on context switches and data transfer between cores..
1153             if (processResponse(call)) {
1154               return; // we're done.
1155             }
1156             // Too big to fit, putting ahead.
1157             call.connection.responseQueue.addFirst(call);
1158             added = true; // We will register to the selector later, outside of the lock.
1159           }
1160         } finally {
1161           call.connection.responseWriteLock.unlock();
1162         }
1163       }
1164 
1165       if (!added) {
1166         call.connection.responseQueue.addLast(call);
1167       }
1168       call.responder.registerForWrite(call.connection);
1169 
1170       // set the serve time when the response has to be sent later
1171       call.timestamp = System.currentTimeMillis();
1172     }
1173   }
1174 
1175   /** Reads calls from a connection and queues them for handling. */
1176   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1177       value="VO_VOLATILE_INCREMENT",
1178       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1179   public class Connection {
1180     // If initial preamble with version and magic has been read or not.
1181     private boolean connectionPreambleRead = false;
1182     // If the connection header has been read or not.
1183     private boolean connectionHeaderRead = false;
1184     protected SocketChannel channel;
1185     private ByteBuffer data;
1186     private ByteBuffer dataLengthBuffer;
1187     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1188     private final Lock responseWriteLock = new ReentrantLock();
1189     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1190     private long lastContact;
1191     private InetAddress addr;
1192     protected Socket socket;
1193     // Cache the remote host & port info so that even if the socket is
1194     // disconnected, we can say where it used to connect to.
1195     protected String hostAddress;
1196     protected int remotePort;
1197     ConnectionHeader connectionHeader;
1198     /**
1199      * Codec the client asked use.
1200      */
1201     private Codec codec;
1202     /**
1203      * Compression codec the client asked us use.
1204      */
1205     private CompressionCodec compressionCodec;
1206     BlockingService service;
1207 
1208     private AuthMethod authMethod;
1209     private boolean saslContextEstablished;
1210     private boolean skipInitialSaslHandshake;
1211     private ByteBuffer unwrappedData;
1212     // When is this set?  FindBugs wants to know!  Says NP
1213     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1214     boolean useSasl;
1215     SaslServer saslServer;
1216     private boolean useWrap = false;
1217     // Fake 'call' for failed authorization response
1218     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1219     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1220         null, null, this, null, 0, null, null);
1221     private ByteArrayOutputStream authFailedResponse =
1222         new ByteArrayOutputStream();
1223     // Fake 'call' for SASL context setup
1224     private static final int SASL_CALLID = -33;
1225     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1226         0, null, null);
1227 
1228     // was authentication allowed with a fallback to simple auth
1229     private boolean authenticatedWithFallback;
1230 
1231     private boolean retryImmediatelySupported = false;
1232 
1233     public UserGroupInformation attemptingUser = null; // user name before auth
1234     protected User user = null;
1235     protected UserGroupInformation ugi = null;
1236 
1237     public Connection(SocketChannel channel, long lastContact) {
1238       this.channel = channel;
1239       this.lastContact = lastContact;
1240       this.data = null;
1241       this.dataLengthBuffer = ByteBuffer.allocate(4);
1242       this.socket = channel.socket();
1243       this.addr = socket.getInetAddress();
1244       if (addr == null) {
1245         this.hostAddress = "*Unknown*";
1246       } else {
1247         this.hostAddress = addr.getHostAddress();
1248       }
1249       this.remotePort = socket.getPort();
1250       if (socketSendBufferSize != 0) {
1251         try {
1252           socket.setSendBufferSize(socketSendBufferSize);
1253         } catch (IOException e) {
1254           LOG.warn("Connection: unable to set socket send buffer size to " +
1255                    socketSendBufferSize);
1256         }
1257       }
1258     }
1259 
1260       @Override
1261     public String toString() {
1262       return getHostAddress() + ":" + remotePort;
1263     }
1264 
1265     public String getHostAddress() {
1266       return hostAddress;
1267     }
1268 
1269     public InetAddress getHostInetAddress() {
1270       return addr;
1271     }
1272 
1273     public int getRemotePort() {
1274       return remotePort;
1275     }
1276 
1277     public void setLastContact(long lastContact) {
1278       this.lastContact = lastContact;
1279     }
1280 
1281     public VersionInfo getVersionInfo() {
1282       if (connectionHeader.hasVersionInfo()) {
1283         return connectionHeader.getVersionInfo();
1284       }
1285       return null;
1286     }
1287 
1288     /* Return true if the connection has no outstanding rpc */
1289     private boolean isIdle() {
1290       return rpcCount.get() == 0;
1291     }
1292 
1293     /* Decrement the outstanding RPC count */
1294     protected void decRpcCount() {
1295       rpcCount.decrement();
1296     }
1297 
1298     /* Increment the outstanding RPC count */
1299     protected void incRpcCount() {
1300       rpcCount.increment();
1301     }
1302 
1303     protected boolean timedOut(long currentTime) {
1304       return isIdle() && currentTime - lastContact > maxIdleTime;
1305     }
1306 
1307     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1308         throws IOException {
1309       UserGroupInformation authorizedUgi;
1310       if (authMethod == AuthMethod.DIGEST) {
1311         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1312             secretManager);
1313         authorizedUgi = tokenId.getUser();
1314         if (authorizedUgi == null) {
1315           throw new AccessDeniedException(
1316               "Can't retrieve username from tokenIdentifier.");
1317         }
1318         authorizedUgi.addTokenIdentifier(tokenId);
1319       } else {
1320         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1321       }
1322       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1323       return authorizedUgi;
1324     }
1325 
1326     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1327         InterruptedException {
1328       if (saslContextEstablished) {
1329         if (LOG.isTraceEnabled())
1330           LOG.trace("Have read input token of size " + saslToken.length
1331               + " for processing by saslServer.unwrap()");
1332 
1333         if (!useWrap) {
1334           processOneRpc(saslToken);
1335         } else {
1336           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1337           processUnwrappedData(plaintextData);
1338         }
1339       } else {
1340         byte[] replyToken;
1341         try {
1342           if (saslServer == null) {
1343             switch (authMethod) {
1344             case DIGEST:
1345               if (secretManager == null) {
1346                 throw new AccessDeniedException(
1347                     "Server is not configured to do DIGEST authentication.");
1348               }
1349               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1350                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1351                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1352                       secretManager, this));
1353               break;
1354             default:
1355               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1356               String fullName = current.getUserName();
1357               if (LOG.isDebugEnabled()) {
1358                 LOG.debug("Kerberos principal name is " + fullName);
1359               }
1360               final String names[] = SaslUtil.splitKerberosName(fullName);
1361               if (names.length != 3) {
1362                 throw new AccessDeniedException(
1363                     "Kerberos principal name does NOT have the expected "
1364                         + "hostname part: " + fullName);
1365               }
1366               current.doAs(new PrivilegedExceptionAction<Object>() {
1367                 @Override
1368                 public Object run() throws SaslException {
1369                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1370                       .getMechanismName(), names[0], names[1],
1371                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1372                   return null;
1373                 }
1374               });
1375             }
1376             if (saslServer == null)
1377               throw new AccessDeniedException(
1378                   "Unable to find SASL server implementation for "
1379                       + authMethod.getMechanismName());
1380             if (LOG.isDebugEnabled()) {
1381               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1382             }
1383           }
1384           if (LOG.isDebugEnabled()) {
1385             LOG.debug("Have read input token of size " + saslToken.length
1386                 + " for processing by saslServer.evaluateResponse()");
1387           }
1388           replyToken = saslServer.evaluateResponse(saslToken);
1389         } catch (IOException e) {
1390           IOException sendToClient = e;
1391           Throwable cause = e;
1392           while (cause != null) {
1393             if (cause instanceof InvalidToken) {
1394               sendToClient = (InvalidToken) cause;
1395               break;
1396             }
1397             cause = cause.getCause();
1398           }
1399           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1400             sendToClient.getLocalizedMessage());
1401           metrics.authenticationFailure();
1402           String clientIP = this.toString();
1403           // attempting user could be null
1404           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1405           throw e;
1406         }
1407         if (replyToken != null) {
1408           if (LOG.isDebugEnabled()) {
1409             LOG.debug("Will send token of size " + replyToken.length
1410                 + " from saslServer.");
1411           }
1412           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1413               null);
1414         }
1415         if (saslServer.isComplete()) {
1416           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1417           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1418           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1419           if (LOG.isDebugEnabled()) {
1420             LOG.debug("SASL server context established. Authenticated client: "
1421               + ugi + ". Negotiated QoP is "
1422               + saslServer.getNegotiatedProperty(Sasl.QOP));
1423           }
1424           metrics.authenticationSuccess();
1425           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1426           saslContextEstablished = true;
1427         }
1428       }
1429     }
1430 
1431     /**
1432      * No protobuf encoding of raw sasl messages
1433      */
1434     private void doRawSaslReply(SaslStatus status, Writable rv,
1435         String errorClass, String error) throws IOException {
1436       ByteBufferOutputStream saslResponse = null;
1437       DataOutputStream out = null;
1438       try {
1439         // In my testing, have noticed that sasl messages are usually
1440         // in the ballpark of 100-200. That's why the initial capacity is 256.
1441         saslResponse = new ByteBufferOutputStream(256);
1442         out = new DataOutputStream(saslResponse);
1443         out.writeInt(status.state); // write status
1444         if (status == SaslStatus.SUCCESS) {
1445           rv.write(out);
1446         } else {
1447           WritableUtils.writeString(out, errorClass);
1448           WritableUtils.writeString(out, error);
1449         }
1450         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1451         saslCall.responder = responder;
1452         saslCall.sendResponseIfReady();
1453       } finally {
1454         if (saslResponse != null) {
1455           saslResponse.close();
1456         }
1457         if (out != null) {
1458           out.close();
1459         }
1460       }
1461     }
1462 
1463     private void disposeSasl() {
1464       if (saslServer != null) {
1465         try {
1466           saslServer.dispose();
1467           saslServer = null;
1468         } catch (SaslException ignored) {
1469           // Ignored. This is being disposed of anyway.
1470         }
1471       }
1472     }
1473 
1474     private int readPreamble() throws IOException {
1475       int count;
1476       // Check for 'HBas' magic.
1477       this.dataLengthBuffer.flip();
1478       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1479         return doBadPreambleHandling("Expected HEADER=" +
1480             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1481             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1482             " from " + toString());
1483       }
1484       // Now read the next two bytes, the version and the auth to use.
1485       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1486       count = channelRead(channel, versionAndAuthBytes);
1487       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1488         return count;
1489       }
1490       int version = versionAndAuthBytes.get(0);
1491       byte authbyte = versionAndAuthBytes.get(1);
1492       this.authMethod = AuthMethod.valueOf(authbyte);
1493       if (version != CURRENT_VERSION) {
1494         String msg = getFatalConnectionString(version, authbyte);
1495         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1496       }
1497       if (authMethod == null) {
1498         String msg = getFatalConnectionString(version, authbyte);
1499         return doBadPreambleHandling(msg, new BadAuthException(msg));
1500       }
1501       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1502         if (allowFallbackToSimpleAuth) {
1503           metrics.authenticationFallback();
1504           authenticatedWithFallback = true;
1505         } else {
1506           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1507           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1508           responder.doRespond(authFailedCall);
1509           throw ae;
1510         }
1511       }
1512       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1513         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1514             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1515         authMethod = AuthMethod.SIMPLE;
1516         // client has already sent the initial Sasl message and we
1517         // should ignore it. Both client and server should fall back
1518         // to simple auth from now on.
1519         skipInitialSaslHandshake = true;
1520       }
1521       if (authMethod != AuthMethod.SIMPLE) {
1522         useSasl = true;
1523       }
1524 
1525       dataLengthBuffer.clear();
1526       connectionPreambleRead = true;
1527       return count;
1528     }
1529 
1530     private int read4Bytes() throws IOException {
1531       if (this.dataLengthBuffer.remaining() > 0) {
1532         return channelRead(channel, this.dataLengthBuffer);
1533       } else {
1534         return 0;
1535       }
1536     }
1537 
1538 
1539     /**
1540      * Read off the wire. If there is not enough data to read, update the connection state with
1541      *  what we have and returns.
1542      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1543      * @throws IOException
1544      * @throws InterruptedException
1545      */
1546     public int readAndProcess() throws IOException, InterruptedException {
1547       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1548       // does, read in the rest of the connection preamble, the version and the auth method.
1549       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1550       // length into the 4-byte this.dataLengthBuffer.
1551       int count = read4Bytes();
1552       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1553         return count;
1554       }
1555 
1556       // If we have not read the connection setup preamble, look to see if that is on the wire.
1557       if (!connectionPreambleRead) {
1558         count = readPreamble();
1559         if (!connectionPreambleRead) {
1560           return count;
1561         }
1562 
1563         count = read4Bytes();
1564         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1565           return count;
1566         }
1567       }
1568 
1569       // We have read a length and we have read the preamble.  It is either the connection header
1570       // or it is a request.
1571       if (data == null) {
1572         dataLengthBuffer.flip();
1573         int dataLength = dataLengthBuffer.getInt();
1574         if (dataLength == RpcClient.PING_CALL_ID) {
1575           if (!useWrap) { //covers the !useSasl too
1576             dataLengthBuffer.clear();
1577             return 0;  //ping message
1578           }
1579         }
1580         if (dataLength < 0) { // A data length of zero is legal.
1581           throw new IllegalArgumentException("Unexpected data length "
1582               + dataLength + "!! from " + getHostAddress());
1583         }
1584         data = ByteBuffer.allocate(dataLength);
1585 
1586         // Increment the rpc count. This counter will be decreased when we write
1587         //  the response.  If we want the connection to be detected as idle properly, we
1588         //  need to keep the inc / dec correct.
1589         incRpcCount();
1590       }
1591 
1592       count = channelRead(channel, data);
1593 
1594       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1595         process();
1596       }
1597 
1598       return count;
1599     }
1600 
1601     /**
1602      * Process the data buffer and clean the connection state for the next call.
1603      */
1604     private void process() throws IOException, InterruptedException {
1605       data.flip();
1606       try {
1607         if (skipInitialSaslHandshake) {
1608           skipInitialSaslHandshake = false;
1609           return;
1610         }
1611 
1612         if (useSasl) {
1613           saslReadAndProcess(data.array());
1614         } else {
1615           processOneRpc(data.array());
1616         }
1617 
1618       } finally {
1619         dataLengthBuffer.clear(); // Clean for the next call
1620         data = null; // For the GC
1621       }
1622     }
1623 
1624     private String getFatalConnectionString(final int version, final byte authByte) {
1625       return "serverVersion=" + CURRENT_VERSION +
1626       ", clientVersion=" + version + ", authMethod=" + authByte +
1627       ", authSupported=" + (authMethod != null) + " from " + toString();
1628     }
1629 
1630     private int doBadPreambleHandling(final String msg) throws IOException {
1631       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1632     }
1633 
1634     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1635       LOG.warn(msg);
1636       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1637       setupResponse(null, fakeCall, e, msg);
1638       responder.doRespond(fakeCall);
1639       // Returning -1 closes out the connection.
1640       return -1;
1641     }
1642 
1643     // Reads the connection header following version
1644     private void processConnectionHeader(byte[] buf) throws IOException {
1645       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1646       String serviceName = connectionHeader.getServiceName();
1647       if (serviceName == null) throw new EmptyServiceNameException();
1648       this.service = getService(services, serviceName);
1649       if (this.service == null) throw new UnknownServiceException(serviceName);
1650       setupCellBlockCodecs(this.connectionHeader);
1651       UserGroupInformation protocolUser = createUser(connectionHeader);
1652       if (!useSasl) {
1653         ugi = protocolUser;
1654         if (ugi != null) {
1655           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1656         }
1657         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1658         if (authenticatedWithFallback) {
1659           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1660               + " connecting from " + getHostAddress());
1661         }
1662         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1663       } else {
1664         // user is authenticated
1665         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1666         //Now we check if this is a proxy user case. If the protocol user is
1667         //different from the 'user', it is a proxy user scenario. However,
1668         //this is not allowed if user authenticated with DIGEST.
1669         if ((protocolUser != null)
1670             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1671           if (authMethod == AuthMethod.DIGEST) {
1672             // Not allowed to doAs if token authentication is used
1673             throw new AccessDeniedException("Authenticated user (" + ugi
1674                 + ") doesn't match what the client claims to be ("
1675                 + protocolUser + ")");
1676           } else {
1677             // Effective user can be different from authenticated user
1678             // for simple auth or kerberos auth
1679             // The user is the real user. Now we create a proxy user
1680             UserGroupInformation realUser = ugi;
1681             ugi = UserGroupInformation.createProxyUser(protocolUser
1682                 .getUserName(), realUser);
1683             // Now the user is a proxy user, set Authentication method Proxy.
1684             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1685           }
1686         }
1687       }
1688       if (connectionHeader.hasVersionInfo()) {
1689         // see if this connection will support RetryImmediatelyException
1690         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1691 
1692         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1693             + " with version info: "
1694             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1695       } else {
1696         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1697             + " with unknown version info");
1698       }
1699 
1700 
1701     }
1702 
1703     /**
1704      * Set up cell block codecs
1705      * @throws FatalConnectionException
1706      */
1707     private void setupCellBlockCodecs(final ConnectionHeader header)
1708     throws FatalConnectionException {
1709       // TODO: Plug in other supported decoders.
1710       if (!header.hasCellBlockCodecClass()) return;
1711       String className = header.getCellBlockCodecClass();
1712       if (className == null || className.length() == 0) return;
1713       try {
1714         this.codec = (Codec)Class.forName(className).newInstance();
1715       } catch (Exception e) {
1716         throw new UnsupportedCellCodecException(className, e);
1717       }
1718       if (!header.hasCellBlockCompressorClass()) return;
1719       className = header.getCellBlockCompressorClass();
1720       try {
1721         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1722       } catch (Exception e) {
1723         throw new UnsupportedCompressionCodecException(className, e);
1724       }
1725     }
1726 
1727     private void processUnwrappedData(byte[] inBuf) throws IOException,
1728     InterruptedException {
1729       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1730       // Read all RPCs contained in the inBuf, even partial ones
1731       while (true) {
1732         int count;
1733         if (unwrappedDataLengthBuffer.remaining() > 0) {
1734           count = channelRead(ch, unwrappedDataLengthBuffer);
1735           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1736             return;
1737         }
1738 
1739         if (unwrappedData == null) {
1740           unwrappedDataLengthBuffer.flip();
1741           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1742 
1743           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1744             if (LOG.isDebugEnabled())
1745               LOG.debug("Received ping message");
1746             unwrappedDataLengthBuffer.clear();
1747             continue; // ping message
1748           }
1749           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1750         }
1751 
1752         count = channelRead(ch, unwrappedData);
1753         if (count <= 0 || unwrappedData.remaining() > 0)
1754           return;
1755 
1756         if (unwrappedData.remaining() == 0) {
1757           unwrappedDataLengthBuffer.clear();
1758           unwrappedData.flip();
1759           processOneRpc(unwrappedData.array());
1760           unwrappedData = null;
1761         }
1762       }
1763     }
1764 
1765     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1766       if (connectionHeaderRead) {
1767         processRequest(buf);
1768       } else {
1769         processConnectionHeader(buf);
1770         this.connectionHeaderRead = true;
1771         if (!authorizeConnection()) {
1772           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1773           // down the connection instead of trying to read non-existent retun.
1774           throw new AccessDeniedException("Connection from " + this + " for service " +
1775             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1776         }
1777         this.user = userProvider.create(this.ugi);
1778       }
1779     }
1780 
1781     /**
1782      * @param buf Has the request header and the request param and optionally encoded data buffer
1783      * all in this one array.
1784      * @throws IOException
1785      * @throws InterruptedException
1786      */
1787     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1788       long totalRequestSize = buf.length;
1789       int offset = 0;
1790       // Here we read in the header.  We avoid having pb
1791       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1792       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1793       int headerSize = cis.readRawVarint32();
1794       offset = cis.getTotalBytesRead();
1795       Message.Builder builder = RequestHeader.newBuilder();
1796       ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1797       RequestHeader header = (RequestHeader) builder.build();
1798       offset += headerSize;
1799       int id = header.getCallId();
1800       if (LOG.isTraceEnabled()) {
1801         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1802           " totalRequestSize: " + totalRequestSize + " bytes");
1803       }
1804       // Enforcing the call queue size, this triggers a retry in the client
1805       // This is a bit late to be doing this check - we have already read in the total request.
1806       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1807         final Call callTooBig =
1808           new Call(id, this.service, null, null, null, null, this,
1809             responder, totalRequestSize, null, null);
1810         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1811         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1812         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1813             "Call queue is full on " + server.getServerName() +
1814                 ", is hbase.ipc.server.max.callqueue.size too small?");
1815         responder.doRespond(callTooBig);
1816         return;
1817       }
1818       MethodDescriptor md = null;
1819       Message param = null;
1820       CellScanner cellScanner = null;
1821       try {
1822         if (header.hasRequestParam() && header.getRequestParam()) {
1823           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1824           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1825           builder = this.service.getRequestPrototype(md).newBuilderForType();
1826           // To read the varint, I need an inputstream; might as well be a CIS.
1827           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1828           int paramSize = cis.readRawVarint32();
1829           offset += cis.getTotalBytesRead();
1830           if (builder != null) {
1831             ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1832             param = builder.build();
1833           }
1834           offset += paramSize;
1835         }
1836         if (header.hasCellBlockMeta()) {
1837           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1838             buf, offset, buf.length);
1839         }
1840       } catch (Throwable t) {
1841         InetSocketAddress address = getListenerAddress();
1842         String msg = (address != null ? address : "(channel closed)") +
1843             " is unable to read call parameter from client " + getHostAddress();
1844         LOG.warn(msg, t);
1845 
1846         metrics.exception(t);
1847 
1848         // probably the hbase hadoop version does not match the running hadoop version
1849         if (t instanceof LinkageError) {
1850           t = new DoNotRetryIOException(t);
1851         }
1852         // If the method is not present on the server, do not retry.
1853         if (t instanceof UnsupportedOperationException) {
1854           t = new DoNotRetryIOException(t);
1855         }
1856 
1857         final Call readParamsFailedCall =
1858           new Call(id, this.service, null, null, null, null, this,
1859             responder, totalRequestSize, null, null);
1860         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1861         setupResponse(responseBuffer, readParamsFailedCall, t,
1862           msg + "; " + t.getMessage());
1863         responder.doRespond(readParamsFailedCall);
1864         return;
1865       }
1866 
1867       TraceInfo traceInfo = header.hasTraceInfo()
1868           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1869           : null;
1870       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1871               totalRequestSize, traceInfo, this.addr);
1872 
1873       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1874         callQueueSize.add(-1 * call.getSize());
1875 
1876         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1877         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1878         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1879             "Call queue is full on " + server.getServerName() +
1880                 ", too many items queued ?");
1881         responder.doRespond(call);
1882       }
1883     }
1884 
1885     private boolean authorizeConnection() throws IOException {
1886       try {
1887         // If auth method is DIGEST, the token was obtained by the
1888         // real user for the effective user, therefore not required to
1889         // authorize real user. doAs is allowed only for simple or kerberos
1890         // authentication
1891         if (ugi != null && ugi.getRealUser() != null
1892             && (authMethod != AuthMethod.DIGEST)) {
1893           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1894         }
1895         authorize(ugi, connectionHeader, getHostInetAddress());
1896         metrics.authorizationSuccess();
1897       } catch (AuthorizationException ae) {
1898         if (LOG.isDebugEnabled()) {
1899           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1900         }
1901         metrics.authorizationFailure();
1902         setupResponse(authFailedResponse, authFailedCall,
1903           new AccessDeniedException(ae), ae.getMessage());
1904         responder.doRespond(authFailedCall);
1905         return false;
1906       }
1907       return true;
1908     }
1909 
1910     protected synchronized void close() {
1911       disposeSasl();
1912       data = null;
1913       if (!channel.isOpen())
1914         return;
1915       try {socket.shutdownOutput();} catch(Exception ignored) {
1916         if (LOG.isTraceEnabled()) {
1917           LOG.trace(ignored);
1918         }
1919       }
1920       if (channel.isOpen()) {
1921         try {channel.close();} catch(Exception ignored) {
1922           if (LOG.isTraceEnabled()) {
1923             LOG.trace(ignored);
1924           }
1925         }
1926       }
1927       try {socket.close();} catch(Exception ignored) {
1928         if (LOG.isTraceEnabled()) {
1929           LOG.trace(ignored);
1930         }
1931       }
1932     }
1933 
1934     private UserGroupInformation createUser(ConnectionHeader head) {
1935       UserGroupInformation ugi = null;
1936 
1937       if (!head.hasUserInfo()) {
1938         return null;
1939       }
1940       UserInformation userInfoProto = head.getUserInfo();
1941       String effectiveUser = null;
1942       if (userInfoProto.hasEffectiveUser()) {
1943         effectiveUser = userInfoProto.getEffectiveUser();
1944       }
1945       String realUser = null;
1946       if (userInfoProto.hasRealUser()) {
1947         realUser = userInfoProto.getRealUser();
1948       }
1949       if (effectiveUser != null) {
1950         if (realUser != null) {
1951           UserGroupInformation realUserUgi =
1952               UserGroupInformation.createRemoteUser(realUser);
1953           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1954         } else {
1955           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1956         }
1957       }
1958       return ugi;
1959     }
1960   }
1961 
1962   /**
1963    * Datastructure for passing a {@link BlockingService} and its associated class of
1964    * protobuf service interface.  For example, a server that fielded what is defined
1965    * in the client protobuf service would pass in an implementation of the client blocking service
1966    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1967    */
1968   public static class BlockingServiceAndInterface {
1969     private final BlockingService service;
1970     private final Class<?> serviceInterface;
1971     public BlockingServiceAndInterface(final BlockingService service,
1972         final Class<?> serviceInterface) {
1973       this.service = service;
1974       this.serviceInterface = serviceInterface;
1975     }
1976     public Class<?> getServiceInterface() {
1977       return this.serviceInterface;
1978     }
1979     public BlockingService getBlockingService() {
1980       return this.service;
1981     }
1982   }
1983 
1984   /**
1985    * Constructs a server listening on the named port and address.
1986    * @param server hosting instance of {@link Server}. We will do authentications if an
1987    * instance else pass null for no authentication check.
1988    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1989    * @param services A list of services.
1990    * @param bindAddress Where to listen
1991    * @param conf
1992    * @param scheduler
1993    */
1994   public RpcServer(final Server server, final String name,
1995       final List<BlockingServiceAndInterface> services,
1996       final InetSocketAddress bindAddress, Configuration conf,
1997       RpcScheduler scheduler)
1998       throws IOException {
1999 
2000     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2001       this.reservoir = new BoundedByteBufferPool(
2002           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2003           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2004           // Make the max twice the number of handlers to be safe.
2005           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2006               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2007                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2008     } else {
2009       reservoir = null;
2010     }
2011     
2012     this.server = server;
2013     this.services = services;
2014     this.bindAddress = bindAddress;
2015     this.conf = conf;
2016     this.socketSendBufferSize = 0;
2017     this.maxQueueSize =
2018       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2019     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2020     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2021     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2022     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2023     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2024       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2025     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2026     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2027 
2028     // Start the listener here and let it bind to the port
2029     listener = new Listener(name);
2030     this.port = listener.getAddress().getPort();
2031 
2032     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2033     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2034     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2035 
2036     this.ipcUtil = new IPCUtil(conf);
2037 
2038 
2039     // Create the responder here
2040     responder = new Responder();
2041     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2042     this.userProvider = UserProvider.instantiate(conf);
2043     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2044     if (isSecurityEnabled) {
2045       HBaseSaslRpcServer.init(conf);
2046     }
2047     initReconfigurable(conf);
2048 
2049     this.scheduler = scheduler;
2050     this.scheduler.init(new RpcSchedulerContext(this));
2051   }
2052 
2053   @Override
2054   public void onConfigurationChange(Configuration newConf) {
2055     initReconfigurable(newConf);
2056   }
2057 
2058   private void initReconfigurable(Configuration confToLoad) {
2059     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2060     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2061       LOG.warn("********* WARNING! *********");
2062       LOG.warn("This server is configured to allow connections from INSECURE clients");
2063       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2064       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2065       LOG.warn("impersonation is possible!");
2066       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2067       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2068       LOG.warn("****************************");
2069     }
2070   }
2071 
2072   /**
2073    * Subclasses of HBaseServer can override this to provide their own
2074    * Connection implementations.
2075    */
2076   protected Connection getConnection(SocketChannel channel, long time) {
2077     return new Connection(channel, time);
2078   }
2079 
2080   /**
2081    * Setup response for the RPC Call.
2082    *
2083    * @param response buffer to serialize the response into
2084    * @param call {@link Call} to which we are setting up the response
2085    * @param error error message, if the call failed
2086    * @throws IOException
2087    */
2088   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2089   throws IOException {
2090     if (response != null) response.reset();
2091     call.setResponse(null, null, t, error);
2092   }
2093 
2094   protected void closeConnection(Connection connection) {
2095     synchronized (connectionList) {
2096       if (connectionList.remove(connection)) {
2097         numConnections--;
2098       }
2099     }
2100     connection.close();
2101   }
2102 
2103   Configuration getConf() {
2104     return conf;
2105   }
2106 
2107   /** Sets the socket buffer size used for responding to RPCs.
2108    * @param size send size
2109    */
2110   @Override
2111   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2112 
2113   @Override
2114   public boolean isStarted() {
2115     return this.started;
2116   }
2117 
2118   /** Starts the service.  Must be called before any calls will be handled. */
2119   @Override
2120   public synchronized void start() {
2121     if (started) return;
2122     authTokenSecretMgr = createSecretManager();
2123     if (authTokenSecretMgr != null) {
2124       setSecretManager(authTokenSecretMgr);
2125       authTokenSecretMgr.start();
2126     }
2127     this.authManager = new ServiceAuthorizationManager();
2128     HBasePolicyProvider.init(conf, authManager);
2129     responder.start();
2130     listener.start();
2131     scheduler.start();
2132     started = true;
2133   }
2134 
2135   @Override
2136   public synchronized void refreshAuthManager(PolicyProvider pp) {
2137     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2138     // it'll break if you go via static route.
2139     this.authManager.refresh(this.conf, pp);
2140   }
2141 
2142   private AuthenticationTokenSecretManager createSecretManager() {
2143     if (!isSecurityEnabled) return null;
2144     if (server == null) return null;
2145     Configuration conf = server.getConfiguration();
2146     long keyUpdateInterval =
2147         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2148     long maxAge =
2149         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2150     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2151         server.getServerName().toString(), keyUpdateInterval, maxAge);
2152   }
2153 
2154   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2155     return this.secretManager;
2156   }
2157 
2158   @SuppressWarnings("unchecked")
2159   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2160     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2161   }
2162 
2163   /**
2164    * This is a server side method, which is invoked over RPC. On success
2165    * the return response has protobuf response payload. On failure, the
2166    * exception name and the stack trace are returned in the protobuf response.
2167    */
2168   @Override
2169   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2170       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2171   throws IOException {
2172     try {
2173       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2174       // TODO: Review after we add in encoded data blocks.
2175       status.setRPCPacket(param);
2176       status.resume("Servicing call");
2177       //get an instance of the method arg type
2178       long startTime = System.currentTimeMillis();
2179       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2180       Message result = service.callBlockingMethod(md, controller, param);
2181       long endTime = System.currentTimeMillis();
2182       int processingTime = (int) (endTime - startTime);
2183       int qTime = (int) (startTime - receiveTime);
2184       int totalTime = (int) (endTime - receiveTime);
2185       if (LOG.isTraceEnabled()) {
2186         LOG.trace(CurCall.get().toString() +
2187             ", response " + TextFormat.shortDebugString(result) +
2188             " queueTime: " + qTime +
2189             " processingTime: " + processingTime +
2190             " totalTime: " + totalTime);
2191       }
2192       long requestSize = param.getSerializedSize();
2193       long responseSize = result.getSerializedSize();
2194       metrics.dequeuedCall(qTime);
2195       metrics.processedCall(processingTime);
2196       metrics.totalCall(totalTime);
2197       metrics.receivedRequest(requestSize);
2198       metrics.sentResponse(responseSize);
2199       // log any RPC responses that are slower than the configured warn
2200       // response time or larger than configured warning size
2201       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2202       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2203       if (tooSlow || tooLarge) {
2204         // when tagging, we let TooLarge trump TooSmall to keep output simple
2205         // note that large responses will often also be slow.
2206         logResponse(new Object[]{param},
2207             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2208             (tooLarge ? "TooLarge" : "TooSlow"),
2209             status.getClient(), startTime, processingTime, qTime,
2210             responseSize);
2211       }
2212       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2213     } catch (Throwable e) {
2214       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2215       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2216       // need to pass it over the wire.
2217       if (e instanceof ServiceException) {
2218         if (e.getCause() == null) {
2219           LOG.debug("Caught a ServiceException with null cause", e);
2220         } else {
2221           e = e.getCause();
2222         }
2223       }
2224 
2225       // increment the number of requests that were exceptions.
2226       metrics.exception(e);
2227 
2228       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2229       if (e instanceof IOException) throw (IOException)e;
2230       LOG.error("Unexpected throwable object ", e);
2231       throw new IOException(e.getMessage(), e);
2232     }
2233   }
2234 
2235   /**
2236    * Logs an RPC response to the LOG file, producing valid JSON objects for
2237    * client Operations.
2238    * @param params The parameters received in the call.
2239    * @param methodName The name of the method invoked
2240    * @param call The string representation of the call
2241    * @param tag  The tag that will be used to indicate this event in the log.
2242    * @param clientAddress   The address of the client who made this call.
2243    * @param startTime       The time that the call was initiated, in ms.
2244    * @param processingTime  The duration that the call took to run, in ms.
2245    * @param qTime           The duration that the call spent on the queue
2246    *                        prior to being initiated, in ms.
2247    * @param responseSize    The size in bytes of the response buffer.
2248    */
2249   void logResponse(Object[] params, String methodName, String call, String tag,
2250       String clientAddress, long startTime, int processingTime, int qTime,
2251       long responseSize)
2252           throws IOException {
2253     // base information that is reported regardless of type of call
2254     Map<String, Object> responseInfo = new HashMap<String, Object>();
2255     responseInfo.put("starttimems", startTime);
2256     responseInfo.put("processingtimems", processingTime);
2257     responseInfo.put("queuetimems", qTime);
2258     responseInfo.put("responsesize", responseSize);
2259     responseInfo.put("client", clientAddress);
2260     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2261     responseInfo.put("method", methodName);
2262     if (params.length == 2 && server instanceof HRegionServer &&
2263         params[0] instanceof byte[] &&
2264         params[1] instanceof Operation) {
2265       // if the slow process is a query, we want to log its table as well
2266       // as its own fingerprint
2267       TableName tableName = TableName.valueOf(
2268           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2269       responseInfo.put("table", tableName.getNameAsString());
2270       // annotate the response map with operation details
2271       responseInfo.putAll(((Operation) params[1]).toMap());
2272       // report to the log file
2273       LOG.warn("(operation" + tag + "): " +
2274                MAPPER.writeValueAsString(responseInfo));
2275     } else if (params.length == 1 && server instanceof HRegionServer &&
2276         params[0] instanceof Operation) {
2277       // annotate the response map with operation details
2278       responseInfo.putAll(((Operation) params[0]).toMap());
2279       // report to the log file
2280       LOG.warn("(operation" + tag + "): " +
2281                MAPPER.writeValueAsString(responseInfo));
2282     } else {
2283       // can't get JSON details, so just report call.toString() along with
2284       // a more generic tag.
2285       responseInfo.put("call", call);
2286       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2287     }
2288   }
2289 
2290   /** Stops the service.  No new calls will be handled after this is called. */
2291   @Override
2292   public synchronized void stop() {
2293     LOG.info("Stopping server on " + port);
2294     running = false;
2295     if (authTokenSecretMgr != null) {
2296       authTokenSecretMgr.stop();
2297       authTokenSecretMgr = null;
2298     }
2299     listener.interrupt();
2300     listener.doStop();
2301     responder.interrupt();
2302     scheduler.stop();
2303     notifyAll();
2304   }
2305 
2306   /** Wait for the server to be stopped.
2307    * Does not wait for all subthreads to finish.
2308    *  See {@link #stop()}.
2309    * @throws InterruptedException e
2310    */
2311   @Override
2312   public synchronized void join() throws InterruptedException {
2313     while (running) {
2314       wait();
2315     }
2316   }
2317 
2318   /**
2319    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2320    * the listener channel is closed.
2321    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2322    * information cannot be determined
2323    */
2324   @Override
2325   public synchronized InetSocketAddress getListenerAddress() {
2326     if (listener == null) {
2327       return null;
2328     }
2329     return listener.getAddress();
2330   }
2331 
2332   /**
2333    * Set the handler for calling out of RPC for error conditions.
2334    * @param handler the handler implementation
2335    */
2336   @Override
2337   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2338     this.errorHandler = handler;
2339   }
2340 
2341   @Override
2342   public HBaseRPCErrorHandler getErrorHandler() {
2343     return this.errorHandler;
2344   }
2345 
2346   /**
2347    * Returns the metrics instance for reporting RPC call statistics
2348    */
2349   @Override
2350   public MetricsHBaseServer getMetrics() {
2351     return metrics;
2352   }
2353 
2354   @Override
2355   public void addCallSize(final long diff) {
2356     this.callQueueSize.add(diff);
2357   }
2358 
2359   /**
2360    * Authorize the incoming client connection.
2361    *
2362    * @param user client user
2363    * @param connection incoming connection
2364    * @param addr InetAddress of incoming connection
2365    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2366    *         when the client isn't authorized to talk the protocol
2367    */
2368   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2369       InetAddress addr)
2370   throws AuthorizationException {
2371     if (authorize) {
2372       Class<?> c = getServiceInterface(services, connection.getServiceName());
2373       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2374     }
2375   }
2376 
2377   /**
2378    * When the read or write buffer size is larger than this limit, i/o will be
2379    * done in chunks of this size. Most RPC requests and responses would be
2380    * be smaller.
2381    */
2382   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2383 
2384   /**
2385    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2386    * If the amount of data is large, it writes to channel in smaller chunks.
2387    * This is to avoid jdk from creating many direct buffers as the size of
2388    * buffer increases. This also minimizes extra copies in NIO layer
2389    * as a result of multiple write operations required to write a large
2390    * buffer.
2391    *
2392    * @param channel writable byte channel to write to
2393    * @param bufferChain Chain of buffers to write
2394    * @return number of bytes written
2395    * @throws java.io.IOException e
2396    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2397    */
2398   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2399   throws IOException {
2400     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2401     if (count > 0) this.metrics.sentBytes(count);
2402     return count;
2403   }
2404 
2405   /**
2406    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2407    * If the amount of data is large, it writes to channel in smaller chunks.
2408    * This is to avoid jdk from creating many direct buffers as the size of
2409    * ByteBuffer increases. There should not be any performance degredation.
2410    *
2411    * @param channel writable byte channel to write on
2412    * @param buffer buffer to write
2413    * @return number of bytes written
2414    * @throws java.io.IOException e
2415    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2416    */
2417   protected int channelRead(ReadableByteChannel channel,
2418                                    ByteBuffer buffer) throws IOException {
2419 
2420     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2421            channel.read(buffer) : channelIO(channel, null, buffer);
2422     if (count > 0) {
2423       metrics.receivedBytes(count);
2424     }
2425     return count;
2426   }
2427 
2428   /**
2429    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2430    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2431    * one of readCh or writeCh should be non-null.
2432    *
2433    * @param readCh read channel
2434    * @param writeCh write channel
2435    * @param buf buffer to read or write into/out of
2436    * @return bytes written
2437    * @throws java.io.IOException e
2438    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2439    * @see #channelWrite(GatheringByteChannel, BufferChain)
2440    */
2441   private static int channelIO(ReadableByteChannel readCh,
2442                                WritableByteChannel writeCh,
2443                                ByteBuffer buf) throws IOException {
2444 
2445     int originalLimit = buf.limit();
2446     int initialRemaining = buf.remaining();
2447     int ret = 0;
2448 
2449     while (buf.remaining() > 0) {
2450       try {
2451         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2452         buf.limit(buf.position() + ioSize);
2453 
2454         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2455 
2456         if (ret < ioSize) {
2457           break;
2458         }
2459 
2460       } finally {
2461         buf.limit(originalLimit);
2462       }
2463     }
2464 
2465     int nBytes = initialRemaining - buf.remaining();
2466     return (nBytes > 0) ? nBytes : ret;
2467   }
2468 
2469   /**
2470    * Needed for features such as delayed calls.  We need to be able to store the current call
2471    * so that we can complete it later or ask questions of what is supported by the current ongoing
2472    * call.
2473    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2474    */
2475   public static RpcCallContext getCurrentCall() {
2476     return CurCall.get();
2477   }
2478 
2479   public static boolean isInRpcCallContext() {
2480     return CurCall.get() != null;
2481   }
2482 
2483   /**
2484    * Returns the user credentials associated with the current RPC request or
2485    * <code>null</code> if no credentials were provided.
2486    * @return A User
2487    */
2488   public static User getRequestUser() {
2489     RpcCallContext ctx = getCurrentCall();
2490     return ctx == null? null: ctx.getRequestUser();
2491   }
2492 
2493   /**
2494    * Returns the username for any user associated with the current RPC
2495    * request or <code>null</code> if no user is set.
2496    */
2497   public static String getRequestUserName() {
2498     User user = getRequestUser();
2499     return user == null? null: user.getShortName();
2500   }
2501 
2502   /**
2503    * @return Address of remote client if a request is ongoing, else null
2504    */
2505   public static InetAddress getRemoteAddress() {
2506     RpcCallContext ctx = getCurrentCall();
2507     return ctx == null? null: ctx.getRemoteAddress();
2508   }
2509 
2510   /**
2511    * @param serviceName Some arbitrary string that represents a 'service'.
2512    * @param services Available service instances
2513    * @return Matching BlockingServiceAndInterface pair
2514    */
2515   static BlockingServiceAndInterface getServiceAndInterface(
2516       final List<BlockingServiceAndInterface> services, final String serviceName) {
2517     for (BlockingServiceAndInterface bs : services) {
2518       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2519         return bs;
2520       }
2521     }
2522     return null;
2523   }
2524 
2525   /**
2526    * @param serviceName Some arbitrary string that represents a 'service'.
2527    * @param services Available services and their service interfaces.
2528    * @return Service interface class for <code>serviceName</code>
2529    */
2530   static Class<?> getServiceInterface(
2531       final List<BlockingServiceAndInterface> services,
2532       final String serviceName) {
2533     BlockingServiceAndInterface bsasi =
2534         getServiceAndInterface(services, serviceName);
2535     return bsasi == null? null: bsasi.getServiceInterface();
2536   }
2537 
2538   /**
2539    * @param serviceName Some arbitrary string that represents a 'service'.
2540    * @param services Available services and their service interfaces.
2541    * @return BlockingService that goes with the passed <code>serviceName</code>
2542    */
2543   static BlockingService getService(
2544       final List<BlockingServiceAndInterface> services,
2545       final String serviceName) {
2546     BlockingServiceAndInterface bsasi =
2547         getServiceAndInterface(services, serviceName);
2548     return bsasi == null? null: bsasi.getBlockingService();
2549   }
2550 
2551   static MonitoredRPCHandler getStatus() {
2552     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2553     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2554     if (status != null) {
2555       return status;
2556     }
2557     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2558     status.pause("Waiting for a call");
2559     RpcServer.MONITORED_RPC.set(status);
2560     return status;
2561   }
2562 
2563   /** Returns the remote side ip address when invoked inside an RPC
2564    *  Returns null incase of an error.
2565    *  @return InetAddress
2566    */
2567   public static InetAddress getRemoteIp() {
2568     Call call = CurCall.get();
2569     if (call != null && call.connection != null && call.connection.socket != null) {
2570       return call.connection.socket.getInetAddress();
2571     }
2572     return null;
2573   }
2574 
2575 
2576   /**
2577    * A convenience method to bind to a given address and report
2578    * better exceptions if the address is not a valid host.
2579    * @param socket the socket to bind
2580    * @param address the address to bind to
2581    * @param backlog the number of connections allowed in the queue
2582    * @throws BindException if the address can't be bound
2583    * @throws UnknownHostException if the address isn't a valid host name
2584    * @throws IOException other random errors from bind
2585    */
2586   public static void bind(ServerSocket socket, InetSocketAddress address,
2587                           int backlog) throws IOException {
2588     try {
2589       socket.bind(address, backlog);
2590     } catch (BindException e) {
2591       BindException bindException =
2592         new BindException("Problem binding to " + address + " : " +
2593             e.getMessage());
2594       bindException.initCause(e);
2595       throw bindException;
2596     } catch (SocketException e) {
2597       // If they try to bind to a different host's address, give a better
2598       // error message.
2599       if ("Unresolved address".equals(e.getMessage())) {
2600         throw new UnknownHostException("Invalid hostname for server: " +
2601                                        address.getHostName());
2602       }
2603       throw e;
2604     }
2605   }
2606 
2607   @Override
2608   public RpcScheduler getScheduler() {
2609     return scheduler;
2610   }
2611 }