View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Collections;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  import java.util.LinkedList;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Random;
54  import java.util.concurrent.ExecutorService;
55  import java.util.concurrent.Executors;
56  import java.util.concurrent.atomic.AtomicInteger;
57  
58  import javax.security.sasl.Sasl;
59  import javax.security.sasl.SaslException;
60  import javax.security.sasl.SaslServer;
61  
62  import org.apache.commons.logging.Log;
63  import org.apache.commons.logging.LogFactory;
64  import org.apache.hadoop.classification.InterfaceAudience;
65  import org.apache.hadoop.conf.Configuration;
66  import org.apache.hadoop.hbase.CellScanner;
67  import org.apache.hadoop.hbase.DoNotRetryIOException;
68  import org.apache.hadoop.hbase.HConstants;
69  import org.apache.hadoop.hbase.HRegionInfo;
70  import org.apache.hadoop.hbase.Server;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.client.Operation;
73  import org.apache.hadoop.hbase.codec.Codec;
74  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
75  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
76  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
77  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
79  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
80  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
81  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
82  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
83  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
84  import org.apache.hadoop.hbase.regionserver.HRegionServer;
85  import org.apache.hadoop.hbase.security.AuthMethod;
86  import org.apache.hadoop.hbase.security.HBasePolicyProvider;
87  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
88  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
89  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
90  import org.apache.hadoop.hbase.security.SaslStatus;
91  import org.apache.hadoop.hbase.security.SaslUtil;
92  import org.apache.hadoop.hbase.security.UserProvider;
93  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
94  import org.apache.hadoop.hbase.util.Bytes;
95  import org.apache.hadoop.hbase.util.Counter;
96  import org.apache.hadoop.hbase.util.Pair;
97  import org.apache.hadoop.io.BytesWritable;
98  import org.apache.hadoop.io.IntWritable;
99  import org.apache.hadoop.io.Writable;
100 import org.apache.hadoop.io.WritableUtils;
101 import org.apache.hadoop.io.compress.CompressionCodec;
102 import org.apache.hadoop.security.AccessControlException;
103 import org.apache.hadoop.security.UserGroupInformation;
104 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
105 import org.apache.hadoop.security.authorize.AuthorizationException;
106 import org.apache.hadoop.security.authorize.PolicyProvider;
107 import org.apache.hadoop.security.authorize.ProxyUsers;
108 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
109 import org.apache.hadoop.security.token.SecretManager;
110 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
111 import org.apache.hadoop.security.token.TokenIdentifier;
112 import org.apache.hadoop.util.StringUtils;
113 import org.htrace.TraceInfo;
114 import org.codehaus.jackson.map.ObjectMapper;
115 
116 import com.google.common.util.concurrent.ThreadFactoryBuilder;
117 import com.google.protobuf.BlockingService;
118 import com.google.protobuf.CodedInputStream;
119 import com.google.protobuf.Descriptors.MethodDescriptor;
120 import com.google.protobuf.Message;
121 import com.google.protobuf.Message.Builder;
122 import com.google.protobuf.ServiceException;
123 import com.google.protobuf.TextFormat;
124 
125 /**
126  * An RPC server that hosts protobuf described Services.
127  *
128  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
129  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
130  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
131  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
132  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
133  * and loops till done.
134  *
135  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
136  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
137  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
138  * and keep taking while the server is up.
139  *
140  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
141  * queue for {@link Responder} to pull from and return result to client.
142  *
143  * @see RpcClient
144  */
145 @InterfaceAudience.Private
146 public class RpcServer implements RpcServerInterface {
147   // The logging package is deliberately outside of standard o.a.h.h package so it is not on
148   // by default.
149   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
150 
151   private final boolean authorize;
152   private boolean isSecurityEnabled;
153 
154   public static final byte CURRENT_VERSION = 0;
155 
156   /**
157    * How many calls/handler are allowed in the queue.
158    */
159   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
160 
161   /**
162    * The maximum size that we can hold in the RPC queue
163    */
164   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
165 
166   static final int BUFFER_INITIAL_SIZE = 1024;
167 
168   private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
169 
170   private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
171 
172   private final int warnDelayedCalls;
173 
174   private AtomicInteger delayedCalls;
175   private final IPCUtil ipcUtil;
176 
177   private static final String AUTH_FAILED_FOR = "Auth failed for ";
178   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
179   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
180     Server.class.getName());
181   protected SecretManager<TokenIdentifier> secretManager;
182   protected ServiceAuthorizationManager authManager;
183 
184   /** This is set to Call object before Handler invokes an RPC and ybdie
185    * after the call returns.
186    */
187   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
188 
189   /** Keeps MonitoredRPCHandler per handler thread. */
190   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
191       = new ThreadLocal<MonitoredRPCHandler>();
192 
193   protected final InetSocketAddress isa;
194   protected int port;                             // port we listen on
195   private int readThreads;                        // number of read threads
196   protected int maxIdleTime;                      // the maximum idle time after
197                                                   // which a client may be
198                                                   // disconnected
199   protected int thresholdIdleConnections;         // the number of idle
200                                                   // connections after which we
201                                                   // will start cleaning up idle
202                                                   // connections
203   int maxConnectionsToNuke;                       // the max number of
204                                                   // connections to nuke
205                                                   // during a cleanup
206 
207   protected MetricsHBaseServer metrics;
208 
209   protected final Configuration conf;
210 
211   private int maxQueueSize;
212   protected int socketSendBufferSize;
213   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
214   protected final boolean tcpKeepAlive; // if T then use keepalives
215   protected final long purgeTimeout;    // in milliseconds
216 
217   /**
218    * This flag is used to indicate to sub threads when they should go down.  When we call
219    * {@link #startThreads()}, all threads started will consult this flag on whether they should
220    * keep going.  It is set to false when {@link #stop()} is called.
221    */
222   volatile boolean running = true;
223 
224   /**
225    * This flag is set to true after all threads are up and 'running' and the server is then opened
226    * for business by the calle to {@link #openServer()}.
227    */
228   volatile boolean started = false;
229 
230   /**
231    * This is a running count of the size of all outstanding calls by size.
232    */
233   protected final Counter callQueueSize = new Counter();
234 
235   protected final List<Connection> connectionList =
236     Collections.synchronizedList(new LinkedList<Connection>());
237   //maintain a list
238   //of client connections
239   private Listener listener = null;
240   protected Responder responder = null;
241   protected int numConnections = 0;
242 
243   protected HBaseRPCErrorHandler errorHandler = null;
244 
245   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
246   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
247 
248   /** Default value for above params */
249   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
250   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
251 
252   private static final ObjectMapper MAPPER = new ObjectMapper();
253 
254   private final int warnResponseTime;
255   private final int warnResponseSize;
256   private final Server server;
257   private final List<BlockingServiceAndInterface> services;
258 
259   private final RpcScheduler scheduler;
260 
261   private UserProvider userProvider;
262 
263   /**
264    * Datastructure that holds all necessary to a method invocation and then afterward, carries
265    * the result.
266    */
267   class Call implements RpcCallContext {
268     protected int id;                             // the client's call id
269     protected BlockingService service;
270     protected MethodDescriptor md;
271     protected RequestHeader header;
272     protected Message param;                      // the parameter passed
273     // Optional cell data passed outside of protobufs.
274     protected CellScanner cellScanner;
275     protected Connection connection;              // connection to client
276     protected long timestamp;      // the time received when response is null
277                                    // the time served when response is not null
278     /**
279      * Chain of buffers to send as response.
280      */
281     protected BufferChain response;
282     protected boolean delayResponse;
283     protected Responder responder;
284     protected boolean delayReturnValue;           // if the return value should be
285                                                   // set at call completion
286     protected long size;                          // size of current call
287     protected boolean isError;
288     protected TraceInfo tinfo;
289 
290     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
291          Message param, CellScanner cellScanner, Connection connection, Responder responder,
292          long size, TraceInfo tinfo) {
293       this.id = id;
294       this.service = service;
295       this.md = md;
296       this.header = header;
297       this.param = param;
298       this.cellScanner = cellScanner;
299       this.connection = connection;
300       this.timestamp = System.currentTimeMillis();
301       this.response = null;
302       this.delayResponse = false;
303       this.responder = responder;
304       this.isError = false;
305       this.size = size;
306       this.tinfo = tinfo;
307     }
308 
309     @Override
310     public String toString() {
311       return toShortString() + " param: " +
312         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
313         " connection: " + connection.toString();
314     }
315 
316     protected RequestHeader getHeader() {
317       return this.header;
318     }
319 
320     /*
321      * Short string representation without param info because param itself could be huge depends on
322      * the payload of a command
323      */
324     @SuppressWarnings("deprecation")
325     String toShortString() {
326       String serviceName = this.connection.service != null?
327         this.connection.service.getDescriptorForType().getName() : "null";
328       StringBuilder sb = new StringBuilder();
329       sb.append("callId: ");
330       sb.append(this.id);
331       sb.append(" service: ");
332       sb.append(serviceName);
333       sb.append(" methodName: ");
334       sb.append((this.md != null) ? this.md.getName() : "");
335       sb.append(" size: ");
336       sb.append(StringUtils.humanReadableInt(this.size));
337       sb.append(" connection: ");
338       sb.append(connection.toString());
339       return sb.toString();
340     }
341 
342     String toTraceString() {
343       String serviceName = this.connection.service != null ?
344                            this.connection.service.getDescriptorForType().getName() : "";
345       String methodName = (this.md != null) ? this.md.getName() : "";
346       String result = serviceName + "." + methodName;
347       return result;
348     }
349 
350     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
351       this.response = new BufferChain(response);
352     }
353 
354     protected synchronized void setResponse(Object m, final CellScanner cells,
355         Throwable t, String errorMsg) {
356       if (this.isError) return;
357       if (t != null) this.isError = true;
358       BufferChain bc = null;
359       try {
360         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
361         // Presume it a pb Message.  Could be null.
362         Message result = (Message)m;
363         // Call id.
364         headerBuilder.setCallId(this.id);
365         if (t != null) {
366           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
367           exceptionBuilder.setExceptionClassName(t.getClass().getName());
368           exceptionBuilder.setStackTrace(errorMsg);
369           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
370           if (t instanceof RegionMovedException) {
371             // Special casing for this exception.  This is only one carrying a payload.
372             // Do this instead of build a generic system for allowing exceptions carry
373             // any kind of payload.
374             RegionMovedException rme = (RegionMovedException)t;
375             exceptionBuilder.setHostname(rme.getHostname());
376             exceptionBuilder.setPort(rme.getPort());
377           }
378           // Set the exception as the result of the method invocation.
379           headerBuilder.setException(exceptionBuilder.build());
380         }
381         ByteBuffer cellBlock =
382           ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
383         if (cellBlock != null) {
384           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
385           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
386           cellBlockBuilder.setLength(cellBlock.limit());
387           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
388         }
389         Message header = headerBuilder.build();
390 
391         // Organize the response as a set of bytebuffers rather than collect it all together inside
392         // one big byte array; save on allocations.
393         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
394         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
395         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
396           (cellBlock == null? 0: cellBlock.limit());
397         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
398         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
399         if (connection.useWrap) {
400           bc = wrapWithSasl(bc);
401         }
402       } catch (IOException e) {
403         LOG.warn("Exception while creating response " + e);
404       }
405       this.response = bc;
406     }
407 
408     private BufferChain wrapWithSasl(BufferChain bc)
409         throws IOException {
410       if (bc == null) return bc;
411       if (!this.connection.useSasl) return bc;
412       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
413       // THIS IS A BIG UGLY COPY.
414       byte [] responseBytes = bc.getBytes();
415       byte [] token;
416       // synchronization may be needed since there can be multiple Handler
417       // threads using saslServer to wrap responses.
418       synchronized (connection.saslServer) {
419         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
420       }
421       if (LOG.isDebugEnabled())
422         LOG.debug("Adding saslServer wrapped token of size " + token.length
423             + " as call response.");
424 
425       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
426       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
427       return new BufferChain(bbTokenLength, bbTokenBytes);
428     }
429 
430     @Override
431     public synchronized void endDelay(Object result) throws IOException {
432       assert this.delayResponse;
433       assert this.delayReturnValue || result == null;
434       this.delayResponse = false;
435       delayedCalls.decrementAndGet();
436       if (this.delayReturnValue) {
437         this.setResponse(result, null, null, null);
438       }
439       this.responder.doRespond(this);
440     }
441 
442     @Override
443     public synchronized void endDelay() throws IOException {
444       this.endDelay(null);
445     }
446 
447     @Override
448     public synchronized void startDelay(boolean delayReturnValue) {
449       assert !this.delayResponse;
450       this.delayResponse = true;
451       this.delayReturnValue = delayReturnValue;
452       int numDelayed = delayedCalls.incrementAndGet();
453       if (numDelayed > warnDelayedCalls) {
454         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
455       }
456     }
457 
458     @Override
459     public synchronized void endDelayThrowing(Throwable t) throws IOException {
460       this.setResponse(null, null, t, StringUtils.stringifyException(t));
461       this.delayResponse = false;
462       this.sendResponseIfReady();
463     }
464 
465     @Override
466     public synchronized boolean isDelayed() {
467       return this.delayResponse;
468     }
469 
470     @Override
471     public synchronized boolean isReturnValueDelayed() {
472       return this.delayReturnValue;
473     }
474 
475     @Override
476     public boolean isClientCellBlockSupport() {
477       return this.connection != null && this.connection.codec != null;
478     }
479 
480     @Override
481     public long disconnectSince() {
482       if (!connection.channel.isOpen()) {
483         return System.currentTimeMillis() - timestamp;
484       } else {
485         return -1L;
486       }
487     }
488 
489     public long getSize() {
490       return this.size;
491     }
492 
493     /**
494      * If we have a response, and delay is not set, then respond
495      * immediately.  Otherwise, do not respond to client.  This is
496      * called by the RPC code in the context of the Handler thread.
497      */
498     public synchronized void sendResponseIfReady() throws IOException {
499       if (!this.delayResponse) {
500         this.responder.doRespond(this);
501       }
502     }
503   }
504 
505   /** Listens on the socket. Creates jobs for the handler threads*/
506   private class Listener extends Thread {
507 
508     private ServerSocketChannel acceptChannel = null; //the accept channel
509     private Selector selector = null; //the selector that we use for the server
510     private Reader[] readers = null;
511     private int currentReader = 0;
512     private Random rand = new Random();
513     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
514                                          //-tion (for idle connections) ran
515     private long cleanupInterval = 10000; //the minimum interval between
516                                           //two cleanup runs
517     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
518 
519     private ExecutorService readPool;
520 
521     public Listener(final String name) throws IOException {
522       super(name);
523       // Create a new server socket and set to non blocking mode
524       acceptChannel = ServerSocketChannel.open();
525       acceptChannel.configureBlocking(false);
526 
527       // Bind the server socket to the local host and port
528       bind(acceptChannel.socket(), isa, backlogLength);
529       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
530       // create a selector;
531       selector= Selector.open();
532 
533       readers = new Reader[readThreads];
534       readPool = Executors.newFixedThreadPool(readThreads,
535         new ThreadFactoryBuilder().setNameFormat(
536           "RpcServer.reader=%d,port=" + port).setDaemon(true).build());
537       for (int i = 0; i < readThreads; ++i) {
538         Reader reader = new Reader();
539         readers[i] = reader;
540         readPool.execute(reader);
541       }
542       LOG.info(getName() + ": started " + readThreads + " reader(s).");
543 
544       // Register accepts on the server socket with the selector.
545       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
546       this.setName("RpcServer.listener,port=" + port);
547       this.setDaemon(true);
548     }
549 
550 
551     private class Reader implements Runnable {
552       private volatile boolean adding = false;
553       private final Selector readSelector;
554 
555       Reader() throws IOException {
556         this.readSelector = Selector.open();
557       }
558       public void run() {
559         try {
560           doRunLoop();
561         } finally {
562           try {
563             readSelector.close();
564           } catch (IOException ioe) {
565             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
566           }
567         }
568       }
569 
570       private synchronized void doRunLoop() {
571         while (running) {
572           SelectionKey key = null;
573           try {
574             readSelector.select();
575             while (adding) {
576               this.wait(1000);
577             }
578 
579             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
580             while (iter.hasNext()) {
581               key = iter.next();
582               iter.remove();
583               if (key.isValid()) {
584                 if (key.isReadable()) {
585                   doRead(key);
586                 }
587               }
588               key = null;
589             }
590           } catch (InterruptedException e) {
591             LOG.debug("Interrupted while sleeping");
592             return;
593           } catch (IOException ex) {
594             LOG.error(getName() + ": error in Reader", ex);
595           }
596         }
597       }
598 
599       /**
600        * This gets reader into the state that waits for the new channel
601        * to be registered with readSelector. If it was waiting in select()
602        * the thread will be woken up, otherwise whenever select() is called
603        * it will return even if there is nothing to read and wait
604        * in while(adding) for finishAdd call
605        */
606       public void startAdd() {
607         adding = true;
608         readSelector.wakeup();
609       }
610 
611       public synchronized SelectionKey registerChannel(SocketChannel channel)
612         throws IOException {
613         return channel.register(readSelector, SelectionKey.OP_READ);
614       }
615 
616       public synchronized void finishAdd() {
617         adding = false;
618         this.notify();
619       }
620     }
621 
622     /** cleanup connections from connectionList. Choose a random range
623      * to scan and also have a limit on the number of the connections
624      * that will be cleanedup per run. The criteria for cleanup is the time
625      * for which the connection was idle. If 'force' is true then all
626      * connections will be looked at for the cleanup.
627      * @param force all connections will be looked at for cleanup
628      */
629     private void cleanupConnections(boolean force) {
630       if (force || numConnections > thresholdIdleConnections) {
631         long currentTime = System.currentTimeMillis();
632         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
633           return;
634         }
635         int start = 0;
636         int end = numConnections - 1;
637         if (!force) {
638           start = rand.nextInt() % numConnections;
639           end = rand.nextInt() % numConnections;
640           int temp;
641           if (end < start) {
642             temp = start;
643             start = end;
644             end = temp;
645           }
646         }
647         int i = start;
648         int numNuked = 0;
649         while (i <= end) {
650           Connection c;
651           synchronized (connectionList) {
652             try {
653               c = connectionList.get(i);
654             } catch (Exception e) {return;}
655           }
656           if (c.timedOut(currentTime)) {
657             if (LOG.isDebugEnabled())
658               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
659             closeConnection(c);
660             numNuked++;
661             end--;
662             //noinspection UnusedAssignment
663             c = null;
664             if (!force && numNuked == maxConnectionsToNuke) break;
665           }
666           else i++;
667         }
668         lastCleanupRunTime = System.currentTimeMillis();
669       }
670     }
671 
672     @Override
673     public void run() {
674       LOG.info(getName() + ": starting");
675       while (running) {
676         SelectionKey key = null;
677         try {
678           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
679           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
680           while (iter.hasNext()) {
681             key = iter.next();
682             iter.remove();
683             try {
684               if (key.isValid()) {
685                 if (key.isAcceptable())
686                   doAccept(key);
687               }
688             } catch (IOException ignored) {
689             }
690             key = null;
691           }
692         } catch (OutOfMemoryError e) {
693           if (errorHandler != null) {
694             if (errorHandler.checkOOME(e)) {
695               LOG.info(getName() + ": exiting on OutOfMemoryError");
696               closeCurrentConnection(key, e);
697               cleanupConnections(true);
698               return;
699             }
700           } else {
701             // we can run out of memory if we have too many threads
702             // log the event and sleep for a minute and give
703             // some thread(s) a chance to finish
704             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
705             closeCurrentConnection(key, e);
706             cleanupConnections(true);
707             try {
708               Thread.sleep(60000);
709             } catch (InterruptedException ex) {
710               LOG.debug("Interrupted while sleeping");
711               return;
712             }
713           }
714         } catch (Exception e) {
715           closeCurrentConnection(key, e);
716         }
717         cleanupConnections(false);
718       }
719 
720       LOG.info(getName() + ": stopping");
721 
722       synchronized (this) {
723         try {
724           acceptChannel.close();
725           selector.close();
726         } catch (IOException ignored) { }
727 
728         selector= null;
729         acceptChannel= null;
730 
731         // clean up all connections
732         while (!connectionList.isEmpty()) {
733           closeConnection(connectionList.remove(0));
734         }
735       }
736     }
737 
738     private void closeCurrentConnection(SelectionKey key, Throwable e) {
739       if (key != null) {
740         Connection c = (Connection)key.attachment();
741         if (c != null) {
742           if (LOG.isDebugEnabled()) {
743             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
744                 (e != null ? " on error " + e.getMessage() : ""));
745           }
746           closeConnection(c);
747           key.attach(null);
748         }
749       }
750     }
751 
752     InetSocketAddress getAddress() {
753       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
754     }
755 
756     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
757       Connection c;
758       ServerSocketChannel server = (ServerSocketChannel) key.channel();
759 
760       SocketChannel channel;
761       while ((channel = server.accept()) != null) {
762         try {
763           channel.configureBlocking(false);
764           channel.socket().setTcpNoDelay(tcpNoDelay);
765           channel.socket().setKeepAlive(tcpKeepAlive);
766         } catch (IOException ioe) {
767           channel.close();
768           throw ioe;
769         }
770 
771         Reader reader = getReader();
772         try {
773           reader.startAdd();
774           SelectionKey readKey = reader.registerChannel(channel);
775           c = getConnection(channel, System.currentTimeMillis());
776           readKey.attach(c);
777           synchronized (connectionList) {
778             connectionList.add(numConnections, c);
779             numConnections++;
780           }
781           if (LOG.isDebugEnabled())
782             LOG.debug(getName() + ": connection from " + c.toString() +
783                 "; # active connections: " + numConnections);
784         } finally {
785           reader.finishAdd();
786         }
787       }
788     }
789 
790     void doRead(SelectionKey key) throws InterruptedException {
791       int count = 0;
792       Connection c = (Connection)key.attachment();
793       if (c == null) {
794         return;
795       }
796       c.setLastContact(System.currentTimeMillis());
797       try {
798         count = c.readAndProcess();
799       } catch (InterruptedException ieo) {
800         throw ieo;
801       } catch (Exception e) {
802         LOG.info(getName() + ": count of bytes read: " + count, e);
803         count = -1; //so that the (count < 0) block is executed
804       }
805       if (count < 0) {
806         if (LOG.isDebugEnabled()) {
807           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
808             " because read count=" + count +
809             ". Number of active connections: " + numConnections);
810         }
811         closeConnection(c);
812         // c = null;
813       } else {
814         c.setLastContact(System.currentTimeMillis());
815       }
816     }
817 
818     synchronized void doStop() {
819       if (selector != null) {
820         selector.wakeup();
821         Thread.yield();
822       }
823       if (acceptChannel != null) {
824         try {
825           acceptChannel.socket().close();
826         } catch (IOException e) {
827           LOG.info(getName() + ": exception in closing listener socket. " + e);
828         }
829       }
830       readPool.shutdownNow();
831     }
832 
833     // The method that will return the next reader to work with
834     // Simplistic implementation of round robin for now
835     Reader getReader() {
836       currentReader = (currentReader + 1) % readers.length;
837       return readers[currentReader];
838     }
839   }
840 
841   // Sends responses of RPC back to clients.
842   protected class Responder extends Thread {
843     private final Selector writeSelector;
844     private int pending;         // connections waiting to register
845 
846     Responder() throws IOException {
847       this.setName("RpcServer.responder");
848       this.setDaemon(true);
849       writeSelector = Selector.open(); // create a selector
850       pending = 0;
851     }
852 
853     @Override
854     public void run() {
855       LOG.info(getName() + ": starting");
856       try {
857         doRunLoop();
858       } finally {
859         LOG.info(getName() + ": stopping");
860         try {
861           writeSelector.close();
862         } catch (IOException ioe) {
863           LOG.error(getName() + ": couldn't close write selector", ioe);
864         }
865       }
866     }
867 
868     private void doRunLoop() {
869       long lastPurgeTime = 0;   // last check for old calls.
870       while (running) {
871         try {
872           waitPending();     // If a channel is being registered, wait.
873           writeSelector.select(purgeTimeout);
874           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
875           while (iter.hasNext()) {
876             SelectionKey key = iter.next();
877             iter.remove();
878             try {
879               if (key.isValid() && key.isWritable()) {
880                 doAsyncWrite(key);
881               }
882             } catch (IOException e) {
883               LOG.info(getName() + ": asyncWrite", e);
884             }
885           }
886           long now = System.currentTimeMillis();
887           if (now < lastPurgeTime + purgeTimeout) {
888             continue;
889           }
890           lastPurgeTime = now;
891           //
892           // If there were some calls that have not been sent out for a
893           // long time, discard them.
894           //
895           if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
896           ArrayList<Call> calls;
897 
898           // get the list of channels from list of keys.
899           synchronized (writeSelector.keys()) {
900             calls = new ArrayList<Call>(writeSelector.keys().size());
901             iter = writeSelector.keys().iterator();
902             while (iter.hasNext()) {
903               SelectionKey key = iter.next();
904               Call call = (Call)key.attachment();
905               if (call != null && key.channel() == call.connection.channel) {
906                 calls.add(call);
907               }
908             }
909           }
910 
911           for(Call call : calls) {
912             try {
913               doPurge(call, now);
914             } catch (IOException e) {
915               LOG.warn(getName() + ": error in purging old calls " + e);
916             }
917           }
918         } catch (OutOfMemoryError e) {
919           if (errorHandler != null) {
920             if (errorHandler.checkOOME(e)) {
921               LOG.info(getName() + ": exiting on OutOfMemoryError");
922               return;
923             }
924           } else {
925             //
926             // we can run out of memory if we have too many threads
927             // log the event and sleep for a minute and give
928             // some thread(s) a chance to finish
929             //
930             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
931             try {
932               Thread.sleep(60000);
933             } catch (InterruptedException ex) {
934               LOG.debug("Interrupted while sleeping");
935               return;
936             }
937           }
938         } catch (Exception e) {
939           LOG.warn(getName() + ": exception in Responder " +
940               StringUtils.stringifyException(e));
941         }
942       }
943       LOG.info(getName() + ": stopped");
944     }
945 
946     private void doAsyncWrite(SelectionKey key) throws IOException {
947       Call call = (Call)key.attachment();
948       if (call == null) {
949         return;
950       }
951       if (key.channel() != call.connection.channel) {
952         throw new IOException("doAsyncWrite: bad channel");
953       }
954 
955       synchronized(call.connection.responseQueue) {
956         if (processResponse(call.connection.responseQueue, false)) {
957           try {
958             key.interestOps(0);
959           } catch (CancelledKeyException e) {
960             /* The Listener/reader might have closed the socket.
961              * We don't explicitly cancel the key, so not sure if this will
962              * ever fire.
963              * This warning could be removed.
964              */
965             LOG.warn("Exception while changing ops : " + e);
966           }
967         }
968       }
969     }
970 
971     //
972     // Remove calls that have been pending in the responseQueue
973     // for a long time.
974     //
975     private void doPurge(Call call, long now) throws IOException {
976       synchronized (call.connection.responseQueue) {
977         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
978         while (iter.hasNext()) {
979           Call nextCall = iter.next();
980           if (now > nextCall.timestamp + purgeTimeout) {
981             closeConnection(nextCall.connection);
982             break;
983           }
984         }
985       }
986     }
987 
988     // Processes one response. Returns true if there are no more pending
989     // data for this channel.
990     //
991     private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
992     throws IOException {
993       boolean error = true;
994       boolean done = false;       // there is more data for this channel.
995       int numElements;
996       Call call = null;
997       try {
998         //noinspection SynchronizationOnLocalVariableOrMethodParameter
999         synchronized (responseQueue) {
1000           //
1001           // If there are no items for this channel, then we are done
1002           //
1003           numElements = responseQueue.size();
1004           if (numElements == 0) {
1005             error = false;
1006             return true;              // no more data for this channel.
1007           }
1008           //
1009           // Extract the first call
1010           //
1011           call = responseQueue.removeFirst();
1012           SocketChannel channel = call.connection.channel;
1013           //
1014           // Send as much data as we can in the non-blocking fashion
1015           //
1016           long numBytes = channelWrite(channel, call.response);
1017           if (numBytes < 0) {
1018             return true;
1019           }
1020           if (!call.response.hasRemaining()) {
1021             call.connection.decRpcCount();
1022             //noinspection RedundantIfStatement
1023             if (numElements == 1) {    // last call fully processes.
1024               done = true;             // no more data for this channel.
1025             } else {
1026               done = false;            // more calls pending to be sent.
1027             }
1028             if (LOG.isDebugEnabled()) {
1029               LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
1030             }
1031           } else {
1032             //
1033             // If we were unable to write the entire response out, then
1034             // insert in Selector queue.
1035             //
1036             call.connection.responseQueue.addFirst(call);
1037 
1038             if (inHandler) {
1039               // set the serve time when the response has to be sent later
1040               call.timestamp = System.currentTimeMillis();
1041               if (enqueueInSelector(call))
1042                 done = true;
1043             }
1044             if (LOG.isDebugEnabled()) {
1045               LOG.debug(getName() + call.toShortString() + " partially sent, wrote " +
1046                 numBytes + " bytes.");
1047             }
1048           }
1049           error = false;              // everything went off well
1050         }
1051       } finally {
1052         if (error && call != null) {
1053           LOG.warn(getName() + call.toShortString() + ": output error -- closing");
1054           done = true;               // error. no more data for this channel.
1055           closeConnection(call.connection);
1056         }
1057       }
1058       return done;
1059     }
1060 
1061     //
1062     // Enqueue for background thread to send responses out later.
1063     //
1064     private boolean enqueueInSelector(Call call) throws IOException {
1065       boolean done = false;
1066       incPending();
1067       try {
1068         // Wake up the thread blocked on select, only then can the call
1069         // to channel.register() complete.
1070         SocketChannel channel = call.connection.channel;
1071         writeSelector.wakeup();
1072         channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1073       } catch (ClosedChannelException e) {
1074         //It's OK.  Channel might be closed else where.
1075         done = true;
1076       } finally {
1077         decPending();
1078       }
1079       return done;
1080     }
1081 
1082     //
1083     // Enqueue a response from the application.
1084     //
1085     void doRespond(Call call) throws IOException {
1086       // set the serve time when the response has to be sent later
1087       call.timestamp = System.currentTimeMillis();
1088 
1089       boolean doRegister = false;
1090       synchronized (call.connection.responseQueue) {
1091         call.connection.responseQueue.addLast(call);
1092         if (call.connection.responseQueue.size() == 1) {
1093           doRegister = !processResponse(call.connection.responseQueue, false);
1094         }
1095       }
1096       if (doRegister) {
1097         enqueueInSelector(call);
1098       }
1099     }
1100 
1101     private synchronized void incPending() {   // call waiting to be enqueued.
1102       pending++;
1103     }
1104 
1105     private synchronized void decPending() { // call done enqueueing.
1106       pending--;
1107       notify();
1108     }
1109 
1110     private synchronized void waitPending() throws InterruptedException {
1111       while (pending > 0) {
1112         wait();
1113       }
1114     }
1115   }
1116 
1117   @SuppressWarnings("serial")
1118   public static class CallQueueTooBigException extends IOException {
1119     CallQueueTooBigException() {
1120       super();
1121     }
1122   }
1123 
1124   /** Reads calls from a connection and queues them for handling. */
1125   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1126       value="VO_VOLATILE_INCREMENT",
1127       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1128   public class Connection {
1129     // If initial preamble with version and magic has been read or not.
1130     private boolean connectionPreambleRead = false;
1131     // If the connection header has been read or not.
1132     private boolean connectionHeaderRead = false;
1133     protected SocketChannel channel;
1134     private ByteBuffer data;
1135     private ByteBuffer dataLengthBuffer;
1136     protected final LinkedList<Call> responseQueue;
1137     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1138     private long lastContact;
1139     private InetAddress addr;
1140     protected Socket socket;
1141     // Cache the remote host & port info so that even if the socket is
1142     // disconnected, we can say where it used to connect to.
1143     protected String hostAddress;
1144     protected int remotePort;
1145     ConnectionHeader connectionHeader;
1146     /**
1147      * Codec the client asked use.
1148      */
1149     private Codec codec;
1150     /**
1151      * Compression codec the client asked us use.
1152      */
1153     private CompressionCodec compressionCodec;
1154     BlockingService service;
1155     protected UserGroupInformation user = null;
1156     private AuthMethod authMethod;
1157     private boolean saslContextEstablished;
1158     private boolean skipInitialSaslHandshake;
1159     private ByteBuffer unwrappedData;
1160     // When is this set?  FindBugs wants to know!  Says NP
1161     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1162     boolean useSasl;
1163     SaslServer saslServer;
1164     private boolean useWrap = false;
1165     // Fake 'call' for failed authorization response
1166     private static final int AUTHROIZATION_FAILED_CALLID = -1;
1167     private final Call authFailedCall =
1168       new Call(AUTHROIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null);
1169     private ByteArrayOutputStream authFailedResponse =
1170         new ByteArrayOutputStream();
1171     // Fake 'call' for SASL context setup
1172     private static final int SASL_CALLID = -33;
1173     private final Call saslCall =
1174       new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
1175 
1176     public UserGroupInformation attemptingUser = null; // user name before auth
1177 
1178     public Connection(SocketChannel channel, long lastContact) {
1179       this.channel = channel;
1180       this.lastContact = lastContact;
1181       this.data = null;
1182       this.dataLengthBuffer = ByteBuffer.allocate(4);
1183       this.socket = channel.socket();
1184       this.addr = socket.getInetAddress();
1185       if (addr == null) {
1186         this.hostAddress = "*Unknown*";
1187       } else {
1188         this.hostAddress = addr.getHostAddress();
1189       }
1190       this.remotePort = socket.getPort();
1191       this.responseQueue = new LinkedList<Call>();
1192       if (socketSendBufferSize != 0) {
1193         try {
1194           socket.setSendBufferSize(socketSendBufferSize);
1195         } catch (IOException e) {
1196           LOG.warn("Connection: unable to set socket send buffer size to " +
1197                    socketSendBufferSize);
1198         }
1199       }
1200     }
1201 
1202     @Override
1203     public String toString() {
1204       return getHostAddress() + ":" + remotePort;
1205     }
1206 
1207     public String getHostAddress() {
1208       return hostAddress;
1209     }
1210 
1211     public InetAddress getHostInetAddress() {
1212       return addr;
1213     }
1214 
1215     public int getRemotePort() {
1216       return remotePort;
1217     }
1218 
1219     public void setLastContact(long lastContact) {
1220       this.lastContact = lastContact;
1221     }
1222 
1223     public long getLastContact() {
1224       return lastContact;
1225     }
1226 
1227     /* Return true if the connection has no outstanding rpc */
1228     private boolean isIdle() {
1229       return rpcCount.get() == 0;
1230     }
1231 
1232     /* Decrement the outstanding RPC count */
1233     protected void decRpcCount() {
1234       rpcCount.decrement();
1235     }
1236 
1237     /* Increment the outstanding RPC count */
1238     protected void incRpcCount() {
1239       rpcCount.increment();
1240     }
1241 
1242     protected boolean timedOut(long currentTime) {
1243       return isIdle() && currentTime - lastContact > maxIdleTime;
1244     }
1245 
1246     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1247         throws IOException {
1248       if (authMethod == AuthMethod.DIGEST) {
1249         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1250             secretManager);
1251         UserGroupInformation ugi = tokenId.getUser();
1252         if (ugi == null) {
1253           throw new AccessControlException(
1254               "Can't retrieve username from tokenIdentifier.");
1255         }
1256         ugi.addTokenIdentifier(tokenId);
1257         return ugi;
1258       } else {
1259         return UserGroupInformation.createRemoteUser(authorizedId);
1260       }
1261     }
1262 
1263     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1264         InterruptedException {
1265       if (saslContextEstablished) {
1266         if (LOG.isDebugEnabled())
1267           LOG.debug("Have read input token of size " + saslToken.length
1268               + " for processing by saslServer.unwrap()");
1269 
1270         if (!useWrap) {
1271           processOneRpc(saslToken);
1272         } else {
1273           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1274           processUnwrappedData(plaintextData);
1275         }
1276       } else {
1277         byte[] replyToken = null;
1278         try {
1279           if (saslServer == null) {
1280             switch (authMethod) {
1281             case DIGEST:
1282               if (secretManager == null) {
1283                 throw new AccessControlException(
1284                     "Server is not configured to do DIGEST authentication.");
1285               }
1286               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1287                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1288                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1289                       secretManager, this));
1290               break;
1291             default:
1292               UserGroupInformation current = UserGroupInformation
1293               .getCurrentUser();
1294               String fullName = current.getUserName();
1295               if (LOG.isDebugEnabled()) {
1296                 LOG.debug("Kerberos principal name is " + fullName);
1297               }
1298               final String names[] = SaslUtil.splitKerberosName(fullName);
1299               if (names.length != 3) {
1300                 throw new AccessControlException(
1301                     "Kerberos principal name does NOT have the expected "
1302                         + "hostname part: " + fullName);
1303               }
1304               current.doAs(new PrivilegedExceptionAction<Object>() {
1305                 @Override
1306                 public Object run() throws SaslException {
1307                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1308                       .getMechanismName(), names[0], names[1],
1309                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1310                   return null;
1311                 }
1312               });
1313             }
1314             if (saslServer == null)
1315               throw new AccessControlException(
1316                   "Unable to find SASL server implementation for "
1317                       + authMethod.getMechanismName());
1318             if (LOG.isDebugEnabled()) {
1319               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1320             }
1321           }
1322           if (LOG.isDebugEnabled()) {
1323             LOG.debug("Have read input token of size " + saslToken.length
1324                 + " for processing by saslServer.evaluateResponse()");
1325           }
1326           replyToken = saslServer.evaluateResponse(saslToken);
1327         } catch (IOException e) {
1328           IOException sendToClient = e;
1329           Throwable cause = e;
1330           while (cause != null) {
1331             if (cause instanceof InvalidToken) {
1332               sendToClient = (InvalidToken) cause;
1333               break;
1334             }
1335             cause = cause.getCause();
1336           }
1337           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1338             sendToClient.getLocalizedMessage());
1339           metrics.authenticationFailure();
1340           String clientIP = this.toString();
1341           // attempting user could be null
1342           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1343           throw e;
1344         }
1345         if (replyToken != null) {
1346           if (LOG.isDebugEnabled()) {
1347             LOG.debug("Will send token of size " + replyToken.length
1348                 + " from saslServer.");
1349           }
1350           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1351               null);
1352         }
1353         if (saslServer.isComplete()) {
1354           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1355           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1356           user = getAuthorizedUgi(saslServer.getAuthorizationID());
1357           if (LOG.isDebugEnabled()) {
1358             LOG.debug("SASL server context established. Authenticated client: "
1359               + user + ". Negotiated QoP is "
1360               + saslServer.getNegotiatedProperty(Sasl.QOP));
1361           }
1362           metrics.authenticationSuccess();
1363           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1364           saslContextEstablished = true;
1365         }
1366       }
1367     }
1368 
1369     /**
1370      * No protobuf encoding of raw sasl messages
1371      */
1372     private void doRawSaslReply(SaslStatus status, Writable rv,
1373         String errorClass, String error) throws IOException {
1374       ByteBufferOutputStream saslResponse = null;
1375       DataOutputStream out = null;
1376       try {
1377         // In my testing, have noticed that sasl messages are usually
1378         // in the ballpark of 100-200. That's why the initial capacity is 256.
1379         saslResponse = new ByteBufferOutputStream(256);
1380         out = new DataOutputStream(saslResponse);
1381         out.writeInt(status.state); // write status
1382         if (status == SaslStatus.SUCCESS) {
1383           rv.write(out);
1384         } else {
1385           WritableUtils.writeString(out, errorClass);
1386           WritableUtils.writeString(out, error);
1387         }
1388         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1389         saslCall.responder = responder;
1390         saslCall.sendResponseIfReady();
1391       } finally {
1392         if (saslResponse != null) {
1393           saslResponse.close();
1394         }
1395         if (out != null) {
1396           out.close();
1397         }
1398       }
1399     }
1400 
1401     private void disposeSasl() {
1402       if (saslServer != null) {
1403         try {
1404           saslServer.dispose();
1405           saslServer = null;
1406         } catch (SaslException ignored) {
1407         }
1408       }
1409     }
1410 
1411     /**
1412      * Read off the wire.
1413      * @return Returns -1 if failure (and caller will close connection) else return how many
1414      * bytes were read and processed
1415      * @throws IOException
1416      * @throws InterruptedException
1417      */
1418     public int readAndProcess() throws IOException, InterruptedException {
1419       while (true) {
1420         // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1421         // does, read in the rest of the connection preamble, the version and the auth method.
1422         // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1423         // length into the 4-byte this.dataLengthBuffer.
1424         int count;
1425         if (this.dataLengthBuffer.remaining() > 0) {
1426           count = channelRead(channel, this.dataLengthBuffer);
1427           if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1428             return count;
1429           }
1430         }
1431         // If we have not read the connection setup preamble, look to see if that is on the wire.
1432         if (!connectionPreambleRead) {
1433           // Check for 'HBas' magic.
1434           this.dataLengthBuffer.flip();
1435           if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1436             return doBadPreambleHandling("Expected HEADER=" +
1437               Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1438               " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1439               " from " + toString());
1440           }
1441           // Now read the next two bytes, the version and the auth to use.
1442           ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1443           count = channelRead(channel, versionAndAuthBytes);
1444           if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1445             return count;
1446           }
1447           int version = versionAndAuthBytes.get(0);
1448           byte authbyte = versionAndAuthBytes.get(1);
1449           this.authMethod = AuthMethod.valueOf(authbyte);
1450           if (version != CURRENT_VERSION) {
1451             String msg = getFatalConnectionString(version, authbyte);
1452             return doBadPreambleHandling(msg, new WrongVersionException(msg));
1453           }
1454           if (authMethod == null) {
1455             String msg = getFatalConnectionString(version, authbyte);
1456             return doBadPreambleHandling(msg, new BadAuthException(msg));
1457           }
1458           if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1459             AccessControlException ae = new AccessControlException("Authentication is required");
1460             setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1461             responder.doRespond(authFailedCall);
1462             throw ae;
1463           }
1464           if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1465             doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1466                 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1467             authMethod = AuthMethod.SIMPLE;
1468             // client has already sent the initial Sasl message and we
1469             // should ignore it. Both client and server should fall back
1470             // to simple auth from now on.
1471             skipInitialSaslHandshake = true;
1472           }
1473           if (authMethod != AuthMethod.SIMPLE) {
1474             useSasl = true;
1475           }
1476           connectionPreambleRead = true;
1477           // Preamble checks out. Go around again to read actual connection header.
1478           dataLengthBuffer.clear();
1479           continue;
1480         }
1481         // We have read a length and we have read the preamble.  It is either the connection header
1482         // or it is a request.
1483         if (data == null) {
1484           dataLengthBuffer.flip();
1485           int dataLength = dataLengthBuffer.getInt();
1486           if (dataLength == RpcClient.PING_CALL_ID) {
1487             if (!useWrap) { //covers the !useSasl too
1488               dataLengthBuffer.clear();
1489               return 0;  //ping message
1490             }
1491           }
1492           if (dataLength < 0) {
1493             throw new IllegalArgumentException("Unexpected data length "
1494                 + dataLength + "!! from " + getHostAddress());
1495           }
1496           data = ByteBuffer.allocate(dataLength);
1497           incRpcCount();  // Increment the rpc count
1498         }
1499         count = channelRead(channel, data);
1500         if (count < 0) {
1501           return count;
1502         } else if (data.remaining() == 0) {
1503           dataLengthBuffer.clear();
1504           data.flip();
1505           if (skipInitialSaslHandshake) {
1506             data = null;
1507             skipInitialSaslHandshake = false;
1508             continue;
1509           }
1510           boolean headerRead = connectionHeaderRead;
1511           if (useSasl) {
1512             saslReadAndProcess(data.array());
1513           } else {
1514             processOneRpc(data.array());
1515           }
1516           this.data = null;
1517           if (!headerRead) {
1518             continue;
1519           }
1520         } else if (count > 0) {
1521           // We got some data and there is more to read still; go around again.
1522           if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1523           continue;
1524         }
1525         return count;
1526       }
1527     }
1528 
1529     private String getFatalConnectionString(final int version, final byte authByte) {
1530       return "serverVersion=" + CURRENT_VERSION +
1531       ", clientVersion=" + version + ", authMethod=" + authByte +
1532       ", authSupported=" + (authMethod != null) + " from " + toString();
1533     }
1534 
1535     private int doBadPreambleHandling(final String msg) throws IOException {
1536       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1537     }
1538 
1539     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1540       LOG.warn(msg);
1541       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
1542       setupResponse(null, fakeCall, e, msg);
1543       responder.doRespond(fakeCall);
1544       // Returning -1 closes out the connection.
1545       return -1;
1546     }
1547 
1548     // Reads the connection header following version
1549     private void processConnectionHeader(byte[] buf) throws IOException {
1550       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1551       String serviceName = connectionHeader.getServiceName();
1552       if (serviceName == null) throw new EmptyServiceNameException();
1553       this.service = getService(services, serviceName);
1554       if (this.service == null) throw new UnknownServiceException(serviceName);
1555       setupCellBlockCodecs(this.connectionHeader);
1556       UserGroupInformation protocolUser = createUser(connectionHeader);
1557       if (!useSasl) {
1558         user = protocolUser;
1559         if (user != null) {
1560           user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1561         }
1562       } else {
1563         // user is authenticated
1564         user.setAuthenticationMethod(authMethod.authenticationMethod);
1565         //Now we check if this is a proxy user case. If the protocol user is
1566         //different from the 'user', it is a proxy user scenario. However,
1567         //this is not allowed if user authenticated with DIGEST.
1568         if ((protocolUser != null)
1569             && (!protocolUser.getUserName().equals(user.getUserName()))) {
1570           if (authMethod == AuthMethod.DIGEST) {
1571             // Not allowed to doAs if token authentication is used
1572             throw new AccessControlException("Authenticated user (" + user
1573                 + ") doesn't match what the client claims to be ("
1574                 + protocolUser + ")");
1575           } else {
1576             // Effective user can be different from authenticated user
1577             // for simple auth or kerberos auth
1578             // The user is the real user. Now we create a proxy user
1579             UserGroupInformation realUser = user;
1580             user = UserGroupInformation.createProxyUser(protocolUser
1581                 .getUserName(), realUser);
1582             // Now the user is a proxy user, set Authentication method Proxy.
1583             user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1584           }
1585         }
1586       }
1587     }
1588 
1589     /**
1590      * Set up cell block codecs
1591      * @param header
1592      * @throws FatalConnectionException
1593      */
1594     private void setupCellBlockCodecs(final ConnectionHeader header)
1595     throws FatalConnectionException {
1596       // TODO: Plug in other supported decoders.
1597       if (!header.hasCellBlockCodecClass()) return;
1598       String className = header.getCellBlockCodecClass();
1599       if (className == null || className.length() == 0) return;
1600       try {
1601         this.codec = (Codec)Class.forName(className).newInstance();
1602       } catch (Exception e) {
1603         throw new UnsupportedCellCodecException(className, e);
1604       }
1605       if (!header.hasCellBlockCompressorClass()) return;
1606       className = header.getCellBlockCompressorClass();
1607       try {
1608         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1609       } catch (Exception e) {
1610         throw new UnsupportedCompressionCodecException(className, e);
1611       }
1612     }
1613 
1614     private void processUnwrappedData(byte[] inBuf) throws IOException,
1615     InterruptedException {
1616       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1617       // Read all RPCs contained in the inBuf, even partial ones
1618       while (true) {
1619         int count = -1;
1620         if (unwrappedDataLengthBuffer.remaining() > 0) {
1621           count = channelRead(ch, unwrappedDataLengthBuffer);
1622           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1623             return;
1624         }
1625 
1626         if (unwrappedData == null) {
1627           unwrappedDataLengthBuffer.flip();
1628           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1629 
1630           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1631             if (LOG.isDebugEnabled())
1632               LOG.debug("Received ping message");
1633             unwrappedDataLengthBuffer.clear();
1634             continue; // ping message
1635           }
1636           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1637         }
1638 
1639         count = channelRead(ch, unwrappedData);
1640         if (count <= 0 || unwrappedData.remaining() > 0)
1641           return;
1642 
1643         if (unwrappedData.remaining() == 0) {
1644           unwrappedDataLengthBuffer.clear();
1645           unwrappedData.flip();
1646           processOneRpc(unwrappedData.array());
1647           unwrappedData = null;
1648         }
1649       }
1650     }
1651 
1652     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1653       if (connectionHeaderRead) {
1654         processRequest(buf);
1655       } else {
1656         processConnectionHeader(buf);
1657         this.connectionHeaderRead = true;
1658         if (!authorizeConnection()) {
1659           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1660           // down the connection instead of trying to read non-existent retun.
1661           throw new AccessControlException("Connection from " + this + " for service " +
1662             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1663         }
1664       }
1665     }
1666 
1667     /**
1668      * @param buf Has the request header and the request param and optionally encoded data buffer
1669      * all in this one array.
1670      * @throws IOException
1671      * @throws InterruptedException
1672      */
1673     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1674       long totalRequestSize = buf.length;
1675       int offset = 0;
1676       // Here we read in the header.  We avoid having pb
1677       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1678       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1679       int headerSize = cis.readRawVarint32();
1680       offset = cis.getTotalBytesRead();
1681       RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1682       offset += headerSize;
1683       int id = header.getCallId();
1684       if (LOG.isTraceEnabled()) {
1685         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1686           " totalRequestSize: " + totalRequestSize + " bytes");
1687       }
1688       // Enforcing the call queue size, this triggers a retry in the client
1689       // This is a bit late to be doing this check - we have already read in the total request.
1690       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1691         final Call callTooBig =
1692           new Call(id, this.service, null, null, null, null, this,
1693             responder, totalRequestSize, null);
1694         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1695         setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1696           "Call queue is full, is ipc.server.max.callqueue.size too small?");
1697         responder.doRespond(callTooBig);
1698         return;
1699       }
1700       MethodDescriptor md = null;
1701       Message param = null;
1702       CellScanner cellScanner = null;
1703       try {
1704         if (header.hasRequestParam() && header.getRequestParam()) {
1705           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1706           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1707           Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1708           // To read the varint, I need an inputstream; might as well be a CIS.
1709           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1710           int paramSize = cis.readRawVarint32();
1711           offset += cis.getTotalBytesRead();
1712           if (builder != null) {
1713             param = builder.mergeFrom(buf, offset, paramSize).build();
1714           }
1715           offset += paramSize;
1716         }
1717         if (header.hasCellBlockMeta()) {
1718           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1719             buf, offset, buf.length);
1720         }
1721       } catch (Throwable t) {
1722         String msg = "Unable to read call parameter from client " + getHostAddress();
1723         LOG.warn(msg, t);
1724 
1725         // probably the hbase hadoop version does not match the running hadoop version
1726         if (t instanceof LinkageError) {
1727           t = new DoNotRetryIOException(t);
1728         }
1729         // If the method is not present on the server, do not retry.
1730         if (t instanceof UnsupportedOperationException) {
1731           t = new DoNotRetryIOException(t);
1732         }
1733 
1734         final Call readParamsFailedCall =
1735           new Call(id, this.service, null, null, null, null, this,
1736             responder, totalRequestSize, null);
1737         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1738         setupResponse(responseBuffer, readParamsFailedCall, t,
1739           msg + "; " + t.getMessage());
1740         responder.doRespond(readParamsFailedCall);
1741         return;
1742       }
1743 
1744       TraceInfo traceInfo = header.hasTraceInfo()
1745           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1746           : null;
1747       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1748               totalRequestSize,
1749               traceInfo);
1750       scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
1751     }
1752 
1753     private boolean authorizeConnection() throws IOException {
1754       try {
1755         // If auth method is DIGEST, the token was obtained by the
1756         // real user for the effective user, therefore not required to
1757         // authorize real user. doAs is allowed only for simple or kerberos
1758         // authentication
1759         if (user != null && user.getRealUser() != null
1760             && (authMethod != AuthMethod.DIGEST)) {
1761           ProxyUsers.authorize(user, this.getHostAddress(), conf);
1762         }
1763         authorize(user, connectionHeader, getHostInetAddress());
1764         if (LOG.isDebugEnabled()) {
1765           LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1766         }
1767         metrics.authorizationSuccess();
1768       } catch (AuthorizationException ae) {
1769         LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1770         metrics.authorizationFailure();
1771         setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1772         responder.doRespond(authFailedCall);
1773         return false;
1774       }
1775       return true;
1776     }
1777 
1778     protected synchronized void close() {
1779       disposeSasl();
1780       data = null;
1781       this.dataLengthBuffer = null;
1782       if (!channel.isOpen())
1783         return;
1784       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
1785       if (channel.isOpen()) {
1786         try {channel.close();} catch(Exception ignored) {}
1787       }
1788       try {socket.close();} catch(Exception ignored) {}
1789     }
1790 
1791     private UserGroupInformation createUser(ConnectionHeader head) {
1792       UserGroupInformation ugi = null;
1793 
1794       if (!head.hasUserInfo()) {
1795         return null;
1796       }
1797       UserInformation userInfoProto = head.getUserInfo();
1798       String effectiveUser = null;
1799       if (userInfoProto.hasEffectiveUser()) {
1800         effectiveUser = userInfoProto.getEffectiveUser();
1801       }
1802       String realUser = null;
1803       if (userInfoProto.hasRealUser()) {
1804         realUser = userInfoProto.getRealUser();
1805       }
1806       if (effectiveUser != null) {
1807         if (realUser != null) {
1808           UserGroupInformation realUserUgi =
1809               UserGroupInformation.createRemoteUser(realUser);
1810           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1811         } else {
1812           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1813         }
1814       }
1815       return ugi;
1816     }
1817   }
1818 
1819   /**
1820    * Datastructure for passing a {@link BlockingService} and its associated class of
1821    * protobuf service interface.  For example, a server that fielded what is defined
1822    * in the client protobuf service would pass in an implementation of the client blocking service
1823    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1824    */
1825   public static class BlockingServiceAndInterface {
1826     private final BlockingService service;
1827     private final Class<?> serviceInterface;
1828     public BlockingServiceAndInterface(final BlockingService service,
1829         final Class<?> serviceInterface) {
1830       this.service = service;
1831       this.serviceInterface = serviceInterface;
1832     }
1833     public Class<?> getServiceInterface() {
1834       return this.serviceInterface;
1835     }
1836     public BlockingService getBlockingService() {
1837       return this.service;
1838     }
1839   }
1840 
1841   /**
1842    * Constructs a server listening on the named port and address.
1843    * @param server hosting instance of {@link Server}. We will do authentications if an
1844    * instance else pass null for no authentication check.
1845    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1846    * @param services A list of services.
1847    * @param isa Where to listen
1848    * @param conf
1849    * @throws IOException
1850    */
1851   public RpcServer(final Server server, final String name,
1852       final List<BlockingServiceAndInterface> services,
1853       final InetSocketAddress isa, Configuration conf,
1854       RpcScheduler scheduler)
1855   throws IOException {
1856     this.server = server;
1857     this.services = services;
1858     this.isa = isa;
1859     this.conf = conf;
1860     this.socketSendBufferSize = 0;
1861     this.maxQueueSize =
1862       this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
1863     this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
1864     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1865     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1866     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1867     this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
1868       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1869     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1870     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1871 
1872     // Start the listener here and let it bind to the port
1873     listener = new Listener(name);
1874     this.port = listener.getAddress().getPort();
1875 
1876     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1877     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
1878     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1879 
1880     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1881     this.delayedCalls = new AtomicInteger(0);
1882     this.ipcUtil = new IPCUtil(conf);
1883 
1884 
1885     // Create the responder here
1886     responder = new Responder();
1887     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1888     this.userProvider = UserProvider.instantiate(conf);
1889     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
1890     if (isSecurityEnabled) {
1891       HBaseSaslRpcServer.init(conf);
1892     }
1893     this.scheduler = scheduler;
1894     this.scheduler.init(new RpcSchedulerContext(this));
1895   }
1896 
1897   /**
1898    * Subclasses of HBaseServer can override this to provide their own
1899    * Connection implementations.
1900    */
1901   protected Connection getConnection(SocketChannel channel, long time) {
1902     return new Connection(channel, time);
1903   }
1904 
1905   /**
1906    * Setup response for the RPC Call.
1907    *
1908    * @param response buffer to serialize the response into
1909    * @param call {@link Call} to which we are setting up the response
1910    * @param error error message, if the call failed
1911    * @param t
1912    * @throws IOException
1913    */
1914   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1915   throws IOException {
1916     if (response != null) response.reset();
1917     call.setResponse(null, null, t, error);
1918   }
1919 
1920   protected void closeConnection(Connection connection) {
1921     synchronized (connectionList) {
1922       if (connectionList.remove(connection)) {
1923         numConnections--;
1924       }
1925     }
1926     connection.close();
1927   }
1928 
1929   Configuration getConf() {
1930     return conf;
1931   }
1932 
1933   /** Sets the socket buffer size used for responding to RPCs.
1934    * @param size send size
1935    */
1936   @Override
1937   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1938 
1939   @Override
1940   public boolean isStarted() {
1941     return this.started;
1942   }
1943 
1944   /** Starts the service.  Must be called before any calls will be handled. */
1945   @Override
1946   public synchronized void start() {
1947     if (started) return;
1948     AuthenticationTokenSecretManager mgr = createSecretManager();
1949     if (mgr != null) {
1950       setSecretManager(mgr);
1951       mgr.start();
1952     }
1953     this.authManager = new ServiceAuthorizationManager();
1954     HBasePolicyProvider.init(conf, authManager);
1955     responder.start();
1956     listener.start();
1957     scheduler.start();
1958     started = true;
1959   }
1960 
1961   @Override
1962   public void refreshAuthManager(PolicyProvider pp) {
1963     // Ignore warnings that this should be accessed in a static way instead of via an instance;
1964     // it'll break if you go via static route.
1965     this.authManager.refresh(this.conf, pp);
1966   }
1967 
1968   private AuthenticationTokenSecretManager createSecretManager() {
1969     if (!isSecurityEnabled) return null;
1970     if (server == null) return null;
1971     Configuration conf = server.getConfiguration();
1972     long keyUpdateInterval =
1973         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
1974     long maxAge =
1975         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
1976     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
1977         server.getServerName().toString(), keyUpdateInterval, maxAge);
1978   }
1979 
1980   public SecretManager<? extends TokenIdentifier> getSecretManager() {
1981     return this.secretManager;
1982   }
1983 
1984   @SuppressWarnings("unchecked")
1985   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
1986     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
1987   }
1988 
1989   /**
1990    * This is a server side method, which is invoked over RPC. On success
1991    * the return response has protobuf response payload. On failure, the
1992    * exception name and the stack trace are returned in the protobuf response.
1993    */
1994   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
1995       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
1996   throws IOException {
1997     try {
1998       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
1999       // TODO: Review after we add in encoded data blocks.
2000       status.setRPCPacket(param);
2001       status.resume("Servicing call");
2002       //get an instance of the method arg type
2003       long startTime = System.currentTimeMillis();
2004       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2005       Message result = service.callBlockingMethod(md, controller, param);
2006       int processingTime = (int) (System.currentTimeMillis() - startTime);
2007       int qTime = (int) (startTime - receiveTime);
2008       if (LOG.isTraceEnabled()) {
2009         LOG.trace(CurCall.get().toString() +
2010             ", response " + TextFormat.shortDebugString(result) +
2011             " queueTime: " + qTime +
2012             " processingTime: " + processingTime);
2013       }
2014       metrics.dequeuedCall(qTime);
2015       metrics.processedCall(processingTime);
2016       long responseSize = result.getSerializedSize();
2017       // log any RPC responses that are slower than the configured warn
2018       // response time or larger than configured warning size
2019       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2020       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2021       if (tooSlow || tooLarge) {
2022         // when tagging, we let TooLarge trump TooSmall to keep output simple
2023         // note that large responses will often also be slow.
2024         StringBuilder buffer = new StringBuilder(256);
2025         buffer.append(md.getName());
2026         buffer.append("(");
2027         buffer.append(param.getClass().getName());
2028         buffer.append(")");
2029         logResponse(new Object[]{param},
2030             md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2031             status.getClient(), startTime, processingTime, qTime,
2032             responseSize);
2033       }
2034       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2035     } catch (Throwable e) {
2036       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2037       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2038       // need to pass it over the wire.
2039       if (e instanceof ServiceException) e = e.getCause();
2040       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2041       if (e instanceof IOException) throw (IOException)e;
2042       LOG.error("Unexpected throwable object ", e);
2043       throw new IOException(e.getMessage(), e);
2044     }
2045   }
2046 
2047   /**
2048    * Logs an RPC response to the LOG file, producing valid JSON objects for
2049    * client Operations.
2050    * @param params The parameters received in the call.
2051    * @param methodName The name of the method invoked
2052    * @param call The string representation of the call
2053    * @param tag  The tag that will be used to indicate this event in the log.
2054    * @param clientAddress   The address of the client who made this call.
2055    * @param startTime       The time that the call was initiated, in ms.
2056    * @param processingTime  The duration that the call took to run, in ms.
2057    * @param qTime           The duration that the call spent on the queue
2058    *                        prior to being initiated, in ms.
2059    * @param responseSize    The size in bytes of the response buffer.
2060    */
2061   void logResponse(Object[] params, String methodName, String call, String tag,
2062       String clientAddress, long startTime, int processingTime, int qTime,
2063       long responseSize)
2064           throws IOException {
2065     // base information that is reported regardless of type of call
2066     Map<String, Object> responseInfo = new HashMap<String, Object>();
2067     responseInfo.put("starttimems", startTime);
2068     responseInfo.put("processingtimems", processingTime);
2069     responseInfo.put("queuetimems", qTime);
2070     responseInfo.put("responsesize", responseSize);
2071     responseInfo.put("client", clientAddress);
2072     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2073     responseInfo.put("method", methodName);
2074     if (params.length == 2 && server instanceof HRegionServer &&
2075         params[0] instanceof byte[] &&
2076         params[1] instanceof Operation) {
2077       // if the slow process is a query, we want to log its table as well
2078       // as its own fingerprint
2079       TableName tableName = TableName.valueOf(
2080           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2081       responseInfo.put("table", tableName.getNameAsString());
2082       // annotate the response map with operation details
2083       responseInfo.putAll(((Operation) params[1]).toMap());
2084       // report to the log file
2085       LOG.warn("(operation" + tag + "): " +
2086                MAPPER.writeValueAsString(responseInfo));
2087     } else if (params.length == 1 && server instanceof HRegionServer &&
2088         params[0] instanceof Operation) {
2089       // annotate the response map with operation details
2090       responseInfo.putAll(((Operation) params[0]).toMap());
2091       // report to the log file
2092       LOG.warn("(operation" + tag + "): " +
2093                MAPPER.writeValueAsString(responseInfo));
2094     } else {
2095       // can't get JSON details, so just report call.toString() along with
2096       // a more generic tag.
2097       responseInfo.put("call", call);
2098       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2099     }
2100   }
2101 
2102   /** Stops the service.  No new calls will be handled after this is called. */
2103   @Override
2104   public synchronized void stop() {
2105     LOG.info("Stopping server on " + port);
2106     running = false;
2107     listener.interrupt();
2108     listener.doStop();
2109     responder.interrupt();
2110     scheduler.stop();
2111     notifyAll();
2112   }
2113 
2114   /** Wait for the server to be stopped.
2115    * Does not wait for all subthreads to finish.
2116    *  See {@link #stop()}.
2117    * @throws InterruptedException e
2118    */
2119   @Override
2120   public synchronized void join() throws InterruptedException {
2121     while (running) {
2122       wait();
2123     }
2124   }
2125 
2126   /**
2127    * Return the socket (ip+port) on which the RPC server is listening to.
2128    * @return the socket (ip+port) on which the RPC server is listening to.
2129    */
2130   @Override
2131   public synchronized InetSocketAddress getListenerAddress() {
2132     return listener.getAddress();
2133   }
2134 
2135   /**
2136    * Set the handler for calling out of RPC for error conditions.
2137    * @param handler the handler implementation
2138    */
2139   @Override
2140   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2141     this.errorHandler = handler;
2142   }
2143 
2144   @Override
2145   public HBaseRPCErrorHandler getErrorHandler() {
2146     return this.errorHandler;
2147   }
2148 
2149   /**
2150    * Returns the metrics instance for reporting RPC call statistics
2151    */
2152   public MetricsHBaseServer getMetrics() {
2153     return metrics;
2154   }
2155 
2156   @Override
2157   public void addCallSize(final long diff) {
2158     this.callQueueSize.add(diff);
2159   }
2160 
2161   /**
2162    * Authorize the incoming client connection.
2163    *
2164    * @param user client user
2165    * @param connection incoming connection
2166    * @param addr InetAddress of incoming connection
2167    * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
2168    */
2169   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2170   throws AuthorizationException {
2171     if (authorize) {
2172       Class<?> c = getServiceInterface(services, connection.getServiceName());
2173       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2174     }
2175   }
2176 
2177   /**
2178    * When the read or write buffer size is larger than this limit, i/o will be
2179    * done in chunks of this size. Most RPC requests and responses would be
2180    * be smaller.
2181    */
2182   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2183 
2184   /**
2185    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2186    * If the amount of data is large, it writes to channel in smaller chunks.
2187    * This is to avoid jdk from creating many direct buffers as the size of
2188    * buffer increases. This also minimizes extra copies in NIO layer
2189    * as a result of multiple write operations required to write a large
2190    * buffer.
2191    *
2192    * @param channel writable byte channel to write to
2193    * @param bufferChain Chain of buffers to write
2194    * @return number of bytes written
2195    * @throws java.io.IOException e
2196    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2197    */
2198   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2199   throws IOException {
2200     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2201     if (count > 0) this.metrics.sentBytes(count);
2202     return count;
2203   }
2204 
2205   /**
2206    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2207    * If the amount of data is large, it writes to channel in smaller chunks.
2208    * This is to avoid jdk from creating many direct buffers as the size of
2209    * ByteBuffer increases. There should not be any performance degredation.
2210    *
2211    * @param channel writable byte channel to write on
2212    * @param buffer buffer to write
2213    * @return number of bytes written
2214    * @throws java.io.IOException e
2215    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2216    */
2217   protected int channelRead(ReadableByteChannel channel,
2218                                    ByteBuffer buffer) throws IOException {
2219 
2220     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2221            channel.read(buffer) : channelIO(channel, null, buffer);
2222     if (count > 0) {
2223       metrics.receivedBytes(count);
2224     }
2225     return count;
2226   }
2227 
2228   /**
2229    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2230    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2231    * one of readCh or writeCh should be non-null.
2232    *
2233    * @param readCh read channel
2234    * @param writeCh write channel
2235    * @param buf buffer to read or write into/out of
2236    * @return bytes written
2237    * @throws java.io.IOException e
2238    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2239    * @see #channelWrite(GatheringByteChannel, BufferChain)
2240    */
2241   private static int channelIO(ReadableByteChannel readCh,
2242                                WritableByteChannel writeCh,
2243                                ByteBuffer buf) throws IOException {
2244 
2245     int originalLimit = buf.limit();
2246     int initialRemaining = buf.remaining();
2247     int ret = 0;
2248 
2249     while (buf.remaining() > 0) {
2250       try {
2251         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2252         buf.limit(buf.position() + ioSize);
2253 
2254         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2255 
2256         if (ret < ioSize) {
2257           break;
2258         }
2259 
2260       } finally {
2261         buf.limit(originalLimit);
2262       }
2263     }
2264 
2265     int nBytes = initialRemaining - buf.remaining();
2266     return (nBytes > 0) ? nBytes : ret;
2267   }
2268 
2269   /**
2270    * Needed for features such as delayed calls.  We need to be able to store the current call
2271    * so that we can complete it later or ask questions of what is supported by the current ongoing
2272    * call.
2273    * @return An RpcCallConext backed by the currently ongoing call (gotten from a thread local)
2274    */
2275   public static RpcCallContext getCurrentCall() {
2276     return CurCall.get();
2277   }
2278 
2279   /**
2280    * @param serviceName Some arbitrary string that represents a 'service'.
2281    * @param services Available service instances
2282    * @return Matching BlockingServiceAndInterface pair
2283    */
2284   static BlockingServiceAndInterface getServiceAndInterface(
2285       final List<BlockingServiceAndInterface> services, final String serviceName) {
2286     for (BlockingServiceAndInterface bs : services) {
2287       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2288         return bs;
2289       }
2290     }
2291     return null;
2292   }
2293 
2294   /**
2295    * @param serviceName Some arbitrary string that represents a 'service'.
2296    * @param services Available services and their service interfaces.
2297    * @return Service interface class for <code>serviceName</code>
2298    */
2299   static Class<?> getServiceInterface(
2300       final List<BlockingServiceAndInterface> services,
2301       final String serviceName) {
2302     BlockingServiceAndInterface bsasi =
2303         getServiceAndInterface(services, serviceName);
2304     return bsasi == null? null: bsasi.getServiceInterface();
2305   }
2306 
2307   /**
2308    * @param serviceName Some arbitrary string that represents a 'service'.
2309    * @param services Available services and their service interfaces.
2310    * @return BlockingService that goes with the passed <code>serviceName</code>
2311    */
2312   static BlockingService getService(
2313       final List<BlockingServiceAndInterface> services,
2314       final String serviceName) {
2315     BlockingServiceAndInterface bsasi =
2316         getServiceAndInterface(services, serviceName);
2317     return bsasi == null? null: bsasi.getBlockingService();
2318   }
2319 
2320   /** Returns the remote side ip address when invoked inside an RPC
2321    *  Returns null incase of an error.
2322    *  @return InetAddress
2323    */
2324   public static InetAddress getRemoteIp() {
2325     Call call = CurCall.get();
2326     if (call != null && call.connection.socket != null) {
2327       return call.connection.socket.getInetAddress();
2328     }
2329     return null;
2330   }
2331 
2332   /** Returns remote address as a string when invoked inside an RPC.
2333    *  Returns null in case of an error.
2334    *  @return String
2335    */
2336   public static String getRemoteAddress() {
2337     Call call = CurCall.get();
2338     if (call != null) {
2339       return call.connection.getHostAddress();
2340     }
2341     return null;
2342   }
2343 
2344   /**
2345    * A convenience method to bind to a given address and report
2346    * better exceptions if the address is not a valid host.
2347    * @param socket the socket to bind
2348    * @param address the address to bind to
2349    * @param backlog the number of connections allowed in the queue
2350    * @throws BindException if the address can't be bound
2351    * @throws UnknownHostException if the address isn't a valid host name
2352    * @throws IOException other random errors from bind
2353    */
2354   public static void bind(ServerSocket socket, InetSocketAddress address,
2355                           int backlog) throws IOException {
2356     try {
2357       socket.bind(address, backlog);
2358     } catch (BindException e) {
2359       BindException bindException =
2360         new BindException("Problem binding to " + address + " : " +
2361             e.getMessage());
2362       bindException.initCause(e);
2363       throw bindException;
2364     } catch (SocketException e) {
2365       // If they try to bind to a different host's address, give a better
2366       // error message.
2367       if ("Unresolved address".equals(e.getMessage())) {
2368         throw new UnknownHostException("Invalid hostname for server: " +
2369                                        address.getHostName());
2370       }
2371       throw e;
2372     }
2373   }
2374 
2375   public RpcScheduler getScheduler() {
2376     return scheduler;
2377   }
2378 }