1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
23  
24  import java.io.ByteArrayInputStream;
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.net.BindException;
29  import java.net.InetAddress;
30  import java.net.InetSocketAddress;
31  import java.net.ServerSocket;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.net.UnknownHostException;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.Channels;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Collections;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  import java.util.LinkedList;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Random;
54  import java.util.concurrent.BlockingQueue;
55  import java.util.concurrent.ExecutorService;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.LinkedBlockingQueue;
58  import java.util.concurrent.atomic.AtomicInteger;
59  
60  import javax.security.sasl.Sasl;
61  import javax.security.sasl.SaslException;
62  import javax.security.sasl.SaslServer;
63  
64  import com.google.common.collect.Lists;
65  import org.apache.commons.logging.Log;
66  import org.apache.commons.logging.LogFactory;
67  import org.apache.hadoop.classification.InterfaceAudience;
68  import org.apache.hadoop.conf.Configuration;
69  import org.apache.hadoop.hbase.CellScanner;
70  import org.apache.hadoop.hbase.HConstants;
71  import org.apache.hadoop.hbase.HRegionInfo;
72  import org.apache.hadoop.hbase.client.Operation;
73  import org.apache.hadoop.hbase.codec.Codec;
74  import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException;
75  import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
77  import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
78  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
79  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
80  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
81  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
82  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
83  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
84  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
85  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
86  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
87  import org.apache.hadoop.hbase.regionserver.HRegionServer;
88  import org.apache.hadoop.hbase.security.AuthMethod;
89  import org.apache.hadoop.hbase.security.HBasePolicyProvider;
90  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
91  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
92  import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
93  import org.apache.hadoop.hbase.security.SaslStatus;
94  import org.apache.hadoop.hbase.security.SaslUtil;
95  import org.apache.hadoop.hbase.security.User;
96  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
97  import org.apache.hadoop.hbase.util.Bytes;
98  import org.apache.hadoop.hbase.util.Pair;
99  import org.apache.hadoop.io.BytesWritable;
100 import org.apache.hadoop.io.IntWritable;
101 import org.apache.hadoop.io.Writable;
102 import org.apache.hadoop.io.WritableUtils;
103 import org.apache.hadoop.io.compress.CompressionCodec;
104 import org.apache.hadoop.hbase.Server;
105 import org.apache.hadoop.security.AccessControlException;
106 import org.apache.hadoop.security.UserGroupInformation;
107 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
108 import org.apache.hadoop.security.authorize.AuthorizationException;
109 import org.apache.hadoop.security.authorize.PolicyProvider;
110 import org.apache.hadoop.security.authorize.ProxyUsers;
111 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
112 import org.apache.hadoop.security.token.SecretManager;
113 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
114 import org.apache.hadoop.security.token.TokenIdentifier;
115 import org.apache.hadoop.util.StringUtils;
116 import org.cliffc.high_scale_lib.Counter;
117 import org.cloudera.htrace.Sampler;
118 import org.cloudera.htrace.Span;
119 import org.cloudera.htrace.Trace;
120 import org.cloudera.htrace.TraceInfo;
121 import org.cloudera.htrace.impl.NullSpan;
122 import org.codehaus.jackson.map.ObjectMapper;
123 
124 import com.google.common.base.Function;
125 import com.google.common.util.concurrent.ThreadFactoryBuilder;
126 import com.google.protobuf.BlockingService;
127 import com.google.protobuf.CodedInputStream;
128 import com.google.protobuf.Descriptors.MethodDescriptor;
129 import com.google.protobuf.Message;
130 import com.google.protobuf.Message.Builder;
131 import com.google.protobuf.ServiceException;
132 import com.google.protobuf.TextFormat;
133 // Uses Writables doing sasl
134 
135 /**
136  * An RPC server that hosts protobuf described Services.
137  * <p>Once was copied from Hadoop to local to fix HBASE-900 but deviated long ago.
138  *
139  * @see RpcClient
140  */
141 @InterfaceAudience.Private
142 public class RpcServer implements RpcServerInterface {
143   // The logging package is deliberately outside of standard o.a.h.h package so it is not on
144   // by default.
145   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcServer");
146 
147   private final boolean authorize;
148   private boolean isSecurityEnabled;
149 
150   public static final byte CURRENT_VERSION = 0;
151 
152   /**
153    * How many calls/handler are allowed in the queue.
154    */
155   private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
156 
157   /**
158    * The maximum size that we can hold in the IPC queue
159    */
160   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
161 
162   static final int BUFFER_INITIAL_SIZE = 1024;
163 
164   private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
165 
166   private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
167 
168   private final int warnDelayedCalls;
169 
170   private AtomicInteger delayedCalls;
171   private final IPCUtil ipcUtil;
172 
173   private static final String AUTH_FAILED_FOR = "Auth failed for ";
174   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
175   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
176     Server.class.getName());
177   protected SecretManager<TokenIdentifier> secretManager;
178   protected ServiceAuthorizationManager authManager;
179 
180   protected static final ThreadLocal<RpcServerInterface> SERVER = new ThreadLocal<RpcServerInterface>();
181   private volatile boolean started = false;
182 
183   /** This is set to Call object before Handler invokes an RPC and reset
184    * after the call returns.
185    */
186   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
187 
188   protected final InetSocketAddress isa;
189   protected int port;                             // port we listen on
190   private int handlerCount;                       // number of handler threads
191   private int priorityHandlerCount;
192   private int readThreads;                        // number of read threads
193   protected int maxIdleTime;                      // the maximum idle time after
194                                                   // which a client may be
195                                                   // disconnected
196   protected int thresholdIdleConnections;         // the number of idle
197                                                   // connections after which we
198                                                   // will start cleaning up idle
199                                                   // connections
200   int maxConnectionsToNuke;                       // the max number of
201                                                   // connections to nuke
202                                                   // during a cleanup
203 
204   protected MetricsHBaseServer metrics;
205 
206   protected final Configuration conf;
207 
208   private int maxQueueLength;
209   private int maxQueueSize;
210   protected int socketSendBufferSize;
211   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
212   protected final boolean tcpKeepAlive; // if T then use keepalives
213   protected final long purgeTimeout;    // in milliseconds
214 
215   volatile protected boolean running = true;         // true while server runs
216   protected BlockingQueue<Call> callQueue; // queued calls
217   protected final Counter callQueueSize = new Counter();
218   protected BlockingQueue<Call> priorityCallQueue;
219 
220   protected int highPriorityLevel;  // what level a high priority call is at
221 
222   protected final List<Connection> connectionList =
223     Collections.synchronizedList(new LinkedList<Connection>());
224   //maintain a list
225   //of client connections
226   private Listener listener = null;
227   protected Responder responder = null;
228   protected int numConnections = 0;
229   private Handler[] handlers = null;
230   private Handler[] priorityHandlers = null;
231   /** replication related queue; */
232   protected BlockingQueue<Call> replicationQueue;
233   private int numOfReplicationHandlers = 0;
234   private Handler[] replicationHandlers = null;
235 
236   protected HBaseRPCErrorHandler errorHandler = null;
237 
238   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
239   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
240 
241   /** Default value for above params */
242   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
243   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
244 
245   private final int warnResponseTime;
246   private final int warnResponseSize;
247   private final Object serverInstance;
248   private final List<BlockingServiceAndInterface> services;
249 
250   /**
251    * Datastructure that holds all necessary to a method invocation and then afterward, carries
252    * the result.
253    */
254   class Call implements RpcCallContext {
255     protected int id;                             // the client's call id
256     protected BlockingService service;
257     protected MethodDescriptor md;
258     protected Message param;                      // the parameter passed
259     // Optional cell data passed outside of protobufs.
260     protected CellScanner cellScanner;
261     protected Connection connection;              // connection to client
262     protected long timestamp;      // the time received when response is null
263                                    // the time served when response is not null
264     protected ByteBuffer response;                // the response for this call
265     protected boolean delayResponse;
266     protected Responder responder;
267     protected boolean delayReturnValue;           // if the return value should be
268                                                   // set at call completion
269     protected long size;                          // size of current call
270     protected boolean isError;
271     protected TraceInfo tinfo;
272 
273     Call(int id, final BlockingService service, final MethodDescriptor md, Message param,
274         CellScanner cellScanner, Connection connection, Responder responder, long size,
275         TraceInfo tinfo) {
276       this.id = id;
277       this.service = service;
278       this.md = md;
279       this.param = param;
280       this.cellScanner = cellScanner;
281       this.connection = connection;
282       this.timestamp = System.currentTimeMillis();
283       this.response = null;
284       this.delayResponse = false;
285       this.responder = responder;
286       this.isError = false;
287       this.size = size;
288       this.tinfo = tinfo;
289     }
290 
291     @Override
292     public String toString() {
293       String serviceName = this.connection.service != null?
294         this.connection.service.getDescriptorForType().getName(): "null";
295       return "callId: " + this.id + " service: " + serviceName + " methodName: " +
296         ((this.md != null)? this.md.getName(): null) + " param: " +
297         (this.param != null? IPCUtil.getRequestShortTextFormat(this.param): "") +
298         " connection: " + connection.toString();
299     }
300 
301     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
302       this.response = response;
303     }
304 
305     protected synchronized void setResponse(Object m, final CellScanner cells,
306         Throwable t, String errorMsg) {
307       if (this.isError) return;
308       if (t != null) this.isError = true;
309       ByteBufferOutputStream bbos = null;
310       try {
311         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
312         // Presume it a pb Message.  Could be null.
313         Message result = (Message)m;
314         // Call id.
315         headerBuilder.setCallId(this.id);
316         if (t != null) {
317           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
318           exceptionBuilder.setExceptionClassName(t.getClass().getName());
319           exceptionBuilder.setStackTrace(errorMsg);
320           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
321           if (t instanceof RegionMovedException) {
322             // Special casing for this exception.  This is only one carrying a payload.
323             // Do this instead of build a generic system for allowing exceptions carry
324             // any kind of payload.
325             RegionMovedException rme = (RegionMovedException)t;
326             exceptionBuilder.setHostname(rme.getHostname());
327             exceptionBuilder.setPort(rme.getPort());
328           }
329           // Set the exception as the result of the method invocation.
330           headerBuilder.setException(exceptionBuilder.build());
331         }
332         ByteBuffer cellBlock =
333           ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
334         if (cellBlock != null) {
335           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
336           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
337           cellBlockBuilder.setLength(cellBlock.limit());
338           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
339         }
340         Message header = headerBuilder.build();
341         bbos = IPCUtil.write(header, result, cellBlock);
342         if (connection.useWrap) {
343           wrapWithSasl(bbos);
344         }
345       } catch (IOException e) {
346         LOG.warn("Exception while creating response " + e);
347       }
348       ByteBuffer bb = null;
349       if (bbos != null) {
350         // TODO: If SASL, maybe buffer already been flipped and written?
351         bb = bbos.getByteBuffer();
352         bb.position(0);
353       }
354       this.response = bb;
355     }
356 
357     private void wrapWithSasl(ByteBufferOutputStream response)
358     throws IOException {
359       if (connection.useSasl) {
360         // getByteBuffer calls flip()
361         ByteBuffer buf = response.getByteBuffer();
362         byte[] token;
363         // synchronization may be needed since there can be multiple Handler
364         // threads using saslServer to wrap responses.
365         synchronized (connection.saslServer) {
366           token = connection.saslServer.wrap(buf.array(),
367               buf.arrayOffset(), buf.remaining());
368         }
369         if (LOG.isDebugEnabled())
370           LOG.debug("Adding saslServer wrapped token of size " + token.length
371               + " as call response.");
372         buf.clear();
373         DataOutputStream saslOut = new DataOutputStream(response);
374         saslOut.writeInt(token.length);
375         saslOut.write(token, 0, token.length);
376       }
377     }
378 
379     @Override
380     public synchronized void endDelay(Object result) throws IOException {
381       assert this.delayResponse;
382       assert this.delayReturnValue || result == null;
383       this.delayResponse = false;
384       delayedCalls.decrementAndGet();
385       if (this.delayReturnValue) {
386         this.setResponse(result, null, null, null);
387       }
388       this.responder.doRespond(this);
389     }
390 
391     @Override
392     public synchronized void endDelay() throws IOException {
393       this.endDelay(null);
394     }
395 
396     @Override
397     public synchronized void startDelay(boolean delayReturnValue) {
398       assert !this.delayResponse;
399       this.delayResponse = true;
400       this.delayReturnValue = delayReturnValue;
401       int numDelayed = delayedCalls.incrementAndGet();
402       if (numDelayed > warnDelayedCalls) {
403         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
404       }
405     }
406 
407     @Override
408     public synchronized void endDelayThrowing(Throwable t) throws IOException {
409       this.setResponse(null, null, t, StringUtils.stringifyException(t));
410       this.delayResponse = false;
411       this.sendResponseIfReady();
412     }
413 
414     @Override
415     public synchronized boolean isDelayed() {
416       return this.delayResponse;
417     }
418 
419     @Override
420     public synchronized boolean isReturnValueDelayed() {
421       return this.delayReturnValue;
422     }
423 
424     @Override
425     public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
426       if (!connection.channel.isOpen()) {
427         long afterTime = System.currentTimeMillis() - timestamp;
428         throw new CallerDisconnectedException(
429             "Aborting call " + this + " after " + afterTime + " ms, since " +
430             "caller disconnected");
431       }
432     }
433 
434     public long getSize() {
435       return this.size;
436     }
437 
438     /**
439      * If we have a response, and delay is not set, then respond
440      * immediately.  Otherwise, do not respond to client.  This is
441      * called the by the RPC code in the context of the Handler thread.
442      */
443     public synchronized void sendResponseIfReady() throws IOException {
444       if (!this.delayResponse) {
445         this.responder.doRespond(this);
446       }
447     }
448   }
449 
450   /** Listens on the socket. Creates jobs for the handler threads*/
451   private class Listener extends Thread {
452 
453     private ServerSocketChannel acceptChannel = null; //the accept channel
454     private Selector selector = null; //the selector that we use for the server
455     private Reader[] readers = null;
456     private int currentReader = 0;
457     private Random rand = new Random();
458     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
459                                          //-tion (for idle connections) ran
460     private long cleanupInterval = 10000; //the minimum interval between
461                                           //two cleanup runs
462     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
463 
464     private ExecutorService readPool;
465 
466     public Listener(final String name) throws IOException {
467       super(name);
468       // Create a new server socket and set to non blocking mode
469       acceptChannel = ServerSocketChannel.open();
470       acceptChannel.configureBlocking(false);
471 
472       // Bind the server socket to the local host and port
473       bind(acceptChannel.socket(), isa, backlogLength);
474       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
475       // create a selector;
476       selector= Selector.open();
477 
478       readers = new Reader[readThreads];
479       readPool = Executors.newFixedThreadPool(readThreads,
480         new ThreadFactoryBuilder().setNameFormat(
481           "IPC Reader %d on port " + port).setDaemon(true).build());
482       for (int i = 0; i < readThreads; ++i) {
483         Reader reader = new Reader();
484         readers[i] = reader;
485         readPool.execute(reader);
486       }
487       LOG.info(getName() + ": started " + readThreads + " reader(s).");
488 
489       // Register accepts on the server socket with the selector.
490       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
491       this.setName("IPC Server listener on " + port);
492       this.setDaemon(true);
493     }
494 
495 
496     private class Reader implements Runnable {
497       private volatile boolean adding = false;
498       private final Selector readSelector;
499 
500       Reader() throws IOException {
501         this.readSelector = Selector.open();
502       }
503       public void run() {
504         try {
505           doRunLoop();
506         } finally {
507           try {
508             readSelector.close();
509           } catch (IOException ioe) {
510             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
511           }
512         }
513       }
514 
515       private synchronized void doRunLoop() {
516         while (running) {
517           SelectionKey key = null;
518           try {
519             readSelector.select();
520             while (adding) {
521               this.wait(1000);
522             }
523 
524             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
525             while (iter.hasNext()) {
526               key = iter.next();
527               iter.remove();
528               if (key.isValid()) {
529                 if (key.isReadable()) {
530                   doRead(key);
531                 }
532               }
533               key = null;
534             }
535           } catch (InterruptedException e) {
536             if (running) {                      // unexpected -- log it
537               LOG.info(getName() + ": unexpectedly interrupted: " +
538                 StringUtils.stringifyException(e));
539             }
540           } catch (IOException ex) {
541             LOG.error(getName() + ": error in Reader", ex);
542           }
543         }
544       }
545 
546       /**
547        * This gets reader into the state that waits for the new channel
548        * to be registered with readSelector. If it was waiting in select()
549        * the thread will be woken up, otherwise whenever select() is called
550        * it will return even if there is nothing to read and wait
551        * in while(adding) for finishAdd call
552        */
553       public void startAdd() {
554         adding = true;
555         readSelector.wakeup();
556       }
557 
558       public synchronized SelectionKey registerChannel(SocketChannel channel)
559         throws IOException {
560         return channel.register(readSelector, SelectionKey.OP_READ);
561       }
562 
563       public synchronized void finishAdd() {
564         adding = false;
565         this.notify();
566       }
567     }
568 
569     /** cleanup connections from connectionList. Choose a random range
570      * to scan and also have a limit on the number of the connections
571      * that will be cleanedup per run. The criteria for cleanup is the time
572      * for which the connection was idle. If 'force' is true then all
573      * connections will be looked at for the cleanup.
574      * @param force all connections will be looked at for cleanup
575      */
576     private void cleanupConnections(boolean force) {
577       if (force || numConnections > thresholdIdleConnections) {
578         long currentTime = System.currentTimeMillis();
579         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
580           return;
581         }
582         int start = 0;
583         int end = numConnections - 1;
584         if (!force) {
585           start = rand.nextInt() % numConnections;
586           end = rand.nextInt() % numConnections;
587           int temp;
588           if (end < start) {
589             temp = start;
590             start = end;
591             end = temp;
592           }
593         }
594         int i = start;
595         int numNuked = 0;
596         while (i <= end) {
597           Connection c;
598           synchronized (connectionList) {
599             try {
600               c = connectionList.get(i);
601             } catch (Exception e) {return;}
602           }
603           if (c.timedOut(currentTime)) {
604             if (LOG.isDebugEnabled())
605               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
606             closeConnection(c);
607             numNuked++;
608             end--;
609             //noinspection UnusedAssignment
610             c = null;
611             if (!force && numNuked == maxConnectionsToNuke) break;
612           }
613           else i++;
614         }
615         lastCleanupRunTime = System.currentTimeMillis();
616       }
617     }
618 
619     @Override
620     public void run() {
621       LOG.info(getName() + ": starting");
622       SERVER.set(RpcServer.this);
623 
624       while (running) {
625         SelectionKey key = null;
626         try {
627           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
628           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
629           while (iter.hasNext()) {
630             key = iter.next();
631             iter.remove();
632             try {
633               if (key.isValid()) {
634                 if (key.isAcceptable())
635                   doAccept(key);
636               }
637             } catch (IOException ignored) {
638             }
639             key = null;
640           }
641         } catch (OutOfMemoryError e) {
642           if (errorHandler != null) {
643             if (errorHandler.checkOOME(e)) {
644               LOG.info(getName() + ": exiting on OutOfMemoryError");
645               closeCurrentConnection(key, e);
646               cleanupConnections(true);
647               return;
648             }
649           } else {
650             // we can run out of memory if we have too many threads
651             // log the event and sleep for a minute and give
652             // some thread(s) a chance to finish
653             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
654             closeCurrentConnection(key, e);
655             cleanupConnections(true);
656             try { Thread.sleep(60000); } catch (Exception ignored) {}
657           }
658         } catch (Exception e) {
659           closeCurrentConnection(key, e);
660         }
661         cleanupConnections(false);
662       }
663       LOG.info(getName() + ": stopping");
664 
665       synchronized (this) {
666         try {
667           acceptChannel.close();
668           selector.close();
669         } catch (IOException ignored) { }
670 
671         selector= null;
672         acceptChannel= null;
673 
674         // clean up all connections
675         while (!connectionList.isEmpty()) {
676           closeConnection(connectionList.remove(0));
677         }
678       }
679     }
680 
681     private void closeCurrentConnection(SelectionKey key, Throwable e) {
682       if (key != null) {
683         Connection c = (Connection)key.attachment();
684         if (c != null) {
685           if (LOG.isDebugEnabled()) {
686             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
687                 (e != null ? " on error " + e.getMessage() : ""));
688           }
689           closeConnection(c);
690           key.attach(null);
691         }
692       }
693     }
694 
695     InetSocketAddress getAddress() {
696       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
697     }
698 
699     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
700       Connection c;
701       ServerSocketChannel server = (ServerSocketChannel) key.channel();
702 
703       SocketChannel channel;
704       while ((channel = server.accept()) != null) {
705         channel.configureBlocking(false);
706         channel.socket().setTcpNoDelay(tcpNoDelay);
707         channel.socket().setKeepAlive(tcpKeepAlive);
708 
709         Reader reader = getReader();
710         try {
711           reader.startAdd();
712           SelectionKey readKey = reader.registerChannel(channel);
713           c = getConnection(channel, System.currentTimeMillis());
714           readKey.attach(c);
715           synchronized (connectionList) {
716             connectionList.add(numConnections, c);
717             numConnections++;
718           }
719           if (LOG.isDebugEnabled())
720             LOG.debug(getName() + ": connection from " + c.toString() +
721                 "; # active connections: " + numConnections +
722                 "; # queued calls: " + callQueue.size());
723         } finally {
724           reader.finishAdd();
725         }
726       }
727     }
728 
729     void doRead(SelectionKey key) throws InterruptedException {
730       int count = 0;
731       Connection c = (Connection)key.attachment();
732       if (c == null) {
733         return;
734       }
735       c.setLastContact(System.currentTimeMillis());
736       try {
737         count = c.readAndProcess();
738       } catch (InterruptedException ieo) {
739         throw ieo;
740       } catch (Exception e) {
741         LOG.warn(getName() + ": count of bytes read: " + count, e);
742         count = -1; //so that the (count < 0) block is executed
743       }
744       if (count < 0) {
745         if (LOG.isDebugEnabled()) {
746           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
747             " because read count=" + count +
748             ". Number of active connections: " + numConnections);
749         }
750         closeConnection(c);
751         // c = null;
752       } else {
753         c.setLastContact(System.currentTimeMillis());
754       }
755     }
756 
757     synchronized void doStop() {
758       if (selector != null) {
759         selector.wakeup();
760         Thread.yield();
761       }
762       if (acceptChannel != null) {
763         try {
764           acceptChannel.socket().close();
765         } catch (IOException e) {
766           LOG.info(getName() + ": exception in closing listener socket. " + e);
767         }
768       }
769       readPool.shutdownNow();
770     }
771 
772     // The method that will return the next reader to work with
773     // Simplistic implementation of round robin for now
774     Reader getReader() {
775       currentReader = (currentReader + 1) % readers.length;
776       return readers[currentReader];
777     }
778   }
779 
780   // Sends responses of RPC back to clients.
781   protected class Responder extends Thread {
782     private final Selector writeSelector;
783     private int pending;         // connections waiting to register
784 
785     Responder() throws IOException {
786       this.setName("IPC Server Responder");
787       this.setDaemon(true);
788       writeSelector = Selector.open(); // create a selector
789       pending = 0;
790     }
791 
792     @Override
793     public void run() {
794       LOG.info(getName() + ": starting");
795       SERVER.set(RpcServer.this);
796       try {
797         doRunLoop();
798       } finally {
799         LOG.info(getName() + ": stopping");
800         try {
801           writeSelector.close();
802         } catch (IOException ioe) {
803           LOG.error(getName() + ": couldn't close write selector", ioe);
804         }
805       }
806     }
807 
808     private void doRunLoop() {
809       long lastPurgeTime = 0;   // last check for old calls.
810 
811       while (running) {
812         try {
813           waitPending();     // If a channel is being registered, wait.
814           writeSelector.select(purgeTimeout);
815           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
816           while (iter.hasNext()) {
817             SelectionKey key = iter.next();
818             iter.remove();
819             try {
820               if (key.isValid() && key.isWritable()) {
821                   doAsyncWrite(key);
822               }
823             } catch (IOException e) {
824               LOG.info(getName() + ": asyncWrite", e);
825             }
826           }
827           long now = System.currentTimeMillis();
828           if (now < lastPurgeTime + purgeTimeout) {
829             continue;
830           }
831           lastPurgeTime = now;
832           //
833           // If there were some calls that have not been sent out for a
834           // long time, discard them.
835           //
836           if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
837           ArrayList<Call> calls;
838 
839           // get the list of channels from list of keys.
840           synchronized (writeSelector.keys()) {
841             calls = new ArrayList<Call>(writeSelector.keys().size());
842             iter = writeSelector.keys().iterator();
843             while (iter.hasNext()) {
844               SelectionKey key = iter.next();
845               Call call = (Call)key.attachment();
846               if (call != null && key.channel() == call.connection.channel) {
847                 calls.add(call);
848               }
849             }
850           }
851 
852           for(Call call : calls) {
853             try {
854               doPurge(call, now);
855             } catch (IOException e) {
856               LOG.warn(getName() + ": error in purging old calls " + e);
857             }
858           }
859         } catch (OutOfMemoryError e) {
860           if (errorHandler != null) {
861             if (errorHandler.checkOOME(e)) {
862               LOG.info(getName() + ": exiting on OutOfMemoryError");
863               return;
864             }
865           } else {
866             //
867             // we can run out of memory if we have too many threads
868             // log the event and sleep for a minute and give
869             // some thread(s) a chance to finish
870             //
871             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
872             try { Thread.sleep(60000); } catch (Exception ignored) {}
873           }
874         } catch (Exception e) {
875           LOG.warn(getName() + ": exception in Responder " +
876                    StringUtils.stringifyException(e));
877         }
878       }
879       LOG.info(getName() + ": stopped");
880     }
881 
882     private void doAsyncWrite(SelectionKey key) throws IOException {
883       Call call = (Call)key.attachment();
884       if (call == null) {
885         return;
886       }
887       if (key.channel() != call.connection.channel) {
888         throw new IOException("doAsyncWrite: bad channel");
889       }
890 
891       synchronized(call.connection.responseQueue) {
892         if (processResponse(call.connection.responseQueue, false)) {
893           try {
894             key.interestOps(0);
895           } catch (CancelledKeyException e) {
896             /* The Listener/reader might have closed the socket.
897              * We don't explicitly cancel the key, so not sure if this will
898              * ever fire.
899              * This warning could be removed.
900              */
901             LOG.warn("Exception while changing ops : " + e);
902           }
903         }
904       }
905     }
906 
907     //
908     // Remove calls that have been pending in the responseQueue
909     // for a long time.
910     //
911     private void doPurge(Call call, long now) throws IOException {
912       synchronized (call.connection.responseQueue) {
913         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
914         while (iter.hasNext()) {
915           Call nextCall = iter.next();
916           if (now > nextCall.timestamp + purgeTimeout) {
917             closeConnection(nextCall.connection);
918             break;
919           }
920         }
921       }
922     }
923 
924     // Processes one response. Returns true if there are no more pending
925     // data for this channel.
926     //
927     private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
928     throws IOException {
929       boolean error = true;
930       boolean done = false;       // there is more data for this channel.
931       int numElements;
932       Call call = null;
933       try {
934         //noinspection SynchronizationOnLocalVariableOrMethodParameter
935         synchronized (responseQueue) {
936           //
937           // If there are no items for this channel, then we are done
938           //
939           numElements = responseQueue.size();
940           if (numElements == 0) {
941             error = false;
942             return true;              // no more data for this channel.
943           }
944           //
945           // Extract the first call
946           //
947           call = responseQueue.removeFirst();
948           SocketChannel channel = call.connection.channel;
949           //
950           // Send as much data as we can in the non-blocking fashion
951           //
952           int numBytes = channelWrite(channel, call.response);
953           if (numBytes < 0) {
954             return true;
955           }
956           if (!call.response.hasRemaining()) {
957             call.connection.decRpcCount();
958             //noinspection RedundantIfStatement
959             if (numElements == 1) {    // last call fully processes.
960               done = true;             // no more data for this channel.
961             } else {
962               done = false;            // more calls pending to be sent.
963             }
964             if (LOG.isDebugEnabled()) {
965               LOG.debug(getName() + ": callId: " + call.id + " wrote " + numBytes + " bytes.");
966             }
967           } else {
968             //
969             // If we were unable to write the entire response out, then
970             // insert in Selector queue.
971             //
972             call.connection.responseQueue.addFirst(call);
973 
974             if (inHandler) {
975               // set the serve time when the response has to be sent later
976               call.timestamp = System.currentTimeMillis();
977               if (enqueueInSelector(call))
978                 done = true;
979             }
980             if (LOG.isDebugEnabled()) {
981               LOG.debug(getName() + call.toString() + " partially sent, wrote " +
982                 numBytes + " bytes.");
983             }
984           }
985           error = false;              // everything went off well
986         }
987       } finally {
988         if (error && call != null) {
989           LOG.warn(getName() + call.toString() + ": output error");
990           done = true;               // error. no more data for this channel.
991           closeConnection(call.connection);
992         }
993       }
994       return done;
995     }
996 
997     //
998     // Enqueue for background thread to send responses out later.
999     //
1000     private boolean enqueueInSelector(Call call) throws IOException {
1001       boolean done = false;
1002       incPending();
1003       try {
1004         // Wake up the thread blocked on select, only then can the call
1005         // to channel.register() complete.
1006         SocketChannel channel = call.connection.channel;
1007         writeSelector.wakeup();
1008         channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1009       } catch (ClosedChannelException e) {
1010         //It's OK.  Channel might be closed else where.
1011         done = true;
1012       } finally {
1013         decPending();
1014       }
1015       return done;
1016     }
1017 
1018     //
1019     // Enqueue a response from the application.
1020     //
1021     void doRespond(Call call) throws IOException {
1022       // set the serve time when the response has to be sent later
1023       call.timestamp = System.currentTimeMillis();
1024 
1025       boolean doRegister = false;
1026       synchronized (call.connection.responseQueue) {
1027         call.connection.responseQueue.addLast(call);
1028         if (call.connection.responseQueue.size() == 1) {
1029           doRegister = !processResponse(call.connection.responseQueue, false);
1030         }
1031       }
1032       if (doRegister) {
1033         enqueueInSelector(call);
1034       }
1035     }
1036 
1037     private synchronized void incPending() {   // call waiting to be enqueued.
1038       pending++;
1039     }
1040 
1041     private synchronized void decPending() { // call done enqueueing.
1042       pending--;
1043       notify();
1044     }
1045 
1046     private synchronized void waitPending() throws InterruptedException {
1047       while (pending > 0) {
1048         wait();
1049       }
1050     }
1051   }
1052 
1053   @SuppressWarnings("serial")
1054   public static class CallQueueTooBigException extends IOException {
1055     CallQueueTooBigException() {
1056       super();
1057     }
1058   }
1059 
1060   private Function<Pair<RequestHeader, Message>, Integer> qosFunction = null;
1061 
1062   /**
1063    * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
1064    * are priorityHandlers available it will be processed in it's own thread set.
1065    *
1066    * @param newFunc
1067    */
1068   @Override
1069   public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc) {
1070     qosFunction = newFunc;
1071   }
1072 
1073   protected int getQosLevel(Pair<RequestHeader, Message> headerAndParam) {
1074     if (qosFunction == null) return 0;
1075     Integer res = qosFunction.apply(headerAndParam);
1076     return res == null? 0: res;
1077   }
1078 
1079   /** Reads calls from a connection and queues them for handling. */
1080   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1081       value="VO_VOLATILE_INCREMENT",
1082       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1083   public class Connection {
1084     // If initial preamble with version and magic has been read or not.
1085     private boolean connectionPreambleRead = false;
1086     // If the connection header has been read or not.
1087     private boolean connectionHeaderRead = false;
1088     protected SocketChannel channel;
1089     private ByteBuffer data;
1090     private ByteBuffer dataLengthBuffer;
1091     protected final LinkedList<Call> responseQueue;
1092     private volatile int rpcCount = 0; // number of outstanding rpcs
1093     private long lastContact;
1094     private InetAddress addr;
1095     protected Socket socket;
1096     // Cache the remote host & port info so that even if the socket is
1097     // disconnected, we can say where it used to connect to.
1098     protected String hostAddress;
1099     protected int remotePort;
1100     ConnectionHeader connectionHeader;
1101     /**
1102      * Codec the client asked use.
1103      */
1104     private Codec codec;
1105     /**
1106      * Compression codec the client asked us use.
1107      */
1108     private CompressionCodec compressionCodec;
1109     BlockingService service;
1110     protected UserGroupInformation user = null;
1111     private AuthMethod authMethod;
1112     private boolean saslContextEstablished;
1113     private boolean skipInitialSaslHandshake;
1114     private ByteBuffer unwrappedData;
1115     // When is this set?  FindBugs wants to know!  Says NP
1116     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1117     boolean useSasl;
1118     SaslServer saslServer;
1119     private boolean useWrap = false;
1120     // Fake 'call' for failed authorization response
1121     private static final int AUTHROIZATION_FAILED_CALLID = -1;
1122     private final Call authFailedCall =
1123       new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, this, null, 0, null);
1124     private ByteArrayOutputStream authFailedResponse =
1125         new ByteArrayOutputStream();
1126     // Fake 'call' for SASL context setup
1127     private static final int SASL_CALLID = -33;
1128     private final Call saslCall =
1129       new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null);
1130 
1131     public UserGroupInformation attemptingUser = null; // user name before auth
1132 
1133     public Connection(SocketChannel channel, long lastContact) {
1134       this.channel = channel;
1135       this.lastContact = lastContact;
1136       this.data = null;
1137       this.dataLengthBuffer = ByteBuffer.allocate(4);
1138       this.socket = channel.socket();
1139       this.addr = socket.getInetAddress();
1140       if (addr == null) {
1141         this.hostAddress = "*Unknown*";
1142       } else {
1143         this.hostAddress = addr.getHostAddress();
1144       }
1145       this.remotePort = socket.getPort();
1146       this.responseQueue = new LinkedList<Call>();
1147       if (socketSendBufferSize != 0) {
1148         try {
1149           socket.setSendBufferSize(socketSendBufferSize);
1150         } catch (IOException e) {
1151           LOG.warn("Connection: unable to set socket send buffer size to " +
1152                    socketSendBufferSize);
1153         }
1154       }
1155     }
1156 
1157     @Override
1158     public String toString() {
1159       return getHostAddress() + ":" + remotePort;
1160     }
1161 
1162     public String getHostAddress() {
1163       return hostAddress;
1164     }
1165 
1166     public InetAddress getHostInetAddress() {
1167       return addr;
1168     }
1169 
1170     public int getRemotePort() {
1171       return remotePort;
1172     }
1173 
1174     public void setLastContact(long lastContact) {
1175       this.lastContact = lastContact;
1176     }
1177 
1178     public long getLastContact() {
1179       return lastContact;
1180     }
1181 
1182     /* Return true if the connection has no outstanding rpc */
1183     private boolean isIdle() {
1184       return rpcCount == 0;
1185     }
1186 
1187     /* Decrement the outstanding RPC count */
1188     protected void decRpcCount() {
1189       rpcCount--;
1190     }
1191 
1192     /* Increment the outstanding RPC count */
1193     protected void incRpcCount() {
1194       rpcCount++;
1195     }
1196 
1197     protected boolean timedOut(long currentTime) {
1198       return isIdle() && currentTime - lastContact > maxIdleTime;
1199     }
1200 
1201     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1202         throws IOException {
1203       if (authMethod == AuthMethod.DIGEST) {
1204         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1205             secretManager);
1206         UserGroupInformation ugi = tokenId.getUser();
1207         if (ugi == null) {
1208           throw new AccessControlException(
1209               "Can't retrieve username from tokenIdentifier.");
1210         }
1211         ugi.addTokenIdentifier(tokenId);
1212         return ugi;
1213       } else {
1214         return UserGroupInformation.createRemoteUser(authorizedId);
1215       }
1216     }
1217 
1218     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1219         InterruptedException {
1220       if (saslContextEstablished) {
1221         if (LOG.isDebugEnabled())
1222           LOG.debug("Have read input token of size " + saslToken.length
1223               + " for processing by saslServer.unwrap()");
1224 
1225         if (!useWrap) {
1226           processOneRpc(saslToken);
1227         } else {
1228           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1229           processUnwrappedData(plaintextData);
1230         }
1231       } else {
1232         byte[] replyToken = null;
1233         try {
1234           if (saslServer == null) {
1235             switch (authMethod) {
1236             case DIGEST:
1237               if (secretManager == null) {
1238                 throw new AccessControlException(
1239                     "Server is not configured to do DIGEST authentication.");
1240               }
1241               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1242                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1243                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1244                       secretManager, this));
1245               break;
1246             default:
1247               UserGroupInformation current = UserGroupInformation
1248               .getCurrentUser();
1249               String fullName = current.getUserName();
1250               if (LOG.isDebugEnabled()) {
1251                 LOG.debug("Kerberos principal name is " + fullName);
1252               }
1253               final String names[] = SaslUtil.splitKerberosName(fullName);
1254               if (names.length != 3) {
1255                 throw new AccessControlException(
1256                     "Kerberos principal name does NOT have the expected "
1257                         + "hostname part: " + fullName);
1258               }
1259               current.doAs(new PrivilegedExceptionAction<Object>() {
1260                 @Override
1261                 public Object run() throws SaslException {
1262                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1263                       .getMechanismName(), names[0], names[1],
1264                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1265                   return null;
1266                 }
1267               });
1268             }
1269             if (saslServer == null)
1270               throw new AccessControlException(
1271                   "Unable to find SASL server implementation for "
1272                       + authMethod.getMechanismName());
1273             if (LOG.isDebugEnabled()) {
1274               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1275             }
1276           }
1277           if (LOG.isDebugEnabled()) {
1278             LOG.debug("Have read input token of size " + saslToken.length
1279                 + " for processing by saslServer.evaluateResponse()");
1280           }
1281           replyToken = saslServer.evaluateResponse(saslToken);
1282         } catch (IOException e) {
1283           IOException sendToClient = e;
1284           Throwable cause = e;
1285           while (cause != null) {
1286             if (cause instanceof InvalidToken) {
1287               sendToClient = (InvalidToken) cause;
1288               break;
1289             }
1290             cause = cause.getCause();
1291           }
1292           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1293             sendToClient.getLocalizedMessage());
1294           metrics.authenticationFailure();
1295           String clientIP = this.toString();
1296           // attempting user could be null
1297           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1298           throw e;
1299         }
1300         if (replyToken != null) {
1301           if (LOG.isDebugEnabled()) {
1302             LOG.debug("Will send token of size " + replyToken.length
1303                 + " from saslServer.");
1304           }
1305           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1306               null);
1307         }
1308         if (saslServer.isComplete()) {
1309           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1310           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1311           user = getAuthorizedUgi(saslServer.getAuthorizationID());
1312           if (LOG.isDebugEnabled()) {
1313             LOG.debug("SASL server context established. Authenticated client: "
1314               + user + ". Negotiated QoP is "
1315               + saslServer.getNegotiatedProperty(Sasl.QOP));
1316           }
1317           metrics.authenticationSuccess();
1318           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1319           saslContextEstablished = true;
1320         }
1321       }
1322     }
1323     /**
1324      * No protobuf encoding of raw sasl messages
1325      */
1326     private void doRawSaslReply(SaslStatus status, Writable rv,
1327         String errorClass, String error) throws IOException {
1328       //In my testing, have noticed that sasl messages are usually
1329       //in the ballpark of 100-200. That's why the initialcapacity is 256.
1330       ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
1331       DataOutputStream out = new DataOutputStream(saslResponse);
1332       out.writeInt(status.state); // write status
1333       if (status == SaslStatus.SUCCESS) {
1334         rv.write(out);
1335       } else {
1336         WritableUtils.writeString(out, errorClass);
1337         WritableUtils.writeString(out, error);
1338       }
1339       saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1340       saslCall.responder = responder;
1341       saslCall.sendResponseIfReady();
1342     }
1343 
1344     private void disposeSasl() {
1345       if (saslServer != null) {
1346         try {
1347           saslServer.dispose();
1348           saslServer = null;
1349         } catch (SaslException ignored) {
1350         }
1351       }
1352     }
1353 
1354     /**
1355      * Read off the wire.
1356      * @return Returns -1 if failure (and caller will close connection) else return how many
1357      * bytes were read and processed
1358      * @throws IOException
1359      * @throws InterruptedException
1360      */
1361     public int readAndProcess() throws IOException, InterruptedException {
1362       while (true) {
1363         // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1364         // does, read in the rest of the connection preamble, the version and the auth method.
1365         // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1366         // length into the 4-byte this.dataLengthBuffer.
1367         int count;
1368         if (this.dataLengthBuffer.remaining() > 0) {
1369           count = channelRead(channel, this.dataLengthBuffer);
1370           if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
1371             return count;
1372           }
1373         }
1374         // If we have not read the connection setup preamble, look to see if that is on the wire.
1375         if (!connectionPreambleRead) {
1376           // Check for 'HBas' magic.
1377           this.dataLengthBuffer.flip();
1378           if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
1379             return doBadPreambleHandling("Expected HEADER=" +
1380               Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
1381               " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1382               " from " + toString());
1383           }
1384           // Now read the next two bytes, the version and the auth to use.
1385           ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1386           count = channelRead(channel, versionAndAuthBytes);
1387           if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1388             return count;
1389           }
1390           int version = versionAndAuthBytes.get(0);
1391           byte authbyte = versionAndAuthBytes.get(1);
1392           this.authMethod = AuthMethod.valueOf(authbyte);
1393           if (version != CURRENT_VERSION) {
1394             String msg = getFatalConnectionString(version, authbyte);
1395             return doBadPreambleHandling(msg, new WrongVersionException(msg));
1396           }
1397           if (authMethod == null) {
1398             String msg = getFatalConnectionString(version, authbyte);
1399             return doBadPreambleHandling(msg, new BadAuthException(msg));
1400           }
1401           if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1402             AccessControlException ae = new AccessControlException("Authentication is required");
1403             setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1404             responder.doRespond(authFailedCall);
1405             throw ae;
1406           }
1407           if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1408             doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1409                 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1410             authMethod = AuthMethod.SIMPLE;
1411             // client has already sent the initial Sasl message and we
1412             // should ignore it. Both client and server should fall back
1413             // to simple auth from now on.
1414             skipInitialSaslHandshake = true;
1415           }
1416           if (authMethod != AuthMethod.SIMPLE) {
1417             useSasl = true;
1418           }
1419           connectionPreambleRead = true;
1420           // Preamble checks out. Go around again to read actual connection header.
1421           dataLengthBuffer.clear();
1422           continue;
1423         }
1424         // We have read a length and we have read the preamble.  It is either the connection header
1425         // or it is a request.
1426         if (data == null) {
1427           dataLengthBuffer.flip();
1428           int dataLength = dataLengthBuffer.getInt();
1429           if (dataLength == RpcClient.PING_CALL_ID) {
1430             if (!useWrap) { //covers the !useSasl too
1431               dataLengthBuffer.clear();
1432               return 0;  //ping message
1433             }
1434           }
1435           if (dataLength < 0) {
1436             throw new IllegalArgumentException("Unexpected data length "
1437                 + dataLength + "!! from " + getHostAddress());
1438           }
1439           data = ByteBuffer.allocate(dataLength);
1440           incRpcCount();  // Increment the rpc count
1441         }
1442         count = channelRead(channel, data);
1443         if (data.remaining() == 0) {
1444           dataLengthBuffer.clear();
1445           data.flip();
1446           if (skipInitialSaslHandshake) {
1447             data = null;
1448             skipInitialSaslHandshake = false;
1449             continue;
1450           }
1451           boolean headerRead = connectionHeaderRead;
1452           if (useSasl) {
1453             saslReadAndProcess(data.array());
1454           } else {
1455             processOneRpc(data.array());
1456           }
1457           this.data = null;
1458           if (!headerRead) {
1459             continue;
1460           }
1461         } else {
1462           // More to read still; go around again.
1463           if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
1464           continue;
1465         }
1466         return count;
1467       }
1468     }
1469 
1470     private String getFatalConnectionString(final int version, final byte authByte) {
1471       return "serverVersion=" + CURRENT_VERSION +
1472       ", clientVersion=" + version + ", authMethod=" + authByte +
1473       ", authSupported=" + (authMethod != null) + " from " + toString();
1474     }
1475 
1476     private int doBadPreambleHandling(final String msg) throws IOException {
1477       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1478     }
1479 
1480     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1481       LOG.warn(msg);
1482       Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null);
1483       setupResponse(null, fakeCall, e, msg);
1484       responder.doRespond(fakeCall);
1485       // Returning -1 closes out the connection.
1486       return -1;
1487     }
1488 
1489     // Reads the connection header following version
1490     private void processConnectionHeader(byte[] buf) throws IOException {
1491       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1492       String serviceName = connectionHeader.getServiceName();
1493       if (serviceName == null) throw new EmptyServiceNameException();
1494       this.service = getService(services, serviceName);
1495       if (this.service == null) throw new UnknownServiceException(serviceName);
1496       setupCellBlockCodecs(this.connectionHeader);
1497       UserGroupInformation protocolUser = createUser(connectionHeader);
1498       if (!useSasl) {
1499         user = protocolUser;
1500         if (user != null) {
1501           user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1502         }
1503       } else {
1504         // user is authenticated
1505         user.setAuthenticationMethod(authMethod.authenticationMethod);
1506         //Now we check if this is a proxy user case. If the protocol user is
1507         //different from the 'user', it is a proxy user scenario. However,
1508         //this is not allowed if user authenticated with DIGEST.
1509         if ((protocolUser != null)
1510             && (!protocolUser.getUserName().equals(user.getUserName()))) {
1511           if (authMethod == AuthMethod.DIGEST) {
1512             // Not allowed to doAs if token authentication is used
1513             throw new AccessControlException("Authenticated user (" + user
1514                 + ") doesn't match what the client claims to be ("
1515                 + protocolUser + ")");
1516           } else {
1517             // Effective user can be different from authenticated user
1518             // for simple auth or kerberos auth
1519             // The user is the real user. Now we create a proxy user
1520             UserGroupInformation realUser = user;
1521             user = UserGroupInformation.createProxyUser(protocolUser
1522                 .getUserName(), realUser);
1523             // Now the user is a proxy user, set Authentication method Proxy.
1524             user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1525           }
1526         }
1527       }
1528     }
1529 
1530     /**
1531      * Set up cell block codecs
1532      * @param header
1533      * @throws FatalConnectionException
1534      */
1535     private void setupCellBlockCodecs(final ConnectionHeader header)
1536     throws FatalConnectionException {
1537       // TODO: Plug in other supported decoders.
1538       if (!header.hasCellBlockCodecClass()) throw new FatalConnectionException("No codec");
1539       String className = header.getCellBlockCodecClass();
1540       try {
1541         this.codec = (Codec)Class.forName(className).newInstance();
1542       } catch (Exception e) {
1543         throw new UnsupportedCellCodecException(className, e);
1544       }
1545       if (!header.hasCellBlockCompressorClass()) return;
1546       className = header.getCellBlockCompressorClass();
1547       try {
1548         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1549       } catch (Exception e) {
1550         throw new UnsupportedCompressionCodecException(className, e);
1551       }
1552     }
1553 
1554     private void processUnwrappedData(byte[] inBuf) throws IOException,
1555     InterruptedException {
1556       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1557       // Read all RPCs contained in the inBuf, even partial ones
1558       while (true) {
1559         int count = -1;
1560         if (unwrappedDataLengthBuffer.remaining() > 0) {
1561           count = channelRead(ch, unwrappedDataLengthBuffer);
1562           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1563             return;
1564         }
1565 
1566         if (unwrappedData == null) {
1567           unwrappedDataLengthBuffer.flip();
1568           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1569 
1570           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1571             if (LOG.isDebugEnabled())
1572               LOG.debug("Received ping message");
1573             unwrappedDataLengthBuffer.clear();
1574             continue; // ping message
1575           }
1576           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1577         }
1578 
1579         count = channelRead(ch, unwrappedData);
1580         if (count <= 0 || unwrappedData.remaining() > 0)
1581           return;
1582 
1583         if (unwrappedData.remaining() == 0) {
1584           unwrappedDataLengthBuffer.clear();
1585           unwrappedData.flip();
1586           processOneRpc(unwrappedData.array());
1587           unwrappedData = null;
1588         }
1589       }
1590     }
1591 
1592     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1593       if (connectionHeaderRead) {
1594         processRequest(buf);
1595       } else {
1596         processConnectionHeader(buf);
1597         this.connectionHeaderRead = true;
1598         if (!authorizeConnection()) {
1599           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1600           // down the connection instead of trying to read non-existent retun.
1601           throw new AccessControlException("Connection from " + this + " for service " +
1602             connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1603         }
1604       }
1605     }
1606 
1607     /**
1608      * @param buf Has the request header and the request param and optionally encoded data buffer
1609      * all in this one array.
1610      * @throws IOException
1611      * @throws InterruptedException
1612      */
1613     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1614       long totalRequestSize = buf.length;
1615       int offset = 0;
1616       // Here we read in the header.  We avoid having pb
1617       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1618       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1619       int headerSize = cis.readRawVarint32();
1620       offset = cis.getTotalBytesRead();
1621       RequestHeader header = RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
1622       offset += headerSize;
1623       int id = header.getCallId();
1624       if (LOG.isTraceEnabled()) {
1625         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1626           " totalRequestSize: " + totalRequestSize + " bytes");
1627       }
1628       // Enforcing the call queue size, this triggers a retry in the client
1629       // This is a bit late to be doing this check - we have already read in the total request.
1630       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1631         final Call callTooBig =
1632           new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null);
1633         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1634         setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
1635           "Call queue is full, is ipc.server.max.callqueue.size too small?");
1636         responder.doRespond(callTooBig);
1637         return;
1638       }
1639       MethodDescriptor md = null;
1640       Message param = null;
1641       CellScanner cellScanner = null;
1642       try {
1643         if (header.hasRequestParam() && header.getRequestParam()) {
1644           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1645           Builder builder = this.service.getRequestPrototype(md).newBuilderForType();
1646           // To read the varint, I need an inputstream; might as well be a CIS.
1647           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1648           int paramSize = cis.readRawVarint32();
1649           offset += cis.getTotalBytesRead();
1650           if (builder != null) {
1651             param = builder.mergeFrom(buf, offset, paramSize).build();
1652           }
1653           offset += paramSize;
1654         }
1655         if (header.hasCellBlockMeta()) {
1656           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1657             buf, offset, buf.length);
1658         }
1659       } catch (Throwable t) {
1660         String msg = "Unable to read call parameter from client " + getHostAddress();
1661         LOG.warn(msg, t);
1662         final Call readParamsFailedCall =
1663           new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null);
1664         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1665         setupResponse(responseBuffer, readParamsFailedCall, t,
1666           msg + "; " + t.getMessage());
1667         responder.doRespond(readParamsFailedCall);
1668         return;
1669       }
1670 
1671       Call call = null;
1672       if (header.hasTraceInfo()) {
1673         call = new Call(id, this.service, md, param, cellScanner, this, responder, totalRequestSize,
1674           new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()));
1675       } else {
1676         call = new Call(id, this.service, md, param, cellScanner, this, responder,
1677           totalRequestSize, null);
1678       }
1679       callQueueSize.add(totalRequestSize);
1680       Pair<RequestHeader, Message> headerAndParam = new Pair<RequestHeader, Message>(header, param);
1681       if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
1682         priorityCallQueue.put(call);
1683       } else if (replicationQueue != null &&
1684           getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
1685         replicationQueue.put(call);
1686       } else {
1687         callQueue.put(call);              // queue the call; maybe blocked here
1688       }
1689     }
1690 
1691     private boolean authorizeConnection() throws IOException {
1692       try {
1693         // If auth method is DIGEST, the token was obtained by the
1694         // real user for the effective user, therefore not required to
1695         // authorize real user. doAs is allowed only for simple or kerberos
1696         // authentication
1697         if (user != null && user.getRealUser() != null
1698             && (authMethod != AuthMethod.DIGEST)) {
1699           ProxyUsers.authorize(user, this.getHostAddress(), conf);
1700         }
1701         authorize(user, connectionHeader, getHostInetAddress());
1702         if (LOG.isDebugEnabled()) {
1703           LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
1704         }
1705         metrics.authorizationSuccess();
1706       } catch (AuthorizationException ae) {
1707         LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1708         metrics.authorizationFailure();
1709         setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1710         responder.doRespond(authFailedCall);
1711         return false;
1712       }
1713       return true;
1714     }
1715 
1716     protected synchronized void close() {
1717       disposeSasl();
1718       data = null;
1719       this.dataLengthBuffer = null;
1720       if (!channel.isOpen())
1721         return;
1722       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
1723       if (channel.isOpen()) {
1724         try {channel.close();} catch(Exception ignored) {}
1725       }
1726       try {socket.close();} catch(Exception ignored) {}
1727     }
1728 
1729     private UserGroupInformation createUser(ConnectionHeader head) {
1730       UserGroupInformation ugi = null;
1731 
1732       if (!head.hasUserInfo()) {
1733         return null;
1734       }
1735       UserInformation userInfoProto = head.getUserInfo();
1736       String effectiveUser = null;
1737       if (userInfoProto.hasEffectiveUser()) {
1738         effectiveUser = userInfoProto.getEffectiveUser();
1739       }
1740       String realUser = null;
1741       if (userInfoProto.hasRealUser()) {
1742         realUser = userInfoProto.getRealUser();
1743       }
1744       if (effectiveUser != null) {
1745         if (realUser != null) {
1746           UserGroupInformation realUserUgi =
1747               UserGroupInformation.createRemoteUser(realUser);
1748           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1749         } else {
1750           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1751         }
1752       }
1753       return ugi;
1754     }
1755   }
1756 
1757   /** Handles queued calls . */
1758   private class Handler extends Thread {
1759     private final BlockingQueue<Call> myCallQueue;
1760     private MonitoredRPCHandler status;
1761 
1762     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
1763       this.myCallQueue = cq;
1764       this.setDaemon(true);
1765 
1766       String threadName = "IPC Server handler " + instanceNumber + " on " + port;
1767       if (cq == priorityCallQueue) {
1768         // this is just an amazing hack, but it works.
1769         threadName = "PRI " + threadName;
1770       } else if (cq == replicationQueue) {
1771         threadName = "REPL " + threadName;
1772       }
1773       this.setName(threadName);
1774       this.status = TaskMonitor.get().createRPCStatus(threadName);
1775     }
1776 
1777     @Override
1778     public void run() {
1779       LOG.info(getName() + ": starting");
1780       status.setStatus("starting");
1781       SERVER.set(RpcServer.this);
1782       while (running) {
1783         try {
1784           status.pause("Waiting for a call");
1785           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
1786           status.setStatus("Setting up call");
1787           status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
1788           if (LOG.isDebugEnabled()) {
1789             UserGroupInformation remoteUser = call.connection.user;
1790             LOG.debug(call.toString() + " executing as " +
1791               ((remoteUser == null)? "NULL principal": remoteUser.getUserName()));
1792           }
1793           Throwable errorThrowable = null;
1794           String error = null;
1795           Pair<Message, CellScanner> resultPair = null;
1796           CurCall.set(call);
1797           Span currentRequestSpan = NullSpan.getInstance();
1798           try {
1799             if (!started) {
1800               throw new ServerNotRunningYetException("Server is not running yet");
1801             }
1802             if (call.tinfo != null) {
1803               currentRequestSpan = Trace.startSpan(
1804                   "handling " + call.toString(), call.tinfo, Sampler.ALWAYS);
1805             }
1806             RequestContext.set(User.create(call.connection.user), getRemoteIp(),
1807               call.connection.service);
1808 
1809             // make the call
1810             resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
1811               status);
1812           } catch (Throwable e) {
1813             LOG.debug(getName() + ": " + call.toString(), e);
1814             errorThrowable = e;
1815             error = StringUtils.stringifyException(e);
1816           } finally {
1817             currentRequestSpan.stop();
1818             // Must always clear the request context to avoid leaking
1819             // credentials between requests.
1820             RequestContext.clear();
1821           }
1822           CurCall.set(null);
1823           callQueueSize.add(call.getSize() * -1);
1824           // Set the response for undelayed calls and delayed calls with
1825           // undelayed responses.
1826           if (!call.isDelayed() || !call.isReturnValueDelayed()) {
1827             Message param = resultPair != null? resultPair.getFirst(): null;
1828             CellScanner cells = resultPair != null? resultPair.getSecond(): null;
1829             call.setResponse(param, cells, errorThrowable, error);
1830           }
1831           call.sendResponseIfReady();
1832           status.markComplete("Sent response");
1833         } catch (InterruptedException e) {
1834           if (running) {                          // unexpected -- log it
1835             LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e));
1836           }
1837         } catch (OutOfMemoryError e) {
1838           if (errorHandler != null) {
1839             if (errorHandler.checkOOME(e)) {
1840               LOG.info(getName() + ": exiting on OutOfMemoryError");
1841               return;
1842             }
1843           } else {
1844             // rethrow if no handler
1845             throw e;
1846           }
1847        } catch (ClosedChannelException cce) {
1848           LOG.warn(getName() + ": caught a ClosedChannelException, " +
1849             "this means that the server was processing a " +
1850             "request but the client went away. The error message was: " +
1851             cce.getMessage());
1852         } catch (Exception e) {
1853           LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));
1854         }
1855       }
1856       LOG.info(getName() + ": exiting");
1857     }
1858   }
1859 
1860   /**
1861    * Datastructure for passing a {@link BlockingService} and its associated class of
1862    * protobuf service interface.  For example, a server that fielded what is defined
1863    * in the client protobuf service would pass in an implementation of the client blocking service
1864    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1865    */
1866   public static class BlockingServiceAndInterface {
1867     private final BlockingService service;
1868     private final Class<?> serviceInterface;
1869     public BlockingServiceAndInterface(final BlockingService service,
1870         final Class<?> serviceInterface) {
1871       this.service = service;
1872       this.serviceInterface = serviceInterface;
1873     }
1874     public Class<?> getServiceInterface() {
1875       return this.serviceInterface;
1876     }
1877     public BlockingService getBlockingService() {
1878       return this.service;
1879     }
1880   }
1881 
1882 
1883   /**
1884    * Minimal setup.  Used by tests mostly.
1885    * @param service
1886    * @param isa
1887    * @param conf
1888    * @throws IOException
1889    */
1890   public RpcServer(final BlockingService service, final InetSocketAddress isa,
1891       final Configuration conf)
1892   throws IOException {
1893     this(null, "generic", Lists.newArrayList(new BlockingServiceAndInterface(service, null)),
1894         isa, 3, 3, conf,
1895       HConstants.QOS_THRESHOLD);
1896   }
1897 
1898   /**
1899    * Constructs a server listening on the named port and address.
1900    * @param serverInstance hosting instance of {@link Server}. We will do authentications if an
1901    * instance else pass null for no authentication check.
1902    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1903    * @param services A list of services.
1904    * @param isa Where to listen
1905    * @param handlerCount the number of handler threads that will be used to process calls
1906    * @param priorityHandlerCount How many threads for priority handling.
1907    * @param conf
1908    * @param highPriorityLevel
1909    * @throws IOException
1910    */
1911   public RpcServer(final Server serverInstance, final String name,
1912       final List<BlockingServiceAndInterface> services,
1913       final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf,
1914       int highPriorityLevel)
1915   throws IOException {
1916     this.serverInstance = serverInstance;
1917     this.services = services;
1918     this.isa = isa;
1919     this.conf = conf;
1920     this.handlerCount = handlerCount;
1921     this.priorityHandlerCount = priorityHandlerCount;
1922     this.socketSendBufferSize = 0;
1923     this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length",
1924       handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
1925     this.maxQueueSize =
1926       this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
1927     this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
1928     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueLength);
1929     if (priorityHandlerCount > 0) {
1930       this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength); // TODO hack on size
1931     } else {
1932       this.priorityCallQueue = null;
1933     }
1934     this.highPriorityLevel = highPriorityLevel;
1935     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1936     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1937     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1938     this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
1939       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1940     this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3);
1941     if (numOfReplicationHandlers > 0) {
1942       this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
1943     }
1944 
1945     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
1946     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
1947 
1948     // Start the listener here and let it bind to the port
1949     listener = new Listener(name);
1950     this.port = listener.getAddress().getPort();
1951 
1952     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
1953     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
1954     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1955 
1956     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
1957     this.delayedCalls = new AtomicInteger(0);
1958     this.ipcUtil = new IPCUtil(conf);
1959 
1960 
1961     // Create the responder here
1962     responder = new Responder();
1963     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
1964     this.isSecurityEnabled = User.isHBaseSecurityEnabled(this.conf);
1965     if (isSecurityEnabled) {
1966       HBaseSaslRpcServer.init(conf);
1967     }
1968   }
1969 
1970   /**
1971    * Subclasses of HBaseServer can override this to provide their own
1972    * Connection implementations.
1973    */
1974   protected Connection getConnection(SocketChannel channel, long time) {
1975     return new Connection(channel, time);
1976   }
1977 
1978   /**
1979    * Setup response for the IPC Call.
1980    *
1981    * @param response buffer to serialize the response into
1982    * @param call {@link Call} to which we are setting up the response
1983    * @param error error message, if the call failed
1984    * @param t
1985    * @throws IOException
1986    */
1987   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
1988   throws IOException {
1989     if (response != null) response.reset();
1990     call.setResponse(null, null, t, error);
1991   }
1992 
1993   protected void closeConnection(Connection connection) {
1994     synchronized (connectionList) {
1995       if (connectionList.remove(connection)) {
1996         numConnections--;
1997       }
1998     }
1999     connection.close();
2000   }
2001 
2002   Configuration getConf() {
2003     return conf;
2004   }
2005 
2006   /** Sets the socket buffer size used for responding to RPCs.
2007    * @param size send size
2008    */
2009   @Override
2010   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2011 
2012   /** Starts the service.  Must be called before any calls will be handled. */
2013   @Override
2014   public void start() {
2015     startThreads();
2016     openServer();
2017   }
2018 
2019   /**
2020    * Open a previously started server.
2021    */
2022   @Override
2023   public void openServer() {
2024     started = true;
2025   }
2026 
2027   /**
2028    * Starts the service threads but does not allow requests to be responded yet.
2029    * Client will get {@link ServerNotRunningYetException} instead.
2030    */
2031   @Override
2032   public synchronized void startThreads() {
2033     AuthenticationTokenSecretManager mgr = createSecretManager();
2034     if (mgr != null) {
2035       setSecretManager(mgr);
2036       mgr.start();
2037     }
2038     this.authManager = new ServiceAuthorizationManager();
2039     HBasePolicyProvider.init(conf, authManager);
2040     responder.start();
2041     listener.start();
2042     handlers = startHandlers(callQueue, handlerCount);
2043     priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
2044     replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
2045   }
2046 
2047   @Override
2048   public void refreshAuthManager(PolicyProvider pp) {
2049     this.authManager.refresh(this.conf, pp);
2050   }
2051 
2052   private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
2053     if (numOfHandlers <= 0) {
2054       return null;
2055     }
2056     Handler[] handlers = new Handler[numOfHandlers];
2057     for (int i = 0; i < numOfHandlers; i++) {
2058       handlers[i] = new Handler(queue, i);
2059       handlers[i].start();
2060     }
2061     return handlers;
2062   }
2063 
2064   private AuthenticationTokenSecretManager createSecretManager() {
2065     if (!isSecurityEnabled) return null;
2066     if (serverInstance == null) return null;
2067     if (!(serverInstance instanceof org.apache.hadoop.hbase.Server)) return null;
2068     org.apache.hadoop.hbase.Server server = (org.apache.hadoop.hbase.Server)serverInstance;
2069     Configuration conf = server.getConfiguration();
2070     long keyUpdateInterval =
2071         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2072     long maxAge =
2073         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2074     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2075         server.getServerName().toString(), keyUpdateInterval, maxAge);
2076   }
2077 
2078   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2079     return this.secretManager;
2080   }
2081 
2082   @SuppressWarnings("unchecked")
2083   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2084     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2085   }
2086 
2087   /**
2088    * This is a server side method, which is invoked over RPC. On success
2089    * the return response has protobuf response payload. On failure, the
2090    * exception name and the stack trace are returned in the protobuf response.
2091    */
2092   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2093       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2094   throws IOException {
2095     try {
2096       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2097       // TODO: Review after we add in encoded data blocks.
2098       status.setRPCPacket(param);
2099       status.resume("Servicing call");
2100       //get an instance of the method arg type
2101       long startTime = System.currentTimeMillis();
2102       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2103       Message result = service.callBlockingMethod(md, controller, param);
2104       int processingTime = (int) (System.currentTimeMillis() - startTime);
2105       int qTime = (int) (startTime - receiveTime);
2106       if (LOG.isTraceEnabled()) {
2107         LOG.trace(CurCall.get().toString() +
2108             ", response " + TextFormat.shortDebugString(result) +
2109             " queueTime: " + qTime +
2110             " processingTime: " + processingTime);
2111       }
2112       metrics.dequeuedCall(qTime);
2113       metrics.processedCall(processingTime);
2114       long responseSize = result.getSerializedSize();
2115       // log any RPC responses that are slower than the configured warn
2116       // response time or larger than configured warning size
2117       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2118       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2119       if (tooSlow || tooLarge) {
2120         // when tagging, we let TooLarge trump TooSmall to keep output simple
2121         // note that large responses will often also be slow.
2122         StringBuilder buffer = new StringBuilder(256);
2123         buffer.append(md.getName());
2124         buffer.append("(");
2125         buffer.append(param.getClass().getName());
2126         buffer.append(")");
2127         logResponse(new Object[]{param},
2128             md.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
2129             status.getClient(), startTime, processingTime, qTime,
2130             responseSize);
2131       }
2132       return new Pair<Message, CellScanner>(result,
2133         controller != null? controller.cellScanner(): null);
2134     } catch (Throwable e) {
2135       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2136       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2137       // need to pass it over the wire.
2138       if (e instanceof ServiceException) e = e.getCause();
2139       if (e instanceof IOException) throw (IOException)e;
2140       LOG.error("Unexpected throwable object ", e);
2141       throw new IOException(e.getMessage(), e);
2142     }
2143   }
2144 
2145   /**
2146    * Logs an RPC response to the LOG file, producing valid JSON objects for
2147    * client Operations.
2148    * @param params The parameters received in the call.
2149    * @param methodName The name of the method invoked
2150    * @param call The string representation of the call
2151    * @param tag  The tag that will be used to indicate this event in the log.
2152    * @param clientAddress   The address of the client who made this call.
2153    * @param startTime       The time that the call was initiated, in ms.
2154    * @param processingTime  The duration that the call took to run, in ms.
2155    * @param qTime           The duration that the call spent on the queue
2156    *                        prior to being initiated, in ms.
2157    * @param responseSize    The size in bytes of the response buffer.
2158    */
2159   void logResponse(Object[] params, String methodName, String call, String tag,
2160       String clientAddress, long startTime, int processingTime, int qTime,
2161       long responseSize)
2162           throws IOException {
2163     // for JSON encoding
2164     ObjectMapper mapper = new ObjectMapper();
2165     // base information that is reported regardless of type of call
2166     Map<String, Object> responseInfo = new HashMap<String, Object>();
2167     responseInfo.put("starttimems", startTime);
2168     responseInfo.put("processingtimems", processingTime);
2169     responseInfo.put("queuetimems", qTime);
2170     responseInfo.put("responsesize", responseSize);
2171     responseInfo.put("client", clientAddress);
2172     responseInfo.put("class", serverInstance == null? "": serverInstance.getClass().getSimpleName());
2173     responseInfo.put("method", methodName);
2174     if (params.length == 2 && serverInstance instanceof HRegionServer &&
2175         params[0] instanceof byte[] &&
2176         params[1] instanceof Operation) {
2177       // if the slow process is a query, we want to log its table as well
2178       // as its own fingerprint
2179       byte [] tableName =
2180           HRegionInfo.parseRegionName((byte[]) params[0])[0];
2181       responseInfo.put("table", Bytes.toStringBinary(tableName));
2182       // annotate the response map with operation details
2183       responseInfo.putAll(((Operation) params[1]).toMap());
2184       // report to the log file
2185       LOG.warn("(operation" + tag + "): " +
2186           mapper.writeValueAsString(responseInfo));
2187     } else if (params.length == 1 && serverInstance instanceof HRegionServer &&
2188         params[0] instanceof Operation) {
2189       // annotate the response map with operation details
2190       responseInfo.putAll(((Operation) params[0]).toMap());
2191       // report to the log file
2192       LOG.warn("(operation" + tag + "): " +
2193           mapper.writeValueAsString(responseInfo));
2194     } else {
2195       // can't get JSON details, so just report call.toString() along with
2196       // a more generic tag.
2197       responseInfo.put("call", call);
2198       LOG.warn("(response" + tag + "): " + mapper.writeValueAsString(responseInfo));
2199     }
2200   }
2201 
2202   /** Stops the service.  No new calls will be handled after this is called. */
2203   @Override
2204   public synchronized void stop() {
2205     LOG.info("Stopping server on " + port);
2206     running = false;
2207     stopHandlers(handlers);
2208     stopHandlers(priorityHandlers);
2209     stopHandlers(replicationHandlers);
2210     listener.interrupt();
2211     listener.doStop();
2212     responder.interrupt();
2213     notifyAll();
2214   }
2215 
2216   private void stopHandlers(Handler[] handlers) {
2217     if (handlers != null) {
2218       for (Handler handler : handlers) {
2219         if (handler != null) {
2220           handler.interrupt();
2221         }
2222       }
2223     }
2224   }
2225 
2226   /** Wait for the server to be stopped.
2227    * Does not wait for all subthreads to finish.
2228    *  See {@link #stop()}.
2229    * @throws InterruptedException e
2230    */
2231   @Override
2232   public synchronized void join() throws InterruptedException {
2233     while (running) {
2234       wait();
2235     }
2236   }
2237 
2238   /**
2239    * Return the socket (ip+port) on which the RPC server is listening to.
2240    * @return the socket (ip+port) on which the RPC server is listening to.
2241    */
2242   @Override
2243   public synchronized InetSocketAddress getListenerAddress() {
2244     return listener.getAddress();
2245   }
2246 
2247   /**
2248    * Set the handler for calling out of RPC for error conditions.
2249    * @param handler the handler implementation
2250    */
2251   @Override
2252   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2253     this.errorHandler = handler;
2254   }
2255 
2256   /**
2257    * Returns the metrics instance for reporting RPC call statistics
2258    */
2259   public MetricsHBaseServer getMetrics() {
2260     return metrics;
2261   }
2262 
2263   /**
2264    * Authorize the incoming client connection.
2265    *
2266    * @param user client user
2267    * @param connection incoming connection
2268    * @param addr InetAddress of incoming connection
2269    * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
2270    */
2271   @SuppressWarnings("static-access")
2272   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2273   throws AuthorizationException {
2274     if (authorize) {
2275       Class<?> c = getServiceInterface(services, connection.getServiceName());
2276       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2277     }
2278   }
2279 
2280   /**
2281    * When the read or write buffer size is larger than this limit, i/o will be
2282    * done in chunks of this size. Most RPC requests and responses would be
2283    * be smaller.
2284    */
2285   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2286 
2287   /**
2288    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2289    * If the amount of data is large, it writes to channel in smaller chunks.
2290    * This is to avoid jdk from creating many direct buffers as the size of
2291    * buffer increases. This also minimizes extra copies in NIO layer
2292    * as a result of multiple write operations required to write a large
2293    * buffer.
2294    *
2295    * @param channel writable byte channel to write to
2296    * @param buffer buffer to write
2297    * @return number of bytes written
2298    * @throws java.io.IOException e
2299    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2300    */
2301   protected int channelWrite(WritableByteChannel channel,
2302                                     ByteBuffer buffer) throws IOException {
2303 
2304     int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2305            channel.write(buffer) : channelIO(null, channel, buffer);
2306     if (count > 0) {
2307       metrics.sentBytes(count);
2308     }
2309     return count;
2310   }
2311 
2312   /**
2313    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2314    * If the amount of data is large, it writes to channel in smaller chunks.
2315    * This is to avoid jdk from creating many direct buffers as the size of
2316    * ByteBuffer increases. There should not be any performance degredation.
2317    *
2318    * @param channel writable byte channel to write on
2319    * @param buffer buffer to write
2320    * @return number of bytes written
2321    * @throws java.io.IOException e
2322    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2323    */
2324   protected int channelRead(ReadableByteChannel channel,
2325                                    ByteBuffer buffer) throws IOException {
2326 
2327     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2328            channel.read(buffer) : channelIO(channel, null, buffer);
2329     if (count > 0) {
2330       metrics.receivedBytes(count);
2331     }
2332     return count;
2333   }
2334 
2335   /**
2336    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2337    * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
2338    * one of readCh or writeCh should be non-null.
2339    *
2340    * @param readCh read channel
2341    * @param writeCh write channel
2342    * @param buf buffer to read or write into/out of
2343    * @return bytes written
2344    * @throws java.io.IOException e
2345    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2346    * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
2347    */
2348   private static int channelIO(ReadableByteChannel readCh,
2349                                WritableByteChannel writeCh,
2350                                ByteBuffer buf) throws IOException {
2351 
2352     int originalLimit = buf.limit();
2353     int initialRemaining = buf.remaining();
2354     int ret = 0;
2355 
2356     while (buf.remaining() > 0) {
2357       try {
2358         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2359         buf.limit(buf.position() + ioSize);
2360 
2361         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2362 
2363         if (ret < ioSize) {
2364           break;
2365         }
2366 
2367       } finally {
2368         buf.limit(originalLimit);
2369       }
2370     }
2371 
2372     int nBytes = initialRemaining - buf.remaining();
2373     return (nBytes > 0) ? nBytes : ret;
2374   }
2375 
2376   /**
2377    * Needed for delayed calls.  We need to be able to store the current call
2378    * so that we can complete it later.
2379    * @return Call the server is currently handling.
2380    */
2381   public static RpcCallContext getCurrentCall() {
2382     return CurCall.get();
2383   }
2384 
2385   /**
2386    * @param serviceName Some arbitrary string that represents a 'service'.
2387    * @param services Available service instances
2388    * @return Matching BlockingServiceAndInterface pair
2389    */
2390   static BlockingServiceAndInterface getServiceAndInterface(
2391       final List<BlockingServiceAndInterface> services, final String serviceName) {
2392     for (BlockingServiceAndInterface bs : services) {
2393       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2394         return bs;
2395       }
2396     }
2397     return null;
2398   }
2399 
2400   /**
2401    * @param serviceName Some arbitrary string that represents a 'service'.
2402    * @param services Available services and their service interfaces.
2403    * @return Service interface class for <code>serviceName</code>
2404    */
2405   static Class<?> getServiceInterface(
2406       final List<BlockingServiceAndInterface> services,
2407       final String serviceName) {
2408     BlockingServiceAndInterface bsasi =
2409         getServiceAndInterface(services, serviceName);
2410     return bsasi == null? null: bsasi.getServiceInterface();
2411   }
2412 
2413   /**
2414    * @param serviceName Some arbitrary string that represents a 'service'.
2415    * @param services Available services and their service interfaces.
2416    * @return BlockingService that goes with the passed <code>serviceName</code>
2417    */
2418   static BlockingService getService(
2419       final List<BlockingServiceAndInterface> services,
2420       final String serviceName) {
2421     BlockingServiceAndInterface bsasi =
2422         getServiceAndInterface(services, serviceName);
2423     return bsasi == null? null: bsasi.getBlockingService();
2424   }
2425 
2426   /** Returns the remote side ip address when invoked inside an RPC
2427    *  Returns null incase of an error.
2428    *  @return InetAddress
2429    */
2430   public static InetAddress getRemoteIp() {
2431     Call call = CurCall.get();
2432     if (call != null) {
2433       return call.connection.socket.getInetAddress();
2434     }
2435     return null;
2436   }
2437 
2438   /** Returns remote address as a string when invoked inside an RPC.
2439    *  Returns null in case of an error.
2440    *  @return String
2441    */
2442   public static String getRemoteAddress() {
2443     Call call = CurCall.get();
2444     if (call != null) {
2445       return call.connection.getHostAddress();
2446     }
2447     return null;
2448   }
2449 
2450   /**
2451    * May be called under
2452    * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
2453    * and under protobuf methods of parameters and return values.
2454    * Permits applications to access the server context.
2455    * @return the server instance called under or null
2456    */
2457   public static RpcServerInterface get() {
2458     return SERVER.get();
2459   }
2460 
2461   /**
2462    * A convenience method to bind to a given address and report
2463    * better exceptions if the address is not a valid host.
2464    * @param socket the socket to bind
2465    * @param address the address to bind to
2466    * @param backlog the number of connections allowed in the queue
2467    * @throws BindException if the address can't be bound
2468    * @throws UnknownHostException if the address isn't a valid host name
2469    * @throws IOException other random errors from bind
2470    */
2471   public static void bind(ServerSocket socket, InetSocketAddress address,
2472                           int backlog) throws IOException {
2473     try {
2474       socket.bind(address, backlog);
2475     } catch (BindException e) {
2476       BindException bindException =
2477         new BindException("Problem binding to " + address + " : " +
2478             e.getMessage());
2479       bindException.initCause(e);
2480       throw bindException;
2481     } catch (SocketException e) {
2482       // If they try to bind to a different host's address, give a better
2483       // error message.
2484       if ("Unresolved address".equals(e.getMessage())) {
2485         throw new UnknownHostException("Invalid hostname for server: " +
2486                                        address.getHostName());
2487       }
2488       throw e;
2489     }
2490   }
2491 }