View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.locks.Lock;
61  import java.util.concurrent.locks.ReentrantLock;
62  
63  import javax.security.sasl.Sasl;
64  import javax.security.sasl.SaslException;
65  import javax.security.sasl.SaslServer;
66  
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  import org.apache.hadoop.conf.Configuration;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.CellScanner;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.HBaseIOException;
74  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
75  import org.apache.hadoop.hbase.HConstants;
76  import org.apache.hadoop.hbase.HRegionInfo;
77  import org.apache.hadoop.hbase.Server;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.classification.InterfaceStability;
81  import org.apache.hadoop.hbase.client.Operation;
82  import org.apache.hadoop.hbase.client.VersionInfoUtil;
83  import org.apache.hadoop.hbase.codec.Codec;
84  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
85  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
86  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
87  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
90  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
91  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
93  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
99  import org.apache.hadoop.hbase.regionserver.HRegionServer;
100 import org.apache.hadoop.hbase.security.AccessDeniedException;
101 import org.apache.hadoop.hbase.security.AuthMethod;
102 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
103 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
105 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
106 import org.apache.hadoop.hbase.security.SaslStatus;
107 import org.apache.hadoop.hbase.security.SaslUtil;
108 import org.apache.hadoop.hbase.security.User;
109 import org.apache.hadoop.hbase.security.UserProvider;
110 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
111 import org.apache.hadoop.hbase.util.Bytes;
112 import org.apache.hadoop.hbase.util.Counter;
113 import org.apache.hadoop.hbase.util.Pair;
114 import org.apache.hadoop.hbase.util.Threads;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.apache.htrace.TraceInfo;
131 import org.codehaus.jackson.map.ObjectMapper;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.CodedOutputStream;
137 import com.google.protobuf.Descriptors.MethodDescriptor;
138 import com.google.protobuf.Message;
139 import com.google.protobuf.ServiceException;
140 import com.google.protobuf.TextFormat;
141 
142 /**
143  * An RPC server that hosts protobuf described Services.
144  *
145  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
146  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
147  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
148  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
149  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
150  * and loops till done.
151  *
152  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
153  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
154  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
155  * and keep taking while the server is up.
156  *
157  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
158  * queue for Responder to pull from and return result to client.
159  *
160  * @see RpcClientImpl
161  */
162 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
163 @InterfaceStability.Evolving
164 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
165   // LOG is being used in CallRunner and the log level is being changed in tests
166   public static final Log LOG = LogFactory.getLog(RpcServer.class);
167   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
168       = new CallQueueTooBigException();
169 
170   private final boolean authorize;
171   private boolean isSecurityEnabled;
172 
173   public static final byte CURRENT_VERSION = 0;
174 
175   /**
176    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
177    */
178   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
179           "hbase.ipc.server.fallback-to-simple-auth-allowed";
180 
181   /**
182    * How many calls/handler are allowed in the queue.
183    */
184   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
185 
186   /**
187    * The maximum size that we can hold in the RPC queue
188    */
189   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
190 
191   private final IPCUtil ipcUtil;
192 
193   private static final String AUTH_FAILED_FOR = "Auth failed for ";
194   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
195   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
196     Server.class.getName());
197   protected SecretManager<TokenIdentifier> secretManager;
198   protected ServiceAuthorizationManager authManager;
199 
200   /** This is set to Call object before Handler invokes an RPC and ybdie
201    * after the call returns.
202    */
203   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
204 
205   /** Keeps MonitoredRPCHandler per handler thread. */
206   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
207       = new ThreadLocal<MonitoredRPCHandler>();
208 
209   protected final InetSocketAddress bindAddress;
210   protected int port;                             // port we listen on
211   protected InetSocketAddress address;            // inet address we listen on
212   private int readThreads;                        // number of read threads
213   protected int maxIdleTime;                      // the maximum idle time after
214                                                   // which a client may be
215                                                   // disconnected
216   protected int thresholdIdleConnections;         // the number of idle
217                                                   // connections after which we
218                                                   // will start cleaning up idle
219                                                   // connections
220   int maxConnectionsToNuke;                       // the max number of
221                                                   // connections to nuke
222                                                   // during a cleanup
223 
224   protected MetricsHBaseServer metrics;
225 
226   protected final Configuration conf;
227 
228   private int maxQueueSize;
229   protected int socketSendBufferSize;
230   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
231   protected final boolean tcpKeepAlive; // if T then use keepalives
232   protected final long purgeTimeout;    // in milliseconds
233 
234   /**
235    * This flag is used to indicate to sub threads when they should go down.  When we call
236    * {@link #start()}, all threads started will consult this flag on whether they should
237    * keep going.  It is set to false when {@link #stop()} is called.
238    */
239   volatile boolean running = true;
240 
241   /**
242    * This flag is set to true after all threads are up and 'running' and the server is then opened
243    * for business by the call to {@link #start()}.
244    */
245   volatile boolean started = false;
246 
247   /**
248    * This is a running count of the size of all outstanding calls by size.
249    */
250   protected final Counter callQueueSize = new Counter();
251 
252   protected final List<Connection> connectionList =
253     Collections.synchronizedList(new LinkedList<Connection>());
254   //maintain a list
255   //of client connections
256   private Listener listener = null;
257   protected Responder responder = null;
258   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
259   protected int numConnections = 0;
260 
261   protected HBaseRPCErrorHandler errorHandler = null;
262 
263   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
264   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
265 
266   /** Default value for above params */
267   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
268   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
269 
270   private static final ObjectMapper MAPPER = new ObjectMapper();
271 
272   private final int warnResponseTime;
273   private final int warnResponseSize;
274   private final Server server;
275   private final List<BlockingServiceAndInterface> services;
276 
277   private final RpcScheduler scheduler;
278 
279   private UserProvider userProvider;
280 
281   private final BoundedByteBufferPool reservoir;
282 
283   private volatile boolean allowFallbackToSimpleAuth;
284 
285   /**
286    * Datastructure that holds all necessary to a method invocation and then afterward, carries
287    * the result.
288    */
289   class Call implements RpcCallContext {
290     protected int id;                             // the client's call id
291     protected BlockingService service;
292     protected MethodDescriptor md;
293     protected RequestHeader header;
294     protected Message param;                      // the parameter passed
295     // Optional cell data passed outside of protobufs.
296     protected CellScanner cellScanner;
297     protected Connection connection;              // connection to client
298     protected long timestamp;      // the time received when response is null
299                                    // the time served when response is not null
300     /**
301      * Chain of buffers to send as response.
302      */
303     protected BufferChain response;
304     protected Responder responder;
305 
306     protected long size;                          // size of current call
307     protected boolean isError;
308     protected TraceInfo tinfo;
309     private ByteBuffer cellBlock = null;
310 
311     private User user;
312     private InetAddress remoteAddress;
313     private RpcCallback callback;
314 
315     private long responseCellSize = 0;
316     private long responseBlockSize = 0;
317     private boolean retryImmediatelySupported;
318 
319     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
320         justification="Can't figure why this complaint is happening... see below")
321     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
322          Message param, CellScanner cellScanner, Connection connection, Responder responder,
323          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
324       this.id = id;
325       this.service = service;
326       this.md = md;
327       this.header = header;
328       this.param = param;
329       this.cellScanner = cellScanner;
330       this.connection = connection;
331       this.timestamp = System.currentTimeMillis();
332       this.response = null;
333       this.responder = responder;
334       this.isError = false;
335       this.size = size;
336       this.tinfo = tinfo;
337       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
338       this.remoteAddress = remoteAddress;
339       this.retryImmediatelySupported =
340           connection == null? null: connection.retryImmediatelySupported;
341     }
342 
343     /**
344      * Call is done. Execution happened and we returned results to client. It is now safe to
345      * cleanup.
346      */
347     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
348         justification="Presume the lock on processing request held by caller is protection enough")
349     void done() {
350       if (this.cellBlock != null && reservoir != null) {
351         // Return buffer to reservoir now we are done with it.
352         reservoir.putBuffer(this.cellBlock);
353         this.cellBlock = null;
354       }
355       this.connection.decRpcCount();  // Say that we're done with this call.
356     }
357 
358     @Override
359     public String toString() {
360       return toShortString() + " param: " +
361         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
362         " connection: " + connection.toString();
363     }
364 
365     protected RequestHeader getHeader() {
366       return this.header;
367     }
368 
369     /*
370      * Short string representation without param info because param itself could be huge depends on
371      * the payload of a command
372      */
373     String toShortString() {
374       String serviceName = this.connection.service != null ?
375           this.connection.service.getDescriptorForType().getName() : "null";
376       return "callId: " + this.id + " service: " + serviceName +
377           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
378           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
379           " connection: " + connection.toString();
380     }
381 
382     String toTraceString() {
383       String serviceName = this.connection.service != null ?
384                            this.connection.service.getDescriptorForType().getName() : "";
385       String methodName = (this.md != null) ? this.md.getName() : "";
386       return serviceName + "." + methodName;
387     }
388 
389     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
390       this.response = new BufferChain(response);
391     }
392 
393     protected synchronized void setResponse(Object m, final CellScanner cells,
394         Throwable t, String errorMsg) {
395       if (this.isError) return;
396       if (t != null) this.isError = true;
397       BufferChain bc = null;
398       try {
399         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
400         // Presume it a pb Message.  Could be null.
401         Message result = (Message)m;
402         // Call id.
403         headerBuilder.setCallId(this.id);
404         if (t != null) {
405           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
406           exceptionBuilder.setExceptionClassName(t.getClass().getName());
407           exceptionBuilder.setStackTrace(errorMsg);
408           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
409           if (t instanceof RegionMovedException) {
410             // Special casing for this exception.  This is only one carrying a payload.
411             // Do this instead of build a generic system for allowing exceptions carry
412             // any kind of payload.
413             RegionMovedException rme = (RegionMovedException)t;
414             exceptionBuilder.setHostname(rme.getHostname());
415             exceptionBuilder.setPort(rme.getPort());
416           }
417           // Set the exception as the result of the method invocation.
418           headerBuilder.setException(exceptionBuilder.build());
419         }
420         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
421         // reservoir when finished. This is hacky and the hack is not contained but benefits are
422         // high when we can avoid a big buffer allocation on each rpc.
423         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
424           this.connection.compressionCodec, cells, reservoir);
425         if (this.cellBlock != null) {
426           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
427           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
428           cellBlockBuilder.setLength(this.cellBlock.limit());
429           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
430         }
431         Message header = headerBuilder.build();
432 
433         byte[] b = createHeaderAndMessageBytes(result, header);
434 
435         bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
436 
437         if (connection.useWrap) {
438           bc = wrapWithSasl(bc);
439         }
440       } catch (IOException e) {
441         LOG.warn("Exception while creating response " + e);
442       }
443       this.response = bc;
444       // Once a response message is created and set to this.response, this Call can be treated as
445       // done. The Responder thread will do the n/w write of this message back to client.
446       if (this.callback != null) {
447         try {
448           this.callback.run();
449         } catch (Exception e) {
450           // Don't allow any exception here to kill this handler thread.
451           LOG.warn("Exception while running the Rpc Callback.", e);
452         }
453       }
454     }
455 
456     private byte[] createHeaderAndMessageBytes(Message result, Message header)
457         throws IOException {
458       // Organize the response as a set of bytebuffers rather than collect it all together inside
459       // one big byte array; save on allocations.
460       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
461           resultVintSize = 0;
462       if (header != null) {
463         headerSerializedSize = header.getSerializedSize();
464         headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
465       }
466       if (result != null) {
467         resultSerializedSize = result.getSerializedSize();
468         resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
469       }
470       // calculate the total size
471       int totalSize = headerSerializedSize + headerVintSize
472           + (resultSerializedSize + resultVintSize)
473           + (this.cellBlock == null ? 0 : this.cellBlock.limit());
474       // The byte[] should also hold the totalSize of the header, message and the cellblock
475       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
476           + resultVintSize + Bytes.SIZEOF_INT];
477       // The RpcClient expects the int to be in a format that code be decoded by
478       // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
479       // form of writing int.
480       Bytes.putInt(b, 0, totalSize);
481       CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
482           b.length - Bytes.SIZEOF_INT);
483       if (header != null) {
484         cos.writeMessageNoTag(header);
485       }
486       if (result != null) {
487         cos.writeMessageNoTag(result);
488       }
489       cos.flush();
490       cos.checkNoSpaceLeft();
491       return b;
492     }
493 
494     private BufferChain wrapWithSasl(BufferChain bc)
495         throws IOException {
496       if (!this.connection.useSasl) return bc;
497       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
498       // THIS IS A BIG UGLY COPY.
499       byte [] responseBytes = bc.getBytes();
500       byte [] token;
501       // synchronization may be needed since there can be multiple Handler
502       // threads using saslServer to wrap responses.
503       synchronized (connection.saslServer) {
504         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
505       }
506       if (LOG.isTraceEnabled()) {
507         LOG.trace("Adding saslServer wrapped token of size " + token.length
508             + " as call response.");
509       }
510 
511       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
512       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
513       return new BufferChain(bbTokenLength, bbTokenBytes);
514     }
515 
516     @Override
517     public boolean isClientCellBlockSupported() {
518       return this.connection != null && this.connection.codec != null;
519     }
520 
521     @Override
522     public long disconnectSince() {
523       if (!connection.channel.isOpen()) {
524         return System.currentTimeMillis() - timestamp;
525       } else {
526         return -1L;
527       }
528     }
529 
530     public long getSize() {
531       return this.size;
532     }
533 
534     @Override
535     public long getResponseCellSize() {
536       return responseCellSize;
537     }
538 
539     @Override
540     public void incrementResponseCellSize(long cellSize) {
541       responseCellSize += cellSize;
542     }
543 
544     @Override
545     public long getResponseBlockSize() {
546       return responseBlockSize;
547     }
548 
549     @Override
550     public void incrementResponseBlockSize(long blockSize) {
551       responseBlockSize += blockSize;
552     }
553 
554     public synchronized void sendResponseIfReady() throws IOException {
555       this.responder.doRespond(this);
556     }
557 
558     public UserGroupInformation getRemoteUser() {
559       return connection.ugi;
560     }
561 
562     @Override
563     public User getRequestUser() {
564       return user;
565     }
566 
567     @Override
568     public String getRequestUserName() {
569       User user = getRequestUser();
570       return user == null? null: user.getShortName();
571     }
572 
573     @Override
574     public InetAddress getRemoteAddress() {
575       return remoteAddress;
576     }
577 
578     @Override
579     public VersionInfo getClientVersionInfo() {
580       return connection.getVersionInfo();
581     }
582 
583     @Override
584     public synchronized void setCallBack(RpcCallback callback) {
585       this.callback = callback;
586     }
587 
588     @Override
589     public boolean isRetryImmediatelySupported() {
590       return retryImmediatelySupported;
591     }
592   }
593 
594   /** Listens on the socket. Creates jobs for the handler threads*/
595   private class Listener extends Thread {
596 
597     private ServerSocketChannel acceptChannel = null; //the accept channel
598     private Selector selector = null; //the selector that we use for the server
599     private Reader[] readers = null;
600     private int currentReader = 0;
601     private Random rand = new Random();
602     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
603                                          //-tion (for idle connections) ran
604     private long cleanupInterval = 10000; //the minimum interval between
605                                           //two cleanup runs
606     private int backlogLength;
607 
608     private ExecutorService readPool;
609 
610     public Listener(final String name) throws IOException {
611       super(name);
612       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
613       // Create a new server socket and set to non blocking mode
614       acceptChannel = ServerSocketChannel.open();
615       acceptChannel.configureBlocking(false);
616 
617       // Bind the server socket to the binding addrees (can be different from the default interface)
618       bind(acceptChannel.socket(), bindAddress, backlogLength);
619       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
620       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
621       // create a selector;
622       selector= Selector.open();
623 
624       readers = new Reader[readThreads];
625       readPool = Executors.newFixedThreadPool(readThreads,
626         new ThreadFactoryBuilder().setNameFormat(
627           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
628           ",port=" + port).setDaemon(true)
629         .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
630       for (int i = 0; i < readThreads; ++i) {
631         Reader reader = new Reader();
632         readers[i] = reader;
633         readPool.execute(reader);
634       }
635       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
636 
637       // Register accepts on the server socket with the selector.
638       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
639       this.setName("RpcServer.listener,port=" + port);
640       this.setDaemon(true);
641     }
642 
643 
644     private class Reader implements Runnable {
645       private volatile boolean adding = false;
646       private final Selector readSelector;
647 
648       Reader() throws IOException {
649         this.readSelector = Selector.open();
650       }
651       @Override
652       public void run() {
653         try {
654           doRunLoop();
655         } finally {
656           try {
657             readSelector.close();
658           } catch (IOException ioe) {
659             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
660           }
661         }
662       }
663 
664       private synchronized void doRunLoop() {
665         while (running) {
666           try {
667             readSelector.select();
668             while (adding) {
669               this.wait(1000);
670             }
671 
672             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
673             while (iter.hasNext()) {
674               SelectionKey key = iter.next();
675               iter.remove();
676               if (key.isValid()) {
677                 if (key.isReadable()) {
678                   doRead(key);
679                 }
680               }
681             }
682           } catch (InterruptedException e) {
683             LOG.debug("Interrupted while sleeping");
684             return;
685           } catch (IOException ex) {
686             LOG.info(getName() + ": IOException in Reader", ex);
687           }
688         }
689       }
690 
691       /**
692        * This gets reader into the state that waits for the new channel
693        * to be registered with readSelector. If it was waiting in select()
694        * the thread will be woken up, otherwise whenever select() is called
695        * it will return even if there is nothing to read and wait
696        * in while(adding) for finishAdd call
697        */
698       public void startAdd() {
699         adding = true;
700         readSelector.wakeup();
701       }
702 
703       public synchronized SelectionKey registerChannel(SocketChannel channel)
704         throws IOException {
705         return channel.register(readSelector, SelectionKey.OP_READ);
706       }
707 
708       public synchronized void finishAdd() {
709         adding = false;
710         this.notify();
711       }
712     }
713 
714     /** cleanup connections from connectionList. Choose a random range
715      * to scan and also have a limit on the number of the connections
716      * that will be cleanedup per run. The criteria for cleanup is the time
717      * for which the connection was idle. If 'force' is true then all
718      * connections will be looked at for the cleanup.
719      * @param force all connections will be looked at for cleanup
720      */
721     private void cleanupConnections(boolean force) {
722       if (force || numConnections > thresholdIdleConnections) {
723         long currentTime = System.currentTimeMillis();
724         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
725           return;
726         }
727         int start = 0;
728         int end = numConnections - 1;
729         if (!force) {
730           start = rand.nextInt() % numConnections;
731           end = rand.nextInt() % numConnections;
732           int temp;
733           if (end < start) {
734             temp = start;
735             start = end;
736             end = temp;
737           }
738         }
739         int i = start;
740         int numNuked = 0;
741         while (i <= end) {
742           Connection c;
743           synchronized (connectionList) {
744             try {
745               c = connectionList.get(i);
746             } catch (Exception e) {return;}
747           }
748           if (c.timedOut(currentTime)) {
749             if (LOG.isDebugEnabled())
750               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
751             closeConnection(c);
752             numNuked++;
753             end--;
754             //noinspection UnusedAssignment
755             c = null;
756             if (!force && numNuked == maxConnectionsToNuke) break;
757           }
758           else i++;
759         }
760         lastCleanupRunTime = System.currentTimeMillis();
761       }
762     }
763 
764     @Override
765     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
766       justification="selector access is not synchronized; seems fine but concerned changing " +
767         "it will have per impact")
768     public void run() {
769       LOG.info(getName() + ": starting");
770       while (running) {
771         SelectionKey key = null;
772         try {
773           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
774           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
775           while (iter.hasNext()) {
776             key = iter.next();
777             iter.remove();
778             try {
779               if (key.isValid()) {
780                 if (key.isAcceptable())
781                   doAccept(key);
782               }
783             } catch (IOException ignored) {
784               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
785             }
786             key = null;
787           }
788         } catch (OutOfMemoryError e) {
789           if (errorHandler != null) {
790             if (errorHandler.checkOOME(e)) {
791               LOG.info(getName() + ": exiting on OutOfMemoryError");
792               closeCurrentConnection(key, e);
793               cleanupConnections(true);
794               return;
795             }
796           } else {
797             // we can run out of memory if we have too many threads
798             // log the event and sleep for a minute and give
799             // some thread(s) a chance to finish
800             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
801             closeCurrentConnection(key, e);
802             cleanupConnections(true);
803             try {
804               Thread.sleep(60000);
805             } catch (InterruptedException ex) {
806               LOG.debug("Interrupted while sleeping");
807               return;
808             }
809           }
810         } catch (Exception e) {
811           closeCurrentConnection(key, e);
812         }
813         cleanupConnections(false);
814       }
815 
816       LOG.info(getName() + ": stopping");
817 
818       synchronized (this) {
819         try {
820           acceptChannel.close();
821           selector.close();
822         } catch (IOException ignored) {
823           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
824         }
825 
826         selector= null;
827         acceptChannel= null;
828 
829         // clean up all connections
830         while (!connectionList.isEmpty()) {
831           closeConnection(connectionList.remove(0));
832         }
833       }
834     }
835 
836     private void closeCurrentConnection(SelectionKey key, Throwable e) {
837       if (key != null) {
838         Connection c = (Connection)key.attachment();
839         if (c != null) {
840           if (LOG.isDebugEnabled()) {
841             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
842                 (e != null ? " on error " + e.getMessage() : ""));
843           }
844           closeConnection(c);
845           key.attach(null);
846         }
847       }
848     }
849 
850     InetSocketAddress getAddress() {
851       return address;
852     }
853 
854     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
855       Connection c;
856       ServerSocketChannel server = (ServerSocketChannel) key.channel();
857 
858       SocketChannel channel;
859       while ((channel = server.accept()) != null) {
860         try {
861           channel.configureBlocking(false);
862           channel.socket().setTcpNoDelay(tcpNoDelay);
863           channel.socket().setKeepAlive(tcpKeepAlive);
864         } catch (IOException ioe) {
865           channel.close();
866           throw ioe;
867         }
868 
869         Reader reader = getReader();
870         try {
871           reader.startAdd();
872           SelectionKey readKey = reader.registerChannel(channel);
873           c = getConnection(channel, System.currentTimeMillis());
874           readKey.attach(c);
875           synchronized (connectionList) {
876             connectionList.add(numConnections, c);
877             numConnections++;
878           }
879           if (LOG.isDebugEnabled())
880             LOG.debug(getName() + ": connection from " + c.toString() +
881                 "; # active connections: " + numConnections);
882         } finally {
883           reader.finishAdd();
884         }
885       }
886     }
887 
888     void doRead(SelectionKey key) throws InterruptedException {
889       int count;
890       Connection c = (Connection) key.attachment();
891       if (c == null) {
892         return;
893       }
894       c.setLastContact(System.currentTimeMillis());
895       try {
896         count = c.readAndProcess();
897 
898         if (count > 0) {
899           c.setLastContact(System.currentTimeMillis());
900         }
901 
902       } catch (InterruptedException ieo) {
903         throw ieo;
904       } catch (Exception e) {
905         if (LOG.isDebugEnabled()) {
906           LOG.debug(getName() + ": Caught exception while reading:", e);
907         }
908         count = -1; //so that the (count < 0) block is executed
909       }
910       if (count < 0) {
911         if (LOG.isDebugEnabled()) {
912           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
913               " because read count=" + count +
914               ". Number of active connections: " + numConnections);
915         }
916         closeConnection(c);
917       }
918     }
919 
920     synchronized void doStop() {
921       if (selector != null) {
922         selector.wakeup();
923         Thread.yield();
924       }
925       if (acceptChannel != null) {
926         try {
927           acceptChannel.socket().close();
928         } catch (IOException e) {
929           LOG.info(getName() + ": exception in closing listener socket. " + e);
930         }
931       }
932       readPool.shutdownNow();
933     }
934 
935     // The method that will return the next reader to work with
936     // Simplistic implementation of round robin for now
937     Reader getReader() {
938       currentReader = (currentReader + 1) % readers.length;
939       return readers[currentReader];
940     }
941   }
942 
943   // Sends responses of RPC back to clients.
944   protected class Responder extends Thread {
945     private final Selector writeSelector;
946     private final Set<Connection> writingCons =
947         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
948 
949     Responder() throws IOException {
950       this.setName("RpcServer.responder");
951       this.setDaemon(true);
952       this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
953       writeSelector = Selector.open(); // create a selector
954     }
955 
956     @Override
957     public void run() {
958       LOG.info(getName() + ": starting");
959       try {
960         doRunLoop();
961       } finally {
962         LOG.info(getName() + ": stopping");
963         try {
964           writeSelector.close();
965         } catch (IOException ioe) {
966           LOG.error(getName() + ": couldn't close write selector", ioe);
967         }
968       }
969     }
970 
971     /**
972      * Take the list of the connections that want to write, and register them
973      * in the selector.
974      */
975     private void registerWrites() {
976       Iterator<Connection> it = writingCons.iterator();
977       while (it.hasNext()) {
978         Connection c = it.next();
979         it.remove();
980         SelectionKey sk = c.channel.keyFor(writeSelector);
981         try {
982           if (sk == null) {
983             try {
984               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
985             } catch (ClosedChannelException e) {
986               // ignore: the client went away.
987               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
988             }
989           } else {
990             sk.interestOps(SelectionKey.OP_WRITE);
991           }
992         } catch (CancelledKeyException e) {
993           // ignore: the client went away.
994           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
995         }
996       }
997     }
998 
999     /**
1000      * Add a connection to the list that want to write,
1001      */
1002     public void registerForWrite(Connection c) {
1003       if (writingCons.add(c)) {
1004         writeSelector.wakeup();
1005       }
1006     }
1007 
1008     private void doRunLoop() {
1009       long lastPurgeTime = 0;   // last check for old calls.
1010       while (running) {
1011         try {
1012           registerWrites();
1013           int keyCt = writeSelector.select(purgeTimeout);
1014           if (keyCt == 0) {
1015             continue;
1016           }
1017 
1018           Set<SelectionKey> keys = writeSelector.selectedKeys();
1019           Iterator<SelectionKey> iter = keys.iterator();
1020           while (iter.hasNext()) {
1021             SelectionKey key = iter.next();
1022             iter.remove();
1023             try {
1024               if (key.isValid() && key.isWritable()) {
1025                 doAsyncWrite(key);
1026               }
1027             } catch (IOException e) {
1028               LOG.debug(getName() + ": asyncWrite", e);
1029             }
1030           }
1031 
1032           lastPurgeTime = purge(lastPurgeTime);
1033 
1034         } catch (OutOfMemoryError e) {
1035           if (errorHandler != null) {
1036             if (errorHandler.checkOOME(e)) {
1037               LOG.info(getName() + ": exiting on OutOfMemoryError");
1038               return;
1039             }
1040           } else {
1041             //
1042             // we can run out of memory if we have too many threads
1043             // log the event and sleep for a minute and give
1044             // some thread(s) a chance to finish
1045             //
1046             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1047             try {
1048               Thread.sleep(60000);
1049             } catch (InterruptedException ex) {
1050               LOG.debug("Interrupted while sleeping");
1051               return;
1052             }
1053           }
1054         } catch (Exception e) {
1055           LOG.warn(getName() + ": exception in Responder " +
1056               StringUtils.stringifyException(e), e);
1057         }
1058       }
1059       LOG.info(getName() + ": stopped");
1060     }
1061 
1062     /**
1063      * If there were some calls that have not been sent out for a
1064      * long time, we close the connection.
1065      * @return the time of the purge.
1066      */
1067     private long purge(long lastPurgeTime) {
1068       long now = System.currentTimeMillis();
1069       if (now < lastPurgeTime + purgeTimeout) {
1070         return lastPurgeTime;
1071       }
1072 
1073       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1074       // get the list of channels from list of keys.
1075       synchronized (writeSelector.keys()) {
1076         for (SelectionKey key : writeSelector.keys()) {
1077           Connection connection = (Connection) key.attachment();
1078           if (connection == null) {
1079             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1080           }
1081           Call call = connection.responseQueue.peekFirst();
1082           if (call != null && now > call.timestamp + purgeTimeout) {
1083             conWithOldCalls.add(call.connection);
1084           }
1085         }
1086       }
1087 
1088       // Seems safer to close the connection outside of the synchronized loop...
1089       for (Connection connection : conWithOldCalls) {
1090         closeConnection(connection);
1091       }
1092 
1093       return now;
1094     }
1095 
1096     private void doAsyncWrite(SelectionKey key) throws IOException {
1097       Connection connection = (Connection) key.attachment();
1098       if (connection == null) {
1099         throw new IOException("doAsyncWrite: no connection");
1100       }
1101       if (key.channel() != connection.channel) {
1102         throw new IOException("doAsyncWrite: bad channel");
1103       }
1104 
1105       if (processAllResponses(connection)) {
1106         try {
1107           // We wrote everything, so we don't need to be told when the socket is ready for
1108           //  write anymore.
1109          key.interestOps(0);
1110         } catch (CancelledKeyException e) {
1111           /* The Listener/reader might have closed the socket.
1112            * We don't explicitly cancel the key, so not sure if this will
1113            * ever fire.
1114            * This warning could be removed.
1115            */
1116           LOG.warn("Exception while changing ops : " + e);
1117         }
1118       }
1119     }
1120 
1121     /**
1122      * Process the response for this call. You need to have the lock on
1123      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1124      *
1125      * @param call the call
1126      * @return true if we proceed the call fully, false otherwise.
1127      * @throws IOException
1128      */
1129     private boolean processResponse(final Call call) throws IOException {
1130       boolean error = true;
1131       try {
1132         // Send as much data as we can in the non-blocking fashion
1133         long numBytes = channelWrite(call.connection.channel, call.response);
1134         if (numBytes < 0) {
1135           throw new HBaseIOException("Error writing on the socket " +
1136             "for the call:" + call.toShortString());
1137         }
1138         error = false;
1139       } finally {
1140         if (error) {
1141           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1142           closeConnection(call.connection);
1143         }
1144       }
1145 
1146       if (!call.response.hasRemaining()) {
1147         call.done();
1148         return true;
1149       } else {
1150         return false; // Socket can't take more, we will have to come back.
1151       }
1152     }
1153 
1154     /**
1155      * Process all the responses for this connection
1156      *
1157      * @return true if all the calls were processed or that someone else is doing it.
1158      * false if there * is still some work to do. In this case, we expect the caller to
1159      * delay us.
1160      * @throws IOException
1161      */
1162     private boolean processAllResponses(final Connection connection) throws IOException {
1163       // We want only one writer on the channel for a connection at a time.
1164       connection.responseWriteLock.lock();
1165       try {
1166         for (int i = 0; i < 20; i++) {
1167           // protection if some handlers manage to need all the responder
1168           Call call = connection.responseQueue.pollFirst();
1169           if (call == null) {
1170             return true;
1171           }
1172           if (!processResponse(call)) {
1173             connection.responseQueue.addFirst(call);
1174             return false;
1175           }
1176         }
1177       } finally {
1178         connection.responseWriteLock.unlock();
1179       }
1180 
1181       return connection.responseQueue.isEmpty();
1182     }
1183 
1184     //
1185     // Enqueue a response from the application.
1186     //
1187     void doRespond(Call call) throws IOException {
1188       boolean added = false;
1189 
1190       // If there is already a write in progress, we don't wait. This allows to free the handlers
1191       //  immediately for other tasks.
1192       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1193         try {
1194           if (call.connection.responseQueue.isEmpty()) {
1195             // If we're alone, we can try to do a direct call to the socket. It's
1196             //  an optimisation to save on context switches and data transfer between cores..
1197             if (processResponse(call)) {
1198               return; // we're done.
1199             }
1200             // Too big to fit, putting ahead.
1201             call.connection.responseQueue.addFirst(call);
1202             added = true; // We will register to the selector later, outside of the lock.
1203           }
1204         } finally {
1205           call.connection.responseWriteLock.unlock();
1206         }
1207       }
1208 
1209       if (!added) {
1210         call.connection.responseQueue.addLast(call);
1211       }
1212       call.responder.registerForWrite(call.connection);
1213 
1214       // set the serve time when the response has to be sent later
1215       call.timestamp = System.currentTimeMillis();
1216     }
1217   }
1218 
1219   /** Reads calls from a connection and queues them for handling. */
1220   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1221       value="VO_VOLATILE_INCREMENT",
1222       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1223   public class Connection {
1224     // If initial preamble with version and magic has been read or not.
1225     private boolean connectionPreambleRead = false;
1226     // If the connection header has been read or not.
1227     private boolean connectionHeaderRead = false;
1228     protected SocketChannel channel;
1229     private ByteBuffer data;
1230     private ByteBuffer dataLengthBuffer;
1231     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1232     private final Lock responseWriteLock = new ReentrantLock();
1233     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1234     private long lastContact;
1235     private InetAddress addr;
1236     protected Socket socket;
1237     // Cache the remote host & port info so that even if the socket is
1238     // disconnected, we can say where it used to connect to.
1239     protected String hostAddress;
1240     protected int remotePort;
1241     ConnectionHeader connectionHeader;
1242     /**
1243      * Codec the client asked use.
1244      */
1245     private Codec codec;
1246     /**
1247      * Compression codec the client asked us use.
1248      */
1249     private CompressionCodec compressionCodec;
1250     BlockingService service;
1251 
1252     private AuthMethod authMethod;
1253     private boolean saslContextEstablished;
1254     private boolean skipInitialSaslHandshake;
1255     private ByteBuffer unwrappedData;
1256     // When is this set?  FindBugs wants to know!  Says NP
1257     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1258     boolean useSasl;
1259     SaslServer saslServer;
1260     private boolean useWrap = false;
1261     // Fake 'call' for failed authorization response
1262     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1263     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1264         null, null, this, null, 0, null, null);
1265     private ByteArrayOutputStream authFailedResponse =
1266         new ByteArrayOutputStream();
1267     // Fake 'call' for SASL context setup
1268     private static final int SASL_CALLID = -33;
1269     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1270         0, null, null);
1271 
1272     // was authentication allowed with a fallback to simple auth
1273     private boolean authenticatedWithFallback;
1274 
1275     private boolean retryImmediatelySupported = false;
1276 
1277     public UserGroupInformation attemptingUser = null; // user name before auth
1278     protected User user = null;
1279     protected UserGroupInformation ugi = null;
1280 
1281     public Connection(SocketChannel channel, long lastContact) {
1282       this.channel = channel;
1283       this.lastContact = lastContact;
1284       this.data = null;
1285       this.dataLengthBuffer = ByteBuffer.allocate(4);
1286       this.socket = channel.socket();
1287       this.addr = socket.getInetAddress();
1288       if (addr == null) {
1289         this.hostAddress = "*Unknown*";
1290       } else {
1291         this.hostAddress = addr.getHostAddress();
1292       }
1293       this.remotePort = socket.getPort();
1294       if (socketSendBufferSize != 0) {
1295         try {
1296           socket.setSendBufferSize(socketSendBufferSize);
1297         } catch (IOException e) {
1298           LOG.warn("Connection: unable to set socket send buffer size to " +
1299                    socketSendBufferSize);
1300         }
1301       }
1302     }
1303 
1304       @Override
1305     public String toString() {
1306       return getHostAddress() + ":" + remotePort;
1307     }
1308 
1309     public String getHostAddress() {
1310       return hostAddress;
1311     }
1312 
1313     public InetAddress getHostInetAddress() {
1314       return addr;
1315     }
1316 
1317     public int getRemotePort() {
1318       return remotePort;
1319     }
1320 
1321     public void setLastContact(long lastContact) {
1322       this.lastContact = lastContact;
1323     }
1324 
1325     public VersionInfo getVersionInfo() {
1326       if (connectionHeader.hasVersionInfo()) {
1327         return connectionHeader.getVersionInfo();
1328       }
1329       return null;
1330     }
1331 
1332     /* Return true if the connection has no outstanding rpc */
1333     private boolean isIdle() {
1334       return rpcCount.get() == 0;
1335     }
1336 
1337     /* Decrement the outstanding RPC count */
1338     protected void decRpcCount() {
1339       rpcCount.decrement();
1340     }
1341 
1342     /* Increment the outstanding RPC count */
1343     protected void incRpcCount() {
1344       rpcCount.increment();
1345     }
1346 
1347     protected boolean timedOut(long currentTime) {
1348       return isIdle() && currentTime - lastContact > maxIdleTime;
1349     }
1350 
1351     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1352         throws IOException {
1353       UserGroupInformation authorizedUgi;
1354       if (authMethod == AuthMethod.DIGEST) {
1355         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1356             secretManager);
1357         authorizedUgi = tokenId.getUser();
1358         if (authorizedUgi == null) {
1359           throw new AccessDeniedException(
1360               "Can't retrieve username from tokenIdentifier.");
1361         }
1362         authorizedUgi.addTokenIdentifier(tokenId);
1363       } else {
1364         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1365       }
1366       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1367       return authorizedUgi;
1368     }
1369 
1370     private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1371         InterruptedException {
1372       if (saslContextEstablished) {
1373         if (LOG.isTraceEnabled())
1374           LOG.trace("Have read input token of size " + saslToken.limit()
1375               + " for processing by saslServer.unwrap()");
1376 
1377         if (!useWrap) {
1378           processOneRpc(saslToken);
1379         } else {
1380           byte[] b = saslToken.array();
1381           byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1382           processUnwrappedData(plaintextData);
1383         }
1384       } else {
1385         byte[] replyToken;
1386         try {
1387           if (saslServer == null) {
1388             switch (authMethod) {
1389             case DIGEST:
1390               if (secretManager == null) {
1391                 throw new AccessDeniedException(
1392                     "Server is not configured to do DIGEST authentication.");
1393               }
1394               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1395                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1396                   HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
1397                       secretManager, this));
1398               break;
1399             default:
1400               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1401               String fullName = current.getUserName();
1402               if (LOG.isDebugEnabled()) {
1403                 LOG.debug("Kerberos principal name is " + fullName);
1404               }
1405               final String names[] = SaslUtil.splitKerberosName(fullName);
1406               if (names.length != 3) {
1407                 throw new AccessDeniedException(
1408                     "Kerberos principal name does NOT have the expected "
1409                         + "hostname part: " + fullName);
1410               }
1411               current.doAs(new PrivilegedExceptionAction<Object>() {
1412                 @Override
1413                 public Object run() throws SaslException {
1414                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1415                       .getMechanismName(), names[0], names[1],
1416                       HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
1417                   return null;
1418                 }
1419               });
1420             }
1421             if (saslServer == null)
1422               throw new AccessDeniedException(
1423                   "Unable to find SASL server implementation for "
1424                       + authMethod.getMechanismName());
1425             if (LOG.isDebugEnabled()) {
1426               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1427             }
1428           }
1429           if (LOG.isDebugEnabled()) {
1430             LOG.debug("Have read input token of size " + saslToken.limit()
1431                 + " for processing by saslServer.evaluateResponse()");
1432           }
1433           replyToken = saslServer.evaluateResponse(saslToken.array());
1434         } catch (IOException e) {
1435           IOException sendToClient = e;
1436           Throwable cause = e;
1437           while (cause != null) {
1438             if (cause instanceof InvalidToken) {
1439               sendToClient = (InvalidToken) cause;
1440               break;
1441             }
1442             cause = cause.getCause();
1443           }
1444           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1445             sendToClient.getLocalizedMessage());
1446           metrics.authenticationFailure();
1447           String clientIP = this.toString();
1448           // attempting user could be null
1449           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1450           throw e;
1451         }
1452         if (replyToken != null) {
1453           if (LOG.isDebugEnabled()) {
1454             LOG.debug("Will send token of size " + replyToken.length
1455                 + " from saslServer.");
1456           }
1457           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1458               null);
1459         }
1460         if (saslServer.isComplete()) {
1461           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1462           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1463           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1464           if (LOG.isDebugEnabled()) {
1465             LOG.debug("SASL server context established. Authenticated client: "
1466               + ugi + ". Negotiated QoP is "
1467               + saslServer.getNegotiatedProperty(Sasl.QOP));
1468           }
1469           metrics.authenticationSuccess();
1470           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1471           saslContextEstablished = true;
1472         }
1473       }
1474     }
1475 
1476     /**
1477      * No protobuf encoding of raw sasl messages
1478      */
1479     private void doRawSaslReply(SaslStatus status, Writable rv,
1480         String errorClass, String error) throws IOException {
1481       ByteBufferOutputStream saslResponse = null;
1482       DataOutputStream out = null;
1483       try {
1484         // In my testing, have noticed that sasl messages are usually
1485         // in the ballpark of 100-200. That's why the initial capacity is 256.
1486         saslResponse = new ByteBufferOutputStream(256);
1487         out = new DataOutputStream(saslResponse);
1488         out.writeInt(status.state); // write status
1489         if (status == SaslStatus.SUCCESS) {
1490           rv.write(out);
1491         } else {
1492           WritableUtils.writeString(out, errorClass);
1493           WritableUtils.writeString(out, error);
1494         }
1495         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1496         saslCall.responder = responder;
1497         saslCall.sendResponseIfReady();
1498       } finally {
1499         if (saslResponse != null) {
1500           saslResponse.close();
1501         }
1502         if (out != null) {
1503           out.close();
1504         }
1505       }
1506     }
1507 
1508     private void disposeSasl() {
1509       if (saslServer != null) {
1510         try {
1511           saslServer.dispose();
1512           saslServer = null;
1513         } catch (SaslException ignored) {
1514           // Ignored. This is being disposed of anyway.
1515         }
1516       }
1517     }
1518 
1519     private int readPreamble() throws IOException {
1520       int count;
1521       // Check for 'HBas' magic.
1522       this.dataLengthBuffer.flip();
1523       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1524         return doBadPreambleHandling("Expected HEADER=" +
1525             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1526             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1527             " from " + toString());
1528       }
1529       // Now read the next two bytes, the version and the auth to use.
1530       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1531       count = channelRead(channel, versionAndAuthBytes);
1532       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1533         return count;
1534       }
1535       int version = versionAndAuthBytes.get(0);
1536       byte authbyte = versionAndAuthBytes.get(1);
1537       this.authMethod = AuthMethod.valueOf(authbyte);
1538       if (version != CURRENT_VERSION) {
1539         String msg = getFatalConnectionString(version, authbyte);
1540         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1541       }
1542       if (authMethod == null) {
1543         String msg = getFatalConnectionString(version, authbyte);
1544         return doBadPreambleHandling(msg, new BadAuthException(msg));
1545       }
1546       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1547         if (allowFallbackToSimpleAuth) {
1548           metrics.authenticationFallback();
1549           authenticatedWithFallback = true;
1550         } else {
1551           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1552           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1553           responder.doRespond(authFailedCall);
1554           throw ae;
1555         }
1556       }
1557       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1558         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1559             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1560         authMethod = AuthMethod.SIMPLE;
1561         // client has already sent the initial Sasl message and we
1562         // should ignore it. Both client and server should fall back
1563         // to simple auth from now on.
1564         skipInitialSaslHandshake = true;
1565       }
1566       if (authMethod != AuthMethod.SIMPLE) {
1567         useSasl = true;
1568       }
1569 
1570       dataLengthBuffer.clear();
1571       connectionPreambleRead = true;
1572       return count;
1573     }
1574 
1575     private int read4Bytes() throws IOException {
1576       if (this.dataLengthBuffer.remaining() > 0) {
1577         return channelRead(channel, this.dataLengthBuffer);
1578       } else {
1579         return 0;
1580       }
1581     }
1582 
1583 
1584     /**
1585      * Read off the wire. If there is not enough data to read, update the connection state with
1586      *  what we have and returns.
1587      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1588      * @throws IOException
1589      * @throws InterruptedException
1590      */
1591     public int readAndProcess() throws IOException, InterruptedException {
1592       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1593       // does, read in the rest of the connection preamble, the version and the auth method.
1594       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1595       // length into the 4-byte this.dataLengthBuffer.
1596       int count = read4Bytes();
1597       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1598         return count;
1599       }
1600 
1601       // If we have not read the connection setup preamble, look to see if that is on the wire.
1602       if (!connectionPreambleRead) {
1603         count = readPreamble();
1604         if (!connectionPreambleRead) {
1605           return count;
1606         }
1607 
1608         count = read4Bytes();
1609         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1610           return count;
1611         }
1612       }
1613 
1614       // We have read a length and we have read the preamble.  It is either the connection header
1615       // or it is a request.
1616       if (data == null) {
1617         dataLengthBuffer.flip();
1618         int dataLength = dataLengthBuffer.getInt();
1619         if (dataLength == RpcClient.PING_CALL_ID) {
1620           if (!useWrap) { //covers the !useSasl too
1621             dataLengthBuffer.clear();
1622             return 0;  //ping message
1623           }
1624         }
1625         if (dataLength < 0) { // A data length of zero is legal.
1626           throw new IllegalArgumentException("Unexpected data length "
1627               + dataLength + "!! from " + getHostAddress());
1628         }
1629 
1630        // TODO: check dataLength against some limit so that the client cannot OOM the server
1631         data = ByteBuffer.allocate(dataLength);
1632 
1633         // Increment the rpc count. This counter will be decreased when we write
1634         //  the response.  If we want the connection to be detected as idle properly, we
1635         //  need to keep the inc / dec correct.
1636         incRpcCount();
1637       }
1638 
1639       count = channelRead(channel, data);
1640 
1641       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1642         process();
1643       }
1644 
1645       return count;
1646     }
1647 
1648     /**
1649      * Process the data buffer and clean the connection state for the next call.
1650      */
1651     private void process() throws IOException, InterruptedException {
1652       data.flip();
1653       try {
1654         if (skipInitialSaslHandshake) {
1655           skipInitialSaslHandshake = false;
1656           return;
1657         }
1658 
1659         if (useSasl) {
1660           saslReadAndProcess(data);
1661         } else {
1662           processOneRpc(data);
1663         }
1664 
1665       } finally {
1666         dataLengthBuffer.clear(); // Clean for the next call
1667         data = null; // For the GC
1668       }
1669     }
1670 
1671     private String getFatalConnectionString(final int version, final byte authByte) {
1672       return "serverVersion=" + CURRENT_VERSION +
1673       ", clientVersion=" + version + ", authMethod=" + authByte +
1674       ", authSupported=" + (authMethod != null) + " from " + toString();
1675     }
1676 
1677     private int doBadPreambleHandling(final String msg) throws IOException {
1678       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1679     }
1680 
1681     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1682       LOG.warn(msg);
1683       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1684       setupResponse(null, fakeCall, e, msg);
1685       responder.doRespond(fakeCall);
1686       // Returning -1 closes out the connection.
1687       return -1;
1688     }
1689 
1690     // Reads the connection header following version
1691     private void processConnectionHeader(ByteBuffer buf) throws IOException {
1692       this.connectionHeader = ConnectionHeader.parseFrom(
1693         new ByteBufferInputStream(buf));
1694       String serviceName = connectionHeader.getServiceName();
1695       if (serviceName == null) throw new EmptyServiceNameException();
1696       this.service = getService(services, serviceName);
1697       if (this.service == null) throw new UnknownServiceException(serviceName);
1698       setupCellBlockCodecs(this.connectionHeader);
1699       UserGroupInformation protocolUser = createUser(connectionHeader);
1700       if (!useSasl) {
1701         ugi = protocolUser;
1702         if (ugi != null) {
1703           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1704         }
1705         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1706         if (authenticatedWithFallback) {
1707           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1708               + " connecting from " + getHostAddress());
1709         }
1710         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1711       } else {
1712         // user is authenticated
1713         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1714         //Now we check if this is a proxy user case. If the protocol user is
1715         //different from the 'user', it is a proxy user scenario. However,
1716         //this is not allowed if user authenticated with DIGEST.
1717         if ((protocolUser != null)
1718             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1719           if (authMethod == AuthMethod.DIGEST) {
1720             // Not allowed to doAs if token authentication is used
1721             throw new AccessDeniedException("Authenticated user (" + ugi
1722                 + ") doesn't match what the client claims to be ("
1723                 + protocolUser + ")");
1724           } else {
1725             // Effective user can be different from authenticated user
1726             // for simple auth or kerberos auth
1727             // The user is the real user. Now we create a proxy user
1728             UserGroupInformation realUser = ugi;
1729             ugi = UserGroupInformation.createProxyUser(protocolUser
1730                 .getUserName(), realUser);
1731             // Now the user is a proxy user, set Authentication method Proxy.
1732             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1733           }
1734         }
1735       }
1736       if (connectionHeader.hasVersionInfo()) {
1737         // see if this connection will support RetryImmediatelyException
1738         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1739 
1740         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1741             + " with version info: "
1742             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1743       } else {
1744         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1745             + " with unknown version info");
1746       }
1747 
1748 
1749     }
1750 
1751     /**
1752      * Set up cell block codecs
1753      * @throws FatalConnectionException
1754      */
1755     private void setupCellBlockCodecs(final ConnectionHeader header)
1756     throws FatalConnectionException {
1757       // TODO: Plug in other supported decoders.
1758       if (!header.hasCellBlockCodecClass()) return;
1759       String className = header.getCellBlockCodecClass();
1760       if (className == null || className.length() == 0) return;
1761       try {
1762         this.codec = (Codec)Class.forName(className).newInstance();
1763       } catch (Exception e) {
1764         throw new UnsupportedCellCodecException(className, e);
1765       }
1766       if (!header.hasCellBlockCompressorClass()) return;
1767       className = header.getCellBlockCompressorClass();
1768       try {
1769         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1770       } catch (Exception e) {
1771         throw new UnsupportedCompressionCodecException(className, e);
1772       }
1773     }
1774 
1775     private void processUnwrappedData(byte[] inBuf) throws IOException,
1776     InterruptedException {
1777       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1778       // Read all RPCs contained in the inBuf, even partial ones
1779       while (true) {
1780         int count;
1781         if (unwrappedDataLengthBuffer.remaining() > 0) {
1782           count = channelRead(ch, unwrappedDataLengthBuffer);
1783           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1784             return;
1785         }
1786 
1787         if (unwrappedData == null) {
1788           unwrappedDataLengthBuffer.flip();
1789           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1790 
1791           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1792             if (LOG.isDebugEnabled())
1793               LOG.debug("Received ping message");
1794             unwrappedDataLengthBuffer.clear();
1795             continue; // ping message
1796           }
1797           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1798         }
1799 
1800         count = channelRead(ch, unwrappedData);
1801         if (count <= 0 || unwrappedData.remaining() > 0)
1802           return;
1803 
1804         if (unwrappedData.remaining() == 0) {
1805           unwrappedDataLengthBuffer.clear();
1806           unwrappedData.flip();
1807           processOneRpc(unwrappedData);
1808           unwrappedData = null;
1809         }
1810       }
1811     }
1812 
1813 
1814     private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1815       if (connectionHeaderRead) {
1816         processRequest(buf);
1817       } else {
1818         processConnectionHeader(buf);
1819         this.connectionHeaderRead = true;
1820         if (!authorizeConnection()) {
1821           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1822           // down the connection instead of trying to read non-existent retun.
1823           throw new AccessDeniedException("Connection from " + this + " for service " +
1824             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1825         }
1826         this.user = userProvider.create(this.ugi);
1827       }
1828     }
1829 
1830     /**
1831      * @param buf Has the request header and the request param and optionally encoded data buffer
1832      * all in this one array.
1833      * @throws IOException
1834      * @throws InterruptedException
1835      */
1836     protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1837       long totalRequestSize = buf.limit();
1838       int offset = 0;
1839       // Here we read in the header.  We avoid having pb
1840       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1841       CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1842       int headerSize = cis.readRawVarint32();
1843       offset = cis.getTotalBytesRead();
1844       Message.Builder builder = RequestHeader.newBuilder();
1845       ProtobufUtil.mergeFrom(builder, cis, headerSize);
1846       RequestHeader header = (RequestHeader) builder.build();
1847       offset += headerSize;
1848       int id = header.getCallId();
1849       if (LOG.isTraceEnabled()) {
1850         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1851           " totalRequestSize: " + totalRequestSize + " bytes");
1852       }
1853       // Enforcing the call queue size, this triggers a retry in the client
1854       // This is a bit late to be doing this check - we have already read in the total request.
1855       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1856         final Call callTooBig =
1857           new Call(id, this.service, null, null, null, null, this,
1858             responder, totalRequestSize, null, null);
1859         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1860         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1861         InetSocketAddress address = getListenerAddress();
1862         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1863             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1864                 ", is hbase.ipc.server.max.callqueue.size too small?");
1865         responder.doRespond(callTooBig);
1866         return;
1867       }
1868       MethodDescriptor md = null;
1869       Message param = null;
1870       CellScanner cellScanner = null;
1871       try {
1872         if (header.hasRequestParam() && header.getRequestParam()) {
1873           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1874           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1875           builder = this.service.getRequestPrototype(md).newBuilderForType();
1876           cis.resetSizeCounter();
1877           int paramSize = cis.readRawVarint32();
1878           offset += cis.getTotalBytesRead();
1879           if (builder != null) {
1880             ProtobufUtil.mergeFrom(builder, cis, paramSize);
1881             param = builder.build();
1882           }
1883           offset += paramSize;
1884         }
1885         if (header.hasCellBlockMeta()) {
1886           buf.position(offset);
1887           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
1888         }
1889       } catch (Throwable t) {
1890         InetSocketAddress address = getListenerAddress();
1891         String msg = (address != null ? address : "(channel closed)") +
1892             " is unable to read call parameter from client " + getHostAddress();
1893         LOG.warn(msg, t);
1894 
1895         metrics.exception(t);
1896 
1897         // probably the hbase hadoop version does not match the running hadoop version
1898         if (t instanceof LinkageError) {
1899           t = new DoNotRetryIOException(t);
1900         }
1901         // If the method is not present on the server, do not retry.
1902         if (t instanceof UnsupportedOperationException) {
1903           t = new DoNotRetryIOException(t);
1904         }
1905 
1906         final Call readParamsFailedCall =
1907           new Call(id, this.service, null, null, null, null, this,
1908             responder, totalRequestSize, null, null);
1909         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1910         setupResponse(responseBuffer, readParamsFailedCall, t,
1911           msg + "; " + t.getMessage());
1912         responder.doRespond(readParamsFailedCall);
1913         return;
1914       }
1915 
1916       TraceInfo traceInfo = header.hasTraceInfo()
1917           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1918           : null;
1919       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1920               totalRequestSize, traceInfo, this.addr);
1921 
1922       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1923         callQueueSize.add(-1 * call.getSize());
1924 
1925         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1926         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1927         InetSocketAddress address = getListenerAddress();
1928         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1929             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1930                 ", too many items queued ?");
1931         responder.doRespond(call);
1932       }
1933     }
1934 
1935     private boolean authorizeConnection() throws IOException {
1936       try {
1937         // If auth method is DIGEST, the token was obtained by the
1938         // real user for the effective user, therefore not required to
1939         // authorize real user. doAs is allowed only for simple or kerberos
1940         // authentication
1941         if (ugi != null && ugi.getRealUser() != null
1942             && (authMethod != AuthMethod.DIGEST)) {
1943           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1944         }
1945         authorize(ugi, connectionHeader, getHostInetAddress());
1946         metrics.authorizationSuccess();
1947       } catch (AuthorizationException ae) {
1948         if (LOG.isDebugEnabled()) {
1949           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1950         }
1951         metrics.authorizationFailure();
1952         setupResponse(authFailedResponse, authFailedCall,
1953           new AccessDeniedException(ae), ae.getMessage());
1954         responder.doRespond(authFailedCall);
1955         return false;
1956       }
1957       return true;
1958     }
1959 
1960     protected synchronized void close() {
1961       disposeSasl();
1962       data = null;
1963       if (!channel.isOpen())
1964         return;
1965       try {socket.shutdownOutput();} catch(Exception ignored) {
1966         if (LOG.isTraceEnabled()) {
1967           LOG.trace("Ignored exception", ignored);
1968         }
1969       }
1970       if (channel.isOpen()) {
1971         try {channel.close();} catch(Exception ignored) {}
1972       }
1973       try {
1974         socket.close();
1975       } catch(Exception ignored) {
1976         if (LOG.isTraceEnabled()) {
1977           LOG.trace("Ignored exception", ignored);
1978         }
1979       }
1980     }
1981 
1982     private UserGroupInformation createUser(ConnectionHeader head) {
1983       UserGroupInformation ugi = null;
1984 
1985       if (!head.hasUserInfo()) {
1986         return null;
1987       }
1988       UserInformation userInfoProto = head.getUserInfo();
1989       String effectiveUser = null;
1990       if (userInfoProto.hasEffectiveUser()) {
1991         effectiveUser = userInfoProto.getEffectiveUser();
1992       }
1993       String realUser = null;
1994       if (userInfoProto.hasRealUser()) {
1995         realUser = userInfoProto.getRealUser();
1996       }
1997       if (effectiveUser != null) {
1998         if (realUser != null) {
1999           UserGroupInformation realUserUgi =
2000               UserGroupInformation.createRemoteUser(realUser);
2001           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
2002         } else {
2003           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
2004         }
2005       }
2006       return ugi;
2007     }
2008   }
2009 
2010   /**
2011    * Datastructure for passing a {@link BlockingService} and its associated class of
2012    * protobuf service interface.  For example, a server that fielded what is defined
2013    * in the client protobuf service would pass in an implementation of the client blocking service
2014    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
2015    */
2016   public static class BlockingServiceAndInterface {
2017     private final BlockingService service;
2018     private final Class<?> serviceInterface;
2019     public BlockingServiceAndInterface(final BlockingService service,
2020         final Class<?> serviceInterface) {
2021       this.service = service;
2022       this.serviceInterface = serviceInterface;
2023     }
2024     public Class<?> getServiceInterface() {
2025       return this.serviceInterface;
2026     }
2027     public BlockingService getBlockingService() {
2028       return this.service;
2029     }
2030   }
2031 
2032   /**
2033    * Constructs a server listening on the named port and address.
2034    * @param server hosting instance of {@link Server}. We will do authentications if an
2035    * instance else pass null for no authentication check.
2036    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2037    * @param services A list of services.
2038    * @param bindAddress Where to listen
2039    * @param conf
2040    * @param scheduler
2041    */
2042   public RpcServer(final Server server, final String name,
2043       final List<BlockingServiceAndInterface> services,
2044       final InetSocketAddress bindAddress, Configuration conf,
2045       RpcScheduler scheduler)
2046       throws IOException {
2047     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2048       this.reservoir = new BoundedByteBufferPool(
2049           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2050           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2051           // Make the max twice the number of handlers to be safe.
2052           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2053               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2054                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2055     } else {
2056       reservoir = null;
2057     }
2058     this.server = server;
2059     this.services = services;
2060     this.bindAddress = bindAddress;
2061     this.conf = conf;
2062     this.socketSendBufferSize = 0;
2063     this.maxQueueSize =
2064       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2065     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2066     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2067     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2068     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2069     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2070       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2071     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2072     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2073 
2074     // Start the listener here and let it bind to the port
2075     listener = new Listener(name);
2076     this.port = listener.getAddress().getPort();
2077 
2078     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2079     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2080     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2081 
2082     this.ipcUtil = new IPCUtil(conf);
2083 
2084 
2085     // Create the responder here
2086     responder = new Responder();
2087     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2088     this.userProvider = UserProvider.instantiate(conf);
2089     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2090     if (isSecurityEnabled) {
2091       HBaseSaslRpcServer.init(conf);
2092     }
2093     initReconfigurable(conf);
2094 
2095     this.scheduler = scheduler;
2096     this.scheduler.init(new RpcSchedulerContext(this));
2097   }
2098 
2099   @Override
2100   public void onConfigurationChange(Configuration newConf) {
2101     initReconfigurable(newConf);
2102   }
2103 
2104   private void initReconfigurable(Configuration confToLoad) {
2105     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2106     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2107       LOG.warn("********* WARNING! *********");
2108       LOG.warn("This server is configured to allow connections from INSECURE clients");
2109       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2110       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2111       LOG.warn("impersonation is possible!");
2112       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2113       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2114       LOG.warn("****************************");
2115     }
2116   }
2117 
2118   /**
2119    * Subclasses of HBaseServer can override this to provide their own
2120    * Connection implementations.
2121    */
2122   protected Connection getConnection(SocketChannel channel, long time) {
2123     return new Connection(channel, time);
2124   }
2125 
2126   /**
2127    * Setup response for the RPC Call.
2128    *
2129    * @param response buffer to serialize the response into
2130    * @param call {@link Call} to which we are setting up the response
2131    * @param error error message, if the call failed
2132    * @throws IOException
2133    */
2134   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2135   throws IOException {
2136     if (response != null) response.reset();
2137     call.setResponse(null, null, t, error);
2138   }
2139 
2140   protected void closeConnection(Connection connection) {
2141     synchronized (connectionList) {
2142       if (connectionList.remove(connection)) {
2143         numConnections--;
2144       }
2145     }
2146     connection.close();
2147   }
2148 
2149   Configuration getConf() {
2150     return conf;
2151   }
2152 
2153   /** Sets the socket buffer size used for responding to RPCs.
2154    * @param size send size
2155    */
2156   @Override
2157   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2158 
2159   @Override
2160   public boolean isStarted() {
2161     return this.started;
2162   }
2163 
2164   /** Starts the service.  Must be called before any calls will be handled. */
2165   @Override
2166   public synchronized void start() {
2167     if (started) return;
2168     authTokenSecretMgr = createSecretManager();
2169     if (authTokenSecretMgr != null) {
2170       setSecretManager(authTokenSecretMgr);
2171       authTokenSecretMgr.start();
2172     }
2173     this.authManager = new ServiceAuthorizationManager();
2174     HBasePolicyProvider.init(conf, authManager);
2175     responder.start();
2176     listener.start();
2177     scheduler.start();
2178     started = true;
2179   }
2180 
2181   @Override
2182   public synchronized void refreshAuthManager(PolicyProvider pp) {
2183     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2184     // it'll break if you go via static route.
2185     this.authManager.refresh(this.conf, pp);
2186   }
2187 
2188   private AuthenticationTokenSecretManager createSecretManager() {
2189     if (!isSecurityEnabled) return null;
2190     if (server == null) return null;
2191     Configuration conf = server.getConfiguration();
2192     long keyUpdateInterval =
2193         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2194     long maxAge =
2195         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2196     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2197         server.getServerName().toString(), keyUpdateInterval, maxAge);
2198   }
2199 
2200   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2201     return this.secretManager;
2202   }
2203 
2204   @SuppressWarnings("unchecked")
2205   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2206     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2207   }
2208 
2209   /**
2210    * This is a server side method, which is invoked over RPC. On success
2211    * the return response has protobuf response payload. On failure, the
2212    * exception name and the stack trace are returned in the protobuf response.
2213    */
2214   @Override
2215   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2216       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2217   throws IOException {
2218     try {
2219       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2220       // TODO: Review after we add in encoded data blocks.
2221       status.setRPCPacket(param);
2222       status.resume("Servicing call");
2223       //get an instance of the method arg type
2224       long startTime = System.currentTimeMillis();
2225       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2226       Message result = service.callBlockingMethod(md, controller, param);
2227       long endTime = System.currentTimeMillis();
2228       int processingTime = (int) (endTime - startTime);
2229       int qTime = (int) (startTime - receiveTime);
2230       int totalTime = (int) (endTime - receiveTime);
2231       if (LOG.isTraceEnabled()) {
2232         LOG.trace(CurCall.get().toString() +
2233             ", response " + TextFormat.shortDebugString(result) +
2234             " queueTime: " + qTime +
2235             " processingTime: " + processingTime +
2236             " totalTime: " + totalTime);
2237       }
2238       long requestSize = param.getSerializedSize();
2239       long responseSize = result.getSerializedSize();
2240       metrics.dequeuedCall(qTime);
2241       metrics.processedCall(processingTime);
2242       metrics.totalCall(totalTime);
2243       metrics.receivedRequest(requestSize);
2244       metrics.sentResponse(responseSize);
2245       // log any RPC responses that are slower than the configured warn
2246       // response time or larger than configured warning size
2247       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2248       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2249       if (tooSlow || tooLarge) {
2250         // when tagging, we let TooLarge trump TooSmall to keep output simple
2251         // note that large responses will often also be slow.
2252         logResponse(new Object[]{param},
2253             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2254             (tooLarge ? "TooLarge" : "TooSlow"),
2255             status.getClient(), startTime, processingTime, qTime,
2256             responseSize);
2257       }
2258       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2259     } catch (Throwable e) {
2260       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2261       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2262       // need to pass it over the wire.
2263       if (e instanceof ServiceException) e = e.getCause();
2264 
2265       // increment the number of requests that were exceptions.
2266       metrics.exception(e);
2267 
2268       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2269       if (e instanceof IOException) throw (IOException)e;
2270       LOG.error("Unexpected throwable object ", e);
2271       throw new IOException(e.getMessage(), e);
2272     }
2273   }
2274 
2275   /**
2276    * Logs an RPC response to the LOG file, producing valid JSON objects for
2277    * client Operations.
2278    * @param params The parameters received in the call.
2279    * @param methodName The name of the method invoked
2280    * @param call The string representation of the call
2281    * @param tag  The tag that will be used to indicate this event in the log.
2282    * @param clientAddress   The address of the client who made this call.
2283    * @param startTime       The time that the call was initiated, in ms.
2284    * @param processingTime  The duration that the call took to run, in ms.
2285    * @param qTime           The duration that the call spent on the queue
2286    *                        prior to being initiated, in ms.
2287    * @param responseSize    The size in bytes of the response buffer.
2288    */
2289   void logResponse(Object[] params, String methodName, String call, String tag,
2290       String clientAddress, long startTime, int processingTime, int qTime,
2291       long responseSize)
2292           throws IOException {
2293     // base information that is reported regardless of type of call
2294     Map<String, Object> responseInfo = new HashMap<String, Object>();
2295     responseInfo.put("starttimems", startTime);
2296     responseInfo.put("processingtimems", processingTime);
2297     responseInfo.put("queuetimems", qTime);
2298     responseInfo.put("responsesize", responseSize);
2299     responseInfo.put("client", clientAddress);
2300     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2301     responseInfo.put("method", methodName);
2302     if (params.length == 2 && server instanceof HRegionServer &&
2303         params[0] instanceof byte[] &&
2304         params[1] instanceof Operation) {
2305       // if the slow process is a query, we want to log its table as well
2306       // as its own fingerprint
2307       TableName tableName = TableName.valueOf(
2308           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2309       responseInfo.put("table", tableName.getNameAsString());
2310       // annotate the response map with operation details
2311       responseInfo.putAll(((Operation) params[1]).toMap());
2312       // report to the log file
2313       LOG.warn("(operation" + tag + "): " +
2314                MAPPER.writeValueAsString(responseInfo));
2315     } else if (params.length == 1 && server instanceof HRegionServer &&
2316         params[0] instanceof Operation) {
2317       // annotate the response map with operation details
2318       responseInfo.putAll(((Operation) params[0]).toMap());
2319       // report to the log file
2320       LOG.warn("(operation" + tag + "): " +
2321                MAPPER.writeValueAsString(responseInfo));
2322     } else {
2323       // can't get JSON details, so just report call.toString() along with
2324       // a more generic tag.
2325       responseInfo.put("call", call);
2326       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2327     }
2328   }
2329 
2330   /** Stops the service.  No new calls will be handled after this is called. */
2331   @Override
2332   public synchronized void stop() {
2333     LOG.info("Stopping server on " + port);
2334     running = false;
2335     if (authTokenSecretMgr != null) {
2336       authTokenSecretMgr.stop();
2337       authTokenSecretMgr = null;
2338     }
2339     listener.interrupt();
2340     listener.doStop();
2341     responder.interrupt();
2342     scheduler.stop();
2343     notifyAll();
2344   }
2345 
2346   /** Wait for the server to be stopped.
2347    * Does not wait for all subthreads to finish.
2348    *  See {@link #stop()}.
2349    * @throws InterruptedException e
2350    */
2351   @Override
2352   public synchronized void join() throws InterruptedException {
2353     while (running) {
2354       wait();
2355     }
2356   }
2357 
2358   /**
2359    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2360    * the listener channel is closed.
2361    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2362    * information cannot be determined
2363    */
2364   @Override
2365   public synchronized InetSocketAddress getListenerAddress() {
2366     if (listener == null) {
2367       return null;
2368     }
2369     return listener.getAddress();
2370   }
2371 
2372   /**
2373    * Set the handler for calling out of RPC for error conditions.
2374    * @param handler the handler implementation
2375    */
2376   @Override
2377   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2378     this.errorHandler = handler;
2379   }
2380 
2381   @Override
2382   public HBaseRPCErrorHandler getErrorHandler() {
2383     return this.errorHandler;
2384   }
2385 
2386   /**
2387    * Returns the metrics instance for reporting RPC call statistics
2388    */
2389   @Override
2390   public MetricsHBaseServer getMetrics() {
2391     return metrics;
2392   }
2393 
2394   @Override
2395   public void addCallSize(final long diff) {
2396     this.callQueueSize.add(diff);
2397   }
2398 
2399   /**
2400    * Authorize the incoming client connection.
2401    *
2402    * @param user client user
2403    * @param connection incoming connection
2404    * @param addr InetAddress of incoming connection
2405    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2406    *         when the client isn't authorized to talk the protocol
2407    */
2408   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2409       InetAddress addr)
2410   throws AuthorizationException {
2411     if (authorize) {
2412       Class<?> c = getServiceInterface(services, connection.getServiceName());
2413       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2414     }
2415   }
2416 
2417   /**
2418    * When the read or write buffer size is larger than this limit, i/o will be
2419    * done in chunks of this size. Most RPC requests and responses would be
2420    * be smaller.
2421    */
2422   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2423 
2424   /**
2425    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2426    * If the amount of data is large, it writes to channel in smaller chunks.
2427    * This is to avoid jdk from creating many direct buffers as the size of
2428    * buffer increases. This also minimizes extra copies in NIO layer
2429    * as a result of multiple write operations required to write a large
2430    * buffer.
2431    *
2432    * @param channel writable byte channel to write to
2433    * @param bufferChain Chain of buffers to write
2434    * @return number of bytes written
2435    * @throws java.io.IOException e
2436    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2437    */
2438   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2439   throws IOException {
2440     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2441     if (count > 0) this.metrics.sentBytes(count);
2442     return count;
2443   }
2444 
2445   /**
2446    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2447    * If the amount of data is large, it writes to channel in smaller chunks.
2448    * This is to avoid jdk from creating many direct buffers as the size of
2449    * ByteBuffer increases. There should not be any performance degredation.
2450    *
2451    * @param channel writable byte channel to write on
2452    * @param buffer buffer to write
2453    * @return number of bytes written
2454    * @throws java.io.IOException e
2455    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2456    */
2457   protected int channelRead(ReadableByteChannel channel,
2458                                    ByteBuffer buffer) throws IOException {
2459 
2460     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2461            channel.read(buffer) : channelIO(channel, null, buffer);
2462     if (count > 0) {
2463       metrics.receivedBytes(count);
2464     }
2465     return count;
2466   }
2467 
2468   /**
2469    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2470    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2471    * one of readCh or writeCh should be non-null.
2472    *
2473    * @param readCh read channel
2474    * @param writeCh write channel
2475    * @param buf buffer to read or write into/out of
2476    * @return bytes written
2477    * @throws java.io.IOException e
2478    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2479    * @see #channelWrite(GatheringByteChannel, BufferChain)
2480    */
2481   private static int channelIO(ReadableByteChannel readCh,
2482                                WritableByteChannel writeCh,
2483                                ByteBuffer buf) throws IOException {
2484 
2485     int originalLimit = buf.limit();
2486     int initialRemaining = buf.remaining();
2487     int ret = 0;
2488 
2489     while (buf.remaining() > 0) {
2490       try {
2491         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2492         buf.limit(buf.position() + ioSize);
2493 
2494         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2495 
2496         if (ret < ioSize) {
2497           break;
2498         }
2499 
2500       } finally {
2501         buf.limit(originalLimit);
2502       }
2503     }
2504 
2505     int nBytes = initialRemaining - buf.remaining();
2506     return (nBytes > 0) ? nBytes : ret;
2507   }
2508 
2509   /**
2510    * Needed for features such as delayed calls.  We need to be able to store the current call
2511    * so that we can complete it later or ask questions of what is supported by the current ongoing
2512    * call.
2513    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2514    */
2515   public static RpcCallContext getCurrentCall() {
2516     return CurCall.get();
2517   }
2518 
2519   public static boolean isInRpcCallContext() {
2520     return CurCall.get() != null;
2521   }
2522 
2523   /**
2524    * Returns the user credentials associated with the current RPC request or
2525    * <code>null</code> if no credentials were provided.
2526    * @return A User
2527    */
2528   public static User getRequestUser() {
2529     RpcCallContext ctx = getCurrentCall();
2530     return ctx == null? null: ctx.getRequestUser();
2531   }
2532 
2533   /**
2534    * Returns the username for any user associated with the current RPC
2535    * request or <code>null</code> if no user is set.
2536    */
2537   public static String getRequestUserName() {
2538     User user = getRequestUser();
2539     return user == null? null: user.getShortName();
2540   }
2541 
2542   /**
2543    * @return Address of remote client if a request is ongoing, else null
2544    */
2545   public static InetAddress getRemoteAddress() {
2546     RpcCallContext ctx = getCurrentCall();
2547     return ctx == null? null: ctx.getRemoteAddress();
2548   }
2549 
2550   /**
2551    * @param serviceName Some arbitrary string that represents a 'service'.
2552    * @param services Available service instances
2553    * @return Matching BlockingServiceAndInterface pair
2554    */
2555   static BlockingServiceAndInterface getServiceAndInterface(
2556       final List<BlockingServiceAndInterface> services, final String serviceName) {
2557     for (BlockingServiceAndInterface bs : services) {
2558       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2559         return bs;
2560       }
2561     }
2562     return null;
2563   }
2564 
2565   /**
2566    * @param serviceName Some arbitrary string that represents a 'service'.
2567    * @param services Available services and their service interfaces.
2568    * @return Service interface class for <code>serviceName</code>
2569    */
2570   static Class<?> getServiceInterface(
2571       final List<BlockingServiceAndInterface> services,
2572       final String serviceName) {
2573     BlockingServiceAndInterface bsasi =
2574         getServiceAndInterface(services, serviceName);
2575     return bsasi == null? null: bsasi.getServiceInterface();
2576   }
2577 
2578   /**
2579    * @param serviceName Some arbitrary string that represents a 'service'.
2580    * @param services Available services and their service interfaces.
2581    * @return BlockingService that goes with the passed <code>serviceName</code>
2582    */
2583   static BlockingService getService(
2584       final List<BlockingServiceAndInterface> services,
2585       final String serviceName) {
2586     BlockingServiceAndInterface bsasi =
2587         getServiceAndInterface(services, serviceName);
2588     return bsasi == null? null: bsasi.getBlockingService();
2589   }
2590 
2591   static MonitoredRPCHandler getStatus() {
2592     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2593     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2594     if (status != null) {
2595       return status;
2596     }
2597     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2598     status.pause("Waiting for a call");
2599     RpcServer.MONITORED_RPC.set(status);
2600     return status;
2601   }
2602 
2603   /** Returns the remote side ip address when invoked inside an RPC
2604    *  Returns null incase of an error.
2605    *  @return InetAddress
2606    */
2607   public static InetAddress getRemoteIp() {
2608     Call call = CurCall.get();
2609     if (call != null && call.connection != null && call.connection.socket != null) {
2610       return call.connection.socket.getInetAddress();
2611     }
2612     return null;
2613   }
2614 
2615 
2616   /**
2617    * A convenience method to bind to a given address and report
2618    * better exceptions if the address is not a valid host.
2619    * @param socket the socket to bind
2620    * @param address the address to bind to
2621    * @param backlog the number of connections allowed in the queue
2622    * @throws BindException if the address can't be bound
2623    * @throws UnknownHostException if the address isn't a valid host name
2624    * @throws IOException other random errors from bind
2625    */
2626   public static void bind(ServerSocket socket, InetSocketAddress address,
2627                           int backlog) throws IOException {
2628     try {
2629       socket.bind(address, backlog);
2630     } catch (BindException e) {
2631       BindException bindException =
2632         new BindException("Problem binding to " + address + " : " +
2633             e.getMessage());
2634       bindException.initCause(e);
2635       throw bindException;
2636     } catch (SocketException e) {
2637       // If they try to bind to a different host's address, give a better
2638       // error message.
2639       if ("Unresolved address".equals(e.getMessage())) {
2640         throw new UnknownHostException("Invalid hostname for server: " +
2641                                        address.getHostName());
2642       }
2643       throw e;
2644     }
2645   }
2646 
2647   @Override
2648   public RpcScheduler getScheduler() {
2649     return scheduler;
2650   }
2651 }