View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.locks.Lock;
61  import java.util.concurrent.locks.ReentrantLock;
62  
63  import javax.security.sasl.Sasl;
64  import javax.security.sasl.SaslException;
65  import javax.security.sasl.SaslServer;
66  
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  import org.apache.hadoop.conf.Configuration;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.CellScanner;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.HBaseIOException;
74  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
75  import org.apache.hadoop.hbase.HConstants;
76  import org.apache.hadoop.hbase.HRegionInfo;
77  import org.apache.hadoop.hbase.Server;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.classification.InterfaceStability;
81  import org.apache.hadoop.hbase.client.Operation;
82  import org.apache.hadoop.hbase.client.VersionInfoUtil;
83  import org.apache.hadoop.hbase.codec.Codec;
84  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
85  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
86  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
87  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
90  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
91  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
93  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
99  import org.apache.hadoop.hbase.regionserver.HRegionServer;
100 import org.apache.hadoop.hbase.security.AccessDeniedException;
101 import org.apache.hadoop.hbase.security.AuthMethod;
102 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
103 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
105 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
106 import org.apache.hadoop.hbase.security.SaslStatus;
107 import org.apache.hadoop.hbase.security.SaslUtil;
108 import org.apache.hadoop.hbase.security.User;
109 import org.apache.hadoop.hbase.security.UserProvider;
110 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
111 import org.apache.hadoop.hbase.util.Bytes;
112 import org.apache.hadoop.hbase.util.Counter;
113 import org.apache.hadoop.hbase.util.Pair;
114 import org.apache.hadoop.hbase.util.Threads;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.apache.htrace.TraceInfo;
131 import org.codehaus.jackson.map.ObjectMapper;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.CodedOutputStream;
137 import com.google.protobuf.Descriptors.MethodDescriptor;
138 import com.google.protobuf.Message;
139 import com.google.protobuf.ServiceException;
140 import com.google.protobuf.TextFormat;
141 
142 /**
143  * An RPC server that hosts protobuf described Services.
144  *
145  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
146  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
147  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
148  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
149  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
150  * and loops till done.
151  *
152  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
153  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
154  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
155  * and keep taking while the server is up.
156  *
157  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
158  * queue for Responder to pull from and return result to client.
159  *
160  * @see RpcClientImpl
161  */
162 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
163 @InterfaceStability.Evolving
164 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
165   // LOG is being used in CallRunner and the log level is being changed in tests
166   public static final Log LOG = LogFactory.getLog(RpcServer.class);
167   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
168       = new CallQueueTooBigException();
169 
170   private final boolean authorize;
171   private boolean isSecurityEnabled;
172 
173   public static final byte CURRENT_VERSION = 0;
174 
175   /**
176    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
177    */
178   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
179           "hbase.ipc.server.fallback-to-simple-auth-allowed";
180 
181   /**
182    * How many calls/handler are allowed in the queue.
183    */
184   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
185 
186   /**
187    * The maximum size that we can hold in the RPC queue
188    */
189   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
190 
191   private final IPCUtil ipcUtil;
192 
193   private static final String AUTH_FAILED_FOR = "Auth failed for ";
194   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
195   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
196     Server.class.getName());
197   protected SecretManager<TokenIdentifier> secretManager;
198   protected ServiceAuthorizationManager authManager;
199 
200   /** This is set to Call object before Handler invokes an RPC and ybdie
201    * after the call returns.
202    */
203   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
204 
205   /** Keeps MonitoredRPCHandler per handler thread. */
206   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
207       = new ThreadLocal<MonitoredRPCHandler>();
208 
209   protected final InetSocketAddress bindAddress;
210   protected int port;                             // port we listen on
211   protected InetSocketAddress address;            // inet address we listen on
212   private int readThreads;                        // number of read threads
213   protected int maxIdleTime;                      // the maximum idle time after
214                                                   // which a client may be
215                                                   // disconnected
216   protected int thresholdIdleConnections;         // the number of idle
217                                                   // connections after which we
218                                                   // will start cleaning up idle
219                                                   // connections
220   int maxConnectionsToNuke;                       // the max number of
221                                                   // connections to nuke
222                                                   // during a cleanup
223 
224   protected MetricsHBaseServer metrics;
225 
226   protected final Configuration conf;
227 
228   private int maxQueueSize;
229   protected int socketSendBufferSize;
230   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
231   protected final boolean tcpKeepAlive; // if T then use keepalives
232   protected final long purgeTimeout;    // in milliseconds
233 
234   /**
235    * This flag is used to indicate to sub threads when they should go down.  When we call
236    * {@link #start()}, all threads started will consult this flag on whether they should
237    * keep going.  It is set to false when {@link #stop()} is called.
238    */
239   volatile boolean running = true;
240 
241   /**
242    * This flag is set to true after all threads are up and 'running' and the server is then opened
243    * for business by the call to {@link #start()}.
244    */
245   volatile boolean started = false;
246 
247   /**
248    * This is a running count of the size of all outstanding calls by size.
249    */
250   protected final Counter callQueueSize = new Counter();
251 
252   protected final List<Connection> connectionList =
253     Collections.synchronizedList(new LinkedList<Connection>());
254   //maintain a list
255   //of client connections
256   private Listener listener = null;
257   protected Responder responder = null;
258   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
259   protected int numConnections = 0;
260 
261   protected HBaseRPCErrorHandler errorHandler = null;
262 
263   static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
264   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
265   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
266 
267   /** Default value for above params */
268   private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
269   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
270   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
271 
272   private static final ObjectMapper MAPPER = new ObjectMapper();
273 
274   private final int maxRequestSize;
275   private final int warnResponseTime;
276   private final int warnResponseSize;
277   private final Server server;
278   private final List<BlockingServiceAndInterface> services;
279 
280   private final RpcScheduler scheduler;
281 
282   private UserProvider userProvider;
283 
284   private final BoundedByteBufferPool reservoir;
285 
286   private volatile boolean allowFallbackToSimpleAuth;
287 
288   /**
289    * Datastructure that holds all necessary to a method invocation and then afterward, carries
290    * the result.
291    */
292   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
293   @InterfaceStability.Evolving
294   public class Call implements RpcCallContext {
295     protected int id;                             // the client's call id
296     protected BlockingService service;
297     protected MethodDescriptor md;
298     protected RequestHeader header;
299     protected Message param;                      // the parameter passed
300     // Optional cell data passed outside of protobufs.
301     protected CellScanner cellScanner;
302     protected Connection connection;              // connection to client
303     protected long timestamp;      // the time received when response is null
304                                    // the time served when response is not null
305     /**
306      * Chain of buffers to send as response.
307      */
308     protected BufferChain response;
309     protected Responder responder;
310 
311     protected long size;                          // size of current call
312     protected boolean isError;
313     protected TraceInfo tinfo;
314     private ByteBuffer cellBlock = null;
315 
316     private User user;
317     private InetAddress remoteAddress;
318     private RpcCallback callback;
319 
320     private long responseCellSize = 0;
321     private long responseBlockSize = 0;
322     private boolean retryImmediatelySupported;
323 
324     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
325         justification="Can't figure why this complaint is happening... see below")
326     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
327          Message param, CellScanner cellScanner, Connection connection, Responder responder,
328          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
329       this.id = id;
330       this.service = service;
331       this.md = md;
332       this.header = header;
333       this.param = param;
334       this.cellScanner = cellScanner;
335       this.connection = connection;
336       this.timestamp = System.currentTimeMillis();
337       this.response = null;
338       this.responder = responder;
339       this.isError = false;
340       this.size = size;
341       this.tinfo = tinfo;
342       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
343       this.remoteAddress = remoteAddress;
344       this.retryImmediatelySupported =
345           connection == null? null: connection.retryImmediatelySupported;
346     }
347 
348     /**
349      * Call is done. Execution happened and we returned results to client. It is now safe to
350      * cleanup.
351      */
352     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
353         justification="Presume the lock on processing request held by caller is protection enough")
354     void done() {
355       if (this.cellBlock != null && reservoir != null) {
356         // Return buffer to reservoir now we are done with it.
357         reservoir.putBuffer(this.cellBlock);
358         this.cellBlock = null;
359       }
360       this.connection.decRpcCount();  // Say that we're done with this call.
361     }
362 
363     @Override
364     public String toString() {
365       return toShortString() + " param: " +
366         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
367         " connection: " + connection.toString();
368     }
369 
370     protected RequestHeader getHeader() {
371       return this.header;
372     }
373 
374     public boolean hasPriority() {
375       return this.header.hasPriority();
376     }
377 
378     public int getPriority() {
379       return this.header.getPriority();
380     }
381 
382     /*
383      * Short string representation without param info because param itself could be huge depends on
384      * the payload of a command
385      */
386     String toShortString() {
387       String serviceName = this.connection.service != null ?
388           this.connection.service.getDescriptorForType().getName() : "null";
389       return "callId: " + this.id + " service: " + serviceName +
390           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
391           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
392           " connection: " + connection.toString();
393     }
394 
395     String toTraceString() {
396       String serviceName = this.connection.service != null ?
397                            this.connection.service.getDescriptorForType().getName() : "";
398       String methodName = (this.md != null) ? this.md.getName() : "";
399       return serviceName + "." + methodName;
400     }
401 
402     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
403       this.response = new BufferChain(response);
404     }
405 
406     protected synchronized void setResponse(Object m, final CellScanner cells,
407         Throwable t, String errorMsg) {
408       if (this.isError) return;
409       if (t != null) this.isError = true;
410       BufferChain bc = null;
411       try {
412         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
413         // Presume it a pb Message.  Could be null.
414         Message result = (Message)m;
415         // Call id.
416         headerBuilder.setCallId(this.id);
417         if (t != null) {
418           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
419           exceptionBuilder.setExceptionClassName(t.getClass().getName());
420           exceptionBuilder.setStackTrace(errorMsg);
421           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
422           if (t instanceof RegionMovedException) {
423             // Special casing for this exception.  This is only one carrying a payload.
424             // Do this instead of build a generic system for allowing exceptions carry
425             // any kind of payload.
426             RegionMovedException rme = (RegionMovedException)t;
427             exceptionBuilder.setHostname(rme.getHostname());
428             exceptionBuilder.setPort(rme.getPort());
429           }
430           // Set the exception as the result of the method invocation.
431           headerBuilder.setException(exceptionBuilder.build());
432         }
433         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
434         // reservoir when finished. This is hacky and the hack is not contained but benefits are
435         // high when we can avoid a big buffer allocation on each rpc.
436         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
437           this.connection.compressionCodec, cells, reservoir);
438         if (this.cellBlock != null) {
439           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
440           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
441           cellBlockBuilder.setLength(this.cellBlock.limit());
442           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
443         }
444         Message header = headerBuilder.build();
445 
446         byte[] b = createHeaderAndMessageBytes(result, header);
447 
448         bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
449 
450         if (connection.useWrap) {
451           bc = wrapWithSasl(bc);
452         }
453       } catch (IOException e) {
454         LOG.warn("Exception while creating response " + e);
455       }
456       this.response = bc;
457       // Once a response message is created and set to this.response, this Call can be treated as
458       // done. The Responder thread will do the n/w write of this message back to client.
459       if (this.callback != null) {
460         try {
461           this.callback.run();
462         } catch (Exception e) {
463           // Don't allow any exception here to kill this handler thread.
464           LOG.warn("Exception while running the Rpc Callback.", e);
465         }
466       }
467     }
468 
469     private byte[] createHeaderAndMessageBytes(Message result, Message header)
470         throws IOException {
471       // Organize the response as a set of bytebuffers rather than collect it all together inside
472       // one big byte array; save on allocations.
473       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
474           resultVintSize = 0;
475       if (header != null) {
476         headerSerializedSize = header.getSerializedSize();
477         headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
478       }
479       if (result != null) {
480         resultSerializedSize = result.getSerializedSize();
481         resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
482       }
483       // calculate the total size
484       int totalSize = headerSerializedSize + headerVintSize
485           + (resultSerializedSize + resultVintSize)
486           + (this.cellBlock == null ? 0 : this.cellBlock.limit());
487       // The byte[] should also hold the totalSize of the header, message and the cellblock
488       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
489           + resultVintSize + Bytes.SIZEOF_INT];
490       // The RpcClient expects the int to be in a format that code be decoded by
491       // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
492       // form of writing int.
493       Bytes.putInt(b, 0, totalSize);
494       CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
495           b.length - Bytes.SIZEOF_INT);
496       if (header != null) {
497         cos.writeMessageNoTag(header);
498       }
499       if (result != null) {
500         cos.writeMessageNoTag(result);
501       }
502       cos.flush();
503       cos.checkNoSpaceLeft();
504       return b;
505     }
506 
507     private BufferChain wrapWithSasl(BufferChain bc)
508         throws IOException {
509       if (!this.connection.useSasl) return bc;
510       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
511       // THIS IS A BIG UGLY COPY.
512       byte [] responseBytes = bc.getBytes();
513       byte [] token;
514       // synchronization may be needed since there can be multiple Handler
515       // threads using saslServer to wrap responses.
516       synchronized (connection.saslServer) {
517         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
518       }
519       if (LOG.isTraceEnabled()) {
520         LOG.trace("Adding saslServer wrapped token of size " + token.length
521             + " as call response.");
522       }
523 
524       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
525       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
526       return new BufferChain(bbTokenLength, bbTokenBytes);
527     }
528 
529     @Override
530     public boolean isClientCellBlockSupported() {
531       return this.connection != null && this.connection.codec != null;
532     }
533 
534     @Override
535     public long disconnectSince() {
536       if (!connection.channel.isOpen()) {
537         return System.currentTimeMillis() - timestamp;
538       } else {
539         return -1L;
540       }
541     }
542 
543     public long getSize() {
544       return this.size;
545     }
546 
547     @Override
548     public long getResponseCellSize() {
549       return responseCellSize;
550     }
551 
552     @Override
553     public void incrementResponseCellSize(long cellSize) {
554       responseCellSize += cellSize;
555     }
556 
557     @Override
558     public long getResponseBlockSize() {
559       return responseBlockSize;
560     }
561 
562     @Override
563     public void incrementResponseBlockSize(long blockSize) {
564       responseBlockSize += blockSize;
565     }
566 
567     public synchronized void sendResponseIfReady() throws IOException {
568       this.responder.doRespond(this);
569     }
570 
571     public UserGroupInformation getRemoteUser() {
572       return connection.ugi;
573     }
574 
575     @Override
576     public User getRequestUser() {
577       return user;
578     }
579 
580     @Override
581     public String getRequestUserName() {
582       User user = getRequestUser();
583       return user == null? null: user.getShortName();
584     }
585 
586     @Override
587     public InetAddress getRemoteAddress() {
588       return remoteAddress;
589     }
590 
591     @Override
592     public VersionInfo getClientVersionInfo() {
593       return connection.getVersionInfo();
594     }
595 
596     @Override
597     public synchronized void setCallBack(RpcCallback callback) {
598       this.callback = callback;
599     }
600 
601     @Override
602     public boolean isRetryImmediatelySupported() {
603       return retryImmediatelySupported;
604     }
605   }
606 
607   /** Listens on the socket. Creates jobs for the handler threads*/
608   private class Listener extends Thread {
609 
610     private ServerSocketChannel acceptChannel = null; //the accept channel
611     private Selector selector = null; //the selector that we use for the server
612     private Reader[] readers = null;
613     private int currentReader = 0;
614     private Random rand = new Random();
615     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
616                                          //-tion (for idle connections) ran
617     private long cleanupInterval = 10000; //the minimum interval between
618                                           //two cleanup runs
619     private int backlogLength;
620 
621     private ExecutorService readPool;
622 
623     public Listener(final String name) throws IOException {
624       super(name);
625       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
626       // Create a new server socket and set to non blocking mode
627       acceptChannel = ServerSocketChannel.open();
628       acceptChannel.configureBlocking(false);
629 
630       // Bind the server socket to the binding addrees (can be different from the default interface)
631       bind(acceptChannel.socket(), bindAddress, backlogLength);
632       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
633       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
634       // create a selector;
635       selector= Selector.open();
636 
637       readers = new Reader[readThreads];
638       readPool = Executors.newFixedThreadPool(readThreads,
639         new ThreadFactoryBuilder().setNameFormat(
640           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
641           ",port=" + port).setDaemon(true)
642         .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
643       for (int i = 0; i < readThreads; ++i) {
644         Reader reader = new Reader();
645         readers[i] = reader;
646         readPool.execute(reader);
647       }
648       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
649 
650       // Register accepts on the server socket with the selector.
651       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
652       this.setName("RpcServer.listener,port=" + port);
653       this.setDaemon(true);
654     }
655 
656 
657     private class Reader implements Runnable {
658       private volatile boolean adding = false;
659       private final Selector readSelector;
660 
661       Reader() throws IOException {
662         this.readSelector = Selector.open();
663       }
664       @Override
665       public void run() {
666         try {
667           doRunLoop();
668         } finally {
669           try {
670             readSelector.close();
671           } catch (IOException ioe) {
672             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
673           }
674         }
675       }
676 
677       private synchronized void doRunLoop() {
678         while (running) {
679           try {
680             readSelector.select();
681             while (adding) {
682               this.wait(1000);
683             }
684 
685             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
686             while (iter.hasNext()) {
687               SelectionKey key = iter.next();
688               iter.remove();
689               if (key.isValid()) {
690                 if (key.isReadable()) {
691                   doRead(key);
692                 }
693               }
694             }
695           } catch (InterruptedException e) {
696             LOG.debug("Interrupted while sleeping");
697             return;
698           } catch (IOException ex) {
699             LOG.info(getName() + ": IOException in Reader", ex);
700           }
701         }
702       }
703 
704       /**
705        * This gets reader into the state that waits for the new channel
706        * to be registered with readSelector. If it was waiting in select()
707        * the thread will be woken up, otherwise whenever select() is called
708        * it will return even if there is nothing to read and wait
709        * in while(adding) for finishAdd call
710        */
711       public void startAdd() {
712         adding = true;
713         readSelector.wakeup();
714       }
715 
716       public synchronized SelectionKey registerChannel(SocketChannel channel)
717         throws IOException {
718         return channel.register(readSelector, SelectionKey.OP_READ);
719       }
720 
721       public synchronized void finishAdd() {
722         adding = false;
723         this.notify();
724       }
725     }
726 
727     /** cleanup connections from connectionList. Choose a random range
728      * to scan and also have a limit on the number of the connections
729      * that will be cleanedup per run. The criteria for cleanup is the time
730      * for which the connection was idle. If 'force' is true then all
731      * connections will be looked at for the cleanup.
732      * @param force all connections will be looked at for cleanup
733      */
734     private void cleanupConnections(boolean force) {
735       if (force || numConnections > thresholdIdleConnections) {
736         long currentTime = System.currentTimeMillis();
737         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
738           return;
739         }
740         int start = 0;
741         int end = numConnections - 1;
742         if (!force) {
743           start = rand.nextInt() % numConnections;
744           end = rand.nextInt() % numConnections;
745           int temp;
746           if (end < start) {
747             temp = start;
748             start = end;
749             end = temp;
750           }
751         }
752         int i = start;
753         int numNuked = 0;
754         while (i <= end) {
755           Connection c;
756           synchronized (connectionList) {
757             try {
758               c = connectionList.get(i);
759             } catch (Exception e) {return;}
760           }
761           if (c.timedOut(currentTime)) {
762             if (LOG.isDebugEnabled())
763               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
764             closeConnection(c);
765             numNuked++;
766             end--;
767             //noinspection UnusedAssignment
768             c = null;
769             if (!force && numNuked == maxConnectionsToNuke) break;
770           }
771           else i++;
772         }
773         lastCleanupRunTime = System.currentTimeMillis();
774       }
775     }
776 
777     @Override
778     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
779       justification="selector access is not synchronized; seems fine but concerned changing " +
780         "it will have per impact")
781     public void run() {
782       LOG.info(getName() + ": starting");
783       while (running) {
784         SelectionKey key = null;
785         try {
786           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
787           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
788           while (iter.hasNext()) {
789             key = iter.next();
790             iter.remove();
791             try {
792               if (key.isValid()) {
793                 if (key.isAcceptable())
794                   doAccept(key);
795               }
796             } catch (IOException ignored) {
797               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
798             }
799             key = null;
800           }
801         } catch (OutOfMemoryError e) {
802           if (errorHandler != null) {
803             if (errorHandler.checkOOME(e)) {
804               LOG.info(getName() + ": exiting on OutOfMemoryError");
805               closeCurrentConnection(key, e);
806               cleanupConnections(true);
807               return;
808             }
809           } else {
810             // we can run out of memory if we have too many threads
811             // log the event and sleep for a minute and give
812             // some thread(s) a chance to finish
813             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
814             closeCurrentConnection(key, e);
815             cleanupConnections(true);
816             try {
817               Thread.sleep(60000);
818             } catch (InterruptedException ex) {
819               LOG.debug("Interrupted while sleeping");
820               return;
821             }
822           }
823         } catch (Exception e) {
824           closeCurrentConnection(key, e);
825         }
826         cleanupConnections(false);
827       }
828 
829       LOG.info(getName() + ": stopping");
830 
831       synchronized (this) {
832         try {
833           acceptChannel.close();
834           selector.close();
835         } catch (IOException ignored) {
836           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
837         }
838 
839         selector= null;
840         acceptChannel= null;
841 
842         // clean up all connections
843         while (!connectionList.isEmpty()) {
844           closeConnection(connectionList.remove(0));
845         }
846       }
847     }
848 
849     private void closeCurrentConnection(SelectionKey key, Throwable e) {
850       if (key != null) {
851         Connection c = (Connection)key.attachment();
852         if (c != null) {
853           if (LOG.isDebugEnabled()) {
854             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
855                 (e != null ? " on error " + e.getMessage() : ""));
856           }
857           closeConnection(c);
858           key.attach(null);
859         }
860       }
861     }
862 
863     InetSocketAddress getAddress() {
864       return address;
865     }
866 
867     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
868       Connection c;
869       ServerSocketChannel server = (ServerSocketChannel) key.channel();
870 
871       SocketChannel channel;
872       while ((channel = server.accept()) != null) {
873         try {
874           channel.configureBlocking(false);
875           channel.socket().setTcpNoDelay(tcpNoDelay);
876           channel.socket().setKeepAlive(tcpKeepAlive);
877         } catch (IOException ioe) {
878           channel.close();
879           throw ioe;
880         }
881 
882         Reader reader = getReader();
883         try {
884           reader.startAdd();
885           SelectionKey readKey = reader.registerChannel(channel);
886           c = getConnection(channel, System.currentTimeMillis());
887           readKey.attach(c);
888           synchronized (connectionList) {
889             connectionList.add(numConnections, c);
890             numConnections++;
891           }
892           if (LOG.isDebugEnabled())
893             LOG.debug(getName() + ": connection from " + c.toString() +
894                 "; # active connections: " + numConnections);
895         } finally {
896           reader.finishAdd();
897         }
898       }
899     }
900 
901     void doRead(SelectionKey key) throws InterruptedException {
902       int count;
903       Connection c = (Connection) key.attachment();
904       if (c == null) {
905         return;
906       }
907       c.setLastContact(System.currentTimeMillis());
908       try {
909         count = c.readAndProcess();
910 
911         if (count > 0) {
912           c.setLastContact(System.currentTimeMillis());
913         }
914 
915       } catch (InterruptedException ieo) {
916         throw ieo;
917       } catch (Exception e) {
918         if (LOG.isDebugEnabled()) {
919           LOG.debug(getName() + ": Caught exception while reading:", e);
920         }
921         count = -1; //so that the (count < 0) block is executed
922       }
923       if (count < 0) {
924         if (LOG.isDebugEnabled()) {
925           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
926               " because read count=" + count +
927               ". Number of active connections: " + numConnections);
928         }
929         closeConnection(c);
930       }
931     }
932 
933     synchronized void doStop() {
934       if (selector != null) {
935         selector.wakeup();
936         Thread.yield();
937       }
938       if (acceptChannel != null) {
939         try {
940           acceptChannel.socket().close();
941         } catch (IOException e) {
942           LOG.info(getName() + ": exception in closing listener socket. " + e);
943         }
944       }
945       readPool.shutdownNow();
946     }
947 
948     // The method that will return the next reader to work with
949     // Simplistic implementation of round robin for now
950     Reader getReader() {
951       currentReader = (currentReader + 1) % readers.length;
952       return readers[currentReader];
953     }
954   }
955 
956   // Sends responses of RPC back to clients.
957   protected class Responder extends Thread {
958     private final Selector writeSelector;
959     private final Set<Connection> writingCons =
960         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
961 
962     Responder() throws IOException {
963       this.setName("RpcServer.responder");
964       this.setDaemon(true);
965       this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
966       writeSelector = Selector.open(); // create a selector
967     }
968 
969     @Override
970     public void run() {
971       LOG.info(getName() + ": starting");
972       try {
973         doRunLoop();
974       } finally {
975         LOG.info(getName() + ": stopping");
976         try {
977           writeSelector.close();
978         } catch (IOException ioe) {
979           LOG.error(getName() + ": couldn't close write selector", ioe);
980         }
981       }
982     }
983 
984     /**
985      * Take the list of the connections that want to write, and register them
986      * in the selector.
987      */
988     private void registerWrites() {
989       Iterator<Connection> it = writingCons.iterator();
990       while (it.hasNext()) {
991         Connection c = it.next();
992         it.remove();
993         SelectionKey sk = c.channel.keyFor(writeSelector);
994         try {
995           if (sk == null) {
996             try {
997               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
998             } catch (ClosedChannelException e) {
999               // ignore: the client went away.
1000               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1001             }
1002           } else {
1003             sk.interestOps(SelectionKey.OP_WRITE);
1004           }
1005         } catch (CancelledKeyException e) {
1006           // ignore: the client went away.
1007           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1008         }
1009       }
1010     }
1011 
1012     /**
1013      * Add a connection to the list that want to write,
1014      */
1015     public void registerForWrite(Connection c) {
1016       if (writingCons.add(c)) {
1017         writeSelector.wakeup();
1018       }
1019     }
1020 
1021     private void doRunLoop() {
1022       long lastPurgeTime = 0;   // last check for old calls.
1023       while (running) {
1024         try {
1025           registerWrites();
1026           int keyCt = writeSelector.select(purgeTimeout);
1027           if (keyCt == 0) {
1028             continue;
1029           }
1030 
1031           Set<SelectionKey> keys = writeSelector.selectedKeys();
1032           Iterator<SelectionKey> iter = keys.iterator();
1033           while (iter.hasNext()) {
1034             SelectionKey key = iter.next();
1035             iter.remove();
1036             try {
1037               if (key.isValid() && key.isWritable()) {
1038                 doAsyncWrite(key);
1039               }
1040             } catch (IOException e) {
1041               LOG.debug(getName() + ": asyncWrite", e);
1042             }
1043           }
1044 
1045           lastPurgeTime = purge(lastPurgeTime);
1046 
1047         } catch (OutOfMemoryError e) {
1048           if (errorHandler != null) {
1049             if (errorHandler.checkOOME(e)) {
1050               LOG.info(getName() + ": exiting on OutOfMemoryError");
1051               return;
1052             }
1053           } else {
1054             //
1055             // we can run out of memory if we have too many threads
1056             // log the event and sleep for a minute and give
1057             // some thread(s) a chance to finish
1058             //
1059             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1060             try {
1061               Thread.sleep(60000);
1062             } catch (InterruptedException ex) {
1063               LOG.debug("Interrupted while sleeping");
1064               return;
1065             }
1066           }
1067         } catch (Exception e) {
1068           LOG.warn(getName() + ": exception in Responder " +
1069               StringUtils.stringifyException(e), e);
1070         }
1071       }
1072       LOG.info(getName() + ": stopped");
1073     }
1074 
1075     /**
1076      * If there were some calls that have not been sent out for a
1077      * long time, we close the connection.
1078      * @return the time of the purge.
1079      */
1080     private long purge(long lastPurgeTime) {
1081       long now = System.currentTimeMillis();
1082       if (now < lastPurgeTime + purgeTimeout) {
1083         return lastPurgeTime;
1084       }
1085 
1086       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1087       // get the list of channels from list of keys.
1088       synchronized (writeSelector.keys()) {
1089         for (SelectionKey key : writeSelector.keys()) {
1090           Connection connection = (Connection) key.attachment();
1091           if (connection == null) {
1092             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1093           }
1094           Call call = connection.responseQueue.peekFirst();
1095           if (call != null && now > call.timestamp + purgeTimeout) {
1096             conWithOldCalls.add(call.connection);
1097           }
1098         }
1099       }
1100 
1101       // Seems safer to close the connection outside of the synchronized loop...
1102       for (Connection connection : conWithOldCalls) {
1103         closeConnection(connection);
1104       }
1105 
1106       return now;
1107     }
1108 
1109     private void doAsyncWrite(SelectionKey key) throws IOException {
1110       Connection connection = (Connection) key.attachment();
1111       if (connection == null) {
1112         throw new IOException("doAsyncWrite: no connection");
1113       }
1114       if (key.channel() != connection.channel) {
1115         throw new IOException("doAsyncWrite: bad channel");
1116       }
1117 
1118       if (processAllResponses(connection)) {
1119         try {
1120           // We wrote everything, so we don't need to be told when the socket is ready for
1121           //  write anymore.
1122          key.interestOps(0);
1123         } catch (CancelledKeyException e) {
1124           /* The Listener/reader might have closed the socket.
1125            * We don't explicitly cancel the key, so not sure if this will
1126            * ever fire.
1127            * This warning could be removed.
1128            */
1129           LOG.warn("Exception while changing ops : " + e);
1130         }
1131       }
1132     }
1133 
1134     /**
1135      * Process the response for this call. You need to have the lock on
1136      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1137      *
1138      * @param call the call
1139      * @return true if we proceed the call fully, false otherwise.
1140      * @throws IOException
1141      */
1142     private boolean processResponse(final Call call) throws IOException {
1143       boolean error = true;
1144       try {
1145         // Send as much data as we can in the non-blocking fashion
1146         long numBytes = channelWrite(call.connection.channel, call.response);
1147         if (numBytes < 0) {
1148           throw new HBaseIOException("Error writing on the socket " +
1149             "for the call:" + call.toShortString());
1150         }
1151         error = false;
1152       } finally {
1153         if (error) {
1154           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1155           closeConnection(call.connection);
1156         }
1157       }
1158 
1159       if (!call.response.hasRemaining()) {
1160         call.done();
1161         return true;
1162       } else {
1163         return false; // Socket can't take more, we will have to come back.
1164       }
1165     }
1166 
1167     /**
1168      * Process all the responses for this connection
1169      *
1170      * @return true if all the calls were processed or that someone else is doing it.
1171      * false if there * is still some work to do. In this case, we expect the caller to
1172      * delay us.
1173      * @throws IOException
1174      */
1175     private boolean processAllResponses(final Connection connection) throws IOException {
1176       // We want only one writer on the channel for a connection at a time.
1177       connection.responseWriteLock.lock();
1178       try {
1179         for (int i = 0; i < 20; i++) {
1180           // protection if some handlers manage to need all the responder
1181           Call call = connection.responseQueue.pollFirst();
1182           if (call == null) {
1183             return true;
1184           }
1185           if (!processResponse(call)) {
1186             connection.responseQueue.addFirst(call);
1187             return false;
1188           }
1189         }
1190       } finally {
1191         connection.responseWriteLock.unlock();
1192       }
1193 
1194       return connection.responseQueue.isEmpty();
1195     }
1196 
1197     //
1198     // Enqueue a response from the application.
1199     //
1200     void doRespond(Call call) throws IOException {
1201       boolean added = false;
1202 
1203       // If there is already a write in progress, we don't wait. This allows to free the handlers
1204       //  immediately for other tasks.
1205       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1206         try {
1207           if (call.connection.responseQueue.isEmpty()) {
1208             // If we're alone, we can try to do a direct call to the socket. It's
1209             //  an optimisation to save on context switches and data transfer between cores..
1210             if (processResponse(call)) {
1211               return; // we're done.
1212             }
1213             // Too big to fit, putting ahead.
1214             call.connection.responseQueue.addFirst(call);
1215             added = true; // We will register to the selector later, outside of the lock.
1216           }
1217         } finally {
1218           call.connection.responseWriteLock.unlock();
1219         }
1220       }
1221 
1222       if (!added) {
1223         call.connection.responseQueue.addLast(call);
1224       }
1225       call.responder.registerForWrite(call.connection);
1226 
1227       // set the serve time when the response has to be sent later
1228       call.timestamp = System.currentTimeMillis();
1229     }
1230   }
1231 
1232   /** Reads calls from a connection and queues them for handling. */
1233   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1234       value="VO_VOLATILE_INCREMENT",
1235       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1236   public class Connection {
1237     // If initial preamble with version and magic has been read or not.
1238     private boolean connectionPreambleRead = false;
1239     // If the connection header has been read or not.
1240     private boolean connectionHeaderRead = false;
1241     protected SocketChannel channel;
1242     private ByteBuffer data;
1243     private ByteBuffer dataLengthBuffer;
1244     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1245     private final Lock responseWriteLock = new ReentrantLock();
1246     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1247     private long lastContact;
1248     private InetAddress addr;
1249     protected Socket socket;
1250     // Cache the remote host & port info so that even if the socket is
1251     // disconnected, we can say where it used to connect to.
1252     protected String hostAddress;
1253     protected int remotePort;
1254     ConnectionHeader connectionHeader;
1255 
1256     /**
1257      * Codec the client asked use.
1258      */
1259     private Codec codec;
1260     /**
1261      * Compression codec the client asked us use.
1262      */
1263     private CompressionCodec compressionCodec;
1264     BlockingService service;
1265 
1266     private AuthMethod authMethod;
1267     private boolean saslContextEstablished;
1268     private boolean skipInitialSaslHandshake;
1269     private ByteBuffer unwrappedData;
1270     // When is this set?  FindBugs wants to know!  Says NP
1271     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1272     boolean useSasl;
1273     SaslServer saslServer;
1274     private boolean useWrap = false;
1275     // Fake 'call' for failed authorization response
1276     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1277     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1278         null, null, this, null, 0, null, null);
1279     private ByteArrayOutputStream authFailedResponse =
1280         new ByteArrayOutputStream();
1281     // Fake 'call' for SASL context setup
1282     private static final int SASL_CALLID = -33;
1283     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1284         0, null, null);
1285 
1286     // was authentication allowed with a fallback to simple auth
1287     private boolean authenticatedWithFallback;
1288 
1289     private boolean retryImmediatelySupported = false;
1290 
1291     public UserGroupInformation attemptingUser = null; // user name before auth
1292     protected User user = null;
1293     protected UserGroupInformation ugi = null;
1294 
1295     public Connection(SocketChannel channel, long lastContact) {
1296       this.channel = channel;
1297       this.lastContact = lastContact;
1298       this.data = null;
1299       this.dataLengthBuffer = ByteBuffer.allocate(4);
1300       this.socket = channel.socket();
1301       this.addr = socket.getInetAddress();
1302       if (addr == null) {
1303         this.hostAddress = "*Unknown*";
1304       } else {
1305         this.hostAddress = addr.getHostAddress();
1306       }
1307       this.remotePort = socket.getPort();
1308       if (socketSendBufferSize != 0) {
1309         try {
1310           socket.setSendBufferSize(socketSendBufferSize);
1311         } catch (IOException e) {
1312           LOG.warn("Connection: unable to set socket send buffer size to " +
1313                    socketSendBufferSize);
1314         }
1315       }
1316     }
1317 
1318       @Override
1319     public String toString() {
1320       return getHostAddress() + ":" + remotePort;
1321     }
1322 
1323     public String getHostAddress() {
1324       return hostAddress;
1325     }
1326 
1327     public InetAddress getHostInetAddress() {
1328       return addr;
1329     }
1330 
1331     public int getRemotePort() {
1332       return remotePort;
1333     }
1334 
1335     public void setLastContact(long lastContact) {
1336       this.lastContact = lastContact;
1337     }
1338 
1339     public VersionInfo getVersionInfo() {
1340       if (connectionHeader.hasVersionInfo()) {
1341         return connectionHeader.getVersionInfo();
1342       }
1343       return null;
1344     }
1345 
1346     /* Return true if the connection has no outstanding rpc */
1347     private boolean isIdle() {
1348       return rpcCount.get() == 0;
1349     }
1350 
1351     /* Decrement the outstanding RPC count */
1352     protected void decRpcCount() {
1353       rpcCount.decrement();
1354     }
1355 
1356     /* Increment the outstanding RPC count */
1357     protected void incRpcCount() {
1358       rpcCount.increment();
1359     }
1360 
1361     protected boolean timedOut(long currentTime) {
1362       return isIdle() && currentTime - lastContact > maxIdleTime;
1363     }
1364 
1365     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1366         throws IOException {
1367       UserGroupInformation authorizedUgi;
1368       if (authMethod == AuthMethod.DIGEST) {
1369         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1370             secretManager);
1371         authorizedUgi = tokenId.getUser();
1372         if (authorizedUgi == null) {
1373           throw new AccessDeniedException(
1374               "Can't retrieve username from tokenIdentifier.");
1375         }
1376         authorizedUgi.addTokenIdentifier(tokenId);
1377       } else {
1378         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1379       }
1380       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1381       return authorizedUgi;
1382     }
1383 
1384     private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1385         InterruptedException {
1386       if (saslContextEstablished) {
1387         if (LOG.isTraceEnabled())
1388           LOG.trace("Have read input token of size " + saslToken.limit()
1389               + " for processing by saslServer.unwrap()");
1390 
1391         if (!useWrap) {
1392           processOneRpc(saslToken);
1393         } else {
1394           byte[] b = saslToken.array();
1395           byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1396           processUnwrappedData(plaintextData);
1397         }
1398       } else {
1399         byte[] replyToken;
1400         try {
1401           if (saslServer == null) {
1402             switch (authMethod) {
1403             case DIGEST:
1404               if (secretManager == null) {
1405                 throw new AccessDeniedException(
1406                     "Server is not configured to do DIGEST authentication.");
1407               }
1408               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1409                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1410                   HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
1411                       secretManager, this));
1412               break;
1413             default:
1414               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1415               String fullName = current.getUserName();
1416               if (LOG.isDebugEnabled()) {
1417                 LOG.debug("Kerberos principal name is " + fullName);
1418               }
1419               final String names[] = SaslUtil.splitKerberosName(fullName);
1420               if (names.length != 3) {
1421                 throw new AccessDeniedException(
1422                     "Kerberos principal name does NOT have the expected "
1423                         + "hostname part: " + fullName);
1424               }
1425               current.doAs(new PrivilegedExceptionAction<Object>() {
1426                 @Override
1427                 public Object run() throws SaslException {
1428                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1429                       .getMechanismName(), names[0], names[1],
1430                       HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
1431                   return null;
1432                 }
1433               });
1434             }
1435             if (saslServer == null)
1436               throw new AccessDeniedException(
1437                   "Unable to find SASL server implementation for "
1438                       + authMethod.getMechanismName());
1439             if (LOG.isDebugEnabled()) {
1440               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1441             }
1442           }
1443           if (LOG.isDebugEnabled()) {
1444             LOG.debug("Have read input token of size " + saslToken.limit()
1445                 + " for processing by saslServer.evaluateResponse()");
1446           }
1447           replyToken = saslServer.evaluateResponse(saslToken.array());
1448         } catch (IOException e) {
1449           IOException sendToClient = e;
1450           Throwable cause = e;
1451           while (cause != null) {
1452             if (cause instanceof InvalidToken) {
1453               sendToClient = (InvalidToken) cause;
1454               break;
1455             }
1456             cause = cause.getCause();
1457           }
1458           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1459             sendToClient.getLocalizedMessage());
1460           metrics.authenticationFailure();
1461           String clientIP = this.toString();
1462           // attempting user could be null
1463           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1464           throw e;
1465         }
1466         if (replyToken != null) {
1467           if (LOG.isDebugEnabled()) {
1468             LOG.debug("Will send token of size " + replyToken.length
1469                 + " from saslServer.");
1470           }
1471           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1472               null);
1473         }
1474         if (saslServer.isComplete()) {
1475           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1476           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1477           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1478           if (LOG.isDebugEnabled()) {
1479             LOG.debug("SASL server context established. Authenticated client: "
1480               + ugi + ". Negotiated QoP is "
1481               + saslServer.getNegotiatedProperty(Sasl.QOP));
1482           }
1483           metrics.authenticationSuccess();
1484           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1485           saslContextEstablished = true;
1486         }
1487       }
1488     }
1489 
1490     /**
1491      * No protobuf encoding of raw sasl messages
1492      */
1493     private void doRawSaslReply(SaslStatus status, Writable rv,
1494         String errorClass, String error) throws IOException {
1495       ByteBufferOutputStream saslResponse = null;
1496       DataOutputStream out = null;
1497       try {
1498         // In my testing, have noticed that sasl messages are usually
1499         // in the ballpark of 100-200. That's why the initial capacity is 256.
1500         saslResponse = new ByteBufferOutputStream(256);
1501         out = new DataOutputStream(saslResponse);
1502         out.writeInt(status.state); // write status
1503         if (status == SaslStatus.SUCCESS) {
1504           rv.write(out);
1505         } else {
1506           WritableUtils.writeString(out, errorClass);
1507           WritableUtils.writeString(out, error);
1508         }
1509         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1510         saslCall.responder = responder;
1511         saslCall.sendResponseIfReady();
1512       } finally {
1513         if (saslResponse != null) {
1514           saslResponse.close();
1515         }
1516         if (out != null) {
1517           out.close();
1518         }
1519       }
1520     }
1521 
1522     private void disposeSasl() {
1523       if (saslServer != null) {
1524         try {
1525           saslServer.dispose();
1526           saslServer = null;
1527         } catch (SaslException ignored) {
1528           // Ignored. This is being disposed of anyway.
1529         }
1530       }
1531     }
1532 
1533     private int readPreamble() throws IOException {
1534       int count;
1535       // Check for 'HBas' magic.
1536       this.dataLengthBuffer.flip();
1537       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1538         return doBadPreambleHandling("Expected HEADER=" +
1539             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1540             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1541             " from " + toString());
1542       }
1543       // Now read the next two bytes, the version and the auth to use.
1544       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1545       count = channelRead(channel, versionAndAuthBytes);
1546       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1547         return count;
1548       }
1549       int version = versionAndAuthBytes.get(0);
1550       byte authbyte = versionAndAuthBytes.get(1);
1551       this.authMethod = AuthMethod.valueOf(authbyte);
1552       if (version != CURRENT_VERSION) {
1553         String msg = getFatalConnectionString(version, authbyte);
1554         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1555       }
1556       if (authMethod == null) {
1557         String msg = getFatalConnectionString(version, authbyte);
1558         return doBadPreambleHandling(msg, new BadAuthException(msg));
1559       }
1560       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1561         if (allowFallbackToSimpleAuth) {
1562           metrics.authenticationFallback();
1563           authenticatedWithFallback = true;
1564         } else {
1565           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1566           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1567           responder.doRespond(authFailedCall);
1568           throw ae;
1569         }
1570       }
1571       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1572         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1573             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1574         authMethod = AuthMethod.SIMPLE;
1575         // client has already sent the initial Sasl message and we
1576         // should ignore it. Both client and server should fall back
1577         // to simple auth from now on.
1578         skipInitialSaslHandshake = true;
1579       }
1580       if (authMethod != AuthMethod.SIMPLE) {
1581         useSasl = true;
1582       }
1583 
1584       dataLengthBuffer.clear();
1585       connectionPreambleRead = true;
1586       return count;
1587     }
1588 
1589     private int read4Bytes() throws IOException {
1590       if (this.dataLengthBuffer.remaining() > 0) {
1591         return channelRead(channel, this.dataLengthBuffer);
1592       } else {
1593         return 0;
1594       }
1595     }
1596 
1597 
1598     /**
1599      * Read off the wire. If there is not enough data to read, update the connection state with
1600      *  what we have and returns.
1601      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1602      * @throws IOException
1603      * @throws InterruptedException
1604      */
1605     public int readAndProcess() throws IOException, InterruptedException {
1606       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1607       // does, read in the rest of the connection preamble, the version and the auth method.
1608       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1609       // length into the 4-byte this.dataLengthBuffer.
1610       int count = read4Bytes();
1611       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1612         return count;
1613       }
1614 
1615       // If we have not read the connection setup preamble, look to see if that is on the wire.
1616       if (!connectionPreambleRead) {
1617         count = readPreamble();
1618         if (!connectionPreambleRead) {
1619           return count;
1620         }
1621 
1622         count = read4Bytes();
1623         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1624           return count;
1625         }
1626       }
1627 
1628       // We have read a length and we have read the preamble.  It is either the connection header
1629       // or it is a request.
1630       if (data == null) {
1631         dataLengthBuffer.flip();
1632         int dataLength = dataLengthBuffer.getInt();
1633         if (dataLength == RpcClient.PING_CALL_ID) {
1634           if (!useWrap) { //covers the !useSasl too
1635             dataLengthBuffer.clear();
1636             return 0;  //ping message
1637           }
1638         }
1639         if (dataLength < 0) { // A data length of zero is legal.
1640           throw new DoNotRetryIOException("Unexpected data length "
1641               + dataLength + "!! from " + getHostAddress());
1642         }
1643 
1644         if (dataLength > maxRequestSize) {
1645           throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
1646               + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
1647               + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
1648         }
1649 
1650         data = ByteBuffer.allocate(dataLength);
1651 
1652         // Increment the rpc count. This counter will be decreased when we write
1653         //  the response.  If we want the connection to be detected as idle properly, we
1654         //  need to keep the inc / dec correct.
1655         incRpcCount();
1656       }
1657 
1658       count = channelRead(channel, data);
1659 
1660       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1661         process();
1662       }
1663 
1664       return count;
1665     }
1666 
1667     /**
1668      * Process the data buffer and clean the connection state for the next call.
1669      */
1670     private void process() throws IOException, InterruptedException {
1671       data.flip();
1672       try {
1673         if (skipInitialSaslHandshake) {
1674           skipInitialSaslHandshake = false;
1675           return;
1676         }
1677 
1678         if (useSasl) {
1679           saslReadAndProcess(data);
1680         } else {
1681           processOneRpc(data);
1682         }
1683 
1684       } finally {
1685         dataLengthBuffer.clear(); // Clean for the next call
1686         data = null; // For the GC
1687       }
1688     }
1689 
1690     private String getFatalConnectionString(final int version, final byte authByte) {
1691       return "serverVersion=" + CURRENT_VERSION +
1692       ", clientVersion=" + version + ", authMethod=" + authByte +
1693       ", authSupported=" + (authMethod != null) + " from " + toString();
1694     }
1695 
1696     private int doBadPreambleHandling(final String msg) throws IOException {
1697       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1698     }
1699 
1700     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1701       LOG.warn(msg);
1702       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1703       setupResponse(null, fakeCall, e, msg);
1704       responder.doRespond(fakeCall);
1705       // Returning -1 closes out the connection.
1706       return -1;
1707     }
1708 
1709     // Reads the connection header following version
1710     private void processConnectionHeader(ByteBuffer buf) throws IOException {
1711       this.connectionHeader = ConnectionHeader.parseFrom(
1712         new ByteBufferInputStream(buf));
1713       String serviceName = connectionHeader.getServiceName();
1714       if (serviceName == null) throw new EmptyServiceNameException();
1715       this.service = getService(services, serviceName);
1716       if (this.service == null) throw new UnknownServiceException(serviceName);
1717       setupCellBlockCodecs(this.connectionHeader);
1718       UserGroupInformation protocolUser = createUser(connectionHeader);
1719       if (!useSasl) {
1720         ugi = protocolUser;
1721         if (ugi != null) {
1722           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1723         }
1724         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1725         if (authenticatedWithFallback) {
1726           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1727               + " connecting from " + getHostAddress());
1728         }
1729         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1730       } else {
1731         // user is authenticated
1732         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1733         //Now we check if this is a proxy user case. If the protocol user is
1734         //different from the 'user', it is a proxy user scenario. However,
1735         //this is not allowed if user authenticated with DIGEST.
1736         if ((protocolUser != null)
1737             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1738           if (authMethod == AuthMethod.DIGEST) {
1739             // Not allowed to doAs if token authentication is used
1740             throw new AccessDeniedException("Authenticated user (" + ugi
1741                 + ") doesn't match what the client claims to be ("
1742                 + protocolUser + ")");
1743           } else {
1744             // Effective user can be different from authenticated user
1745             // for simple auth or kerberos auth
1746             // The user is the real user. Now we create a proxy user
1747             UserGroupInformation realUser = ugi;
1748             ugi = UserGroupInformation.createProxyUser(protocolUser
1749                 .getUserName(), realUser);
1750             // Now the user is a proxy user, set Authentication method Proxy.
1751             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1752           }
1753         }
1754       }
1755       if (connectionHeader.hasVersionInfo()) {
1756         // see if this connection will support RetryImmediatelyException
1757         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1758 
1759         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1760             + " with version info: "
1761             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1762       } else {
1763         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1764             + " with unknown version info");
1765       }
1766 
1767 
1768     }
1769 
1770     /**
1771      * Set up cell block codecs
1772      * @throws FatalConnectionException
1773      */
1774     private void setupCellBlockCodecs(final ConnectionHeader header)
1775     throws FatalConnectionException {
1776       // TODO: Plug in other supported decoders.
1777       if (!header.hasCellBlockCodecClass()) return;
1778       String className = header.getCellBlockCodecClass();
1779       if (className == null || className.length() == 0) return;
1780       try {
1781         this.codec = (Codec)Class.forName(className).newInstance();
1782       } catch (Exception e) {
1783         throw new UnsupportedCellCodecException(className, e);
1784       }
1785       if (!header.hasCellBlockCompressorClass()) return;
1786       className = header.getCellBlockCompressorClass();
1787       try {
1788         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1789       } catch (Exception e) {
1790         throw new UnsupportedCompressionCodecException(className, e);
1791       }
1792     }
1793 
1794     private void processUnwrappedData(byte[] inBuf) throws IOException,
1795     InterruptedException {
1796       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1797       // Read all RPCs contained in the inBuf, even partial ones
1798       while (true) {
1799         int count;
1800         if (unwrappedDataLengthBuffer.remaining() > 0) {
1801           count = channelRead(ch, unwrappedDataLengthBuffer);
1802           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1803             return;
1804         }
1805 
1806         if (unwrappedData == null) {
1807           unwrappedDataLengthBuffer.flip();
1808           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1809 
1810           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1811             if (LOG.isDebugEnabled())
1812               LOG.debug("Received ping message");
1813             unwrappedDataLengthBuffer.clear();
1814             continue; // ping message
1815           }
1816           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1817         }
1818 
1819         count = channelRead(ch, unwrappedData);
1820         if (count <= 0 || unwrappedData.remaining() > 0)
1821           return;
1822 
1823         if (unwrappedData.remaining() == 0) {
1824           unwrappedDataLengthBuffer.clear();
1825           unwrappedData.flip();
1826           processOneRpc(unwrappedData);
1827           unwrappedData = null;
1828         }
1829       }
1830     }
1831 
1832 
1833     private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1834       if (connectionHeaderRead) {
1835         processRequest(buf);
1836       } else {
1837         processConnectionHeader(buf);
1838         this.connectionHeaderRead = true;
1839         if (!authorizeConnection()) {
1840           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1841           // down the connection instead of trying to read non-existent retun.
1842           throw new AccessDeniedException("Connection from " + this + " for service " +
1843             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1844         }
1845         this.user = userProvider.create(this.ugi);
1846       }
1847     }
1848 
1849     /**
1850      * @param buf Has the request header and the request param and optionally encoded data buffer
1851      * all in this one array.
1852      * @throws IOException
1853      * @throws InterruptedException
1854      */
1855     protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1856       long totalRequestSize = buf.limit();
1857       int offset = 0;
1858       // Here we read in the header.  We avoid having pb
1859       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1860       CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1861       int headerSize = cis.readRawVarint32();
1862       offset = cis.getTotalBytesRead();
1863       Message.Builder builder = RequestHeader.newBuilder();
1864       ProtobufUtil.mergeFrom(builder, cis, headerSize);
1865       RequestHeader header = (RequestHeader) builder.build();
1866       offset += headerSize;
1867       int id = header.getCallId();
1868       if (LOG.isTraceEnabled()) {
1869         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1870           " totalRequestSize: " + totalRequestSize + " bytes");
1871       }
1872       // Enforcing the call queue size, this triggers a retry in the client
1873       // This is a bit late to be doing this check - we have already read in the total request.
1874       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1875         final Call callTooBig =
1876           new Call(id, this.service, null, null, null, null, this,
1877             responder, totalRequestSize, null, null);
1878         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1879         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1880         InetSocketAddress address = getListenerAddress();
1881         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1882             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1883                 ", is hbase.ipc.server.max.callqueue.size too small?");
1884         responder.doRespond(callTooBig);
1885         return;
1886       }
1887       MethodDescriptor md = null;
1888       Message param = null;
1889       CellScanner cellScanner = null;
1890       try {
1891         if (header.hasRequestParam() && header.getRequestParam()) {
1892           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1893           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1894           builder = this.service.getRequestPrototype(md).newBuilderForType();
1895           cis.resetSizeCounter();
1896           int paramSize = cis.readRawVarint32();
1897           offset += cis.getTotalBytesRead();
1898           if (builder != null) {
1899             ProtobufUtil.mergeFrom(builder, cis, paramSize);
1900             param = builder.build();
1901           }
1902           offset += paramSize;
1903         }
1904         if (header.hasCellBlockMeta()) {
1905           buf.position(offset);
1906           cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
1907         }
1908       } catch (Throwable t) {
1909         InetSocketAddress address = getListenerAddress();
1910         String msg = (address != null ? address : "(channel closed)") +
1911             " is unable to read call parameter from client " + getHostAddress();
1912         LOG.warn(msg, t);
1913 
1914         metrics.exception(t);
1915 
1916         // probably the hbase hadoop version does not match the running hadoop version
1917         if (t instanceof LinkageError) {
1918           t = new DoNotRetryIOException(t);
1919         }
1920         // If the method is not present on the server, do not retry.
1921         if (t instanceof UnsupportedOperationException) {
1922           t = new DoNotRetryIOException(t);
1923         }
1924 
1925         final Call readParamsFailedCall =
1926           new Call(id, this.service, null, null, null, null, this,
1927             responder, totalRequestSize, null, null);
1928         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1929         setupResponse(responseBuffer, readParamsFailedCall, t,
1930           msg + "; " + t.getMessage());
1931         responder.doRespond(readParamsFailedCall);
1932         return;
1933       }
1934 
1935       TraceInfo traceInfo = header.hasTraceInfo()
1936           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1937           : null;
1938       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1939               totalRequestSize, traceInfo, this.addr);
1940 
1941       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1942         callQueueSize.add(-1 * call.getSize());
1943 
1944         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1945         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1946         InetSocketAddress address = getListenerAddress();
1947         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1948             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1949                 ", too many items queued ?");
1950         responder.doRespond(call);
1951       }
1952     }
1953 
1954     private boolean authorizeConnection() throws IOException {
1955       try {
1956         // If auth method is DIGEST, the token was obtained by the
1957         // real user for the effective user, therefore not required to
1958         // authorize real user. doAs is allowed only for simple or kerberos
1959         // authentication
1960         if (ugi != null && ugi.getRealUser() != null
1961             && (authMethod != AuthMethod.DIGEST)) {
1962           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1963         }
1964         authorize(ugi, connectionHeader, getHostInetAddress());
1965         metrics.authorizationSuccess();
1966       } catch (AuthorizationException ae) {
1967         if (LOG.isDebugEnabled()) {
1968           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1969         }
1970         metrics.authorizationFailure();
1971         setupResponse(authFailedResponse, authFailedCall,
1972           new AccessDeniedException(ae), ae.getMessage());
1973         responder.doRespond(authFailedCall);
1974         return false;
1975       }
1976       return true;
1977     }
1978 
1979     protected synchronized void close() {
1980       disposeSasl();
1981       data = null;
1982       if (!channel.isOpen())
1983         return;
1984       try {socket.shutdownOutput();} catch(Exception ignored) {
1985         if (LOG.isTraceEnabled()) {
1986           LOG.trace("Ignored exception", ignored);
1987         }
1988       }
1989       if (channel.isOpen()) {
1990         try {channel.close();} catch(Exception ignored) {}
1991       }
1992       try {
1993         socket.close();
1994       } catch(Exception ignored) {
1995         if (LOG.isTraceEnabled()) {
1996           LOG.trace("Ignored exception", ignored);
1997         }
1998       }
1999     }
2000 
2001     private UserGroupInformation createUser(ConnectionHeader head) {
2002       UserGroupInformation ugi = null;
2003 
2004       if (!head.hasUserInfo()) {
2005         return null;
2006       }
2007       UserInformation userInfoProto = head.getUserInfo();
2008       String effectiveUser = null;
2009       if (userInfoProto.hasEffectiveUser()) {
2010         effectiveUser = userInfoProto.getEffectiveUser();
2011       }
2012       String realUser = null;
2013       if (userInfoProto.hasRealUser()) {
2014         realUser = userInfoProto.getRealUser();
2015       }
2016       if (effectiveUser != null) {
2017         if (realUser != null) {
2018           UserGroupInformation realUserUgi =
2019               UserGroupInformation.createRemoteUser(realUser);
2020           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
2021         } else {
2022           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
2023         }
2024       }
2025       return ugi;
2026     }
2027   }
2028 
2029   /**
2030    * Datastructure for passing a {@link BlockingService} and its associated class of
2031    * protobuf service interface.  For example, a server that fielded what is defined
2032    * in the client protobuf service would pass in an implementation of the client blocking service
2033    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
2034    */
2035   public static class BlockingServiceAndInterface {
2036     private final BlockingService service;
2037     private final Class<?> serviceInterface;
2038     public BlockingServiceAndInterface(final BlockingService service,
2039         final Class<?> serviceInterface) {
2040       this.service = service;
2041       this.serviceInterface = serviceInterface;
2042     }
2043     public Class<?> getServiceInterface() {
2044       return this.serviceInterface;
2045     }
2046     public BlockingService getBlockingService() {
2047       return this.service;
2048     }
2049   }
2050 
2051   /**
2052    * Constructs a server listening on the named port and address.
2053    * @param server hosting instance of {@link Server}. We will do authentications if an
2054    * instance else pass null for no authentication check.
2055    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2056    * @param services A list of services.
2057    * @param bindAddress Where to listen
2058    * @param conf
2059    * @param scheduler
2060    */
2061   public RpcServer(final Server server, final String name,
2062       final List<BlockingServiceAndInterface> services,
2063       final InetSocketAddress bindAddress, Configuration conf,
2064       RpcScheduler scheduler)
2065       throws IOException {
2066     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2067       this.reservoir = new BoundedByteBufferPool(
2068           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2069           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2070           // Make the max twice the number of handlers to be safe.
2071           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2072               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2073                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2074     } else {
2075       reservoir = null;
2076     }
2077     this.server = server;
2078     this.services = services;
2079     this.bindAddress = bindAddress;
2080     this.conf = conf;
2081     this.socketSendBufferSize = 0;
2082     this.maxQueueSize =
2083       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2084     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2085     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2086     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2087     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2088     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2089       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2090     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2091     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2092 
2093     this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
2094 
2095     // Start the listener here and let it bind to the port
2096     listener = new Listener(name);
2097     this.port = listener.getAddress().getPort();
2098 
2099     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2100     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2101     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2102 
2103     this.ipcUtil = new IPCUtil(conf);
2104 
2105 
2106     // Create the responder here
2107     responder = new Responder();
2108     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2109     this.userProvider = UserProvider.instantiate(conf);
2110     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2111     if (isSecurityEnabled) {
2112       HBaseSaslRpcServer.init(conf);
2113     }
2114     initReconfigurable(conf);
2115 
2116     this.scheduler = scheduler;
2117     this.scheduler.init(new RpcSchedulerContext(this));
2118   }
2119 
2120   @Override
2121   public void onConfigurationChange(Configuration newConf) {
2122     initReconfigurable(newConf);
2123     if (scheduler instanceof ConfigurationObserver) {
2124       ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
2125     }
2126   }
2127 
2128   private void initReconfigurable(Configuration confToLoad) {
2129     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2130     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2131       LOG.warn("********* WARNING! *********");
2132       LOG.warn("This server is configured to allow connections from INSECURE clients");
2133       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2134       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2135       LOG.warn("impersonation is possible!");
2136       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2137       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2138       LOG.warn("****************************");
2139     }
2140   }
2141 
2142   /**
2143    * Subclasses of HBaseServer can override this to provide their own
2144    * Connection implementations.
2145    */
2146   protected Connection getConnection(SocketChannel channel, long time) {
2147     return new Connection(channel, time);
2148   }
2149 
2150   /**
2151    * Setup response for the RPC Call.
2152    *
2153    * @param response buffer to serialize the response into
2154    * @param call {@link Call} to which we are setting up the response
2155    * @param error error message, if the call failed
2156    * @throws IOException
2157    */
2158   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2159   throws IOException {
2160     if (response != null) response.reset();
2161     call.setResponse(null, null, t, error);
2162   }
2163 
2164   protected void closeConnection(Connection connection) {
2165     synchronized (connectionList) {
2166       if (connectionList.remove(connection)) {
2167         numConnections--;
2168       }
2169     }
2170     connection.close();
2171   }
2172 
2173   Configuration getConf() {
2174     return conf;
2175   }
2176 
2177   /** Sets the socket buffer size used for responding to RPCs.
2178    * @param size send size
2179    */
2180   @Override
2181   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2182 
2183   @Override
2184   public boolean isStarted() {
2185     return this.started;
2186   }
2187 
2188   /** Starts the service.  Must be called before any calls will be handled. */
2189   @Override
2190   public synchronized void start() {
2191     if (started) return;
2192     authTokenSecretMgr = createSecretManager();
2193     if (authTokenSecretMgr != null) {
2194       setSecretManager(authTokenSecretMgr);
2195       authTokenSecretMgr.start();
2196     }
2197     this.authManager = new ServiceAuthorizationManager();
2198     HBasePolicyProvider.init(conf, authManager);
2199     responder.start();
2200     listener.start();
2201     scheduler.start();
2202     started = true;
2203   }
2204 
2205   @Override
2206   public synchronized void refreshAuthManager(PolicyProvider pp) {
2207     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2208     // it'll break if you go via static route.
2209     this.authManager.refresh(this.conf, pp);
2210   }
2211 
2212   private AuthenticationTokenSecretManager createSecretManager() {
2213     if (!isSecurityEnabled) return null;
2214     if (server == null) return null;
2215     Configuration conf = server.getConfiguration();
2216     long keyUpdateInterval =
2217         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2218     long maxAge =
2219         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2220     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2221         server.getServerName().toString(), keyUpdateInterval, maxAge);
2222   }
2223 
2224   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2225     return this.secretManager;
2226   }
2227 
2228   @SuppressWarnings("unchecked")
2229   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2230     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2231   }
2232 
2233   /**
2234    * This is a server side method, which is invoked over RPC. On success
2235    * the return response has protobuf response payload. On failure, the
2236    * exception name and the stack trace are returned in the protobuf response.
2237    */
2238   @Override
2239   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2240       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2241   throws IOException {
2242     try {
2243       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2244       // TODO: Review after we add in encoded data blocks.
2245       status.setRPCPacket(param);
2246       status.resume("Servicing call");
2247       //get an instance of the method arg type
2248       long startTime = System.currentTimeMillis();
2249       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2250       Message result = service.callBlockingMethod(md, controller, param);
2251       long endTime = System.currentTimeMillis();
2252       int processingTime = (int) (endTime - startTime);
2253       int qTime = (int) (startTime - receiveTime);
2254       int totalTime = (int) (endTime - receiveTime);
2255       if (LOG.isTraceEnabled()) {
2256         LOG.trace(CurCall.get().toString() +
2257             ", response " + TextFormat.shortDebugString(result) +
2258             " queueTime: " + qTime +
2259             " processingTime: " + processingTime +
2260             " totalTime: " + totalTime);
2261       }
2262       long requestSize = param.getSerializedSize();
2263       long responseSize = result.getSerializedSize();
2264       metrics.dequeuedCall(qTime);
2265       metrics.processedCall(processingTime);
2266       metrics.totalCall(totalTime);
2267       metrics.receivedRequest(requestSize);
2268       metrics.sentResponse(responseSize);
2269       // log any RPC responses that are slower than the configured warn
2270       // response time or larger than configured warning size
2271       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2272       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2273       if (tooSlow || tooLarge) {
2274         // when tagging, we let TooLarge trump TooSmall to keep output simple
2275         // note that large responses will often also be slow.
2276         logResponse(new Object[]{param},
2277             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2278             (tooLarge ? "TooLarge" : "TooSlow"),
2279             status.getClient(), startTime, processingTime, qTime,
2280             responseSize);
2281       }
2282       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2283     } catch (Throwable e) {
2284       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2285       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2286       // need to pass it over the wire.
2287       if (e instanceof ServiceException) e = e.getCause();
2288 
2289       // increment the number of requests that were exceptions.
2290       metrics.exception(e);
2291 
2292       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2293       if (e instanceof IOException) throw (IOException)e;
2294       LOG.error("Unexpected throwable object ", e);
2295       throw new IOException(e.getMessage(), e);
2296     }
2297   }
2298 
2299   /**
2300    * Logs an RPC response to the LOG file, producing valid JSON objects for
2301    * client Operations.
2302    * @param params The parameters received in the call.
2303    * @param methodName The name of the method invoked
2304    * @param call The string representation of the call
2305    * @param tag  The tag that will be used to indicate this event in the log.
2306    * @param clientAddress   The address of the client who made this call.
2307    * @param startTime       The time that the call was initiated, in ms.
2308    * @param processingTime  The duration that the call took to run, in ms.
2309    * @param qTime           The duration that the call spent on the queue
2310    *                        prior to being initiated, in ms.
2311    * @param responseSize    The size in bytes of the response buffer.
2312    */
2313   void logResponse(Object[] params, String methodName, String call, String tag,
2314       String clientAddress, long startTime, int processingTime, int qTime,
2315       long responseSize)
2316           throws IOException {
2317     // base information that is reported regardless of type of call
2318     Map<String, Object> responseInfo = new HashMap<String, Object>();
2319     responseInfo.put("starttimems", startTime);
2320     responseInfo.put("processingtimems", processingTime);
2321     responseInfo.put("queuetimems", qTime);
2322     responseInfo.put("responsesize", responseSize);
2323     responseInfo.put("client", clientAddress);
2324     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2325     responseInfo.put("method", methodName);
2326     if (params.length == 2 && server instanceof HRegionServer &&
2327         params[0] instanceof byte[] &&
2328         params[1] instanceof Operation) {
2329       // if the slow process is a query, we want to log its table as well
2330       // as its own fingerprint
2331       TableName tableName = TableName.valueOf(
2332           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2333       responseInfo.put("table", tableName.getNameAsString());
2334       // annotate the response map with operation details
2335       responseInfo.putAll(((Operation) params[1]).toMap());
2336       // report to the log file
2337       LOG.warn("(operation" + tag + "): " +
2338                MAPPER.writeValueAsString(responseInfo));
2339     } else if (params.length == 1 && server instanceof HRegionServer &&
2340         params[0] instanceof Operation) {
2341       // annotate the response map with operation details
2342       responseInfo.putAll(((Operation) params[0]).toMap());
2343       // report to the log file
2344       LOG.warn("(operation" + tag + "): " +
2345                MAPPER.writeValueAsString(responseInfo));
2346     } else {
2347       // can't get JSON details, so just report call.toString() along with
2348       // a more generic tag.
2349       responseInfo.put("call", call);
2350       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2351     }
2352   }
2353 
2354   /** Stops the service.  No new calls will be handled after this is called. */
2355   @Override
2356   public synchronized void stop() {
2357     LOG.info("Stopping server on " + port);
2358     running = false;
2359     if (authTokenSecretMgr != null) {
2360       authTokenSecretMgr.stop();
2361       authTokenSecretMgr = null;
2362     }
2363     listener.interrupt();
2364     listener.doStop();
2365     responder.interrupt();
2366     scheduler.stop();
2367     notifyAll();
2368   }
2369 
2370   /** Wait for the server to be stopped.
2371    * Does not wait for all subthreads to finish.
2372    *  See {@link #stop()}.
2373    * @throws InterruptedException e
2374    */
2375   @Override
2376   public synchronized void join() throws InterruptedException {
2377     while (running) {
2378       wait();
2379     }
2380   }
2381 
2382   /**
2383    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2384    * the listener channel is closed.
2385    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2386    * information cannot be determined
2387    */
2388   @Override
2389   public synchronized InetSocketAddress getListenerAddress() {
2390     if (listener == null) {
2391       return null;
2392     }
2393     return listener.getAddress();
2394   }
2395 
2396   /**
2397    * Set the handler for calling out of RPC for error conditions.
2398    * @param handler the handler implementation
2399    */
2400   @Override
2401   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2402     this.errorHandler = handler;
2403   }
2404 
2405   @Override
2406   public HBaseRPCErrorHandler getErrorHandler() {
2407     return this.errorHandler;
2408   }
2409 
2410   /**
2411    * Returns the metrics instance for reporting RPC call statistics
2412    */
2413   @Override
2414   public MetricsHBaseServer getMetrics() {
2415     return metrics;
2416   }
2417 
2418   @Override
2419   public void addCallSize(final long diff) {
2420     this.callQueueSize.add(diff);
2421   }
2422 
2423   /**
2424    * Authorize the incoming client connection.
2425    *
2426    * @param user client user
2427    * @param connection incoming connection
2428    * @param addr InetAddress of incoming connection
2429    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2430    *         when the client isn't authorized to talk the protocol
2431    */
2432   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2433       InetAddress addr)
2434   throws AuthorizationException {
2435     if (authorize) {
2436       Class<?> c = getServiceInterface(services, connection.getServiceName());
2437       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2438     }
2439   }
2440 
2441   /**
2442    * When the read or write buffer size is larger than this limit, i/o will be
2443    * done in chunks of this size. Most RPC requests and responses would be
2444    * be smaller.
2445    */
2446   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2447 
2448   /**
2449    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2450    * If the amount of data is large, it writes to channel in smaller chunks.
2451    * This is to avoid jdk from creating many direct buffers as the size of
2452    * buffer increases. This also minimizes extra copies in NIO layer
2453    * as a result of multiple write operations required to write a large
2454    * buffer.
2455    *
2456    * @param channel writable byte channel to write to
2457    * @param bufferChain Chain of buffers to write
2458    * @return number of bytes written
2459    * @throws java.io.IOException e
2460    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2461    */
2462   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2463   throws IOException {
2464     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2465     if (count > 0) this.metrics.sentBytes(count);
2466     return count;
2467   }
2468 
2469   /**
2470    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2471    * If the amount of data is large, it writes to channel in smaller chunks.
2472    * This is to avoid jdk from creating many direct buffers as the size of
2473    * ByteBuffer increases. There should not be any performance degredation.
2474    *
2475    * @param channel writable byte channel to write on
2476    * @param buffer buffer to write
2477    * @return number of bytes written
2478    * @throws java.io.IOException e
2479    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2480    */
2481   protected int channelRead(ReadableByteChannel channel,
2482                                    ByteBuffer buffer) throws IOException {
2483 
2484     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2485            channel.read(buffer) : channelIO(channel, null, buffer);
2486     if (count > 0) {
2487       metrics.receivedBytes(count);
2488     }
2489     return count;
2490   }
2491 
2492   /**
2493    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2494    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2495    * one of readCh or writeCh should be non-null.
2496    *
2497    * @param readCh read channel
2498    * @param writeCh write channel
2499    * @param buf buffer to read or write into/out of
2500    * @return bytes written
2501    * @throws java.io.IOException e
2502    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2503    * @see #channelWrite(GatheringByteChannel, BufferChain)
2504    */
2505   private static int channelIO(ReadableByteChannel readCh,
2506                                WritableByteChannel writeCh,
2507                                ByteBuffer buf) throws IOException {
2508 
2509     int originalLimit = buf.limit();
2510     int initialRemaining = buf.remaining();
2511     int ret = 0;
2512 
2513     while (buf.remaining() > 0) {
2514       try {
2515         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2516         buf.limit(buf.position() + ioSize);
2517 
2518         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2519 
2520         if (ret < ioSize) {
2521           break;
2522         }
2523 
2524       } finally {
2525         buf.limit(originalLimit);
2526       }
2527     }
2528 
2529     int nBytes = initialRemaining - buf.remaining();
2530     return (nBytes > 0) ? nBytes : ret;
2531   }
2532 
2533   /**
2534    * Needed for features such as delayed calls.  We need to be able to store the current call
2535    * so that we can complete it later or ask questions of what is supported by the current ongoing
2536    * call.
2537    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2538    */
2539   public static RpcCallContext getCurrentCall() {
2540     return CurCall.get();
2541   }
2542 
2543   public static boolean isInRpcCallContext() {
2544     return CurCall.get() != null;
2545   }
2546 
2547   /**
2548    * Returns the user credentials associated with the current RPC request or
2549    * <code>null</code> if no credentials were provided.
2550    * @return A User
2551    */
2552   public static User getRequestUser() {
2553     RpcCallContext ctx = getCurrentCall();
2554     return ctx == null? null: ctx.getRequestUser();
2555   }
2556 
2557   /**
2558    * Returns the username for any user associated with the current RPC
2559    * request or <code>null</code> if no user is set.
2560    */
2561   public static String getRequestUserName() {
2562     User user = getRequestUser();
2563     return user == null? null: user.getShortName();
2564   }
2565 
2566   /**
2567    * @return Address of remote client if a request is ongoing, else null
2568    */
2569   public static InetAddress getRemoteAddress() {
2570     RpcCallContext ctx = getCurrentCall();
2571     return ctx == null? null: ctx.getRemoteAddress();
2572   }
2573 
2574   /**
2575    * @param serviceName Some arbitrary string that represents a 'service'.
2576    * @param services Available service instances
2577    * @return Matching BlockingServiceAndInterface pair
2578    */
2579   static BlockingServiceAndInterface getServiceAndInterface(
2580       final List<BlockingServiceAndInterface> services, final String serviceName) {
2581     for (BlockingServiceAndInterface bs : services) {
2582       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2583         return bs;
2584       }
2585     }
2586     return null;
2587   }
2588 
2589   /**
2590    * @param serviceName Some arbitrary string that represents a 'service'.
2591    * @param services Available services and their service interfaces.
2592    * @return Service interface class for <code>serviceName</code>
2593    */
2594   static Class<?> getServiceInterface(
2595       final List<BlockingServiceAndInterface> services,
2596       final String serviceName) {
2597     BlockingServiceAndInterface bsasi =
2598         getServiceAndInterface(services, serviceName);
2599     return bsasi == null? null: bsasi.getServiceInterface();
2600   }
2601 
2602   /**
2603    * @param serviceName Some arbitrary string that represents a 'service'.
2604    * @param services Available services and their service interfaces.
2605    * @return BlockingService that goes with the passed <code>serviceName</code>
2606    */
2607   static BlockingService getService(
2608       final List<BlockingServiceAndInterface> services,
2609       final String serviceName) {
2610     BlockingServiceAndInterface bsasi =
2611         getServiceAndInterface(services, serviceName);
2612     return bsasi == null? null: bsasi.getBlockingService();
2613   }
2614 
2615   static MonitoredRPCHandler getStatus() {
2616     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2617     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2618     if (status != null) {
2619       return status;
2620     }
2621     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2622     status.pause("Waiting for a call");
2623     RpcServer.MONITORED_RPC.set(status);
2624     return status;
2625   }
2626 
2627   /** Returns the remote side ip address when invoked inside an RPC
2628    *  Returns null incase of an error.
2629    *  @return InetAddress
2630    */
2631   public static InetAddress getRemoteIp() {
2632     Call call = CurCall.get();
2633     if (call != null && call.connection != null && call.connection.socket != null) {
2634       return call.connection.socket.getInetAddress();
2635     }
2636     return null;
2637   }
2638 
2639 
2640   /**
2641    * A convenience method to bind to a given address and report
2642    * better exceptions if the address is not a valid host.
2643    * @param socket the socket to bind
2644    * @param address the address to bind to
2645    * @param backlog the number of connections allowed in the queue
2646    * @throws BindException if the address can't be bound
2647    * @throws UnknownHostException if the address isn't a valid host name
2648    * @throws IOException other random errors from bind
2649    */
2650   public static void bind(ServerSocket socket, InetSocketAddress address,
2651                           int backlog) throws IOException {
2652     try {
2653       socket.bind(address, backlog);
2654     } catch (BindException e) {
2655       BindException bindException =
2656         new BindException("Problem binding to " + address + " : " +
2657             e.getMessage());
2658       bindException.initCause(e);
2659       throw bindException;
2660     } catch (SocketException e) {
2661       // If they try to bind to a different host's address, give a better
2662       // error message.
2663       if ("Unresolved address".equals(e.getMessage())) {
2664         throw new UnknownHostException("Invalid hostname for server: " +
2665                                        address.getHostName());
2666       }
2667       throw e;
2668     }
2669   }
2670 
2671   @Override
2672   public RpcScheduler getScheduler() {
2673     return scheduler;
2674   }
2675 }