View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.classification.InterfaceAudience;
71  import org.apache.hadoop.hbase.classification.InterfaceStability;
72  import org.apache.hadoop.conf.Configuration;
73  import org.apache.hadoop.hbase.CellScanner;
74  import org.apache.hadoop.hbase.DoNotRetryIOException;
75  import org.apache.hadoop.hbase.HBaseIOException;
76  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.HRegionInfo;
79  import org.apache.hadoop.hbase.Server;
80  import org.apache.hadoop.hbase.TableName;
81  import org.apache.hadoop.hbase.client.Operation;
82  import org.apache.hadoop.hbase.codec.Codec;
83  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
84  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
85  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
86  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
87  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
88  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
89  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
90  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
91  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
92  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
93  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
94  import org.apache.hadoop.hbase.regionserver.HRegionServer;
95  import org.apache.hadoop.hbase.security.AccessDeniedException;
96  import org.apache.hadoop.hbase.security.AuthMethod;
97  import org.apache.hadoop.hbase.security.HBasePolicyProvider;
98  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
99  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
100 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
101 import org.apache.hadoop.hbase.security.SaslStatus;
102 import org.apache.hadoop.hbase.security.SaslUtil;
103 import org.apache.hadoop.hbase.security.UserProvider;
104 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
105 import org.apache.hadoop.hbase.util.Bytes;
106 import org.apache.hadoop.hbase.util.Counter;
107 import org.apache.hadoop.hbase.util.Pair;
108 import org.apache.hadoop.io.BytesWritable;
109 import org.apache.hadoop.io.IntWritable;
110 import org.apache.hadoop.io.Writable;
111 import org.apache.hadoop.io.WritableUtils;
112 import org.apache.hadoop.io.compress.CompressionCodec;
113 import org.apache.hadoop.security.UserGroupInformation;
114 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
115 import org.apache.hadoop.security.authorize.AuthorizationException;
116 import org.apache.hadoop.security.authorize.PolicyProvider;
117 import org.apache.hadoop.security.authorize.ProxyUsers;
118 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
119 import org.apache.hadoop.security.token.SecretManager;
120 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
121 import org.apache.hadoop.security.token.TokenIdentifier;
122 import org.apache.hadoop.util.StringUtils;
123 import org.codehaus.jackson.map.ObjectMapper;
124 import org.apache.htrace.TraceInfo;
125 
126 import com.google.common.util.concurrent.ThreadFactoryBuilder;
127 import com.google.protobuf.BlockingService;
128 import com.google.protobuf.CodedInputStream;
129 import com.google.protobuf.Descriptors.MethodDescriptor;
130 import com.google.protobuf.Message;
131 import com.google.protobuf.Message.Builder;
132 import com.google.protobuf.ServiceException;
133 import com.google.protobuf.TextFormat;
134 
135 /**
136  * An RPC server that hosts protobuf described Services.
137  *
138  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
139  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
140  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
141  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
142  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
143  * and loops till done.
144  *
145  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
146  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
147  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
148  * and keep taking while the server is up.
149  *
150  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
151  * queue for Responder to pull from and return result to client.
152  *
153  * @see RpcClientImpl
154  */
155 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
156 @InterfaceStability.Evolving
157 public class RpcServer implements RpcServerInterface {
158   public static final Log LOG = LogFactory.getLog(RpcServer.class);
159 
160   private final boolean authorize;
161   private boolean isSecurityEnabled;
162 
163   public static final byte CURRENT_VERSION = 0;
164 
165   /**
166    * How many calls/handler are allowed in the queue.
167    */
168   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
169 
170   /**
171    * The maximum size that we can hold in the RPC queue
172    */
173   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
174 
175   private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
176 
177   private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
178 
179   private final int warnDelayedCalls;
180 
181   private AtomicInteger delayedCalls;
182   private final IPCUtil ipcUtil;
183 
184   private static final String AUTH_FAILED_FOR = "Auth failed for ";
185   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
186   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
187     Server.class.getName());
188   protected SecretManager<TokenIdentifier> secretManager;
189   protected ServiceAuthorizationManager authManager;
190 
191   /** This is set to Call object before Handler invokes an RPC and ybdie
192    * after the call returns.
193    */
194   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
195 
196   /** Keeps MonitoredRPCHandler per handler thread. */
197   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
198       = new ThreadLocal<MonitoredRPCHandler>();
199 
200   protected final InetSocketAddress bindAddress;
201   protected int port;                             // port we listen on
202   protected InetSocketAddress address;            // inet address we listen on
203   private int readThreads;                        // number of read threads
204   protected int maxIdleTime;                      // the maximum idle time after
205                                                   // which a client may be
206                                                   // disconnected
207   protected int thresholdIdleConnections;         // the number of idle
208                                                   // connections after which we
209                                                   // will start cleaning up idle
210                                                   // connections
211   int maxConnectionsToNuke;                       // the max number of
212                                                   // connections to nuke
213                                                   // during a cleanup
214 
215   protected MetricsHBaseServer metrics;
216 
217   protected final Configuration conf;
218 
219   private int maxQueueSize;
220   protected int socketSendBufferSize;
221   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
222   protected final boolean tcpKeepAlive; // if T then use keepalives
223   protected final long purgeTimeout;    // in milliseconds
224 
225   /**
226    * This flag is used to indicate to sub threads when they should go down.  When we call
227    * {@link #start()}, all threads started will consult this flag on whether they should
228    * keep going.  It is set to false when {@link #stop()} is called.
229    */
230   volatile boolean running = true;
231 
232   /**
233    * This flag is set to true after all threads are up and 'running' and the server is then opened
234    * for business by the call to {@link #start()}.
235    */
236   volatile boolean started = false;
237 
238   /**
239    * This is a running count of the size of all outstanding calls by size.
240    */
241   protected final Counter callQueueSize = new Counter();
242 
243   protected final List<Connection> connectionList =
244     Collections.synchronizedList(new LinkedList<Connection>());
245   //maintain a list
246   //of client connections
247   private Listener listener = null;
248   protected Responder responder = null;
249   protected int numConnections = 0;
250 
251   protected HBaseRPCErrorHandler errorHandler = null;
252 
253   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
254   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
255 
256   /** Default value for above params */
257   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
258   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
259 
260   private static final ObjectMapper MAPPER = new ObjectMapper();
261 
262   private final int warnResponseTime;
263   private final int warnResponseSize;
264   private final Server server;
265   private final List<BlockingServiceAndInterface> services;
266 
267   private final RpcScheduler scheduler;
268 
269   private UserProvider userProvider;
270 
271   private final BoundedByteBufferPool reservoir;
272 
273 
274   /**
275    * Datastructure that holds all necessary to a method invocation and then afterward, carries
276    * the result.
277    */
278   class Call implements RpcCallContext {
279     protected int id;                             // the client's call id
280     protected BlockingService service;
281     protected MethodDescriptor md;
282     protected RequestHeader header;
283     protected Message param;                      // the parameter passed
284     // Optional cell data passed outside of protobufs.
285     protected CellScanner cellScanner;
286     protected Connection connection;              // connection to client
287     protected long timestamp;      // the time received when response is null
288                                    // the time served when response is not null
289     /**
290      * Chain of buffers to send as response.
291      */
292     protected BufferChain response;
293     protected boolean delayResponse;
294     protected Responder responder;
295     protected boolean delayReturnValue;           // if the return value should be
296                                                   // set at call completion
297     protected long size;                          // size of current call
298     protected boolean isError;
299     protected TraceInfo tinfo;
300     private ByteBuffer cellBlock = null;
301 
302     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
303          Message param, CellScanner cellScanner, Connection connection, Responder responder,
304          long size, TraceInfo tinfo) {
305       this.id = id;
306       this.service = service;
307       this.md = md;
308       this.header = header;
309       this.param = param;
310       this.cellScanner = cellScanner;
311       this.connection = connection;
312       this.timestamp = System.currentTimeMillis();
313       this.response = null;
314       this.delayResponse = false;
315       this.responder = responder;
316       this.isError = false;
317       this.size = size;
318       this.tinfo = tinfo;
319     }
320 
321     /**
322      * Call is done. Execution happened and we returned results to client. It is now safe to
323      * cleanup.
324      */
325     void done() {
326       if (this.cellBlock != null) {
327         // Return buffer to reservoir now we are done with it.
328         reservoir.putBuffer(this.cellBlock);
329         this.cellBlock = null;
330       }
331       this.connection.decRpcCount();  // Say that we're done with this call.
332     }
333 
334     @Override
335     public String toString() {
336       return toShortString() + " param: " +
337         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
338         " connection: " + connection.toString();
339     }
340 
341     protected RequestHeader getHeader() {
342       return this.header;
343     }
344 
345     /*
346      * Short string representation without param info because param itself could be huge depends on
347      * the payload of a command
348      */
349     String toShortString() {
350       String serviceName = this.connection.service != null ?
351           this.connection.service.getDescriptorForType().getName() : "null";
352       return "callId: " + this.id + " service: " + serviceName +
353           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
354           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
355           " connection: " + connection.toString();
356     }
357 
358     String toTraceString() {
359       String serviceName = this.connection.service != null ?
360                            this.connection.service.getDescriptorForType().getName() : "";
361       String methodName = (this.md != null) ? this.md.getName() : "";
362       return serviceName + "." + methodName;
363     }
364 
365     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
366       this.response = new BufferChain(response);
367     }
368 
369     protected synchronized void setResponse(Object m, final CellScanner cells,
370         Throwable t, String errorMsg) {
371       if (this.isError) return;
372       if (t != null) this.isError = true;
373       BufferChain bc = null;
374       try {
375         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
376         // Presume it a pb Message.  Could be null.
377         Message result = (Message)m;
378         // Call id.
379         headerBuilder.setCallId(this.id);
380         if (t != null) {
381           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
382           exceptionBuilder.setExceptionClassName(t.getClass().getName());
383           exceptionBuilder.setStackTrace(errorMsg);
384           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
385           if (t instanceof RegionMovedException) {
386             // Special casing for this exception.  This is only one carrying a payload.
387             // Do this instead of build a generic system for allowing exceptions carry
388             // any kind of payload.
389             RegionMovedException rme = (RegionMovedException)t;
390             exceptionBuilder.setHostname(rme.getHostname());
391             exceptionBuilder.setPort(rme.getPort());
392           }
393           // Set the exception as the result of the method invocation.
394           headerBuilder.setException(exceptionBuilder.build());
395         }
396         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 
397         // reservoir when finished. This is hacky and the hack is not contained but benefits are
398         // high when we can avoid a big buffer allocation on each rpc.
399         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
400           this.connection.compressionCodec, cells, reservoir);
401         if (this.cellBlock != null) {
402           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
403           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
404           cellBlockBuilder.setLength(this.cellBlock.limit());
405           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
406         }
407         Message header = headerBuilder.build();
408 
409         // Organize the response as a set of bytebuffers rather than collect it all together inside
410         // one big byte array; save on allocations.
411         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
412         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
413         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
414           (this.cellBlock == null? 0: this.cellBlock.limit());
415         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
416         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
417         if (connection.useWrap) {
418           bc = wrapWithSasl(bc);
419         }
420       } catch (IOException e) {
421         LOG.warn("Exception while creating response " + e);
422       }
423       this.response = bc;
424     }
425 
426     private BufferChain wrapWithSasl(BufferChain bc)
427         throws IOException {
428       if (!this.connection.useSasl) return bc;
429       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
430       // THIS IS A BIG UGLY COPY.
431       byte [] responseBytes = bc.getBytes();
432       byte [] token;
433       // synchronization may be needed since there can be multiple Handler
434       // threads using saslServer to wrap responses.
435       synchronized (connection.saslServer) {
436         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
437       }
438       if (LOG.isTraceEnabled()) {
439         LOG.trace("Adding saslServer wrapped token of size " + token.length
440             + " as call response.");
441       }
442 
443       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
444       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
445       return new BufferChain(bbTokenLength, bbTokenBytes);
446     }
447 
448     @Override
449     public synchronized void endDelay(Object result) throws IOException {
450       assert this.delayResponse;
451       assert this.delayReturnValue || result == null;
452       this.delayResponse = false;
453       delayedCalls.decrementAndGet();
454       if (this.delayReturnValue) {
455         this.setResponse(result, null, null, null);
456       }
457       this.responder.doRespond(this);
458     }
459 
460     @Override
461     public synchronized void endDelay() throws IOException {
462       this.endDelay(null);
463     }
464 
465     @Override
466     public synchronized void startDelay(boolean delayReturnValue) {
467       assert !this.delayResponse;
468       this.delayResponse = true;
469       this.delayReturnValue = delayReturnValue;
470       int numDelayed = delayedCalls.incrementAndGet();
471       if (numDelayed > warnDelayedCalls) {
472         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
473       }
474     }
475 
476     @Override
477     public synchronized void endDelayThrowing(Throwable t) throws IOException {
478       this.setResponse(null, null, t, StringUtils.stringifyException(t));
479       this.delayResponse = false;
480       this.sendResponseIfReady();
481     }
482 
483     @Override
484     public synchronized boolean isDelayed() {
485       return this.delayResponse;
486     }
487 
488     @Override
489     public synchronized boolean isReturnValueDelayed() {
490       return this.delayReturnValue;
491     }
492 
493     @Override
494     public boolean isClientCellBlockSupport() {
495       return this.connection != null && this.connection.codec != null;
496     }
497 
498     @Override
499     public long disconnectSince() {
500       if (!connection.channel.isOpen()) {
501         return System.currentTimeMillis() - timestamp;
502       } else {
503         return -1L;
504       }
505     }
506 
507     public long getSize() {
508       return this.size;
509     }
510 
511     /**
512      * If we have a response, and delay is not set, then respond
513      * immediately.  Otherwise, do not respond to client.  This is
514      * called by the RPC code in the context of the Handler thread.
515      */
516     public synchronized void sendResponseIfReady() throws IOException {
517       if (!this.delayResponse) {
518         this.responder.doRespond(this);
519       }
520     }
521 
522     public UserGroupInformation getRemoteUser() {
523       return connection.user;
524     }
525   }
526 
527   /** Listens on the socket. Creates jobs for the handler threads*/
528   private class Listener extends Thread {
529 
530     private ServerSocketChannel acceptChannel = null; //the accept channel
531     private Selector selector = null; //the selector that we use for the server
532     private Reader[] readers = null;
533     private int currentReader = 0;
534     private Random rand = new Random();
535     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
536                                          //-tion (for idle connections) ran
537     private long cleanupInterval = 10000; //the minimum interval between
538                                           //two cleanup runs
539     private int backlogLength;
540 
541     private ExecutorService readPool;
542 
543     public Listener(final String name) throws IOException {
544       super(name);
545       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
546       // Create a new server socket and set to non blocking mode
547       acceptChannel = ServerSocketChannel.open();
548       acceptChannel.configureBlocking(false);
549 
550       // Bind the server socket to the binding addrees (can be different from the default interface)
551       bind(acceptChannel.socket(), bindAddress, backlogLength);
552       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
553       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
554       // create a selector;
555       selector= Selector.open();
556 
557       readers = new Reader[readThreads];
558       readPool = Executors.newFixedThreadPool(readThreads,
559         new ThreadFactoryBuilder().setNameFormat(
560           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
561           ",port=" + port).setDaemon(true).build());
562       for (int i = 0; i < readThreads; ++i) {
563         Reader reader = new Reader();
564         readers[i] = reader;
565         readPool.execute(reader);
566       }
567       LOG.info(getName() + ": started " + readThreads + " reader(s).");
568 
569       // Register accepts on the server socket with the selector.
570       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
571       this.setName("RpcServer.listener,port=" + port);
572       this.setDaemon(true);
573     }
574 
575 
576     private class Reader implements Runnable {
577       private volatile boolean adding = false;
578       private final Selector readSelector;
579 
580       Reader() throws IOException {
581         this.readSelector = Selector.open();
582       }
583       @Override
584       public void run() {
585         try {
586           doRunLoop();
587         } finally {
588           try {
589             readSelector.close();
590           } catch (IOException ioe) {
591             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
592           }
593         }
594       }
595 
596       private synchronized void doRunLoop() {
597         while (running) {
598           try {
599             readSelector.select();
600             while (adding) {
601               this.wait(1000);
602             }
603 
604             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
605             while (iter.hasNext()) {
606               SelectionKey key = iter.next();
607               iter.remove();
608               if (key.isValid()) {
609                 if (key.isReadable()) {
610                   doRead(key);
611                 }
612               }
613             }
614           } catch (InterruptedException e) {
615             LOG.debug("Interrupted while sleeping");
616             return;
617           } catch (IOException ex) {
618             LOG.info(getName() + ": IOException in Reader", ex);
619           }
620         }
621       }
622 
623       /**
624        * This gets reader into the state that waits for the new channel
625        * to be registered with readSelector. If it was waiting in select()
626        * the thread will be woken up, otherwise whenever select() is called
627        * it will return even if there is nothing to read and wait
628        * in while(adding) for finishAdd call
629        */
630       public void startAdd() {
631         adding = true;
632         readSelector.wakeup();
633       }
634 
635       public synchronized SelectionKey registerChannel(SocketChannel channel)
636         throws IOException {
637         return channel.register(readSelector, SelectionKey.OP_READ);
638       }
639 
640       public synchronized void finishAdd() {
641         adding = false;
642         this.notify();
643       }
644     }
645 
646     /** cleanup connections from connectionList. Choose a random range
647      * to scan and also have a limit on the number of the connections
648      * that will be cleanedup per run. The criteria for cleanup is the time
649      * for which the connection was idle. If 'force' is true then all
650      * connections will be looked at for the cleanup.
651      * @param force all connections will be looked at for cleanup
652      */
653     private void cleanupConnections(boolean force) {
654       if (force || numConnections > thresholdIdleConnections) {
655         long currentTime = System.currentTimeMillis();
656         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
657           return;
658         }
659         int start = 0;
660         int end = numConnections - 1;
661         if (!force) {
662           start = rand.nextInt() % numConnections;
663           end = rand.nextInt() % numConnections;
664           int temp;
665           if (end < start) {
666             temp = start;
667             start = end;
668             end = temp;
669           }
670         }
671         int i = start;
672         int numNuked = 0;
673         while (i <= end) {
674           Connection c;
675           synchronized (connectionList) {
676             try {
677               c = connectionList.get(i);
678             } catch (Exception e) {return;}
679           }
680           if (c.timedOut(currentTime)) {
681             if (LOG.isDebugEnabled())
682               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
683             closeConnection(c);
684             numNuked++;
685             end--;
686             //noinspection UnusedAssignment
687             c = null;
688             if (!force && numNuked == maxConnectionsToNuke) break;
689           }
690           else i++;
691         }
692         lastCleanupRunTime = System.currentTimeMillis();
693       }
694     }
695 
696     @Override
697     public void run() {
698       LOG.info(getName() + ": starting");
699       while (running) {
700         SelectionKey key = null;
701         try {
702           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
703           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
704           while (iter.hasNext()) {
705             key = iter.next();
706             iter.remove();
707             try {
708               if (key.isValid()) {
709                 if (key.isAcceptable())
710                   doAccept(key);
711               }
712             } catch (IOException ignored) {
713               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
714             }
715             key = null;
716           }
717         } catch (OutOfMemoryError e) {
718           if (errorHandler != null) {
719             if (errorHandler.checkOOME(e)) {
720               LOG.info(getName() + ": exiting on OutOfMemoryError");
721               closeCurrentConnection(key, e);
722               cleanupConnections(true);
723               return;
724             }
725           } else {
726             // we can run out of memory if we have too many threads
727             // log the event and sleep for a minute and give
728             // some thread(s) a chance to finish
729             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
730             closeCurrentConnection(key, e);
731             cleanupConnections(true);
732             try {
733               Thread.sleep(60000);
734             } catch (InterruptedException ex) {
735               LOG.debug("Interrupted while sleeping");
736               return;
737             }
738           }
739         } catch (Exception e) {
740           closeCurrentConnection(key, e);
741         }
742         cleanupConnections(false);
743       }
744 
745       LOG.info(getName() + ": stopping");
746 
747       synchronized (this) {
748         try {
749           acceptChannel.close();
750           selector.close();
751         } catch (IOException ignored) {
752           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
753         }
754 
755         selector= null;
756         acceptChannel= null;
757 
758         // clean up all connections
759         while (!connectionList.isEmpty()) {
760           closeConnection(connectionList.remove(0));
761         }
762       }
763     }
764 
765     private void closeCurrentConnection(SelectionKey key, Throwable e) {
766       if (key != null) {
767         Connection c = (Connection)key.attachment();
768         if (c != null) {
769           if (LOG.isDebugEnabled()) {
770             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
771                 (e != null ? " on error " + e.getMessage() : ""));
772           }
773           closeConnection(c);
774           key.attach(null);
775         }
776       }
777     }
778 
779     InetSocketAddress getAddress() {
780       return address;
781     }
782 
783     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
784       Connection c;
785       ServerSocketChannel server = (ServerSocketChannel) key.channel();
786 
787       SocketChannel channel;
788       while ((channel = server.accept()) != null) {
789         try {
790           channel.configureBlocking(false);
791           channel.socket().setTcpNoDelay(tcpNoDelay);
792           channel.socket().setKeepAlive(tcpKeepAlive);
793         } catch (IOException ioe) {
794           channel.close();
795           throw ioe;
796         }
797 
798         Reader reader = getReader();
799         try {
800           reader.startAdd();
801           SelectionKey readKey = reader.registerChannel(channel);
802           c = getConnection(channel, System.currentTimeMillis());
803           readKey.attach(c);
804           synchronized (connectionList) {
805             connectionList.add(numConnections, c);
806             numConnections++;
807           }
808           if (LOG.isDebugEnabled())
809             LOG.debug(getName() + ": connection from " + c.toString() +
810                 "; # active connections: " + numConnections);
811         } finally {
812           reader.finishAdd();
813         }
814       }
815     }
816 
817     void doRead(SelectionKey key) throws InterruptedException {
818       int count;
819       Connection c = (Connection) key.attachment();
820       if (c == null) {
821         return;
822       }
823       c.setLastContact(System.currentTimeMillis());
824       try {
825         count = c.readAndProcess();
826 
827         if (count > 0) {
828           c.setLastContact(System.currentTimeMillis());
829         }
830 
831       } catch (InterruptedException ieo) {
832         throw ieo;
833       } catch (Exception e) {
834         if (LOG.isDebugEnabled()) {
835           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
836         }
837         count = -1; //so that the (count < 0) block is executed
838       }
839       if (count < 0) {
840         if (LOG.isDebugEnabled()) {
841           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
842               " because read count=" + count +
843               ". Number of active connections: " + numConnections);
844         }
845         closeConnection(c);
846       }
847     }
848 
849     synchronized void doStop() {
850       if (selector != null) {
851         selector.wakeup();
852         Thread.yield();
853       }
854       if (acceptChannel != null) {
855         try {
856           acceptChannel.socket().close();
857         } catch (IOException e) {
858           LOG.info(getName() + ": exception in closing listener socket. " + e);
859         }
860       }
861       readPool.shutdownNow();
862     }
863 
864     // The method that will return the next reader to work with
865     // Simplistic implementation of round robin for now
866     Reader getReader() {
867       currentReader = (currentReader + 1) % readers.length;
868       return readers[currentReader];
869     }
870   }
871 
872   // Sends responses of RPC back to clients.
873   protected class Responder extends Thread {
874     private final Selector writeSelector;
875     private final Set<Connection> writingCons =
876         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
877 
878     Responder() throws IOException {
879       this.setName("RpcServer.responder");
880       this.setDaemon(true);
881       writeSelector = Selector.open(); // create a selector
882     }
883 
884     @Override
885     public void run() {
886       LOG.info(getName() + ": starting");
887       try {
888         doRunLoop();
889       } finally {
890         LOG.info(getName() + ": stopping");
891         try {
892           writeSelector.close();
893         } catch (IOException ioe) {
894           LOG.error(getName() + ": couldn't close write selector", ioe);
895         }
896       }
897     }
898 
899     /**
900      * Take the list of the connections that want to write, and register them
901      * in the selector.
902      */
903     private void registerWrites() {
904       Iterator<Connection> it = writingCons.iterator();
905       while (it.hasNext()) {
906         Connection c = it.next();
907         it.remove();
908         SelectionKey sk = c.channel.keyFor(writeSelector);
909         try {
910           if (sk == null) {
911             try {
912               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
913             } catch (ClosedChannelException e) {
914               // ignore: the client went away.
915               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
916             }
917           } else {
918             sk.interestOps(SelectionKey.OP_WRITE);
919           }
920         } catch (CancelledKeyException e) {
921           // ignore: the client went away.
922           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
923         }
924       }
925     }
926 
927     /**
928      * Add a connection to the list that want to write,
929      */
930     public void registerForWrite(Connection c) {
931       if (writingCons.add(c)) {
932         writeSelector.wakeup();
933       }
934     }
935 
936     private void doRunLoop() {
937       long lastPurgeTime = 0;   // last check for old calls.
938       while (running) {
939         try {
940           registerWrites();
941           int keyCt = writeSelector.select(purgeTimeout);
942           if (keyCt == 0) {
943             continue;
944           }
945 
946           Set<SelectionKey> keys = writeSelector.selectedKeys();
947           Iterator<SelectionKey> iter = keys.iterator();
948           while (iter.hasNext()) {
949             SelectionKey key = iter.next();
950             iter.remove();
951             try {
952               if (key.isValid() && key.isWritable()) {
953                 doAsyncWrite(key);
954               }
955             } catch (IOException e) {
956               LOG.debug(getName() + ": asyncWrite", e);
957             }
958           }
959 
960           lastPurgeTime = purge(lastPurgeTime);
961 
962         } catch (OutOfMemoryError e) {
963           if (errorHandler != null) {
964             if (errorHandler.checkOOME(e)) {
965               LOG.info(getName() + ": exiting on OutOfMemoryError");
966               return;
967             }
968           } else {
969             //
970             // we can run out of memory if we have too many threads
971             // log the event and sleep for a minute and give
972             // some thread(s) a chance to finish
973             //
974             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
975             try {
976               Thread.sleep(60000);
977             } catch (InterruptedException ex) {
978               LOG.debug("Interrupted while sleeping");
979               return;
980             }
981           }
982         } catch (Exception e) {
983           LOG.warn(getName() + ": exception in Responder " +
984               StringUtils.stringifyException(e), e);
985         }
986       }
987       LOG.info(getName() + ": stopped");
988     }
989 
990     /**
991      * If there were some calls that have not been sent out for a
992      * long time, we close the connection.
993      * @return the time of the purge.
994      */
995     private long purge(long lastPurgeTime) {
996       long now = System.currentTimeMillis();
997       if (now < lastPurgeTime + purgeTimeout) {
998         return lastPurgeTime;
999       }
1000 
1001       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1002       // get the list of channels from list of keys.
1003       synchronized (writeSelector.keys()) {
1004         for (SelectionKey key : writeSelector.keys()) {
1005           Connection connection = (Connection) key.attachment();
1006           if (connection == null) {
1007             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1008           }
1009           Call call = connection.responseQueue.peekFirst();
1010           if (call != null && now > call.timestamp + purgeTimeout) {
1011             conWithOldCalls.add(call.connection);
1012           }
1013         }
1014       }
1015 
1016       // Seems safer to close the connection outside of the synchronized loop...
1017       for (Connection connection : conWithOldCalls) {
1018         closeConnection(connection);
1019       }
1020 
1021       return now;
1022     }
1023 
1024     private void doAsyncWrite(SelectionKey key) throws IOException {
1025       Connection connection = (Connection) key.attachment();
1026       if (connection == null) {
1027         throw new IOException("doAsyncWrite: no connection");
1028       }
1029       if (key.channel() != connection.channel) {
1030         throw new IOException("doAsyncWrite: bad channel");
1031       }
1032 
1033       if (processAllResponses(connection)) {
1034         try {
1035           // We wrote everything, so we don't need to be told when the socket is ready for
1036           //  write anymore.
1037          key.interestOps(0);
1038         } catch (CancelledKeyException e) {
1039           /* The Listener/reader might have closed the socket.
1040            * We don't explicitly cancel the key, so not sure if this will
1041            * ever fire.
1042            * This warning could be removed.
1043            */
1044           LOG.warn("Exception while changing ops : " + e);
1045         }
1046       }
1047     }
1048 
1049     /**
1050      * Process the response for this call. You need to have the lock on
1051      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1052      *
1053      * @param call the call
1054      * @return true if we proceed the call fully, false otherwise.
1055      * @throws IOException
1056      */
1057     private boolean processResponse(final Call call) throws IOException {
1058       boolean error = true;
1059       try {
1060         // Send as much data as we can in the non-blocking fashion
1061         long numBytes = channelWrite(call.connection.channel, call.response);
1062         if (numBytes < 0) {
1063           throw new HBaseIOException("Error writing on the socket " +
1064             "for the call:" + call.toShortString());
1065         }
1066         error = false;
1067       } finally {
1068         if (error) {
1069           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1070           closeConnection(call.connection);
1071         }
1072       }
1073 
1074       if (!call.response.hasRemaining()) {
1075         call.done();
1076         return true;
1077       } else {
1078         return false; // Socket can't take more, we will have to come back.
1079       }
1080     }
1081 
1082     /**
1083      * Process all the responses for this connection
1084      *
1085      * @return true if all the calls were processed or that someone else is doing it.
1086      * false if there * is still some work to do. In this case, we expect the caller to
1087      * delay us.
1088      * @throws IOException
1089      */
1090     private boolean processAllResponses(final Connection connection) throws IOException {
1091       // We want only one writer on the channel for a connection at a time.
1092       connection.responseWriteLock.lock();
1093       try {
1094         for (int i = 0; i < 20; i++) {
1095           // protection if some handlers manage to need all the responder
1096           Call call = connection.responseQueue.pollFirst();
1097           if (call == null) {
1098             return true;
1099           }
1100           if (!processResponse(call)) {
1101             connection.responseQueue.addFirst(call);
1102             return false;
1103           }
1104         }
1105       } finally {
1106         connection.responseWriteLock.unlock();
1107       }
1108 
1109       return connection.responseQueue.isEmpty();
1110     }
1111 
1112     //
1113     // Enqueue a response from the application.
1114     //
1115     void doRespond(Call call) throws IOException {
1116       boolean added = false;
1117 
1118       // If there is already a write in progress, we don't wait. This allows to free the handlers
1119       //  immediately for other tasks.
1120       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1121         try {
1122           if (call.connection.responseQueue.isEmpty()) {
1123             // If we're alone, we can try to do a direct call to the socket. It's
1124             //  an optimisation to save on context switches and data transfer between cores..
1125             if (processResponse(call)) {
1126               return; // we're done.
1127             }
1128             // Too big to fit, putting ahead.
1129             call.connection.responseQueue.addFirst(call);
1130             added = true; // We will register to the selector later, outside of the lock.
1131           }
1132         } finally {
1133           call.connection.responseWriteLock.unlock();
1134         }
1135       }
1136 
1137       if (!added) {
1138         call.connection.responseQueue.addLast(call);
1139       }
1140       call.responder.registerForWrite(call.connection);
1141 
1142       // set the serve time when the response has to be sent later
1143       call.timestamp = System.currentTimeMillis();
1144     }
1145   }
1146 
1147   @SuppressWarnings("serial")
1148   public static class CallQueueTooBigException extends IOException {
1149     CallQueueTooBigException() {
1150       super();
1151     }
1152   }
1153 
1154   /** Reads calls from a connection and queues them for handling. */
1155   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1156       value="VO_VOLATILE_INCREMENT",
1157       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1158   public class Connection {
1159     // If initial preamble with version and magic has been read or not.
1160     private boolean connectionPreambleRead = false;
1161     // If the connection header has been read or not.
1162     private boolean connectionHeaderRead = false;
1163     protected SocketChannel channel;
1164     private ByteBuffer data;
1165     private ByteBuffer dataLengthBuffer;
1166     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1167     private final Lock responseWriteLock = new ReentrantLock();
1168     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1169     private long lastContact;
1170     private InetAddress addr;
1171     protected Socket socket;
1172     // Cache the remote host & port info so that even if the socket is
1173     // disconnected, we can say where it used to connect to.
1174     protected String hostAddress;
1175     protected int remotePort;
1176     ConnectionHeader connectionHeader;
1177     /**
1178      * Codec the client asked use.
1179      */
1180     private Codec codec;
1181     /**
1182      * Compression codec the client asked us use.
1183      */
1184     private CompressionCodec compressionCodec;
1185     BlockingService service;
1186     protected UserGroupInformation user = null;
1187     private AuthMethod authMethod;
1188     private boolean saslContextEstablished;
1189     private boolean skipInitialSaslHandshake;
1190     private ByteBuffer unwrappedData;
1191     // When is this set?  FindBugs wants to know!  Says NP
1192     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1193     boolean useSasl;
1194     SaslServer saslServer;
1195     private boolean useWrap = false;
1196     // Fake 'call' for failed authorization response
1197     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1198     private final Call authFailedCall =
1199       new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null);
1200     private ByteArrayOutputStream authFailedResponse =
1201         new ByteArrayOutputStream();
1202     // Fake 'call' for SASL context setup
1203     private static final int SASL_CALLID = -33;
1204     private final Call saslCall =
1205       new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
1206 
1207     public UserGroupInformation attemptingUser = null; // user name before auth
1208 
1209     public Connection(SocketChannel channel, long lastContact) {
1210       this.channel = channel;
1211       this.lastContact = lastContact;
1212       this.data = null;
1213       this.dataLengthBuffer = ByteBuffer.allocate(4);
1214       this.socket = channel.socket();
1215       this.addr = socket.getInetAddress();
1216       if (addr == null) {
1217         this.hostAddress = "*Unknown*";
1218       } else {
1219         this.hostAddress = addr.getHostAddress();
1220       }
1221       this.remotePort = socket.getPort();
1222       if (socketSendBufferSize != 0) {
1223         try {
1224           socket.setSendBufferSize(socketSendBufferSize);
1225         } catch (IOException e) {
1226           LOG.warn("Connection: unable to set socket send buffer size to " +
1227                    socketSendBufferSize);
1228         }
1229       }
1230     }
1231 
1232       @Override
1233     public String toString() {
1234       return getHostAddress() + ":" + remotePort;
1235     }
1236 
1237     public String getHostAddress() {
1238       return hostAddress;
1239     }
1240 
1241     public InetAddress getHostInetAddress() {
1242       return addr;
1243     }
1244 
1245     public int getRemotePort() {
1246       return remotePort;
1247     }
1248 
1249     public void setLastContact(long lastContact) {
1250       this.lastContact = lastContact;
1251     }
1252 
1253     /* Return true if the connection has no outstanding rpc */
1254     private boolean isIdle() {
1255       return rpcCount.get() == 0;
1256     }
1257 
1258     /* Decrement the outstanding RPC count */
1259     protected void decRpcCount() {
1260       rpcCount.decrement();
1261     }
1262 
1263     /* Increment the outstanding RPC count */
1264     protected void incRpcCount() {
1265       rpcCount.increment();
1266     }
1267 
1268     protected boolean timedOut(long currentTime) {
1269       return isIdle() && currentTime - lastContact > maxIdleTime;
1270     }
1271 
1272     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1273         throws IOException {
1274       if (authMethod == AuthMethod.DIGEST) {
1275         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1276             secretManager);
1277         UserGroupInformation ugi = tokenId.getUser();
1278         if (ugi == null) {
1279           throw new AccessDeniedException(
1280               "Can't retrieve username from tokenIdentifier.");
1281         }
1282         ugi.addTokenIdentifier(tokenId);
1283         return ugi;
1284       } else {
1285         return UserGroupInformation.createRemoteUser(authorizedId);
1286       }
1287     }
1288 
1289     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1290         InterruptedException {
1291       if (saslContextEstablished) {
1292         if (LOG.isTraceEnabled())
1293           LOG.trace("Have read input token of size " + saslToken.length
1294               + " for processing by saslServer.unwrap()");
1295 
1296         if (!useWrap) {
1297           processOneRpc(saslToken);
1298         } else {
1299           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1300           processUnwrappedData(plaintextData);
1301         }
1302       } else {
1303         byte[] replyToken;
1304         try {
1305           if (saslServer == null) {
1306             switch (authMethod) {
1307             case DIGEST:
1308               if (secretManager == null) {
1309                 throw new AccessDeniedException(
1310                     "Server is not configured to do DIGEST authentication.");
1311               }
1312               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1313                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1314                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1315                       secretManager, this));
1316               break;
1317             default:
1318               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1319               String fullName = current.getUserName();
1320               if (LOG.isDebugEnabled()) {
1321                 LOG.debug("Kerberos principal name is " + fullName);
1322               }
1323               final String names[] = SaslUtil.splitKerberosName(fullName);
1324               if (names.length != 3) {
1325                 throw new AccessDeniedException(
1326                     "Kerberos principal name does NOT have the expected "
1327                         + "hostname part: " + fullName);
1328               }
1329               current.doAs(new PrivilegedExceptionAction<Object>() {
1330                 @Override
1331                 public Object run() throws SaslException {
1332                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1333                       .getMechanismName(), names[0], names[1],
1334                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1335                   return null;
1336                 }
1337               });
1338             }
1339             if (saslServer == null)
1340               throw new AccessDeniedException(
1341                   "Unable to find SASL server implementation for "
1342                       + authMethod.getMechanismName());
1343             if (LOG.isDebugEnabled()) {
1344               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1345             }
1346           }
1347           if (LOG.isDebugEnabled()) {
1348             LOG.debug("Have read input token of size " + saslToken.length
1349                 + " for processing by saslServer.evaluateResponse()");
1350           }
1351           replyToken = saslServer.evaluateResponse(saslToken);
1352         } catch (IOException e) {
1353           IOException sendToClient = e;
1354           Throwable cause = e;
1355           while (cause != null) {
1356             if (cause instanceof InvalidToken) {
1357               sendToClient = (InvalidToken) cause;
1358               break;
1359             }
1360             cause = cause.getCause();
1361           }
1362           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1363             sendToClient.getLocalizedMessage());
1364           metrics.authenticationFailure();
1365           String clientIP = this.toString();
1366           // attempting user could be null
1367           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1368           throw e;
1369         }
1370         if (replyToken != null) {
1371           if (LOG.isDebugEnabled()) {
1372             LOG.debug("Will send token of size " + replyToken.length
1373                 + " from saslServer.");
1374           }
1375           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1376               null);
1377         }
1378         if (saslServer.isComplete()) {
1379           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1380           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1381           user = getAuthorizedUgi(saslServer.getAuthorizationID());
1382           if (LOG.isDebugEnabled()) {
1383             LOG.debug("SASL server context established. Authenticated client: "
1384               + user + ". Negotiated QoP is "
1385               + saslServer.getNegotiatedProperty(Sasl.QOP));
1386           }
1387           metrics.authenticationSuccess();
1388           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1389           saslContextEstablished = true;
1390         }
1391       }
1392     }
1393 
1394     /**
1395      * No protobuf encoding of raw sasl messages
1396      */
1397     private void doRawSaslReply(SaslStatus status, Writable rv,
1398         String errorClass, String error) throws IOException {
1399       ByteBufferOutputStream saslResponse = null;
1400       DataOutputStream out = null;
1401       try {
1402         // In my testing, have noticed that sasl messages are usually
1403         // in the ballpark of 100-200. That's why the initial capacity is 256.
1404         saslResponse = new ByteBufferOutputStream(256);
1405         out = new DataOutputStream(saslResponse);
1406         out.writeInt(status.state); // write status
1407         if (status == SaslStatus.SUCCESS) {
1408           rv.write(out);
1409         } else {
1410           WritableUtils.writeString(out, errorClass);
1411           WritableUtils.writeString(out, error);
1412         }
1413         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1414         saslCall.responder = responder;
1415         saslCall.sendResponseIfReady();
1416       } finally {
1417         if (saslResponse != null) {
1418           saslResponse.close();
1419         }
1420         if (out != null) {
1421           out.close();
1422         }
1423       }
1424     }
1425 
1426     private void disposeSasl() {
1427       if (saslServer != null) {
1428         try {
1429           saslServer.dispose();
1430           saslServer = null;
1431         } catch (SaslException ignored) {
1432         }
1433       }
1434     }
1435 
1436     private int readPreamble() throws IOException {
1437       int count;
1438       // Check for 'HBas' magic.
1439       this.dataLengthBuffer.flip();
1440       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1441         return doBadPreambleHandling("Expected HEADER=" +
1442             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1443             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1444             " from " + toString());
1445       }
1446       // Now read the next two bytes, the version and the auth to use.
1447       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1448       count = channelRead(channel, versionAndAuthBytes);
1449       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1450         return count;
1451       }
1452       int version = versionAndAuthBytes.get(0);
1453       byte authbyte = versionAndAuthBytes.get(1);
1454       this.authMethod = AuthMethod.valueOf(authbyte);
1455       if (version != CURRENT_VERSION) {
1456         String msg = getFatalConnectionString(version, authbyte);
1457         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1458       }
1459       if (authMethod == null) {
1460         String msg = getFatalConnectionString(version, authbyte);
1461         return doBadPreambleHandling(msg, new BadAuthException(msg));
1462       }
1463       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1464         AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1465         setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1466         responder.doRespond(authFailedCall);
1467         throw ae;
1468       }
1469       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1470         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1471             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1472         authMethod = AuthMethod.SIMPLE;
1473         // client has already sent the initial Sasl message and we
1474         // should ignore it. Both client and server should fall back
1475         // to simple auth from now on.
1476         skipInitialSaslHandshake = true;
1477       }
1478       if (authMethod != AuthMethod.SIMPLE) {
1479         useSasl = true;
1480       }
1481 
1482       dataLengthBuffer.clear();
1483       connectionPreambleRead = true;
1484       return count;
1485     }
1486 
1487     private int read4Bytes() throws IOException {
1488       if (this.dataLengthBuffer.remaining() > 0) {
1489         return channelRead(channel, this.dataLengthBuffer);
1490       } else {
1491         return 0;
1492       }
1493     }
1494 
1495 
1496     /**
1497      * Read off the wire. If there is not enough data to read, update the connection state with
1498      *  what we have and returns.
1499      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1500      * @throws IOException
1501      * @throws InterruptedException
1502      */
1503     public int readAndProcess() throws IOException, InterruptedException {
1504       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1505       // does, read in the rest of the connection preamble, the version and the auth method.
1506       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1507       // length into the 4-byte this.dataLengthBuffer.
1508       int count = read4Bytes();
1509       if (count < 0 || dataLengthBuffer.remaining() > 0 ){
1510         return count;
1511       }
1512 
1513       // If we have not read the connection setup preamble, look to see if that is on the wire.
1514       if (!connectionPreambleRead) {
1515         count = readPreamble();
1516         if (!connectionPreambleRead) {
1517           return count;
1518         }
1519 
1520         count = read4Bytes();
1521         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1522           return count;
1523         }
1524       }
1525 
1526       // We have read a length and we have read the preamble.  It is either the connection header
1527       // or it is a request.
1528       if (data == null) {
1529         dataLengthBuffer.flip();
1530         int dataLength = dataLengthBuffer.getInt();
1531         if (dataLength == RpcClient.PING_CALL_ID) {
1532           if (!useWrap) { //covers the !useSasl too
1533             dataLengthBuffer.clear();
1534             return 0;  //ping message
1535           }
1536         }
1537         if (dataLength < 0) { // A data length of zero is legal.
1538           throw new IllegalArgumentException("Unexpected data length "
1539               + dataLength + "!! from " + getHostAddress());
1540         }
1541         data = ByteBuffer.allocate(dataLength);
1542 
1543         // Increment the rpc count. This counter will be decreased when we write
1544         //  the response.  If we want the connection to be detected as idle properly, we
1545         //  need to keep the inc / dec correct.
1546         incRpcCount();
1547       }
1548 
1549       count = channelRead(channel, data);
1550 
1551       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1552         process();
1553       }
1554 
1555       return count;
1556     }
1557 
1558     /**
1559      * Process the data buffer and clean the connection state for the next call.
1560      */
1561     private void process() throws IOException, InterruptedException {
1562       data.flip();
1563       try {
1564         if (skipInitialSaslHandshake) {
1565           skipInitialSaslHandshake = false;
1566           return;
1567         }
1568 
1569         if (useSasl) {
1570           saslReadAndProcess(data.array());
1571         } else {
1572           processOneRpc(data.array());
1573         }
1574 
1575       } finally {
1576         dataLengthBuffer.clear(); // Clean for the next call
1577         data = null; // For the GC
1578       }
1579     }
1580 
1581     private String getFatalConnectionString(final int version, final byte authByte) {
1582       return "serverVersion=" + CURRENT_VERSION +
1583       ", clientVersion=" + version + ", authMethod=" + authByte +
1584       ", authSupported=" + (authMethod != null) + " from " + toString();
1585     }
1586 
1587     private int doBadPreambleHandling(final String msg) throws IOException {
1588       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1589     }
1590 
1591     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1592       LOG.warn(msg);
1593       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
1594       setupResponse(null, fakeCall, e, msg);
1595       responder.doRespond(fakeCall);
1596       // Returning -1 closes out the connection.
1597       return -1;
1598     }
1599 
1600     // Reads the connection header following version
1601     private void processConnectionHeader(byte[] buf) throws IOException {
1602       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1603       String serviceName = connectionHeader.getServiceName();
1604       if (serviceName == null) throw new EmptyServiceNameException();
1605       this.service = getService(services, serviceName);
1606       if (this.service == null) throw new UnknownServiceException(serviceName);
1607       setupCellBlockCodecs(this.connectionHeader);
1608       UserGroupInformation protocolUser = createUser(connectionHeader);
1609       if (!useSasl) {
1610         user = protocolUser;
1611         if (user != null) {
1612           user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1613         }
1614       } else {
1615         // user is authenticated
1616         user.setAuthenticationMethod(authMethod.authenticationMethod);
1617         //Now we check if this is a proxy user case. If the protocol user is
1618         //different from the 'user', it is a proxy user scenario. However,
1619         //this is not allowed if user authenticated with DIGEST.
1620         if ((protocolUser != null)
1621             && (!protocolUser.getUserName().equals(user.getUserName()))) {
1622           if (authMethod == AuthMethod.DIGEST) {
1623             // Not allowed to doAs if token authentication is used
1624             throw new AccessDeniedException("Authenticated user (" + user
1625                 + ") doesn't match what the client claims to be ("
1626                 + protocolUser + ")");
1627           } else {
1628             // Effective user can be different from authenticated user
1629             // for simple auth or kerberos auth
1630             // The user is the real user. Now we create a proxy user
1631             UserGroupInformation realUser = user;
1632             user = UserGroupInformation.createProxyUser(protocolUser
1633                 .getUserName(), realUser);
1634             // Now the user is a proxy user, set Authentication method Proxy.
1635             user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1636           }
1637         }
1638       }
1639     }
1640 
1641     /**
1642      * Set up cell block codecs
1643      * @throws FatalConnectionException
1644      */
1645     private void setupCellBlockCodecs(final ConnectionHeader header)
1646     throws FatalConnectionException {
1647       // TODO: Plug in other supported decoders.
1648       if (!header.hasCellBlockCodecClass()) return;
1649       String className = header.getCellBlockCodecClass();
1650       if (className == null || className.length() == 0) return;
1651       try {
1652         this.codec = (Codec)Class.forName(className).newInstance();
1653       } catch (Exception e) {
1654         throw new UnsupportedCellCodecException(className, e);
1655       }
1656       if (!header.hasCellBlockCompressorClass()) return;
1657       className = header.getCellBlockCompressorClass();
1658       try {
1659         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1660       } catch (Exception e) {
1661         throw new UnsupportedCompressionCodecException(className, e);
1662       }
1663     }
1664 
1665     private void processUnwrappedData(byte[] inBuf) throws IOException,
1666     InterruptedException {
1667       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1668       // Read all RPCs contained in the inBuf, even partial ones
1669       while (true) {
1670         int count;
1671         if (unwrappedDataLengthBuffer.remaining() > 0) {
1672           count = channelRead(ch, unwrappedDataLengthBuffer);
1673           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1674             return;
1675         }
1676 
1677         if (unwrappedData == null) {
1678           unwrappedDataLengthBuffer.flip();
1679           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1680 
1681           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1682             if (LOG.isDebugEnabled())
1683               LOG.debug("Received ping message");
1684             unwrappedDataLengthBuffer.clear();
1685             continue; // ping message
1686           }
1687           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1688         }
1689 
1690         count = channelRead(ch, unwrappedData);
1691         if (count <= 0 || unwrappedData.remaining() > 0)
1692           return;
1693 
1694         if (unwrappedData.remaining() == 0) {
1695           unwrappedDataLengthBuffer.clear();
1696           unwrappedData.flip();
1697           processOneRpc(unwrappedData.array());
1698           unwrappedData = null;
1699         }
1700       }
1701     }
1702 
1703     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1704       if (connectionHeaderRead) {
1705         processRequest(buf);
1706       } else {
1707         processConnectionHeader(buf);
1708         this.connectionHeaderRead = true;
1709         if (!authorizeConnection()) {
1710           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1711           // down the connection instead of trying to read non-existent retun.
1712           throw new AccessDeniedException("Connection from " + this + " for service " +
1713             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1714         }
1715       }
1716     }
1717 
1718     /**
1719      * @param buf Has the request header and the request param and optionally encoded data buffer
1720      * all in this one array.
1721      * @throws IOException
1722      * @throws InterruptedException
1723      */
1724     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1725       long totalRequestSize = buf.length;
1726       int offset = 0;
1727       // Here we read in the header.  We avoid having pb
1728       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1729       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1730       int headerSize = cis.readRawVarint32();
1731       offset = cis.getTotalBytesRead();
1732       RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1733       offset += headerSize;
1734       int id = header.getCallId();
1735       if (LOG.isTraceEnabled()) {
1736         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1737           " totalRequestSize: " + totalRequestSize + " bytes");
1738       }
1739       // Enforcing the call queue size, this triggers a retry in the client
1740       // This is a bit late to be doing this check - we have already read in the total request.
1741       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1742         final Call callTooBig =
1743           new Call(id, this.service, null, null, null, null, this,
1744             responder, totalRequestSize, null);
1745         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1746         setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1747           "Call queue is full on " + getListenerAddress() +
1748           ", is hbase.ipc.server.max.callqueue.size too small?");
1749         responder.doRespond(callTooBig);
1750         return;
1751       }
1752       MethodDescriptor md = null;
1753       Message param = null;
1754       CellScanner cellScanner = null;
1755       try {
1756         if (header.hasRequestParam() && header.getRequestParam()) {
1757           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1758           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1759           Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1760           // To read the varint, I need an inputstream; might as well be a CIS.
1761           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1762           int paramSize = cis.readRawVarint32();
1763           offset += cis.getTotalBytesRead();
1764           if (builder != null) {
1765             param = builder.mergeFrom(buf, offset, paramSize).build();
1766           }
1767           offset += paramSize;
1768         }
1769         if (header.hasCellBlockMeta()) {
1770           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1771             buf, offset, buf.length);
1772         }
1773       } catch (Throwable t) {
1774         String msg = getListenerAddress() + " is unable to read call parameter from client " +
1775             getHostAddress();
1776         LOG.warn(msg, t);
1777 
1778         // probably the hbase hadoop version does not match the running hadoop version
1779         if (t instanceof LinkageError) {
1780           t = new DoNotRetryIOException(t);
1781         }
1782         // If the method is not present on the server, do not retry.
1783         if (t instanceof UnsupportedOperationException) {
1784           t = new DoNotRetryIOException(t);
1785         }
1786 
1787         final Call readParamsFailedCall =
1788           new Call(id, this.service, null, null, null, null, this,
1789             responder, totalRequestSize, null);
1790         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1791         setupResponse(responseBuffer, readParamsFailedCall, t,
1792           msg + "; " + t.getMessage());
1793         responder.doRespond(readParamsFailedCall);
1794         return;
1795       }
1796 
1797       TraceInfo traceInfo = header.hasTraceInfo()
1798           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1799           : null;
1800       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1801               totalRequestSize,
1802               traceInfo);
1803       scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
1804     }
1805 
1806     private boolean authorizeConnection() throws IOException {
1807       try {
1808         // If auth method is DIGEST, the token was obtained by the
1809         // real user for the effective user, therefore not required to
1810         // authorize real user. doAs is allowed only for simple or kerberos
1811         // authentication
1812         if (user != null && user.getRealUser() != null
1813             && (authMethod != AuthMethod.DIGEST)) {
1814           ProxyUsers.authorize(user, this.getHostAddress(), conf);
1815         }
1816         authorize(user, connectionHeader, getHostInetAddress());
1817         metrics.authorizationSuccess();
1818       } catch (AuthorizationException ae) {
1819         if (LOG.isDebugEnabled()) {
1820           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1821         }
1822         metrics.authorizationFailure();
1823         setupResponse(authFailedResponse, authFailedCall,
1824           new AccessDeniedException(ae), ae.getMessage());
1825         responder.doRespond(authFailedCall);
1826         return false;
1827       }
1828       return true;
1829     }
1830 
1831     protected synchronized void close() {
1832       disposeSasl();
1833       data = null;
1834       this.dataLengthBuffer = null;
1835       if (!channel.isOpen())
1836         return;
1837       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
1838       if (channel.isOpen()) {
1839         try {channel.close();} catch(Exception ignored) {}
1840       }
1841       try {socket.close();} catch(Exception ignored) {}
1842     }
1843 
1844     private UserGroupInformation createUser(ConnectionHeader head) {
1845       UserGroupInformation ugi = null;
1846 
1847       if (!head.hasUserInfo()) {
1848         return null;
1849       }
1850       UserInformation userInfoProto = head.getUserInfo();
1851       String effectiveUser = null;
1852       if (userInfoProto.hasEffectiveUser()) {
1853         effectiveUser = userInfoProto.getEffectiveUser();
1854       }
1855       String realUser = null;
1856       if (userInfoProto.hasRealUser()) {
1857         realUser = userInfoProto.getRealUser();
1858       }
1859       if (effectiveUser != null) {
1860         if (realUser != null) {
1861           UserGroupInformation realUserUgi =
1862               UserGroupInformation.createRemoteUser(realUser);
1863           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1864         } else {
1865           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1866         }
1867       }
1868       return ugi;
1869     }
1870   }
1871 
1872   /**
1873    * Datastructure for passing a {@link BlockingService} and its associated class of
1874    * protobuf service interface.  For example, a server that fielded what is defined
1875    * in the client protobuf service would pass in an implementation of the client blocking service
1876    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1877    */
1878   public static class BlockingServiceAndInterface {
1879     private final BlockingService service;
1880     private final Class<?> serviceInterface;
1881     public BlockingServiceAndInterface(final BlockingService service,
1882         final Class<?> serviceInterface) {
1883       this.service = service;
1884       this.serviceInterface = serviceInterface;
1885     }
1886     public Class<?> getServiceInterface() {
1887       return this.serviceInterface;
1888     }
1889     public BlockingService getBlockingService() {
1890       return this.service;
1891     }
1892   }
1893 
1894   /**
1895    * Constructs a server listening on the named port and address.
1896    * @param server hosting instance of {@link Server}. We will do authentications if an
1897    * instance else pass null for no authentication check.
1898    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1899    * @param services A list of services.
1900    * @param bindAddress Where to listen
1901    * @param conf
1902    * @param scheduler
1903    */
1904   public RpcServer(final Server server, final String name,
1905       final List<BlockingServiceAndInterface> services,
1906       final InetSocketAddress bindAddress, Configuration conf,
1907       RpcScheduler scheduler)
1908       throws IOException {
1909     this.reservoir = new BoundedByteBufferPool(
1910       conf.getInt("hbase.ipc.server.reservoir.max.buffer.size",  1024 * 1024),
1911       conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
1912       // Make the max twice the number of handlers to be safe.
1913       conf.getInt("hbase.ipc.server.reservoir.initial.max",
1914         conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
1915           HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
1916     this.server = server;
1917     this.services = services;
1918     this.bindAddress = bindAddress;
1919     this.conf = conf;
1920     this.socketSendBufferSize = 0;
1921     this.maxQueueSize =
1922       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
1923     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
1924     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
1925     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
1926     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
1927     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
1928       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1929     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1930     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1931 
1932     // Start the listener here and let it bind to the port
1933     listener = new Listener(name);
1934     this.port = listener.getAddress().getPort();
1935 
1936     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1937     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
1938     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
1939 
1940     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1941     this.delayedCalls = new AtomicInteger(0);
1942     this.ipcUtil = new IPCUtil(conf);
1943 
1944 
1945     // Create the responder here
1946     responder = new Responder();
1947     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1948     this.userProvider = UserProvider.instantiate(conf);
1949     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
1950     if (isSecurityEnabled) {
1951       HBaseSaslRpcServer.init(conf);
1952     }
1953     this.scheduler = scheduler;
1954     this.scheduler.init(new RpcSchedulerContext(this));
1955   }
1956 
1957   /**
1958    * Subclasses of HBaseServer can override this to provide their own
1959    * Connection implementations.
1960    */
1961   protected Connection getConnection(SocketChannel channel, long time) {
1962     return new Connection(channel, time);
1963   }
1964 
1965   /**
1966    * Setup response for the RPC Call.
1967    *
1968    * @param response buffer to serialize the response into
1969    * @param call {@link Call} to which we are setting up the response
1970    * @param error error message, if the call failed
1971    * @throws IOException
1972    */
1973   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1974   throws IOException {
1975     if (response != null) response.reset();
1976     call.setResponse(null, null, t, error);
1977   }
1978 
1979   protected void closeConnection(Connection connection) {
1980     synchronized (connectionList) {
1981       if (connectionList.remove(connection)) {
1982         numConnections--;
1983       }
1984     }
1985     connection.close();
1986   }
1987 
1988   Configuration getConf() {
1989     return conf;
1990   }
1991 
1992   /** Sets the socket buffer size used for responding to RPCs.
1993    * @param size send size
1994    */
1995   @Override
1996   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1997 
1998   @Override
1999   public boolean isStarted() {
2000     return this.started;
2001   }
2002 
2003   /** Starts the service.  Must be called before any calls will be handled. */
2004   @Override
2005   public synchronized void start() {
2006     if (started) return;
2007     AuthenticationTokenSecretManager mgr = createSecretManager();
2008     if (mgr != null) {
2009       setSecretManager(mgr);
2010       mgr.start();
2011     }
2012     this.authManager = new ServiceAuthorizationManager();
2013     HBasePolicyProvider.init(conf, authManager);
2014     responder.start();
2015     listener.start();
2016     scheduler.start();
2017     started = true;
2018   }
2019 
2020   @Override
2021   public void refreshAuthManager(PolicyProvider pp) {
2022     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2023     // it'll break if you go via static route.
2024     this.authManager.refresh(this.conf, pp);
2025   }
2026 
2027   private AuthenticationTokenSecretManager createSecretManager() {
2028     if (!isSecurityEnabled) return null;
2029     if (server == null) return null;
2030     Configuration conf = server.getConfiguration();
2031     long keyUpdateInterval =
2032         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2033     long maxAge =
2034         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2035     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2036         server.getServerName().toString(), keyUpdateInterval, maxAge);
2037   }
2038 
2039   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2040     return this.secretManager;
2041   }
2042 
2043   @SuppressWarnings("unchecked")
2044   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2045     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2046   }
2047 
2048   /**
2049    * This is a server side method, which is invoked over RPC. On success
2050    * the return response has protobuf response payload. On failure, the
2051    * exception name and the stack trace are returned in the protobuf response.
2052    */
2053   @Override
2054   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2055       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2056   throws IOException {
2057     try {
2058       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2059       // TODO: Review after we add in encoded data blocks.
2060       status.setRPCPacket(param);
2061       status.resume("Servicing call");
2062       //get an instance of the method arg type
2063       long startTime = System.currentTimeMillis();
2064       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2065       Message result = service.callBlockingMethod(md, controller, param);
2066       int processingTime = (int) (System.currentTimeMillis() - startTime);
2067       int qTime = (int) (startTime - receiveTime);
2068       if (LOG.isTraceEnabled()) {
2069         LOG.trace(CurCall.get().toString() +
2070             ", response " + TextFormat.shortDebugString(result) +
2071             " queueTime: " + qTime +
2072             " processingTime: " + processingTime);
2073       }
2074       metrics.dequeuedCall(qTime);
2075       metrics.processedCall(processingTime);
2076       long responseSize = result.getSerializedSize();
2077       // log any RPC responses that are slower than the configured warn
2078       // response time or larger than configured warning size
2079       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2080       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2081       if (tooSlow || tooLarge) {
2082         // when tagging, we let TooLarge trump TooSmall to keep output simple
2083         // note that large responses will often also be slow.
2084         logResponse(new Object[]{param},
2085             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2086             (tooLarge ? "TooLarge" : "TooSlow"),
2087             status.getClient(), startTime, processingTime, qTime,
2088             responseSize);
2089       }
2090       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2091     } catch (Throwable e) {
2092       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2093       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2094       // need to pass it over the wire.
2095       if (e instanceof ServiceException) e = e.getCause();
2096       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2097       if (e instanceof IOException) throw (IOException)e;
2098       LOG.error("Unexpected throwable object ", e);
2099       throw new IOException(e.getMessage(), e);
2100     }
2101   }
2102 
2103   /**
2104    * Logs an RPC response to the LOG file, producing valid JSON objects for
2105    * client Operations.
2106    * @param params The parameters received in the call.
2107    * @param methodName The name of the method invoked
2108    * @param call The string representation of the call
2109    * @param tag  The tag that will be used to indicate this event in the log.
2110    * @param clientAddress   The address of the client who made this call.
2111    * @param startTime       The time that the call was initiated, in ms.
2112    * @param processingTime  The duration that the call took to run, in ms.
2113    * @param qTime           The duration that the call spent on the queue
2114    *                        prior to being initiated, in ms.
2115    * @param responseSize    The size in bytes of the response buffer.
2116    */
2117   void logResponse(Object[] params, String methodName, String call, String tag,
2118       String clientAddress, long startTime, int processingTime, int qTime,
2119       long responseSize)
2120           throws IOException {
2121     // base information that is reported regardless of type of call
2122     Map<String, Object> responseInfo = new HashMap<String, Object>();
2123     responseInfo.put("starttimems", startTime);
2124     responseInfo.put("processingtimems", processingTime);
2125     responseInfo.put("queuetimems", qTime);
2126     responseInfo.put("responsesize", responseSize);
2127     responseInfo.put("client", clientAddress);
2128     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2129     responseInfo.put("method", methodName);
2130     if (params.length == 2 && server instanceof HRegionServer &&
2131         params[0] instanceof byte[] &&
2132         params[1] instanceof Operation) {
2133       // if the slow process is a query, we want to log its table as well
2134       // as its own fingerprint
2135       TableName tableName = TableName.valueOf(
2136           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2137       responseInfo.put("table", tableName.getNameAsString());
2138       // annotate the response map with operation details
2139       responseInfo.putAll(((Operation) params[1]).toMap());
2140       // report to the log file
2141       LOG.warn("(operation" + tag + "): " +
2142                MAPPER.writeValueAsString(responseInfo));
2143     } else if (params.length == 1 && server instanceof HRegionServer &&
2144         params[0] instanceof Operation) {
2145       // annotate the response map with operation details
2146       responseInfo.putAll(((Operation) params[0]).toMap());
2147       // report to the log file
2148       LOG.warn("(operation" + tag + "): " +
2149                MAPPER.writeValueAsString(responseInfo));
2150     } else {
2151       // can't get JSON details, so just report call.toString() along with
2152       // a more generic tag.
2153       responseInfo.put("call", call);
2154       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2155     }
2156   }
2157 
2158   /** Stops the service.  No new calls will be handled after this is called. */
2159   @Override
2160   public synchronized void stop() {
2161     LOG.info("Stopping server on " + port);
2162     running = false;
2163     listener.interrupt();
2164     listener.doStop();
2165     responder.interrupt();
2166     scheduler.stop();
2167     notifyAll();
2168   }
2169 
2170   /** Wait for the server to be stopped.
2171    * Does not wait for all subthreads to finish.
2172    *  See {@link #stop()}.
2173    * @throws InterruptedException e
2174    */
2175   @Override
2176   public synchronized void join() throws InterruptedException {
2177     while (running) {
2178       wait();
2179     }
2180   }
2181 
2182   /**
2183    * Return the socket (ip+port) on which the RPC server is listening to.
2184    * @return the socket (ip+port) on which the RPC server is listening to.
2185    */
2186   @Override
2187   public synchronized InetSocketAddress getListenerAddress() {
2188     return listener.getAddress();
2189   }
2190 
2191   /**
2192    * Set the handler for calling out of RPC for error conditions.
2193    * @param handler the handler implementation
2194    */
2195   @Override
2196   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2197     this.errorHandler = handler;
2198   }
2199 
2200   @Override
2201   public HBaseRPCErrorHandler getErrorHandler() {
2202     return this.errorHandler;
2203   }
2204 
2205   /**
2206    * Returns the metrics instance for reporting RPC call statistics
2207    */
2208   @Override
2209   public MetricsHBaseServer getMetrics() {
2210     return metrics;
2211   }
2212 
2213   @Override
2214   public void addCallSize(final long diff) {
2215     this.callQueueSize.add(diff);
2216   }
2217 
2218   /**
2219    * Authorize the incoming client connection.
2220    *
2221    * @param user client user
2222    * @param connection incoming connection
2223    * @param addr InetAddress of incoming connection
2224    * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
2225    */
2226   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2227   throws AuthorizationException {
2228     if (authorize) {
2229       Class<?> c = getServiceInterface(services, connection.getServiceName());
2230       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2231     }
2232   }
2233 
2234   /**
2235    * When the read or write buffer size is larger than this limit, i/o will be
2236    * done in chunks of this size. Most RPC requests and responses would be
2237    * be smaller.
2238    */
2239   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2240 
2241   /**
2242    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2243    * If the amount of data is large, it writes to channel in smaller chunks.
2244    * This is to avoid jdk from creating many direct buffers as the size of
2245    * buffer increases. This also minimizes extra copies in NIO layer
2246    * as a result of multiple write operations required to write a large
2247    * buffer.
2248    *
2249    * @param channel writable byte channel to write to
2250    * @param bufferChain Chain of buffers to write
2251    * @return number of bytes written
2252    * @throws java.io.IOException e
2253    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2254    */
2255   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2256   throws IOException {
2257     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2258     if (count > 0) this.metrics.sentBytes(count);
2259     return count;
2260   }
2261 
2262   /**
2263    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2264    * If the amount of data is large, it writes to channel in smaller chunks.
2265    * This is to avoid jdk from creating many direct buffers as the size of
2266    * ByteBuffer increases. There should not be any performance degredation.
2267    *
2268    * @param channel writable byte channel to write on
2269    * @param buffer buffer to write
2270    * @return number of bytes written
2271    * @throws java.io.IOException e
2272    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2273    */
2274   protected int channelRead(ReadableByteChannel channel,
2275                                    ByteBuffer buffer) throws IOException {
2276 
2277     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2278            channel.read(buffer) : channelIO(channel, null, buffer);
2279     if (count > 0) {
2280       metrics.receivedBytes(count);
2281     }
2282     return count;
2283   }
2284 
2285   /**
2286    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2287    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2288    * one of readCh or writeCh should be non-null.
2289    *
2290    * @param readCh read channel
2291    * @param writeCh write channel
2292    * @param buf buffer to read or write into/out of
2293    * @return bytes written
2294    * @throws java.io.IOException e
2295    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2296    * @see #channelWrite(GatheringByteChannel, BufferChain)
2297    */
2298   private static int channelIO(ReadableByteChannel readCh,
2299                                WritableByteChannel writeCh,
2300                                ByteBuffer buf) throws IOException {
2301 
2302     int originalLimit = buf.limit();
2303     int initialRemaining = buf.remaining();
2304     int ret = 0;
2305 
2306     while (buf.remaining() > 0) {
2307       try {
2308         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2309         buf.limit(buf.position() + ioSize);
2310 
2311         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2312 
2313         if (ret < ioSize) {
2314           break;
2315         }
2316 
2317       } finally {
2318         buf.limit(originalLimit);
2319       }
2320     }
2321 
2322     int nBytes = initialRemaining - buf.remaining();
2323     return (nBytes > 0) ? nBytes : ret;
2324   }
2325 
2326   /**
2327    * Needed for features such as delayed calls.  We need to be able to store the current call
2328    * so that we can complete it later or ask questions of what is supported by the current ongoing
2329    * call.
2330    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2331    */
2332   public static RpcCallContext getCurrentCall() {
2333     return CurCall.get();
2334   }
2335 
2336   /**
2337    * @param serviceName Some arbitrary string that represents a 'service'.
2338    * @param services Available service instances
2339    * @return Matching BlockingServiceAndInterface pair
2340    */
2341   static BlockingServiceAndInterface getServiceAndInterface(
2342       final List<BlockingServiceAndInterface> services, final String serviceName) {
2343     for (BlockingServiceAndInterface bs : services) {
2344       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2345         return bs;
2346       }
2347     }
2348     return null;
2349   }
2350 
2351   /**
2352    * @param serviceName Some arbitrary string that represents a 'service'.
2353    * @param services Available services and their service interfaces.
2354    * @return Service interface class for <code>serviceName</code>
2355    */
2356   static Class<?> getServiceInterface(
2357       final List<BlockingServiceAndInterface> services,
2358       final String serviceName) {
2359     BlockingServiceAndInterface bsasi =
2360         getServiceAndInterface(services, serviceName);
2361     return bsasi == null? null: bsasi.getServiceInterface();
2362   }
2363 
2364   /**
2365    * @param serviceName Some arbitrary string that represents a 'service'.
2366    * @param services Available services and their service interfaces.
2367    * @return BlockingService that goes with the passed <code>serviceName</code>
2368    */
2369   static BlockingService getService(
2370       final List<BlockingServiceAndInterface> services,
2371       final String serviceName) {
2372     BlockingServiceAndInterface bsasi =
2373         getServiceAndInterface(services, serviceName);
2374     return bsasi == null? null: bsasi.getBlockingService();
2375   }
2376 
2377   /** Returns the remote side ip address when invoked inside an RPC
2378    *  Returns null incase of an error.
2379    *  @return InetAddress
2380    */
2381   public static InetAddress getRemoteIp() {
2382     Call call = CurCall.get();
2383     if (call != null && call.connection.socket != null) {
2384       return call.connection.socket.getInetAddress();
2385     }
2386     return null;
2387   }
2388 
2389 
2390   /**
2391    * A convenience method to bind to a given address and report
2392    * better exceptions if the address is not a valid host.
2393    * @param socket the socket to bind
2394    * @param address the address to bind to
2395    * @param backlog the number of connections allowed in the queue
2396    * @throws BindException if the address can't be bound
2397    * @throws UnknownHostException if the address isn't a valid host name
2398    * @throws IOException other random errors from bind
2399    */
2400   public static void bind(ServerSocket socket, InetSocketAddress address,
2401                           int backlog) throws IOException {
2402     try {
2403       socket.bind(address, backlog);
2404     } catch (BindException e) {
2405       BindException bindException =
2406         new BindException("Problem binding to " + address + " : " +
2407             e.getMessage());
2408       bindException.initCause(e);
2409       throw bindException;
2410     } catch (SocketException e) {
2411       // If they try to bind to a different host's address, give a better
2412       // error message.
2413       if ("Unresolved address".equals(e.getMessage())) {
2414         throw new UnknownHostException("Invalid hostname for server: " +
2415                                        address.getHostName());
2416       }
2417       throw e;
2418     }
2419   }
2420 
2421   @Override
2422   public RpcScheduler getScheduler() {
2423     return scheduler;
2424   }
2425 }