View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.classification.InterfaceAudience;
72  import org.apache.hadoop.hbase.classification.InterfaceStability;
73  import org.apache.hadoop.conf.Configuration;
74  import org.apache.hadoop.hbase.CellScanner;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.HBaseIOException;
77  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
94  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
100 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
101 import org.apache.hadoop.hbase.regionserver.HRegionServer;
102 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
103 import org.apache.hadoop.hbase.security.AccessDeniedException;
104 import org.apache.hadoop.hbase.security.AuthMethod;
105 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
109 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
110 import org.apache.hadoop.hbase.security.SaslStatus;
111 import org.apache.hadoop.hbase.security.SaslUtil;
112 import org.apache.hadoop.hbase.security.UserProvider;
113 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
114 import org.apache.hadoop.hbase.util.Bytes;
115 import org.apache.hadoop.hbase.util.Counter;
116 import org.apache.hadoop.hbase.util.Pair;
117 import org.apache.hadoop.io.BytesWritable;
118 import org.apache.hadoop.io.IntWritable;
119 import org.apache.hadoop.io.Writable;
120 import org.apache.hadoop.io.WritableUtils;
121 import org.apache.hadoop.io.compress.CompressionCodec;
122 import org.apache.hadoop.security.UserGroupInformation;
123 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
124 import org.apache.hadoop.security.authorize.AuthorizationException;
125 import org.apache.hadoop.security.authorize.PolicyProvider;
126 import org.apache.hadoop.security.authorize.ProxyUsers;
127 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
128 import org.apache.hadoop.security.token.SecretManager;
129 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
130 import org.apache.hadoop.security.token.TokenIdentifier;
131 import org.apache.hadoop.util.StringUtils;
132 import org.codehaus.jackson.map.ObjectMapper;
133 import org.apache.htrace.TraceInfo;
134 
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 import com.google.protobuf.BlockingService;
137 import com.google.protobuf.CodedInputStream;
138 import com.google.protobuf.Descriptors.MethodDescriptor;
139 import com.google.protobuf.Message;
140 import com.google.protobuf.ServiceException;
141 import com.google.protobuf.TextFormat;
142 
143 /**
144  * An RPC server that hosts protobuf described Services.
145  *
146  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
147  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
148  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
149  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
150  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
151  * and loops till done.
152  *
153  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
154  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
155  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
156  * and keep taking while the server is up.
157  *
158  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
159  * queue for Responder to pull from and return result to client.
160  *
161  * @see RpcClientImpl
162  */
163 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
164 @InterfaceStability.Evolving
165 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
166   // LOG is being used in CallRunner and the log level is being changed in tests
167   public static final Log LOG = LogFactory.getLog(RpcServer.class);
168   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
169       = new CallQueueTooBigException();
170 
171   private final boolean authorize;
172   private boolean isSecurityEnabled;
173 
174   public static final byte CURRENT_VERSION = 0;
175 
176   /**
177    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
178    */
179   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
180           "hbase.ipc.server.fallback-to-simple-auth-allowed";
181 
182   /**
183    * How many calls/handler are allowed in the queue.
184    */
185   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
186 
187   /**
188    * The maximum size that we can hold in the RPC queue
189    */
190   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
191 
192   private final IPCUtil ipcUtil;
193 
194   private static final String AUTH_FAILED_FOR = "Auth failed for ";
195   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
196   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
197     Server.class.getName());
198   protected SecretManager<TokenIdentifier> secretManager;
199   protected ServiceAuthorizationManager authManager;
200 
201   /** This is set to Call object before Handler invokes an RPC and ybdie
202    * after the call returns.
203    */
204   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
205 
206   /** Keeps MonitoredRPCHandler per handler thread. */
207   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
208       = new ThreadLocal<MonitoredRPCHandler>();
209 
210   protected final InetSocketAddress bindAddress;
211   protected int port;                             // port we listen on
212   private int readThreads;                        // number of read threads
213   protected int maxIdleTime;                      // the maximum idle time after
214                                                   // which a client may be
215                                                   // disconnected
216   protected int thresholdIdleConnections;         // the number of idle
217                                                   // connections after which we
218                                                   // will start cleaning up idle
219                                                   // connections
220   int maxConnectionsToNuke;                       // the max number of
221                                                   // connections to nuke
222                                                   // during a cleanup
223 
224   protected MetricsHBaseServer metrics;
225 
226   protected final Configuration conf;
227 
228   private int maxQueueSize;
229   protected int socketSendBufferSize;
230   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
231   protected final boolean tcpKeepAlive; // if T then use keepalives
232   protected final long purgeTimeout;    // in milliseconds
233 
234   /**
235    * This flag is used to indicate to sub threads when they should go down.  When we call
236    * {@link #start()}, all threads started will consult this flag on whether they should
237    * keep going.  It is set to false when {@link #stop()} is called.
238    */
239   volatile boolean running = true;
240 
241   /**
242    * This flag is set to true after all threads are up and 'running' and the server is then opened
243    * for business by the call to {@link #start()}.
244    */
245   volatile boolean started = false;
246 
247   /**
248    * This is a running count of the size of all outstanding calls by size.
249    */
250   protected final Counter callQueueSize = new Counter();
251 
252   protected final List<Connection> connectionList =
253     Collections.synchronizedList(new LinkedList<Connection>());
254   //maintain a list
255   //of client connections
256   private Listener listener = null;
257   protected Responder responder = null;
258   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
259   protected int numConnections = 0;
260 
261   protected HBaseRPCErrorHandler errorHandler = null;
262 
263   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
264   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
265 
266   /** Default value for above params */
267   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
268   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
269 
270   private static final ObjectMapper MAPPER = new ObjectMapper();
271 
272   private final int warnResponseTime;
273   private final int warnResponseSize;
274   private final Server server;
275   private final List<BlockingServiceAndInterface> services;
276 
277   private final RpcScheduler scheduler;
278 
279   private UserProvider userProvider;
280 
281   private final BoundedByteBufferPool reservoir;
282 
283   private volatile boolean allowFallbackToSimpleAuth;
284 
285   /**
286    * Used to get details for scan with a scanner_id<br/>
287    * TODO try to figure out a better way and remove reference from regionserver package later.
288    */
289   private RSRpcServices rsRpcServices;
290 
291   /**
292    * Datastructure that holds all necessary to a method invocation and then afterward, carries
293    * the result.
294    */
295   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
296   @InterfaceStability.Evolving
297   public class Call implements RpcCallContext {
298     protected int id;                             // the client's call id
299     protected BlockingService service;
300     protected MethodDescriptor md;
301     protected RequestHeader header;
302     protected Message param;                      // the parameter passed
303     // Optional cell data passed outside of protobufs.
304     protected CellScanner cellScanner;
305     protected Connection connection;              // connection to client
306     protected long timestamp;      // the time received when response is null
307                                    // the time served when response is not null
308     /**
309      * Chain of buffers to send as response.
310      */
311     protected BufferChain response;
312     protected Responder responder;
313 
314     protected long size;                          // size of current call
315     protected boolean isError;
316     protected TraceInfo tinfo;
317     private ByteBuffer cellBlock = null;
318 
319     private User user;
320     private InetAddress remoteAddress;
321 
322     private long responseCellSize = 0;
323     private long responseBlockSize = 0;
324     private boolean retryImmediatelySupported;
325 
326     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
327         justification="Can't figure why this complaint is happening... see below")
328     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
329          Message param, CellScanner cellScanner, Connection connection, Responder responder,
330          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
331       this.id = id;
332       this.service = service;
333       this.md = md;
334       this.header = header;
335       this.param = param;
336       this.cellScanner = cellScanner;
337       this.connection = connection;
338       this.timestamp = System.currentTimeMillis();
339       this.response = null;
340       this.responder = responder;
341       this.isError = false;
342       this.size = size;
343       this.tinfo = tinfo;
344       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
345       this.remoteAddress = remoteAddress;
346       this.retryImmediatelySupported =
347           connection == null? null: connection.retryImmediatelySupported;
348     }
349 
350     /**
351      * Call is done. Execution happened and we returned results to client. It is now safe to
352      * cleanup.
353      */
354     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
355         justification="Presume the lock on processing request held by caller is protection enough")
356     void done() {
357       if (this.cellBlock != null && reservoir != null) {
358         // Return buffer to reservoir now we are done with it.
359         reservoir.putBuffer(this.cellBlock);
360         this.cellBlock = null;
361       }
362       this.connection.decRpcCount();  // Say that we're done with this call.
363     }
364 
365     @Override
366     public String toString() {
367       return toShortString() + " param: " +
368         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
369         " connection: " + connection.toString();
370     }
371 
372     protected RequestHeader getHeader() {
373       return this.header;
374     }
375 
376     public boolean hasPriority() {
377       return this.header.hasPriority();
378     }
379 
380     public int getPriority() {
381       return this.header.getPriority();
382     }
383 
384     /*
385      * Short string representation without param info because param itself could be huge depends on
386      * the payload of a command
387      */
388     String toShortString() {
389       String serviceName = this.connection.service != null ?
390           this.connection.service.getDescriptorForType().getName() : "null";
391       return "callId: " + this.id + " service: " + serviceName +
392           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
393           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
394           " connection: " + connection.toString();
395     }
396 
397     String toTraceString() {
398       String serviceName = this.connection.service != null ?
399                            this.connection.service.getDescriptorForType().getName() : "";
400       String methodName = (this.md != null) ? this.md.getName() : "";
401       return serviceName + "." + methodName;
402     }
403 
404     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
405       this.response = new BufferChain(response);
406     }
407 
408     protected synchronized void setResponse(Object m, final CellScanner cells,
409         Throwable t, String errorMsg) {
410       if (this.isError) return;
411       if (t != null) this.isError = true;
412       BufferChain bc = null;
413       try {
414         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
415         // Presume it a pb Message.  Could be null.
416         Message result = (Message)m;
417         // Call id.
418         headerBuilder.setCallId(this.id);
419         if (t != null) {
420           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
421           exceptionBuilder.setExceptionClassName(t.getClass().getName());
422           exceptionBuilder.setStackTrace(errorMsg);
423           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
424             t instanceof NeedUnmanagedConnectionException);
425           if (t instanceof RegionMovedException) {
426             // Special casing for this exception.  This is only one carrying a payload.
427             // Do this instead of build a generic system for allowing exceptions carry
428             // any kind of payload.
429             RegionMovedException rme = (RegionMovedException)t;
430             exceptionBuilder.setHostname(rme.getHostname());
431             exceptionBuilder.setPort(rme.getPort());
432           }
433           // Set the exception as the result of the method invocation.
434           headerBuilder.setException(exceptionBuilder.build());
435         }
436         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
437         // reservoir when finished. This is hacky and the hack is not contained but benefits are
438         // high when we can avoid a big buffer allocation on each rpc.
439         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
440           this.connection.compressionCodec, cells, reservoir);
441         if (this.cellBlock != null) {
442           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
443           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
444           cellBlockBuilder.setLength(this.cellBlock.limit());
445           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
446         }
447         Message header = headerBuilder.build();
448 
449         // Organize the response as a set of bytebuffers rather than collect it all together inside
450         // one big byte array; save on allocations.
451         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
452         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
453         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
454           (this.cellBlock == null? 0: this.cellBlock.limit());
455         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
456         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
457         if (connection.useWrap) {
458           bc = wrapWithSasl(bc);
459         }
460       } catch (IOException e) {
461         LOG.warn("Exception while creating response " + e);
462       }
463       this.response = bc;
464     }
465 
466     private BufferChain wrapWithSasl(BufferChain bc)
467         throws IOException {
468       if (!this.connection.useSasl) return bc;
469       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
470       // THIS IS A BIG UGLY COPY.
471       byte [] responseBytes = bc.getBytes();
472       byte [] token;
473       // synchronization may be needed since there can be multiple Handler
474       // threads using saslServer to wrap responses.
475       synchronized (connection.saslServer) {
476         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
477       }
478       if (LOG.isTraceEnabled()) {
479         LOG.trace("Adding saslServer wrapped token of size " + token.length
480             + " as call response.");
481       }
482 
483       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
484       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
485       return new BufferChain(bbTokenLength, bbTokenBytes);
486     }
487 
488     @Override
489     public boolean isClientCellBlockSupported() {
490       return this.connection != null && this.connection.codec != null;
491     }
492 
493     @Override
494     public long disconnectSince() {
495       if (!connection.channel.isOpen()) {
496         return System.currentTimeMillis() - timestamp;
497       } else {
498         return -1L;
499       }
500     }
501 
502     public long getSize() {
503       return this.size;
504     }
505 
506     public long getResponseCellSize() {
507       return responseCellSize;
508     }
509 
510     public void incrementResponseCellSize(long cellSize) {
511       responseCellSize += cellSize;
512     }
513 
514     @Override
515     public long getResponseBlockSize() {
516       return responseBlockSize;
517     }
518 
519     @Override
520     public void incrementResponseBlockSize(long blockSize) {
521       responseBlockSize += blockSize;
522     }
523 
524     public synchronized void sendResponseIfReady() throws IOException {
525       // set param null to reduce memory pressure
526       this.param = null;
527       this.responder.doRespond(this);
528     }
529 
530     public UserGroupInformation getRemoteUser() {
531       return connection.ugi;
532     }
533 
534     @Override
535     public User getRequestUser() {
536       return user;
537     }
538 
539     @Override
540     public String getRequestUserName() {
541       User user = getRequestUser();
542       return user == null? null: user.getShortName();
543     }
544 
545     @Override
546     public InetAddress getRemoteAddress() {
547       return remoteAddress;
548     }
549 
550     @Override
551     public VersionInfo getClientVersionInfo() {
552       return connection.getVersionInfo();
553     }
554 
555     @Override
556     public boolean isRetryImmediatelySupported() {
557       return retryImmediatelySupported;
558     }
559   }
560 
561   /** Listens on the socket. Creates jobs for the handler threads*/
562   private class Listener extends Thread {
563 
564     private ServerSocketChannel acceptChannel = null; //the accept channel
565     private Selector selector = null; //the selector that we use for the server
566     private Reader[] readers = null;
567     private int currentReader = 0;
568     private Random rand = new Random();
569     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
570                                          //-tion (for idle connections) ran
571     private long cleanupInterval = 10000; //the minimum interval between
572                                           //two cleanup runs
573     private int backlogLength;
574 
575     private ExecutorService readPool;
576 
577     public Listener(final String name) throws IOException {
578       super(name);
579       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
580       // Create a new server socket and set to non blocking mode
581       acceptChannel = ServerSocketChannel.open();
582       acceptChannel.configureBlocking(false);
583 
584       // Bind the server socket to the binding addrees (can be different from the default interface)
585       bind(acceptChannel.socket(), bindAddress, backlogLength);
586       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
587       // create a selector;
588       selector= Selector.open();
589 
590       readers = new Reader[readThreads];
591       readPool = Executors.newFixedThreadPool(readThreads,
592         new ThreadFactoryBuilder().setNameFormat(
593           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
594           ",port=" + port).setDaemon(true).build());
595       for (int i = 0; i < readThreads; ++i) {
596         Reader reader = new Reader();
597         readers[i] = reader;
598         readPool.execute(reader);
599       }
600       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
601 
602       // Register accepts on the server socket with the selector.
603       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
604       this.setName("RpcServer.listener,port=" + port);
605       this.setDaemon(true);
606     }
607 
608 
609     private class Reader implements Runnable {
610       private volatile boolean adding = false;
611       private final Selector readSelector;
612 
613       Reader() throws IOException {
614         this.readSelector = Selector.open();
615       }
616       @Override
617       public void run() {
618         try {
619           doRunLoop();
620         } finally {
621           try {
622             readSelector.close();
623           } catch (IOException ioe) {
624             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
625           }
626         }
627       }
628 
629       private synchronized void doRunLoop() {
630         while (running) {
631           try {
632             readSelector.select();
633             while (adding) {
634               this.wait(1000);
635             }
636 
637             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
638             while (iter.hasNext()) {
639               SelectionKey key = iter.next();
640               iter.remove();
641               if (key.isValid()) {
642                 if (key.isReadable()) {
643                   doRead(key);
644                 }
645               }
646             }
647           } catch (InterruptedException e) {
648             LOG.debug("Interrupted while sleeping");
649             return;
650           } catch (IOException ex) {
651             LOG.info(getName() + ": IOException in Reader", ex);
652           }
653         }
654       }
655 
656       /**
657        * This gets reader into the state that waits for the new channel
658        * to be registered with readSelector. If it was waiting in select()
659        * the thread will be woken up, otherwise whenever select() is called
660        * it will return even if there is nothing to read and wait
661        * in while(adding) for finishAdd call
662        */
663       public void startAdd() {
664         adding = true;
665         readSelector.wakeup();
666       }
667 
668       public synchronized SelectionKey registerChannel(SocketChannel channel)
669         throws IOException {
670         return channel.register(readSelector, SelectionKey.OP_READ);
671       }
672 
673       public synchronized void finishAdd() {
674         adding = false;
675         this.notify();
676       }
677     }
678 
679     /** cleanup connections from connectionList. Choose a random range
680      * to scan and also have a limit on the number of the connections
681      * that will be cleanedup per run. The criteria for cleanup is the time
682      * for which the connection was idle. If 'force' is true then all
683      * connections will be looked at for the cleanup.
684      * @param force all connections will be looked at for cleanup
685      */
686     private void cleanupConnections(boolean force) {
687       if (force || numConnections > thresholdIdleConnections) {
688         long currentTime = System.currentTimeMillis();
689         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
690           return;
691         }
692         int start = 0;
693         int end = numConnections - 1;
694         if (!force) {
695           start = rand.nextInt() % numConnections;
696           end = rand.nextInt() % numConnections;
697           int temp;
698           if (end < start) {
699             temp = start;
700             start = end;
701             end = temp;
702           }
703         }
704         int i = start;
705         int numNuked = 0;
706         while (i <= end) {
707           Connection c;
708           synchronized (connectionList) {
709             try {
710               c = connectionList.get(i);
711             } catch (Exception e) {return;}
712           }
713           if (c.timedOut(currentTime)) {
714             if (LOG.isDebugEnabled())
715               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
716             closeConnection(c);
717             numNuked++;
718             end--;
719             //noinspection UnusedAssignment
720             c = null;
721             if (!force && numNuked == maxConnectionsToNuke) break;
722           }
723           else i++;
724         }
725         lastCleanupRunTime = System.currentTimeMillis();
726       }
727     }
728 
729     @Override
730     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
731       justification="selector access is not synchronized; seems fine but concerned changing " +
732         "it will have per impact")
733     public void run() {
734       LOG.info(getName() + ": starting");
735       while (running) {
736         SelectionKey key = null;
737         try {
738           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
739           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
740           while (iter.hasNext()) {
741             key = iter.next();
742             iter.remove();
743             try {
744               if (key.isValid()) {
745                 if (key.isAcceptable())
746                   doAccept(key);
747               }
748             } catch (IOException ignored) {
749               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
750             }
751             key = null;
752           }
753         } catch (OutOfMemoryError e) {
754           if (errorHandler != null) {
755             if (errorHandler.checkOOME(e)) {
756               LOG.info(getName() + ": exiting on OutOfMemoryError");
757               closeCurrentConnection(key, e);
758               cleanupConnections(true);
759               return;
760             }
761           } else {
762             // we can run out of memory if we have too many threads
763             // log the event and sleep for a minute and give
764             // some thread(s) a chance to finish
765             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
766             closeCurrentConnection(key, e);
767             cleanupConnections(true);
768             try {
769               Thread.sleep(60000);
770             } catch (InterruptedException ex) {
771               LOG.debug("Interrupted while sleeping");
772               return;
773             }
774           }
775         } catch (Exception e) {
776           closeCurrentConnection(key, e);
777         }
778         cleanupConnections(false);
779       }
780 
781       LOG.info(getName() + ": stopping");
782 
783       synchronized (this) {
784         try {
785           acceptChannel.close();
786           selector.close();
787         } catch (IOException ignored) {
788           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
789         }
790 
791         selector= null;
792         acceptChannel= null;
793 
794         // clean up all connections
795         while (!connectionList.isEmpty()) {
796           closeConnection(connectionList.remove(0));
797         }
798       }
799     }
800 
801     private void closeCurrentConnection(SelectionKey key, Throwable e) {
802       if (key != null) {
803         Connection c = (Connection)key.attachment();
804         if (c != null) {
805           if (LOG.isDebugEnabled()) {
806             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
807                 (e != null ? " on error " + e.getMessage() : ""));
808           }
809           closeConnection(c);
810           key.attach(null);
811         }
812       }
813     }
814 
815     InetSocketAddress getAddress() {
816       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
817     }
818 
819     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
820       Connection c;
821       ServerSocketChannel server = (ServerSocketChannel) key.channel();
822 
823       SocketChannel channel;
824       while ((channel = server.accept()) != null) {
825         try {
826           channel.configureBlocking(false);
827           channel.socket().setTcpNoDelay(tcpNoDelay);
828           channel.socket().setKeepAlive(tcpKeepAlive);
829         } catch (IOException ioe) {
830           channel.close();
831           throw ioe;
832         }
833 
834         Reader reader = getReader();
835         try {
836           reader.startAdd();
837           SelectionKey readKey = reader.registerChannel(channel);
838           c = getConnection(channel, System.currentTimeMillis());
839           readKey.attach(c);
840           synchronized (connectionList) {
841             connectionList.add(numConnections, c);
842             numConnections++;
843           }
844           if (LOG.isDebugEnabled())
845             LOG.debug(getName() + ": connection from " + c.toString() +
846                 "; # active connections: " + numConnections);
847         } finally {
848           reader.finishAdd();
849         }
850       }
851     }
852 
853     void doRead(SelectionKey key) throws InterruptedException {
854       int count;
855       Connection c = (Connection) key.attachment();
856       if (c == null) {
857         return;
858       }
859       c.setLastContact(System.currentTimeMillis());
860       try {
861         count = c.readAndProcess();
862 
863         if (count > 0) {
864           c.setLastContact(System.currentTimeMillis());
865         }
866 
867       } catch (InterruptedException ieo) {
868         throw ieo;
869       } catch (Exception e) {
870         if (LOG.isDebugEnabled()) {
871           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
872         }
873         count = -1; //so that the (count < 0) block is executed
874       }
875       if (count < 0) {
876         if (LOG.isDebugEnabled()) {
877           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
878               " because read count=" + count +
879               ". Number of active connections: " + numConnections);
880         }
881         closeConnection(c);
882       }
883     }
884 
885     synchronized void doStop() {
886       if (selector != null) {
887         selector.wakeup();
888         Thread.yield();
889       }
890       if (acceptChannel != null) {
891         try {
892           acceptChannel.socket().close();
893         } catch (IOException e) {
894           LOG.info(getName() + ": exception in closing listener socket. " + e);
895         }
896       }
897       readPool.shutdownNow();
898     }
899 
900     // The method that will return the next reader to work with
901     // Simplistic implementation of round robin for now
902     Reader getReader() {
903       currentReader = (currentReader + 1) % readers.length;
904       return readers[currentReader];
905     }
906   }
907 
908   // Sends responses of RPC back to clients.
909   protected class Responder extends Thread {
910     private final Selector writeSelector;
911     private final Set<Connection> writingCons =
912         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
913 
914     Responder() throws IOException {
915       this.setName("RpcServer.responder");
916       this.setDaemon(true);
917       writeSelector = Selector.open(); // create a selector
918     }
919 
920     @Override
921     public void run() {
922       LOG.info(getName() + ": starting");
923       try {
924         doRunLoop();
925       } finally {
926         LOG.info(getName() + ": stopping");
927         try {
928           writeSelector.close();
929         } catch (IOException ioe) {
930           LOG.error(getName() + ": couldn't close write selector", ioe);
931         }
932       }
933     }
934 
935     /**
936      * Take the list of the connections that want to write, and register them
937      * in the selector.
938      */
939     private void registerWrites() {
940       Iterator<Connection> it = writingCons.iterator();
941       while (it.hasNext()) {
942         Connection c = it.next();
943         it.remove();
944         SelectionKey sk = c.channel.keyFor(writeSelector);
945         try {
946           if (sk == null) {
947             try {
948               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
949             } catch (ClosedChannelException e) {
950               // ignore: the client went away.
951               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
952             }
953           } else {
954             sk.interestOps(SelectionKey.OP_WRITE);
955           }
956         } catch (CancelledKeyException e) {
957           // ignore: the client went away.
958           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
959         }
960       }
961     }
962 
963     /**
964      * Add a connection to the list that want to write,
965      */
966     public void registerForWrite(Connection c) {
967       if (writingCons.add(c)) {
968         writeSelector.wakeup();
969       }
970     }
971 
972     private void doRunLoop() {
973       long lastPurgeTime = 0;   // last check for old calls.
974       while (running) {
975         try {
976           registerWrites();
977           int keyCt = writeSelector.select(purgeTimeout);
978           if (keyCt == 0) {
979             continue;
980           }
981 
982           Set<SelectionKey> keys = writeSelector.selectedKeys();
983           Iterator<SelectionKey> iter = keys.iterator();
984           while (iter.hasNext()) {
985             SelectionKey key = iter.next();
986             iter.remove();
987             try {
988               if (key.isValid() && key.isWritable()) {
989                 doAsyncWrite(key);
990               }
991             } catch (IOException e) {
992               LOG.debug(getName() + ": asyncWrite", e);
993             }
994           }
995 
996           lastPurgeTime = purge(lastPurgeTime);
997 
998         } catch (OutOfMemoryError e) {
999           if (errorHandler != null) {
1000             if (errorHandler.checkOOME(e)) {
1001               LOG.info(getName() + ": exiting on OutOfMemoryError");
1002               return;
1003             }
1004           } else {
1005             //
1006             // we can run out of memory if we have too many threads
1007             // log the event and sleep for a minute and give
1008             // some thread(s) a chance to finish
1009             //
1010             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1011             try {
1012               Thread.sleep(60000);
1013             } catch (InterruptedException ex) {
1014               LOG.debug("Interrupted while sleeping");
1015               return;
1016             }
1017           }
1018         } catch (Exception e) {
1019           LOG.warn(getName() + ": exception in Responder " +
1020               StringUtils.stringifyException(e), e);
1021         }
1022       }
1023       LOG.info(getName() + ": stopped");
1024     }
1025 
1026     /**
1027      * If there were some calls that have not been sent out for a
1028      * long time, we close the connection.
1029      * @return the time of the purge.
1030      */
1031     private long purge(long lastPurgeTime) {
1032       long now = System.currentTimeMillis();
1033       if (now < lastPurgeTime + purgeTimeout) {
1034         return lastPurgeTime;
1035       }
1036 
1037       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1038       // get the list of channels from list of keys.
1039       synchronized (writeSelector.keys()) {
1040         for (SelectionKey key : writeSelector.keys()) {
1041           Connection connection = (Connection) key.attachment();
1042           if (connection == null) {
1043             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1044           }
1045           Call call = connection.responseQueue.peekFirst();
1046           if (call != null && now > call.timestamp + purgeTimeout) {
1047             conWithOldCalls.add(call.connection);
1048           }
1049         }
1050       }
1051 
1052       // Seems safer to close the connection outside of the synchronized loop...
1053       for (Connection connection : conWithOldCalls) {
1054         closeConnection(connection);
1055       }
1056 
1057       return now;
1058     }
1059 
1060     private void doAsyncWrite(SelectionKey key) throws IOException {
1061       Connection connection = (Connection) key.attachment();
1062       if (connection == null) {
1063         throw new IOException("doAsyncWrite: no connection");
1064       }
1065       if (key.channel() != connection.channel) {
1066         throw new IOException("doAsyncWrite: bad channel");
1067       }
1068 
1069       if (processAllResponses(connection)) {
1070         try {
1071           // We wrote everything, so we don't need to be told when the socket is ready for
1072           //  write anymore.
1073          key.interestOps(0);
1074         } catch (CancelledKeyException e) {
1075           /* The Listener/reader might have closed the socket.
1076            * We don't explicitly cancel the key, so not sure if this will
1077            * ever fire.
1078            * This warning could be removed.
1079            */
1080           LOG.warn("Exception while changing ops : " + e);
1081         }
1082       }
1083     }
1084 
1085     /**
1086      * Process the response for this call. You need to have the lock on
1087      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1088      *
1089      * @param call the call
1090      * @return true if we proceed the call fully, false otherwise.
1091      * @throws IOException
1092      */
1093     private boolean processResponse(final Call call) throws IOException {
1094       boolean error = true;
1095       try {
1096         // Send as much data as we can in the non-blocking fashion
1097         long numBytes = channelWrite(call.connection.channel, call.response);
1098         if (numBytes < 0) {
1099           throw new HBaseIOException("Error writing on the socket " +
1100             "for the call:" + call.toShortString());
1101         }
1102         error = false;
1103       } finally {
1104         if (error) {
1105           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1106           closeConnection(call.connection);
1107         }
1108       }
1109 
1110       if (!call.response.hasRemaining()) {
1111         call.done();
1112         return true;
1113       } else {
1114         return false; // Socket can't take more, we will have to come back.
1115       }
1116     }
1117 
1118     /**
1119      * Process all the responses for this connection
1120      *
1121      * @return true if all the calls were processed or that someone else is doing it.
1122      * false if there * is still some work to do. In this case, we expect the caller to
1123      * delay us.
1124      * @throws IOException
1125      */
1126     private boolean processAllResponses(final Connection connection) throws IOException {
1127       // We want only one writer on the channel for a connection at a time.
1128       connection.responseWriteLock.lock();
1129       try {
1130         for (int i = 0; i < 20; i++) {
1131           // protection if some handlers manage to need all the responder
1132           Call call = connection.responseQueue.pollFirst();
1133           if (call == null) {
1134             return true;
1135           }
1136           if (!processResponse(call)) {
1137             connection.responseQueue.addFirst(call);
1138             return false;
1139           }
1140         }
1141       } finally {
1142         connection.responseWriteLock.unlock();
1143       }
1144 
1145       return connection.responseQueue.isEmpty();
1146     }
1147 
1148     //
1149     // Enqueue a response from the application.
1150     //
1151     void doRespond(Call call) throws IOException {
1152       boolean added = false;
1153 
1154       // If there is already a write in progress, we don't wait. This allows to free the handlers
1155       //  immediately for other tasks.
1156       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1157         try {
1158           if (call.connection.responseQueue.isEmpty()) {
1159             // If we're alone, we can try to do a direct call to the socket. It's
1160             //  an optimisation to save on context switches and data transfer between cores..
1161             if (processResponse(call)) {
1162               return; // we're done.
1163             }
1164             // Too big to fit, putting ahead.
1165             call.connection.responseQueue.addFirst(call);
1166             added = true; // We will register to the selector later, outside of the lock.
1167           }
1168         } finally {
1169           call.connection.responseWriteLock.unlock();
1170         }
1171       }
1172 
1173       if (!added) {
1174         call.connection.responseQueue.addLast(call);
1175       }
1176       call.responder.registerForWrite(call.connection);
1177 
1178       // set the serve time when the response has to be sent later
1179       call.timestamp = System.currentTimeMillis();
1180     }
1181   }
1182 
1183   /** Reads calls from a connection and queues them for handling. */
1184   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1185       value="VO_VOLATILE_INCREMENT",
1186       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1187   public class Connection {
1188     // If initial preamble with version and magic has been read or not.
1189     private boolean connectionPreambleRead = false;
1190     // If the connection header has been read or not.
1191     private boolean connectionHeaderRead = false;
1192     protected SocketChannel channel;
1193     private ByteBuffer data;
1194     private ByteBuffer dataLengthBuffer;
1195     private ByteBuffer preambleBuffer;
1196     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1197     private final Lock responseWriteLock = new ReentrantLock();
1198     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1199     private long lastContact;
1200     private InetAddress addr;
1201     protected Socket socket;
1202     // Cache the remote host & port info so that even if the socket is
1203     // disconnected, we can say where it used to connect to.
1204     protected String hostAddress;
1205     protected int remotePort;
1206     ConnectionHeader connectionHeader;
1207     /**
1208      * Codec the client asked use.
1209      */
1210     private Codec codec;
1211     /**
1212      * Compression codec the client asked us use.
1213      */
1214     private CompressionCodec compressionCodec;
1215     BlockingService service;
1216 
1217     private AuthMethod authMethod;
1218     private boolean saslContextEstablished;
1219     private boolean skipInitialSaslHandshake;
1220     private ByteBuffer unwrappedData;
1221     // When is this set?  FindBugs wants to know!  Says NP
1222     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1223     boolean useSasl;
1224     SaslServer saslServer;
1225     private boolean useWrap = false;
1226     // Fake 'call' for failed authorization response
1227     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1228     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1229         null, null, this, null, 0, null, null);
1230     private ByteArrayOutputStream authFailedResponse =
1231         new ByteArrayOutputStream();
1232     // Fake 'call' for SASL context setup
1233     private static final int SASL_CALLID = -33;
1234     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1235         0, null, null);
1236 
1237     // was authentication allowed with a fallback to simple auth
1238     private boolean authenticatedWithFallback;
1239 
1240     private boolean retryImmediatelySupported = false;
1241 
1242     public UserGroupInformation attemptingUser = null; // user name before auth
1243     protected User user = null;
1244     protected UserGroupInformation ugi = null;
1245 
1246     public Connection(SocketChannel channel, long lastContact) {
1247       this.channel = channel;
1248       this.lastContact = lastContact;
1249       this.data = null;
1250       this.dataLengthBuffer = ByteBuffer.allocate(4);
1251       this.socket = channel.socket();
1252       this.addr = socket.getInetAddress();
1253       if (addr == null) {
1254         this.hostAddress = "*Unknown*";
1255       } else {
1256         this.hostAddress = addr.getHostAddress();
1257       }
1258       this.remotePort = socket.getPort();
1259       if (socketSendBufferSize != 0) {
1260         try {
1261           socket.setSendBufferSize(socketSendBufferSize);
1262         } catch (IOException e) {
1263           LOG.warn("Connection: unable to set socket send buffer size to " +
1264                    socketSendBufferSize);
1265         }
1266       }
1267     }
1268 
1269       @Override
1270     public String toString() {
1271       return getHostAddress() + ":" + remotePort;
1272     }
1273 
1274     public String getHostAddress() {
1275       return hostAddress;
1276     }
1277 
1278     public InetAddress getHostInetAddress() {
1279       return addr;
1280     }
1281 
1282     public int getRemotePort() {
1283       return remotePort;
1284     }
1285 
1286     public void setLastContact(long lastContact) {
1287       this.lastContact = lastContact;
1288     }
1289 
1290     public VersionInfo getVersionInfo() {
1291       if (connectionHeader.hasVersionInfo()) {
1292         return connectionHeader.getVersionInfo();
1293       }
1294       return null;
1295     }
1296 
1297     /* Return true if the connection has no outstanding rpc */
1298     private boolean isIdle() {
1299       return rpcCount.get() == 0;
1300     }
1301 
1302     /* Decrement the outstanding RPC count */
1303     protected void decRpcCount() {
1304       rpcCount.decrement();
1305     }
1306 
1307     /* Increment the outstanding RPC count */
1308     protected void incRpcCount() {
1309       rpcCount.increment();
1310     }
1311 
1312     protected boolean timedOut(long currentTime) {
1313       return isIdle() && currentTime - lastContact > maxIdleTime;
1314     }
1315 
1316     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1317         throws IOException {
1318       UserGroupInformation authorizedUgi;
1319       if (authMethod == AuthMethod.DIGEST) {
1320         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1321             secretManager);
1322         authorizedUgi = tokenId.getUser();
1323         if (authorizedUgi == null) {
1324           throw new AccessDeniedException(
1325               "Can't retrieve username from tokenIdentifier.");
1326         }
1327         authorizedUgi.addTokenIdentifier(tokenId);
1328       } else {
1329         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1330       }
1331       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1332       return authorizedUgi;
1333     }
1334 
1335     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1336         InterruptedException {
1337       if (saslContextEstablished) {
1338         if (LOG.isTraceEnabled())
1339           LOG.trace("Have read input token of size " + saslToken.length
1340               + " for processing by saslServer.unwrap()");
1341 
1342         if (!useWrap) {
1343           processOneRpc(saslToken);
1344         } else {
1345           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1346           processUnwrappedData(plaintextData);
1347         }
1348       } else {
1349         byte[] replyToken;
1350         try {
1351           if (saslServer == null) {
1352             switch (authMethod) {
1353             case DIGEST:
1354               if (secretManager == null) {
1355                 throw new AccessDeniedException(
1356                     "Server is not configured to do DIGEST authentication.");
1357               }
1358               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1359                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1360                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1361                       secretManager, this));
1362               break;
1363             default:
1364               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1365               String fullName = current.getUserName();
1366               if (LOG.isDebugEnabled()) {
1367                 LOG.debug("Kerberos principal name is " + fullName);
1368               }
1369               final String names[] = SaslUtil.splitKerberosName(fullName);
1370               if (names.length != 3) {
1371                 throw new AccessDeniedException(
1372                     "Kerberos principal name does NOT have the expected "
1373                         + "hostname part: " + fullName);
1374               }
1375               current.doAs(new PrivilegedExceptionAction<Object>() {
1376                 @Override
1377                 public Object run() throws SaslException {
1378                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1379                       .getMechanismName(), names[0], names[1],
1380                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1381                   return null;
1382                 }
1383               });
1384             }
1385             if (saslServer == null)
1386               throw new AccessDeniedException(
1387                   "Unable to find SASL server implementation for "
1388                       + authMethod.getMechanismName());
1389             if (LOG.isDebugEnabled()) {
1390               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1391             }
1392           }
1393           if (LOG.isDebugEnabled()) {
1394             LOG.debug("Have read input token of size " + saslToken.length
1395                 + " for processing by saslServer.evaluateResponse()");
1396           }
1397           replyToken = saslServer.evaluateResponse(saslToken);
1398         } catch (IOException e) {
1399           IOException sendToClient = e;
1400           Throwable cause = e;
1401           while (cause != null) {
1402             if (cause instanceof InvalidToken) {
1403               sendToClient = (InvalidToken) cause;
1404               break;
1405             }
1406             cause = cause.getCause();
1407           }
1408           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1409             sendToClient.getLocalizedMessage());
1410           metrics.authenticationFailure();
1411           String clientIP = this.toString();
1412           // attempting user could be null
1413           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1414           throw e;
1415         }
1416         if (replyToken != null) {
1417           if (LOG.isDebugEnabled()) {
1418             LOG.debug("Will send token of size " + replyToken.length
1419                 + " from saslServer.");
1420           }
1421           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1422               null);
1423         }
1424         if (saslServer.isComplete()) {
1425           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1426           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1427           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1428           if (LOG.isDebugEnabled()) {
1429             LOG.debug("SASL server context established. Authenticated client: "
1430               + ugi + ". Negotiated QoP is "
1431               + saslServer.getNegotiatedProperty(Sasl.QOP));
1432           }
1433           metrics.authenticationSuccess();
1434           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1435           saslContextEstablished = true;
1436         }
1437       }
1438     }
1439 
1440     /**
1441      * No protobuf encoding of raw sasl messages
1442      */
1443     private void doRawSaslReply(SaslStatus status, Writable rv,
1444         String errorClass, String error) throws IOException {
1445       ByteBufferOutputStream saslResponse = null;
1446       DataOutputStream out = null;
1447       try {
1448         // In my testing, have noticed that sasl messages are usually
1449         // in the ballpark of 100-200. That's why the initial capacity is 256.
1450         saslResponse = new ByteBufferOutputStream(256);
1451         out = new DataOutputStream(saslResponse);
1452         out.writeInt(status.state); // write status
1453         if (status == SaslStatus.SUCCESS) {
1454           rv.write(out);
1455         } else {
1456           WritableUtils.writeString(out, errorClass);
1457           WritableUtils.writeString(out, error);
1458         }
1459         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1460         saslCall.responder = responder;
1461         saslCall.sendResponseIfReady();
1462       } finally {
1463         if (saslResponse != null) {
1464           saslResponse.close();
1465         }
1466         if (out != null) {
1467           out.close();
1468         }
1469       }
1470     }
1471 
1472     private void disposeSasl() {
1473       if (saslServer != null) {
1474         try {
1475           saslServer.dispose();
1476           saslServer = null;
1477         } catch (SaslException ignored) {
1478           // Ignored. This is being disposed of anyway.
1479         }
1480       }
1481     }
1482 
1483     private int readPreamble() throws IOException {
1484       if (preambleBuffer == null) {
1485         preambleBuffer = ByteBuffer.allocate(6);
1486       }
1487       int count = channelRead(channel, preambleBuffer);
1488       if (count < 0 || preambleBuffer.remaining() > 0) {
1489         return count;
1490       }
1491       // Check for 'HBas' magic.
1492       preambleBuffer.flip();
1493       for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
1494         if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
1495           return doBadPreambleHandling("Expected HEADER=" +
1496               Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
1497               Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
1498               " from " + toString());
1499         }
1500       }
1501       int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
1502       byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
1503       this.authMethod = AuthMethod.valueOf(authbyte);
1504       if (version != CURRENT_VERSION) {
1505         String msg = getFatalConnectionString(version, authbyte);
1506         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1507       }
1508       if (authMethod == null) {
1509         String msg = getFatalConnectionString(version, authbyte);
1510         return doBadPreambleHandling(msg, new BadAuthException(msg));
1511       }
1512       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1513         if (allowFallbackToSimpleAuth) {
1514           metrics.authenticationFallback();
1515           authenticatedWithFallback = true;
1516         } else {
1517           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1518           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1519           responder.doRespond(authFailedCall);
1520           throw ae;
1521         }
1522       }
1523       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1524         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1525             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1526         authMethod = AuthMethod.SIMPLE;
1527         // client has already sent the initial Sasl message and we
1528         // should ignore it. Both client and server should fall back
1529         // to simple auth from now on.
1530         skipInitialSaslHandshake = true;
1531       }
1532       if (authMethod != AuthMethod.SIMPLE) {
1533         useSasl = true;
1534       }
1535 
1536       preambleBuffer = null; // do not need it anymore
1537       connectionPreambleRead = true;
1538       return count;
1539     }
1540 
1541     private int read4Bytes() throws IOException {
1542       if (this.dataLengthBuffer.remaining() > 0) {
1543         return channelRead(channel, this.dataLengthBuffer);
1544       } else {
1545         return 0;
1546       }
1547     }
1548 
1549 
1550     /**
1551      * Read off the wire. If there is not enough data to read, update the connection state with
1552      *  what we have and returns.
1553      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1554      * @throws IOException
1555      * @throws InterruptedException
1556      */
1557     public int readAndProcess() throws IOException, InterruptedException {
1558       // If we have not read the connection setup preamble, look to see if that is on the wire.
1559       if (!connectionPreambleRead) {
1560         int count = readPreamble();
1561         if (!connectionPreambleRead) {
1562           return count;
1563         }
1564       }
1565       // Try and read in an int. It will be length of the data to read (or -1 if a ping). We catch
1566       // the integer length into the 4-byte this.dataLengthBuffer.
1567       int count = read4Bytes();
1568       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1569         return count;
1570       }
1571 
1572       // If we have not read the connection setup preamble, look to see if that is on the wire.
1573       if (!connectionPreambleRead) {
1574         count = readPreamble();
1575         if (!connectionPreambleRead) {
1576           return count;
1577         }
1578 
1579         count = read4Bytes();
1580         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1581           return count;
1582         }
1583       }
1584 
1585       // We have read a length and we have read the preamble.  It is either the connection header
1586       // or it is a request.
1587       if (data == null) {
1588         dataLengthBuffer.flip();
1589         int dataLength = dataLengthBuffer.getInt();
1590         if (dataLength == RpcClient.PING_CALL_ID) {
1591           if (!useWrap) { //covers the !useSasl too
1592             dataLengthBuffer.clear();
1593             return 0;  //ping message
1594           }
1595         }
1596         if (dataLength < 0) { // A data length of zero is legal.
1597           throw new IllegalArgumentException("Unexpected data length "
1598               + dataLength + "!! from " + getHostAddress());
1599         }
1600         data = ByteBuffer.allocate(dataLength);
1601 
1602         // Increment the rpc count. This counter will be decreased when we write
1603         //  the response.  If we want the connection to be detected as idle properly, we
1604         //  need to keep the inc / dec correct.
1605         incRpcCount();
1606       }
1607 
1608       count = channelRead(channel, data);
1609 
1610       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1611         process();
1612       }
1613 
1614       return count;
1615     }
1616 
1617     /**
1618      * Process the data buffer and clean the connection state for the next call.
1619      */
1620     private void process() throws IOException, InterruptedException {
1621       data.flip();
1622       try {
1623         if (skipInitialSaslHandshake) {
1624           skipInitialSaslHandshake = false;
1625           return;
1626         }
1627 
1628         if (useSasl) {
1629           saslReadAndProcess(data.array());
1630         } else {
1631           processOneRpc(data.array());
1632         }
1633 
1634       } finally {
1635         dataLengthBuffer.clear(); // Clean for the next call
1636         data = null; // For the GC
1637       }
1638     }
1639 
1640     private String getFatalConnectionString(final int version, final byte authByte) {
1641       return "serverVersion=" + CURRENT_VERSION +
1642       ", clientVersion=" + version + ", authMethod=" + authByte +
1643       ", authSupported=" + (authMethod != null) + " from " + toString();
1644     }
1645 
1646     private int doBadPreambleHandling(final String msg) throws IOException {
1647       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1648     }
1649 
1650     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1651       LOG.warn(msg);
1652       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1653       setupResponse(null, fakeCall, e, msg);
1654       responder.doRespond(fakeCall);
1655       // Returning -1 closes out the connection.
1656       return -1;
1657     }
1658 
1659     // Reads the connection header following version
1660     private void processConnectionHeader(byte[] buf) throws IOException {
1661       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1662       String serviceName = connectionHeader.getServiceName();
1663       if (serviceName == null) throw new EmptyServiceNameException();
1664       this.service = getService(services, serviceName);
1665       if (this.service == null) throw new UnknownServiceException(serviceName);
1666       setupCellBlockCodecs(this.connectionHeader);
1667       UserGroupInformation protocolUser = createUser(connectionHeader);
1668       if (!useSasl) {
1669         ugi = protocolUser;
1670         if (ugi != null) {
1671           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1672         }
1673         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1674         if (authenticatedWithFallback) {
1675           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1676               + " connecting from " + getHostAddress());
1677         }
1678         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1679       } else {
1680         // user is authenticated
1681         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1682         //Now we check if this is a proxy user case. If the protocol user is
1683         //different from the 'user', it is a proxy user scenario. However,
1684         //this is not allowed if user authenticated with DIGEST.
1685         if ((protocolUser != null)
1686             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1687           if (authMethod == AuthMethod.DIGEST) {
1688             // Not allowed to doAs if token authentication is used
1689             throw new AccessDeniedException("Authenticated user (" + ugi
1690                 + ") doesn't match what the client claims to be ("
1691                 + protocolUser + ")");
1692           } else {
1693             // Effective user can be different from authenticated user
1694             // for simple auth or kerberos auth
1695             // The user is the real user. Now we create a proxy user
1696             UserGroupInformation realUser = ugi;
1697             ugi = UserGroupInformation.createProxyUser(protocolUser
1698                 .getUserName(), realUser);
1699             // Now the user is a proxy user, set Authentication method Proxy.
1700             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1701           }
1702         }
1703       }
1704       if (connectionHeader.hasVersionInfo()) {
1705         // see if this connection will support RetryImmediatelyException
1706         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1707 
1708         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1709             + " with version info: "
1710             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1711       } else {
1712         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1713             + " with unknown version info");
1714       }
1715 
1716 
1717     }
1718 
1719     /**
1720      * Set up cell block codecs
1721      * @throws FatalConnectionException
1722      */
1723     private void setupCellBlockCodecs(final ConnectionHeader header)
1724     throws FatalConnectionException {
1725       // TODO: Plug in other supported decoders.
1726       if (!header.hasCellBlockCodecClass()) return;
1727       String className = header.getCellBlockCodecClass();
1728       if (className == null || className.length() == 0) return;
1729       try {
1730         this.codec = (Codec)Class.forName(className).newInstance();
1731       } catch (Exception e) {
1732         throw new UnsupportedCellCodecException(className, e);
1733       }
1734       if (!header.hasCellBlockCompressorClass()) return;
1735       className = header.getCellBlockCompressorClass();
1736       try {
1737         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1738       } catch (Exception e) {
1739         throw new UnsupportedCompressionCodecException(className, e);
1740       }
1741     }
1742 
1743     private void processUnwrappedData(byte[] inBuf) throws IOException,
1744     InterruptedException {
1745       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1746       // Read all RPCs contained in the inBuf, even partial ones
1747       while (true) {
1748         int count;
1749         if (unwrappedDataLengthBuffer.remaining() > 0) {
1750           count = channelRead(ch, unwrappedDataLengthBuffer);
1751           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1752             return;
1753         }
1754 
1755         if (unwrappedData == null) {
1756           unwrappedDataLengthBuffer.flip();
1757           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1758 
1759           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1760             if (LOG.isDebugEnabled())
1761               LOG.debug("Received ping message");
1762             unwrappedDataLengthBuffer.clear();
1763             continue; // ping message
1764           }
1765           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1766         }
1767 
1768         count = channelRead(ch, unwrappedData);
1769         if (count <= 0 || unwrappedData.remaining() > 0)
1770           return;
1771 
1772         if (unwrappedData.remaining() == 0) {
1773           unwrappedDataLengthBuffer.clear();
1774           unwrappedData.flip();
1775           processOneRpc(unwrappedData.array());
1776           unwrappedData = null;
1777         }
1778       }
1779     }
1780 
1781     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1782       if (connectionHeaderRead) {
1783         processRequest(buf);
1784       } else {
1785         processConnectionHeader(buf);
1786         this.connectionHeaderRead = true;
1787         if (!authorizeConnection()) {
1788           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1789           // down the connection instead of trying to read non-existent retun.
1790           throw new AccessDeniedException("Connection from " + this + " for service " +
1791             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1792         }
1793         this.user = userProvider.create(this.ugi);
1794       }
1795     }
1796 
1797     /**
1798      * @param buf Has the request header and the request param and optionally encoded data buffer
1799      * all in this one array.
1800      * @throws IOException
1801      * @throws InterruptedException
1802      */
1803     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1804       long totalRequestSize = buf.length;
1805       int offset = 0;
1806       // Here we read in the header.  We avoid having pb
1807       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1808       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1809       int headerSize = cis.readRawVarint32();
1810       offset = cis.getTotalBytesRead();
1811       Message.Builder builder = RequestHeader.newBuilder();
1812       ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1813       RequestHeader header = (RequestHeader) builder.build();
1814       offset += headerSize;
1815       int id = header.getCallId();
1816       if (LOG.isTraceEnabled()) {
1817         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1818           " totalRequestSize: " + totalRequestSize + " bytes");
1819       }
1820       // Enforcing the call queue size, this triggers a retry in the client
1821       // This is a bit late to be doing this check - we have already read in the total request.
1822       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1823         final Call callTooBig =
1824           new Call(id, this.service, null, null, null, null, this,
1825             responder, totalRequestSize, null, null);
1826         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1827         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1828         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1829             "Call queue is full on " + server.getServerName() +
1830                 ", is hbase.ipc.server.max.callqueue.size too small?");
1831         responder.doRespond(callTooBig);
1832         return;
1833       }
1834       MethodDescriptor md = null;
1835       Message param = null;
1836       CellScanner cellScanner = null;
1837       try {
1838         if (header.hasRequestParam() && header.getRequestParam()) {
1839           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1840           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1841           builder = this.service.getRequestPrototype(md).newBuilderForType();
1842           // To read the varint, I need an inputstream; might as well be a CIS.
1843           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1844           int paramSize = cis.readRawVarint32();
1845           offset += cis.getTotalBytesRead();
1846           if (builder != null) {
1847             ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1848             param = builder.build();
1849           }
1850           offset += paramSize;
1851         }
1852         if (header.hasCellBlockMeta()) {
1853           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1854             buf, offset, buf.length);
1855         }
1856       } catch (Throwable t) {
1857         InetSocketAddress address = getListenerAddress();
1858         String msg = (address != null ? address : "(channel closed)") +
1859             " is unable to read call parameter from client " + getHostAddress();
1860         LOG.warn(msg, t);
1861 
1862         metrics.exception(t);
1863 
1864         // probably the hbase hadoop version does not match the running hadoop version
1865         if (t instanceof LinkageError) {
1866           t = new DoNotRetryIOException(t);
1867         }
1868         // If the method is not present on the server, do not retry.
1869         if (t instanceof UnsupportedOperationException) {
1870           t = new DoNotRetryIOException(t);
1871         }
1872 
1873         final Call readParamsFailedCall =
1874           new Call(id, this.service, null, null, null, null, this,
1875             responder, totalRequestSize, null, null);
1876         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1877         setupResponse(responseBuffer, readParamsFailedCall, t,
1878           msg + "; " + t.getMessage());
1879         responder.doRespond(readParamsFailedCall);
1880         return;
1881       }
1882 
1883       TraceInfo traceInfo = header.hasTraceInfo()
1884           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1885           : null;
1886       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1887               totalRequestSize, traceInfo, this.addr);
1888 
1889       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1890         callQueueSize.add(-1 * call.getSize());
1891 
1892         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1893         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1894         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1895             "Call queue is full on " + server.getServerName() +
1896                 ", too many items queued ?");
1897         responder.doRespond(call);
1898       }
1899     }
1900 
1901     private boolean authorizeConnection() throws IOException {
1902       try {
1903         // If auth method is DIGEST, the token was obtained by the
1904         // real user for the effective user, therefore not required to
1905         // authorize real user. doAs is allowed only for simple or kerberos
1906         // authentication
1907         if (ugi != null && ugi.getRealUser() != null
1908             && (authMethod != AuthMethod.DIGEST)) {
1909           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1910         }
1911         authorize(ugi, connectionHeader, getHostInetAddress());
1912         metrics.authorizationSuccess();
1913       } catch (AuthorizationException ae) {
1914         if (LOG.isDebugEnabled()) {
1915           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1916         }
1917         metrics.authorizationFailure();
1918         setupResponse(authFailedResponse, authFailedCall,
1919           new AccessDeniedException(ae), ae.getMessage());
1920         responder.doRespond(authFailedCall);
1921         return false;
1922       }
1923       return true;
1924     }
1925 
1926     protected synchronized void close() {
1927       disposeSasl();
1928       data = null;
1929       if (!channel.isOpen())
1930         return;
1931       try {socket.shutdownOutput();} catch(Exception ignored) {
1932         if (LOG.isTraceEnabled()) {
1933           LOG.trace(ignored);
1934         }
1935       }
1936       if (channel.isOpen()) {
1937         try {channel.close();} catch(Exception ignored) {
1938           if (LOG.isTraceEnabled()) {
1939             LOG.trace(ignored);
1940           }
1941         }
1942       }
1943       try {socket.close();} catch(Exception ignored) {
1944         if (LOG.isTraceEnabled()) {
1945           LOG.trace(ignored);
1946         }
1947       }
1948     }
1949 
1950     private UserGroupInformation createUser(ConnectionHeader head) {
1951       UserGroupInformation ugi = null;
1952 
1953       if (!head.hasUserInfo()) {
1954         return null;
1955       }
1956       UserInformation userInfoProto = head.getUserInfo();
1957       String effectiveUser = null;
1958       if (userInfoProto.hasEffectiveUser()) {
1959         effectiveUser = userInfoProto.getEffectiveUser();
1960       }
1961       String realUser = null;
1962       if (userInfoProto.hasRealUser()) {
1963         realUser = userInfoProto.getRealUser();
1964       }
1965       if (effectiveUser != null) {
1966         if (realUser != null) {
1967           UserGroupInformation realUserUgi =
1968               UserGroupInformation.createRemoteUser(realUser);
1969           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1970         } else {
1971           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1972         }
1973       }
1974       return ugi;
1975     }
1976   }
1977 
1978   /**
1979    * Datastructure for passing a {@link BlockingService} and its associated class of
1980    * protobuf service interface.  For example, a server that fielded what is defined
1981    * in the client protobuf service would pass in an implementation of the client blocking service
1982    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1983    */
1984   public static class BlockingServiceAndInterface {
1985     private final BlockingService service;
1986     private final Class<?> serviceInterface;
1987     public BlockingServiceAndInterface(final BlockingService service,
1988         final Class<?> serviceInterface) {
1989       this.service = service;
1990       this.serviceInterface = serviceInterface;
1991     }
1992     public Class<?> getServiceInterface() {
1993       return this.serviceInterface;
1994     }
1995     public BlockingService getBlockingService() {
1996       return this.service;
1997     }
1998   }
1999 
2000   /**
2001    * Constructs a server listening on the named port and address.
2002    * @param server hosting instance of {@link Server}. We will do authentications if an
2003    * instance else pass null for no authentication check.
2004    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2005    * @param services A list of services.
2006    * @param bindAddress Where to listen
2007    * @param conf
2008    * @param scheduler
2009    */
2010   public RpcServer(final Server server, final String name,
2011       final List<BlockingServiceAndInterface> services,
2012       final InetSocketAddress bindAddress, Configuration conf,
2013       RpcScheduler scheduler)
2014       throws IOException {
2015 
2016     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2017       this.reservoir = new BoundedByteBufferPool(
2018           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2019           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2020           // Make the max twice the number of handlers to be safe.
2021           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2022               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2023                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2024     } else {
2025       reservoir = null;
2026     }
2027     
2028     this.server = server;
2029     this.services = services;
2030     this.bindAddress = bindAddress;
2031     this.conf = conf;
2032     this.socketSendBufferSize = 0;
2033     this.maxQueueSize =
2034       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2035     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2036     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2037     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2038     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2039     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2040       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2041     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2042     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2043 
2044     // Start the listener here and let it bind to the port
2045     listener = new Listener(name);
2046     this.port = listener.getAddress().getPort();
2047 
2048     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2049     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2050     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2051 
2052     this.ipcUtil = new IPCUtil(conf);
2053 
2054 
2055     // Create the responder here
2056     responder = new Responder();
2057     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2058     this.userProvider = UserProvider.instantiate(conf);
2059     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2060     if (isSecurityEnabled) {
2061       HBaseSaslRpcServer.init(conf);
2062     }
2063     initReconfigurable(conf);
2064 
2065     this.scheduler = scheduler;
2066     this.scheduler.init(new RpcSchedulerContext(this));
2067   }
2068 
2069   @Override
2070   public void onConfigurationChange(Configuration newConf) {
2071     initReconfigurable(newConf);
2072   }
2073 
2074   private void initReconfigurable(Configuration confToLoad) {
2075     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2076     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2077       LOG.warn("********* WARNING! *********");
2078       LOG.warn("This server is configured to allow connections from INSECURE clients");
2079       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2080       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2081       LOG.warn("impersonation is possible!");
2082       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2083       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2084       LOG.warn("****************************");
2085     }
2086   }
2087 
2088   /**
2089    * Subclasses of HBaseServer can override this to provide their own
2090    * Connection implementations.
2091    */
2092   protected Connection getConnection(SocketChannel channel, long time) {
2093     return new Connection(channel, time);
2094   }
2095 
2096   /**
2097    * Setup response for the RPC Call.
2098    *
2099    * @param response buffer to serialize the response into
2100    * @param call {@link Call} to which we are setting up the response
2101    * @param error error message, if the call failed
2102    * @throws IOException
2103    */
2104   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2105   throws IOException {
2106     if (response != null) response.reset();
2107     call.setResponse(null, null, t, error);
2108   }
2109 
2110   protected void closeConnection(Connection connection) {
2111     synchronized (connectionList) {
2112       if (connectionList.remove(connection)) {
2113         numConnections--;
2114       }
2115     }
2116     connection.close();
2117   }
2118 
2119   Configuration getConf() {
2120     return conf;
2121   }
2122 
2123   /** Sets the socket buffer size used for responding to RPCs.
2124    * @param size send size
2125    */
2126   @Override
2127   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2128 
2129   @Override
2130   public boolean isStarted() {
2131     return this.started;
2132   }
2133 
2134   /** Starts the service.  Must be called before any calls will be handled. */
2135   @Override
2136   public synchronized void start() {
2137     if (started) return;
2138     authTokenSecretMgr = createSecretManager();
2139     if (authTokenSecretMgr != null) {
2140       setSecretManager(authTokenSecretMgr);
2141       authTokenSecretMgr.start();
2142     }
2143     this.authManager = new ServiceAuthorizationManager();
2144     HBasePolicyProvider.init(conf, authManager);
2145     responder.start();
2146     listener.start();
2147     scheduler.start();
2148     started = true;
2149   }
2150 
2151   @Override
2152   public synchronized void refreshAuthManager(PolicyProvider pp) {
2153     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2154     // it'll break if you go via static route.
2155     this.authManager.refresh(this.conf, pp);
2156   }
2157 
2158   private AuthenticationTokenSecretManager createSecretManager() {
2159     if (!isSecurityEnabled) return null;
2160     if (server == null) return null;
2161     Configuration conf = server.getConfiguration();
2162     long keyUpdateInterval =
2163         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2164     long maxAge =
2165         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2166     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2167         server.getServerName().toString(), keyUpdateInterval, maxAge);
2168   }
2169 
2170   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2171     return this.secretManager;
2172   }
2173 
2174   @SuppressWarnings("unchecked")
2175   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2176     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2177   }
2178 
2179   /**
2180    * This is a server side method, which is invoked over RPC. On success
2181    * the return response has protobuf response payload. On failure, the
2182    * exception name and the stack trace are returned in the protobuf response.
2183    */
2184   @Override
2185   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2186       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2187   throws IOException {
2188     try {
2189       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2190       // TODO: Review after we add in encoded data blocks.
2191       status.setRPCPacket(param);
2192       status.resume("Servicing call");
2193       //get an instance of the method arg type
2194       long startTime = System.currentTimeMillis();
2195       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2196       Message result = service.callBlockingMethod(md, controller, param);
2197       long endTime = System.currentTimeMillis();
2198       int processingTime = (int) (endTime - startTime);
2199       int qTime = (int) (startTime - receiveTime);
2200       int totalTime = (int) (endTime - receiveTime);
2201       if (LOG.isTraceEnabled()) {
2202         LOG.trace(CurCall.get().toString() +
2203             ", response " + TextFormat.shortDebugString(result) +
2204             " queueTime: " + qTime +
2205             " processingTime: " + processingTime +
2206             " totalTime: " + totalTime);
2207       }
2208       long requestSize = param.getSerializedSize();
2209       long responseSize = result.getSerializedSize();
2210       metrics.dequeuedCall(qTime);
2211       metrics.processedCall(processingTime);
2212       metrics.totalCall(totalTime);
2213       metrics.receivedRequest(requestSize);
2214       metrics.sentResponse(responseSize);
2215       // log any RPC responses that are slower than the configured warn
2216       // response time or larger than configured warning size
2217       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2218       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2219       if (tooSlow || tooLarge) {
2220         // when tagging, we let TooLarge trump TooSmall to keep output simple
2221         // note that large responses will often also be slow.
2222         logResponse(param,
2223             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2224             (tooLarge ? "TooLarge" : "TooSlow"),
2225             status.getClient(), startTime, processingTime, qTime,
2226             responseSize);
2227       }
2228       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2229     } catch (Throwable e) {
2230       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2231       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2232       // need to pass it over the wire.
2233       if (e instanceof ServiceException) {
2234         if (e.getCause() == null) {
2235           LOG.debug("Caught a ServiceException with null cause", e);
2236         } else {
2237           e = e.getCause();
2238         }
2239       }
2240 
2241       // increment the number of requests that were exceptions.
2242       metrics.exception(e);
2243 
2244       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2245       if (e instanceof IOException) throw (IOException)e;
2246       LOG.error("Unexpected throwable object ", e);
2247       throw new IOException(e.getMessage(), e);
2248     }
2249   }
2250 
2251   /**
2252    * Logs an RPC response to the LOG file, producing valid JSON objects for
2253    * client Operations.
2254    * @param param The parameters received in the call.
2255    * @param methodName The name of the method invoked
2256    * @param call The string representation of the call
2257    * @param tag  The tag that will be used to indicate this event in the log.
2258    * @param clientAddress   The address of the client who made this call.
2259    * @param startTime       The time that the call was initiated, in ms.
2260    * @param processingTime  The duration that the call took to run, in ms.
2261    * @param qTime           The duration that the call spent on the queue
2262    *                        prior to being initiated, in ms.
2263    * @param responseSize    The size in bytes of the response buffer.
2264    */
2265   void logResponse(Message param, String methodName, String call, String tag,
2266       String clientAddress, long startTime, int processingTime, int qTime,
2267       long responseSize)
2268           throws IOException {
2269     // base information that is reported regardless of type of call
2270     Map<String, Object> responseInfo = new HashMap<String, Object>();
2271     responseInfo.put("starttimems", startTime);
2272     responseInfo.put("processingtimems", processingTime);
2273     responseInfo.put("queuetimems", qTime);
2274     responseInfo.put("responsesize", responseSize);
2275     responseInfo.put("client", clientAddress);
2276     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2277     responseInfo.put("method", methodName);
2278     responseInfo.put("call", call);
2279     responseInfo.put("param", ProtobufUtil.getShortTextFormat(param));
2280     if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
2281       ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
2282       if (request.hasScannerId()) {
2283         long scannerId = request.getScannerId();
2284         String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
2285         if (scanDetails != null) {
2286           responseInfo.put("scandetails", scanDetails);
2287         }
2288       }
2289     }
2290     LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2291   }
2292 
2293   /** Stops the service.  No new calls will be handled after this is called. */
2294   @Override
2295   public synchronized void stop() {
2296     LOG.info("Stopping server on " + port);
2297     running = false;
2298     if (authTokenSecretMgr != null) {
2299       authTokenSecretMgr.stop();
2300       authTokenSecretMgr = null;
2301     }
2302     listener.interrupt();
2303     listener.doStop();
2304     responder.interrupt();
2305     scheduler.stop();
2306     notifyAll();
2307   }
2308 
2309   /** Wait for the server to be stopped.
2310    * Does not wait for all subthreads to finish.
2311    *  See {@link #stop()}.
2312    * @throws InterruptedException e
2313    */
2314   @Override
2315   public synchronized void join() throws InterruptedException {
2316     while (running) {
2317       wait();
2318     }
2319   }
2320 
2321   /**
2322    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2323    * the listener channel is closed.
2324    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2325    * information cannot be determined
2326    */
2327   @Override
2328   public synchronized InetSocketAddress getListenerAddress() {
2329     if (listener == null) {
2330       return null;
2331     }
2332     return listener.getAddress();
2333   }
2334 
2335   /**
2336    * Set the handler for calling out of RPC for error conditions.
2337    * @param handler the handler implementation
2338    */
2339   @Override
2340   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2341     this.errorHandler = handler;
2342   }
2343 
2344   @Override
2345   public HBaseRPCErrorHandler getErrorHandler() {
2346     return this.errorHandler;
2347   }
2348 
2349   /**
2350    * Returns the metrics instance for reporting RPC call statistics
2351    */
2352   @Override
2353   public MetricsHBaseServer getMetrics() {
2354     return metrics;
2355   }
2356 
2357   @Override
2358   public void addCallSize(final long diff) {
2359     this.callQueueSize.add(diff);
2360   }
2361 
2362   /**
2363    * Authorize the incoming client connection.
2364    *
2365    * @param user client user
2366    * @param connection incoming connection
2367    * @param addr InetAddress of incoming connection
2368    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2369    *         when the client isn't authorized to talk the protocol
2370    */
2371   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2372       InetAddress addr)
2373   throws AuthorizationException {
2374     if (authorize) {
2375       Class<?> c = getServiceInterface(services, connection.getServiceName());
2376       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2377     }
2378   }
2379 
2380   /**
2381    * When the read or write buffer size is larger than this limit, i/o will be
2382    * done in chunks of this size. Most RPC requests and responses would be
2383    * be smaller.
2384    */
2385   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2386 
2387   /**
2388    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2389    * If the amount of data is large, it writes to channel in smaller chunks.
2390    * This is to avoid jdk from creating many direct buffers as the size of
2391    * buffer increases. This also minimizes extra copies in NIO layer
2392    * as a result of multiple write operations required to write a large
2393    * buffer.
2394    *
2395    * @param channel writable byte channel to write to
2396    * @param bufferChain Chain of buffers to write
2397    * @return number of bytes written
2398    * @throws java.io.IOException e
2399    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2400    */
2401   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2402   throws IOException {
2403     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2404     if (count > 0) this.metrics.sentBytes(count);
2405     return count;
2406   }
2407 
2408   /**
2409    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2410    * If the amount of data is large, it writes to channel in smaller chunks.
2411    * This is to avoid jdk from creating many direct buffers as the size of
2412    * ByteBuffer increases. There should not be any performance degredation.
2413    *
2414    * @param channel writable byte channel to write on
2415    * @param buffer buffer to write
2416    * @return number of bytes written
2417    * @throws java.io.IOException e
2418    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2419    */
2420   protected int channelRead(ReadableByteChannel channel,
2421                                    ByteBuffer buffer) throws IOException {
2422 
2423     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2424            channel.read(buffer) : channelIO(channel, null, buffer);
2425     if (count > 0) {
2426       metrics.receivedBytes(count);
2427     }
2428     return count;
2429   }
2430 
2431   /**
2432    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2433    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2434    * one of readCh or writeCh should be non-null.
2435    *
2436    * @param readCh read channel
2437    * @param writeCh write channel
2438    * @param buf buffer to read or write into/out of
2439    * @return bytes written
2440    * @throws java.io.IOException e
2441    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2442    * @see #channelWrite(GatheringByteChannel, BufferChain)
2443    */
2444   private static int channelIO(ReadableByteChannel readCh,
2445                                WritableByteChannel writeCh,
2446                                ByteBuffer buf) throws IOException {
2447 
2448     int originalLimit = buf.limit();
2449     int initialRemaining = buf.remaining();
2450     int ret = 0;
2451 
2452     while (buf.remaining() > 0) {
2453       try {
2454         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2455         buf.limit(buf.position() + ioSize);
2456 
2457         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2458 
2459         if (ret < ioSize) {
2460           break;
2461         }
2462 
2463       } finally {
2464         buf.limit(originalLimit);
2465       }
2466     }
2467 
2468     int nBytes = initialRemaining - buf.remaining();
2469     return (nBytes > 0) ? nBytes : ret;
2470   }
2471 
2472   /**
2473    * Needed for features such as delayed calls.  We need to be able to store the current call
2474    * so that we can complete it later or ask questions of what is supported by the current ongoing
2475    * call.
2476    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2477    */
2478   public static RpcCallContext getCurrentCall() {
2479     return CurCall.get();
2480   }
2481 
2482   public static boolean isInRpcCallContext() {
2483     return CurCall.get() != null;
2484   }
2485 
2486   /**
2487    * Returns the user credentials associated with the current RPC request or
2488    * <code>null</code> if no credentials were provided.
2489    * @return A User
2490    */
2491   public static User getRequestUser() {
2492     RpcCallContext ctx = getCurrentCall();
2493     return ctx == null? null: ctx.getRequestUser();
2494   }
2495 
2496   /**
2497    * Returns the username for any user associated with the current RPC
2498    * request or <code>null</code> if no user is set.
2499    */
2500   public static String getRequestUserName() {
2501     User user = getRequestUser();
2502     return user == null? null: user.getShortName();
2503   }
2504 
2505   /**
2506    * @return Address of remote client if a request is ongoing, else null
2507    */
2508   public static InetAddress getRemoteAddress() {
2509     RpcCallContext ctx = getCurrentCall();
2510     return ctx == null? null: ctx.getRemoteAddress();
2511   }
2512 
2513   /**
2514    * @param serviceName Some arbitrary string that represents a 'service'.
2515    * @param services Available service instances
2516    * @return Matching BlockingServiceAndInterface pair
2517    */
2518   static BlockingServiceAndInterface getServiceAndInterface(
2519       final List<BlockingServiceAndInterface> services, final String serviceName) {
2520     for (BlockingServiceAndInterface bs : services) {
2521       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2522         return bs;
2523       }
2524     }
2525     return null;
2526   }
2527 
2528   /**
2529    * @param serviceName Some arbitrary string that represents a 'service'.
2530    * @param services Available services and their service interfaces.
2531    * @return Service interface class for <code>serviceName</code>
2532    */
2533   static Class<?> getServiceInterface(
2534       final List<BlockingServiceAndInterface> services,
2535       final String serviceName) {
2536     BlockingServiceAndInterface bsasi =
2537         getServiceAndInterface(services, serviceName);
2538     return bsasi == null? null: bsasi.getServiceInterface();
2539   }
2540 
2541   /**
2542    * @param serviceName Some arbitrary string that represents a 'service'.
2543    * @param services Available services and their service interfaces.
2544    * @return BlockingService that goes with the passed <code>serviceName</code>
2545    */
2546   static BlockingService getService(
2547       final List<BlockingServiceAndInterface> services,
2548       final String serviceName) {
2549     BlockingServiceAndInterface bsasi =
2550         getServiceAndInterface(services, serviceName);
2551     return bsasi == null? null: bsasi.getBlockingService();
2552   }
2553 
2554   static MonitoredRPCHandler getStatus() {
2555     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2556     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2557     if (status != null) {
2558       return status;
2559     }
2560     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2561     status.pause("Waiting for a call");
2562     RpcServer.MONITORED_RPC.set(status);
2563     return status;
2564   }
2565 
2566   /** Returns the remote side ip address when invoked inside an RPC
2567    *  Returns null incase of an error.
2568    *  @return InetAddress
2569    */
2570   public static InetAddress getRemoteIp() {
2571     Call call = CurCall.get();
2572     if (call != null && call.connection != null && call.connection.socket != null) {
2573       return call.connection.socket.getInetAddress();
2574     }
2575     return null;
2576   }
2577 
2578 
2579   /**
2580    * A convenience method to bind to a given address and report
2581    * better exceptions if the address is not a valid host.
2582    * @param socket the socket to bind
2583    * @param address the address to bind to
2584    * @param backlog the number of connections allowed in the queue
2585    * @throws BindException if the address can't be bound
2586    * @throws UnknownHostException if the address isn't a valid host name
2587    * @throws IOException other random errors from bind
2588    */
2589   public static void bind(ServerSocket socket, InetSocketAddress address,
2590                           int backlog) throws IOException {
2591     try {
2592       socket.bind(address, backlog);
2593     } catch (BindException e) {
2594       BindException bindException =
2595         new BindException("Problem binding to " + address + " : " +
2596             e.getMessage());
2597       bindException.initCause(e);
2598       throw bindException;
2599     } catch (SocketException e) {
2600       // If they try to bind to a different host's address, give a better
2601       // error message.
2602       if ("Unresolved address".equals(e.getMessage())) {
2603         throw new UnknownHostException("Invalid hostname for server: " +
2604                                        address.getHostName());
2605       }
2606       throw e;
2607     }
2608   }
2609 
2610   @Override
2611   public RpcScheduler getScheduler() {
2612     return scheduler;
2613   }
2614 
2615   @Override
2616   public void setRsRpcServices(RSRpcServices rsRpcServices) {
2617     this.rsRpcServices = rsRpcServices;
2618   }
2619 }