View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Set;
54  import java.util.Timer;
55  import java.util.TimerTask;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.LinkedBlockingQueue;
61  import java.util.concurrent.atomic.AtomicInteger;
62  import java.util.concurrent.locks.Lock;
63  import java.util.concurrent.locks.ReentrantLock;
64
65  import javax.security.sasl.Sasl;
66  import javax.security.sasl.SaslException;
67  import javax.security.sasl.SaslServer;
68
69  import org.apache.commons.logging.Log;
70  import org.apache.commons.logging.LogFactory;
71  import org.apache.hadoop.conf.Configuration;
72  import org.apache.hadoop.hbase.CallQueueTooBigException;
73  import org.apache.hadoop.hbase.CellScanner;
74  import org.apache.hadoop.hbase.DoNotRetryIOException;
75  import org.apache.hadoop.hbase.HBaseIOException;
76  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.HRegionInfo;
79  import org.apache.hadoop.hbase.Server;
80  import org.apache.hadoop.hbase.TableName;
81  import org.apache.hadoop.hbase.classification.InterfaceAudience;
82  import org.apache.hadoop.hbase.classification.InterfaceStability;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
89  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
90  import org.apache.hadoop.hbase.io.ByteBufferPool;
91  import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
92  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
93  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
94  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
95  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
100 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
101 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
102 import org.apache.hadoop.hbase.regionserver.HRegionServer;
103 import org.apache.hadoop.hbase.security.AccessDeniedException;
104 import org.apache.hadoop.hbase.security.AuthMethod;
105 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
107 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
108 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
109 import org.apache.hadoop.hbase.security.SaslStatus;
110 import org.apache.hadoop.hbase.security.SaslUtil;
111 import org.apache.hadoop.hbase.security.User;
112 import org.apache.hadoop.hbase.security.UserProvider;
113 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
114 import org.apache.hadoop.hbase.util.Bytes;
115 import org.apache.hadoop.hbase.util.Counter;
116 import org.apache.hadoop.hbase.util.Pair;
117 import org.apache.hadoop.hbase.util.Threads;
118 import org.apache.hadoop.io.BytesWritable;
119 import org.apache.hadoop.io.IOUtils;
120 import org.apache.hadoop.io.IntWritable;
121 import org.apache.hadoop.io.Writable;
122 import org.apache.hadoop.io.WritableUtils;
123 import org.apache.hadoop.io.compress.CompressionCodec;
124 import org.apache.hadoop.security.UserGroupInformation;
125 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
126 import org.apache.hadoop.security.authorize.AuthorizationException;
127 import org.apache.hadoop.security.authorize.PolicyProvider;
128 import org.apache.hadoop.security.authorize.ProxyUsers;
129 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
130 import org.apache.hadoop.security.token.SecretManager;
131 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
132 import org.apache.hadoop.security.token.TokenIdentifier;
133 import org.apache.hadoop.util.StringUtils;
134 import org.apache.htrace.TraceInfo;
135 import org.codehaus.jackson.map.ObjectMapper;
136
137 import com.google.common.util.concurrent.ThreadFactoryBuilder;
138 import com.google.protobuf.BlockingService;
139 import com.google.protobuf.CodedInputStream;
140 import com.google.protobuf.CodedOutputStream;
141 import com.google.protobuf.Descriptors.MethodDescriptor;
142 import com.google.protobuf.Message;
143 import com.google.protobuf.ServiceException;
144 import com.google.protobuf.TextFormat;
145
146 /**
147  * An RPC server that hosts protobuf described Services.
148  *
149  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
150  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
151  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
152  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
153  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
154  * and loops till done.
155  *
156  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
157  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
158  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
159  * and keep taking while the server is up.
160  *
161  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
162  * queue for Responder to pull from and return result to client.
163  *
164  * @see RpcClientImpl
165  */
166 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
167 @InterfaceStability.Evolving
168 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
169   // LOG is being used in CallRunner and the log level is being changed in tests
170   public static final Log LOG = LogFactory.getLog(RpcServer.class);
171   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
172       = new CallQueueTooBigException();
173
174   private final boolean authorize;
175   private boolean isSecurityEnabled;
176
177   public static final byte CURRENT_VERSION = 0;
178
179   /**
180    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
181    */
182   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
183           "hbase.ipc.server.fallback-to-simple-auth-allowed";
184
185   /**
186    * How many calls/handler are allowed in the queue.
187    */
188   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
189
190   private final IPCUtil ipcUtil;
191
192   private static final String AUTH_FAILED_FOR = "Auth failed for ";
193   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
194   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
195     Server.class.getName());
196   protected SecretManager<TokenIdentifier> secretManager;
197   protected ServiceAuthorizationManager authManager;
198
199   /** This is set to Call object before Handler invokes an RPC and ybdie
200    * after the call returns.
201    */
202   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
203
204   /** Keeps MonitoredRPCHandler per handler thread. */
205   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
206       = new ThreadLocal<MonitoredRPCHandler>();
207
208   protected final InetSocketAddress bindAddress;
209   protected int port;                             // port we listen on
210   protected InetSocketAddress address;            // inet address we listen on
211   private int readThreads;                        // number of read threads
212   protected MetricsHBaseServer metrics;
213
214   protected final Configuration conf;
215
216   /**
217    * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
218    * this size, then we will reject the call (after parsing it though). It will go back to the
219    * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
220    * call queue size gets incremented after we parse a call and before we add it to the queue of
221    * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
222    * size is kept in {@link #callQueueSizeInBytes}.
223    * @see {@link #callQueueSizeInBytes}
224    * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
225    * @see {@link #callQueueSizeInBytes}
226    */
227   private final long maxQueueSizeInBytes;
228   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
229
230   /**
231    * This is a running count of the size in bytes of all outstanding calls whether currently
232    * executing or queued waiting to be run.
233    */
234   protected final Counter callQueueSizeInBytes = new Counter();
235
236   protected int socketSendBufferSize;
237   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
238   protected final boolean tcpKeepAlive; // if T then use keepalives
239   protected final long purgeTimeout;    // in milliseconds
240
241   /**
242    * This flag is used to indicate to sub threads when they should go down.  When we call
243    * {@link #start()}, all threads started will consult this flag on whether they should
244    * keep going.  It is set to false when {@link #stop()} is called.
245    */
246   volatile boolean running = true;
247
248   /**
249    * This flag is set to true after all threads are up and 'running' and the server is then opened
250    * for business by the call to {@link #start()}.
251    */
252   volatile boolean started = false;
253
254   // maintains the set of client connections and handles idle timeouts
255   private ConnectionManager connectionManager;
256   private Listener listener = null;
257   protected Responder responder = null;
258   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
259
260   protected HBaseRPCErrorHandler errorHandler = null;
261
262   static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
263   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
264   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
265
266   /**
267    * Minimum allowable timeout (in milliseconds) in rpc request's header. This
268    * configuration exists to prevent the rpc service regarding this request as timeout immediately.
269    */
270   private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
271   private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
272
273   /** Default value for above params */
274   private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
275   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
276   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
277
278   private static final ObjectMapper MAPPER = new ObjectMapper();
279
280   private final int maxRequestSize;
281   private final int warnResponseTime;
282   private final int warnResponseSize;
283
284   private final int minClientRequestTimeout;
285
286   private final Server server;
287   private final List<BlockingServiceAndInterface> services;
288
289   private final RpcScheduler scheduler;
290
291   private UserProvider userProvider;
292
293   private final ByteBufferPool reservoir;
294
295   private volatile boolean allowFallbackToSimpleAuth;
296
297   /**
298    * Datastructure that holds all necessary to a method invocation and then afterward, carries
299    * the result.
300    */
301   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
302   @InterfaceStability.Evolving
303   public class Call implements RpcCallContext {
304     protected int id;                             // the client's call id
305     protected BlockingService service;
306     protected MethodDescriptor md;
307     protected RequestHeader header;
308     protected Message param;                      // the parameter passed
309     // Optional cell data passed outside of protobufs.
310     protected CellScanner cellScanner;
311     protected Connection connection;              // connection to client
312     protected long timestamp;      // the time received when response is null
313                                    // the time served when response is not null
314     protected int timeout;
315     /**
316      * Chain of buffers to send as response.
317      */
318     protected BufferChain response;
319     protected Responder responder;
320
321     protected long size;                          // size of current call
322     protected boolean isError;
323     protected TraceInfo tinfo;
324     private ByteBufferListOutputStream cellBlockStream = null;
325
326     private User user;
327     private InetAddress remoteAddress;
328     private RpcCallback callback;
329
330     private long responseCellSize = 0;
331     private long responseBlockSize = 0;
332     private boolean retryImmediatelySupported;
333
334     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
335         justification="Can't figure why this complaint is happening... see below")
336     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
337          Message param, CellScanner cellScanner, Connection connection, Responder responder,
338          long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
339       this.id = id;
340       this.service = service;
341       this.md = md;
342       this.header = header;
343       this.param = param;
344       this.cellScanner = cellScanner;
345       this.connection = connection;
346       this.timestamp = System.currentTimeMillis();
347       this.response = null;
348       this.responder = responder;
349       this.isError = false;
350       this.size = size;
351       this.tinfo = tinfo;
352       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
353       this.remoteAddress = remoteAddress;
354       this.retryImmediatelySupported =
355           connection == null? null: connection.retryImmediatelySupported;
356       this.timeout = timeout;
357     }
358
359     /**
360      * Call is done. Execution happened and we returned results to client. It is now safe to
361      * cleanup.
362      */
363     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
364         justification="Presume the lock on processing request held by caller is protection enough")
365     void done() {
366       if (this.cellBlockStream != null) {
367         this.cellBlockStream.releaseResources();// This will return back the BBs which we
368                                                 // got from pool.
369         this.cellBlockStream = null;
370       }
371       this.connection.decRpcCount();  // Say that we're done with this call.
372     }
373
374     @Override
375     public String toString() {
376       return toShortString() + " param: " +
377         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
378         " connection: " + connection.toString();
379     }
380
381     protected RequestHeader getHeader() {
382       return this.header;
383     }
384
385     public boolean hasPriority() {
386       return this.header.hasPriority();
387     }
388
389     public int getPriority() {
390       return this.header.getPriority();
391     }
392
393     /*
394      * Short string representation without param info because param itself could be huge depends on
395      * the payload of a command
396      */
397     String toShortString() {
398       String serviceName = this.connection.service != null ?
399           this.connection.service.getDescriptorForType().getName() : "null";
400       return "callId: " + this.id + " service: " + serviceName +
401           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
402           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
403           " connection: " + connection.toString();
404     }
405
406     String toTraceString() {
407       String serviceName = this.connection.service != null ?
408                            this.connection.service.getDescriptorForType().getName() : "";
409       String methodName = (this.md != null) ? this.md.getName() : "";
410       return serviceName + "." + methodName;
411     }
412
413     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
414       this.response = new BufferChain(response);
415     }
416
417     protected synchronized void setResponse(Object m, final CellScanner cells,
418         Throwable t, String errorMsg) {
419       if (this.isError) return;
420       if (t != null) this.isError = true;
421       BufferChain bc = null;
422       try {
423         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
424         // Presume it a pb Message.  Could be null.
425         Message result = (Message)m;
426         // Call id.
427         headerBuilder.setCallId(this.id);
428         if (t != null) {
429           setExceptionResponse(t, errorMsg, headerBuilder);
430         }
431         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
432         // reservoir when finished. This is hacky and the hack is not contained but benefits are
433         // high when we can avoid a big buffer allocation on each rpc.
434         List<ByteBuffer> cellBlock = null;
435         int cellBlockSize = 0;
436         if (reservoir != null) {
437           this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
438               this.connection.compressionCodec, cells, reservoir);
439           if (this.cellBlockStream != null) {
440             cellBlock = this.cellBlockStream.getByteBuffers();
441             cellBlockSize = this.cellBlockStream.size();
442           }
443         } else {
444           ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
445               this.connection.compressionCodec, cells);
446           if (b != null) {
447             cellBlockSize = b.remaining();
448             cellBlock = new ArrayList<ByteBuffer>(1);
449             cellBlock.add(b);
450           }
451         }
452
453         if (cellBlockSize > 0) {
454           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
455           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
456           cellBlockBuilder.setLength(cellBlockSize);
457           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
458         }
459         Message header = headerBuilder.build();
460         byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
461         List<ByteBuffer> responseBufs = new ArrayList<ByteBuffer>(
462             (cellBlock == null ? 1 : cellBlock.size()) + 1);
463         responseBufs.add(ByteBuffer.wrap(b));
464         if (cellBlock != null) responseBufs.addAll(cellBlock);
465         bc = new BufferChain(responseBufs);
466         if (connection.useWrap) {
467           bc = wrapWithSasl(bc);
468         }
469       } catch (IOException e) {
470         LOG.warn("Exception while creating response " + e);
471       }
472       this.response = bc;
473       // Once a response message is created and set to this.response, this Call can be treated as
474       // done. The Responder thread will do the n/w write of this message back to client.
475       if (this.callback != null) {
476         try {
477           this.callback.run();
478         } catch (Exception e) {
479           // Don't allow any exception here to kill this handler thread.
480           LOG.warn("Exception while running the Rpc Callback.", e);
481         }
482       }
483     }
484
485     private void setExceptionResponse(Throwable t, String errorMsg,
486         ResponseHeader.Builder headerBuilder) {
487       ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
488       exceptionBuilder.setExceptionClassName(t.getClass().getName());
489       exceptionBuilder.setStackTrace(errorMsg);
490       exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
491       if (t instanceof RegionMovedException) {
492         // Special casing for this exception.  This is only one carrying a payload.
493         // Do this instead of build a generic system for allowing exceptions carry
494         // any kind of payload.
495         RegionMovedException rme = (RegionMovedException)t;
496         exceptionBuilder.setHostname(rme.getHostname());
497         exceptionBuilder.setPort(rme.getPort());
498       }
499       // Set the exception as the result of the method invocation.
500       headerBuilder.setException(exceptionBuilder.build());
501     }
502
503     private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
504         throws IOException {
505       // Organize the response as a set of bytebuffers rather than collect it all together inside
506       // one big byte array; save on allocations.
507       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
508           resultVintSize = 0;
509       if (header != null) {
510         headerSerializedSize = header.getSerializedSize();
511         headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
512       }
513       if (result != null) {
514         resultSerializedSize = result.getSerializedSize();
515         resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
516       }
517       // calculate the total size
518       int totalSize = headerSerializedSize + headerVintSize
519           + (resultSerializedSize + resultVintSize)
520           + cellBlockSize;
521       // The byte[] should also hold the totalSize of the header, message and the cellblock
522       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
523           + resultVintSize + Bytes.SIZEOF_INT];
524       // The RpcClient expects the int to be in a format that code be decoded by
525       // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
526       // form of writing int.
527       Bytes.putInt(b, 0, totalSize);
528       CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
529           b.length - Bytes.SIZEOF_INT);
530       if (header != null) {
531         cos.writeMessageNoTag(header);
532       }
533       if (result != null) {
534         cos.writeMessageNoTag(result);
535       }
536       cos.flush();
537       cos.checkNoSpaceLeft();
538       return b;
539     }
540
541     private BufferChain wrapWithSasl(BufferChain bc)
542         throws IOException {
543       if (!this.connection.useSasl) return bc;
544       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
545       // THIS IS A BIG UGLY COPY.
546       byte [] responseBytes = bc.getBytes();
547       byte [] token;
548       // synchronization may be needed since there can be multiple Handler
549       // threads using saslServer to wrap responses.
550       synchronized (connection.saslServer) {
551         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
552       }
553       if (LOG.isTraceEnabled()) {
554         LOG.trace("Adding saslServer wrapped token of size " + token.length
555             + " as call response.");
556       }
557
558       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
559       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
560       return new BufferChain(bbTokenLength, bbTokenBytes);
561     }
562
563     @Override
564     public boolean isClientCellBlockSupported() {
565       return this.connection != null && this.connection.codec != null;
566     }
567
568     @Override
569     public long disconnectSince() {
570       if (!connection.channel.isOpen()) {
571         return System.currentTimeMillis() - timestamp;
572       } else {
573         return -1L;
574       }
575     }
576
577     public long getSize() {
578       return this.size;
579     }
580
581     @Override
582     public long getResponseCellSize() {
583       return responseCellSize;
584     }
585
586     @Override
587     public void incrementResponseCellSize(long cellSize) {
588       responseCellSize += cellSize;
589     }
590
591     @Override
592     public long getResponseBlockSize() {
593       return responseBlockSize;
594     }
595
596     @Override
597     public void incrementResponseBlockSize(long blockSize) {
598       responseBlockSize += blockSize;
599     }
600
601     public synchronized void sendResponseIfReady() throws IOException {
602       this.responder.doRespond(this);
603     }
604
605     public UserGroupInformation getRemoteUser() {
606       return connection.ugi;
607     }
608
609     @Override
610     public User getRequestUser() {
611       return user;
612     }
613
614     @Override
615     public String getRequestUserName() {
616       User user = getRequestUser();
617       return user == null? null: user.getShortName();
618     }
619
620     @Override
621     public InetAddress getRemoteAddress() {
622       return remoteAddress;
623     }
624
625     @Override
626     public VersionInfo getClientVersionInfo() {
627       return connection.getVersionInfo();
628     }
629
630     @Override
631     public synchronized void setCallBack(RpcCallback callback) {
632       this.callback = callback;
633     }
634
635     @Override
636     public boolean isRetryImmediatelySupported() {
637       return retryImmediatelySupported;
638     }
639   }
640
641   /** Listens on the socket. Creates jobs for the handler threads*/
642   private class Listener extends Thread {
643
644     private ServerSocketChannel acceptChannel = null; //the accept channel
645     private Selector selector = null; //the selector that we use for the server
646     private Reader[] readers = null;
647     private int currentReader = 0;
648     private final int readerPendingConnectionQueueLength;
649
650     private ExecutorService readPool;
651
652     public Listener(final String name) throws IOException {
653       super(name);
654       // The backlog of requests that we will have the serversocket carry.
655       int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
656       readerPendingConnectionQueueLength =
657           conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
658       // Create a new server socket and set to non blocking mode
659       acceptChannel = ServerSocketChannel.open();
660       acceptChannel.configureBlocking(false);
661
662       // Bind the server socket to the binding addrees (can be different from the default interface)
663       bind(acceptChannel.socket(), bindAddress, backlogLength);
664       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
665       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
666       // create a selector;
667       selector = Selector.open();
668
669       readers = new Reader[readThreads];
670       // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
671       // has an advantage in that it is easy to shutdown the pool.
672       readPool = Executors.newFixedThreadPool(readThreads,
673         new ThreadFactoryBuilder().setNameFormat(
674           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
675           ",port=" + port).setDaemon(true)
676         .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
677       for (int i = 0; i < readThreads; ++i) {
678         Reader reader = new Reader();
679         readers[i] = reader;
680         readPool.execute(reader);
681       }
682       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
683
684       // Register accepts on the server socket with the selector.
685       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
686       this.setName("RpcServer.listener,port=" + port);
687       this.setDaemon(true);
688     }
689
690
691     private class Reader implements Runnable {
692       final private LinkedBlockingQueue<Connection> pendingConnections;
693       private final Selector readSelector;
694
695       Reader() throws IOException {
696         this.pendingConnections =
697           new LinkedBlockingQueue<Connection>(readerPendingConnectionQueueLength);
698         this.readSelector = Selector.open();
699       }
700
701       @Override
702       public void run() {
703         try {
704           doRunLoop();
705         } finally {
706           try {
707             readSelector.close();
708           } catch (IOException ioe) {
709             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
710           }
711         }
712       }
713
714       private synchronized void doRunLoop() {
715         while (running) {
716           try {
717             // Consume as many connections as currently queued to avoid
718             // unbridled acceptance of connections that starves the select
719             int size = pendingConnections.size();
720             for (int i=size; i>0; i--) {
721               Connection conn = pendingConnections.take();
722               conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
723             }
724             readSelector.select();
725             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
726             while (iter.hasNext()) {
727               SelectionKey key = iter.next();
728               iter.remove();
729               if (key.isValid()) {
730                 if (key.isReadable()) {
731                   doRead(key);
732                 }
733               }
734               key = null;
735             }
736           } catch (InterruptedException e) {
737             if (running) {                      // unexpected -- log it
738               LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
739             }
740             return;
741           } catch (IOException ex) {
742             LOG.info(getName() + ": IOException in Reader", ex);
743           }
744         }
745       }
746
747       /**
748        * Updating the readSelector while it's being used is not thread-safe,
749        * so the connection must be queued.  The reader will drain the queue
750        * and update its readSelector before performing the next select
751        */
752       public void addConnection(Connection conn) throws IOException {
753         pendingConnections.add(conn);
754         readSelector.wakeup();
755       }
756     }
757
758     @Override
759     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
760       justification="selector access is not synchronized; seems fine but concerned changing " +
761         "it will have per impact")
762     public void run() {
763       LOG.info(getName() + ": starting");
764       connectionManager.startIdleScan();
765       while (running) {
766         SelectionKey key = null;
767         try {
768           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
769           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
770           while (iter.hasNext()) {
771             key = iter.next();
772             iter.remove();
773             try {
774               if (key.isValid()) {
775                 if (key.isAcceptable())
776                   doAccept(key);
777               }
778             } catch (IOException ignored) {
779               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
780             }
781             key = null;
782           }
783         } catch (OutOfMemoryError e) {
784           if (errorHandler != null) {
785             if (errorHandler.checkOOME(e)) {
786               LOG.info(getName() + ": exiting on OutOfMemoryError");
787               closeCurrentConnection(key, e);
788               connectionManager.closeIdle(true);
789               return;
790             }
791           } else {
792             // we can run out of memory if we have too many threads
793             // log the event and sleep for a minute and give
794             // some thread(s) a chance to finish
795             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
796             closeCurrentConnection(key, e);
797             connectionManager.closeIdle(true);
798             try {
799               Thread.sleep(60000);
800             } catch (InterruptedException ex) {
801               LOG.debug("Interrupted while sleeping");
802             }
803           }
804         } catch (Exception e) {
805           closeCurrentConnection(key, e);
806         }
807       }
808       LOG.info(getName() + ": stopping");
809       synchronized (this) {
810         try {
811           acceptChannel.close();
812           selector.close();
813         } catch (IOException ignored) {
814           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
815         }
816
817         selector= null;
818         acceptChannel= null;
819
820         // close all connections
821         connectionManager.stopIdleScan();
822         connectionManager.closeAll();
823       }
824     }
825
826     private void closeCurrentConnection(SelectionKey key, Throwable e) {
827       if (key != null) {
828         Connection c = (Connection)key.attachment();
829         if (c != null) {
830           closeConnection(c);
831           key.attach(null);
832         }
833       }
834     }
835
836     InetSocketAddress getAddress() {
837       return address;
838     }
839
840     void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
841       ServerSocketChannel server = (ServerSocketChannel) key.channel();
842       SocketChannel channel;
843       while ((channel = server.accept()) != null) {
844         channel.configureBlocking(false);
845         channel.socket().setTcpNoDelay(tcpNoDelay);
846         channel.socket().setKeepAlive(tcpKeepAlive);
847         Reader reader = getReader();
848         Connection c = connectionManager.register(channel);
849         // If the connectionManager can't take it, close the connection.
850         if (c == null) {
851           if (channel.isOpen()) {
852             IOUtils.cleanup(null, channel);
853           }
854           continue;
855         }
856         key.attach(c);  // so closeCurrentConnection can get the object
857         reader.addConnection(c);
858       }
859     }
860
861     void doRead(SelectionKey key) throws InterruptedException {
862       int count;
863       Connection c = (Connection) key.attachment();
864       if (c == null) {
865         return;
866       }
867       c.setLastContact(System.currentTimeMillis());
868       try {
869         count = c.readAndProcess();
870       } catch (InterruptedException ieo) {
871         LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
872         throw ieo;
873       } catch (Exception e) {
874         if (LOG.isDebugEnabled()) {
875           LOG.debug(getName() + ": Caught exception while reading:", e);
876         }
877         count = -1; //so that the (count < 0) block is executed
878       }
879       if (count < 0) {
880         closeConnection(c);
881         c = null;
882       } else {
883         c.setLastContact(System.currentTimeMillis());
884       }
885     }
886
887     synchronized void doStop() {
888       if (selector != null) {
889         selector.wakeup();
890         Thread.yield();
891       }
892       if (acceptChannel != null) {
893         try {
894           acceptChannel.socket().close();
895         } catch (IOException e) {
896           LOG.info(getName() + ": exception in closing listener socket. " + e);
897         }
898       }
899       readPool.shutdownNow();
900     }
901
902     synchronized Selector getSelector() { return selector; }
903
904     // The method that will return the next reader to work with
905     // Simplistic implementation of round robin for now
906     Reader getReader() {
907       currentReader = (currentReader + 1) % readers.length;
908       return readers[currentReader];
909     }
910   }
911
912   // Sends responses of RPC back to clients.
913   protected class Responder extends Thread {
914     private final Selector writeSelector;
915     private final Set<Connection> writingCons =
916         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
917
918     Responder() throws IOException {
919       this.setName("RpcServer.responder");
920       this.setDaemon(true);
921       this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
922       writeSelector = Selector.open(); // create a selector
923     }
924
925     @Override
926     public void run() {
927       LOG.info(getName() + ": starting");
928       try {
929         doRunLoop();
930       } finally {
931         LOG.info(getName() + ": stopping");
932         try {
933           writeSelector.close();
934         } catch (IOException ioe) {
935           LOG.error(getName() + ": couldn't close write selector", ioe);
936         }
937       }
938     }
939
940     /**
941      * Take the list of the connections that want to write, and register them
942      * in the selector.
943      */
944     private void registerWrites() {
945       Iterator<Connection> it = writingCons.iterator();
946       while (it.hasNext()) {
947         Connection c = it.next();
948         it.remove();
949         SelectionKey sk = c.channel.keyFor(writeSelector);
950         try {
951           if (sk == null) {
952             try {
953               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
954             } catch (ClosedChannelException e) {
955               // ignore: the client went away.
956               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
957             }
958           } else {
959             sk.interestOps(SelectionKey.OP_WRITE);
960           }
961         } catch (CancelledKeyException e) {
962           // ignore: the client went away.
963           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
964         }
965       }
966     }
967
968     /**
969      * Add a connection to the list that want to write,
970      */
971     public void registerForWrite(Connection c) {
972       if (writingCons.add(c)) {
973         writeSelector.wakeup();
974       }
975     }
976
977     private void doRunLoop() {
978       long lastPurgeTime = 0;   // last check for old calls.
979       while (running) {
980         try {
981           registerWrites();
982           int keyCt = writeSelector.select(purgeTimeout);
983           if (keyCt == 0) {
984             continue;
985           }
986
987           Set<SelectionKey> keys = writeSelector.selectedKeys();
988           Iterator<SelectionKey> iter = keys.iterator();
989           while (iter.hasNext()) {
990             SelectionKey key = iter.next();
991             iter.remove();
992             try {
993               if (key.isValid() && key.isWritable()) {
994                 doAsyncWrite(key);
995               }
996             } catch (IOException e) {
997               LOG.debug(getName() + ": asyncWrite", e);
998             }
999           }
1000
1001           lastPurgeTime = purge(lastPurgeTime);
1002
1003         } catch (OutOfMemoryError e) {
1004           if (errorHandler != null) {
1005             if (errorHandler.checkOOME(e)) {
1006               LOG.info(getName() + ": exiting on OutOfMemoryError");
1007               return;
1008             }
1009           } else {
1010             //
1011             // we can run out of memory if we have too many threads
1012             // log the event and sleep for a minute and give
1013             // some thread(s) a chance to finish
1014             //
1015             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1016             try {
1017               Thread.sleep(60000);
1018             } catch (InterruptedException ex) {
1019               LOG.debug("Interrupted while sleeping");
1020               return;
1021             }
1022           }
1023         } catch (Exception e) {
1024           LOG.warn(getName() + ": exception in Responder " +
1025               StringUtils.stringifyException(e), e);
1026         }
1027       }
1028       LOG.info(getName() + ": stopped");
1029     }
1030
1031     /**
1032      * If there were some calls that have not been sent out for a
1033      * long time, we close the connection.
1034      * @return the time of the purge.
1035      */
1036     private long purge(long lastPurgeTime) {
1037       long now = System.currentTimeMillis();
1038       if (now < lastPurgeTime + purgeTimeout) {
1039         return lastPurgeTime;
1040       }
1041
1042       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1043       // get the list of channels from list of keys.
1044       synchronized (writeSelector.keys()) {
1045         for (SelectionKey key : writeSelector.keys()) {
1046           Connection connection = (Connection) key.attachment();
1047           if (connection == null) {
1048             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1049           }
1050           Call call = connection.responseQueue.peekFirst();
1051           if (call != null && now > call.timestamp + purgeTimeout) {
1052             conWithOldCalls.add(call.connection);
1053           }
1054         }
1055       }
1056
1057       // Seems safer to close the connection outside of the synchronized loop...
1058       for (Connection connection : conWithOldCalls) {
1059         closeConnection(connection);
1060       }
1061
1062       return now;
1063     }
1064
1065     private void doAsyncWrite(SelectionKey key) throws IOException {
1066       Connection connection = (Connection) key.attachment();
1067       if (connection == null) {
1068         throw new IOException("doAsyncWrite: no connection");
1069       }
1070       if (key.channel() != connection.channel) {
1071         throw new IOException("doAsyncWrite: bad channel");
1072       }
1073
1074       if (processAllResponses(connection)) {
1075         try {
1076           // We wrote everything, so we don't need to be told when the socket is ready for
1077           //  write anymore.
1078          key.interestOps(0);
1079         } catch (CancelledKeyException e) {
1080           /* The Listener/reader might have closed the socket.
1081            * We don't explicitly cancel the key, so not sure if this will
1082            * ever fire.
1083            * This warning could be removed.
1084            */
1085           LOG.warn("Exception while changing ops : " + e);
1086         }
1087       }
1088     }
1089
1090     /**
1091      * Process the response for this call. You need to have the lock on
1092      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1093      *
1094      * @param call the call
1095      * @return true if we proceed the call fully, false otherwise.
1096      * @throws IOException
1097      */
1098     private boolean processResponse(final Call call) throws IOException {
1099       boolean error = true;
1100       try {
1101         // Send as much data as we can in the non-blocking fashion
1102         long numBytes = channelWrite(call.connection.channel, call.response);
1103         if (numBytes < 0) {
1104           throw new HBaseIOException("Error writing on the socket " +
1105             "for the call:" + call.toShortString());
1106         }
1107         error = false;
1108       } finally {
1109         if (error) {
1110           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1111           // We will be closing this connection itself. Mark this call as done so that all the
1112           // buffer(s) it got from pool can get released
1113           call.done();
1114           closeConnection(call.connection);
1115         }
1116       }
1117
1118       if (!call.response.hasRemaining()) {
1119         call.done();
1120         return true;
1121       } else {
1122         return false; // Socket can't take more, we will have to come back.
1123       }
1124     }
1125
1126     /**
1127      * Process all the responses for this connection
1128      *
1129      * @return true if all the calls were processed or that someone else is doing it.
1130      * false if there * is still some work to do. In this case, we expect the caller to
1131      * delay us.
1132      * @throws IOException
1133      */
1134     private boolean processAllResponses(final Connection connection) throws IOException {
1135       // We want only one writer on the channel for a connection at a time.
1136       connection.responseWriteLock.lock();
1137       try {
1138         for (int i = 0; i < 20; i++) {
1139           // protection if some handlers manage to need all the responder
1140           Call call = connection.responseQueue.pollFirst();
1141           if (call == null) {
1142             return true;
1143           }
1144           if (!processResponse(call)) {
1145             connection.responseQueue.addFirst(call);
1146             return false;
1147           }
1148         }
1149       } finally {
1150         connection.responseWriteLock.unlock();
1151       }
1152
1153       return connection.responseQueue.isEmpty();
1154     }
1155
1156     //
1157     // Enqueue a response from the application.
1158     //
1159     void doRespond(Call call) throws IOException {
1160       boolean added = false;
1161
1162       // If there is already a write in progress, we don't wait. This allows to free the handlers
1163       //  immediately for other tasks.
1164       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1165         try {
1166           if (call.connection.responseQueue.isEmpty()) {
1167             // If we're alone, we can try to do a direct call to the socket. It's
1168             //  an optimisation to save on context switches and data transfer between cores..
1169             if (processResponse(call)) {
1170               return; // we're done.
1171             }
1172             // Too big to fit, putting ahead.
1173             call.connection.responseQueue.addFirst(call);
1174             added = true; // We will register to the selector later, outside of the lock.
1175           }
1176         } finally {
1177           call.connection.responseWriteLock.unlock();
1178         }
1179       }
1180
1181       if (!added) {
1182         call.connection.responseQueue.addLast(call);
1183       }
1184       call.responder.registerForWrite(call.connection);
1185
1186       // set the serve time when the response has to be sent later
1187       call.timestamp = System.currentTimeMillis();
1188     }
1189   }
1190
1191   /** Reads calls from a connection and queues them for handling. */
1192   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1193       value="VO_VOLATILE_INCREMENT",
1194       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1195   public class Connection {
1196     // If initial preamble with version and magic has been read or not.
1197     private boolean connectionPreambleRead = false;
1198     // If the connection header has been read or not.
1199     private boolean connectionHeaderRead = false;
1200     protected SocketChannel channel;
1201     private ByteBuffer data;
1202     private ByteBuffer dataLengthBuffer;
1203     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1204     private final Lock responseWriteLock = new ReentrantLock();
1205     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1206     private long lastContact;
1207     private InetAddress addr;
1208     protected Socket socket;
1209     // Cache the remote host & port info so that even if the socket is
1210     // disconnected, we can say where it used to connect to.
1211     protected String hostAddress;
1212     protected int remotePort;
1213     ConnectionHeader connectionHeader;
1214
1215     /**
1216      * Codec the client asked use.
1217      */
1218     private Codec codec;
1219     /**
1220      * Compression codec the client asked us use.
1221      */
1222     private CompressionCodec compressionCodec;
1223     BlockingService service;
1224
1225     private AuthMethod authMethod;
1226     private boolean saslContextEstablished;
1227     private boolean skipInitialSaslHandshake;
1228     private ByteBuffer unwrappedData;
1229     // When is this set?  FindBugs wants to know!  Says NP
1230     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1231     boolean useSasl;
1232     SaslServer saslServer;
1233     private boolean useWrap = false;
1234     // Fake 'call' for failed authorization response
1235     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1236     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1237         null, null, this, null, 0, null, null, 0);
1238     private ByteArrayOutputStream authFailedResponse =
1239         new ByteArrayOutputStream();
1240     // Fake 'call' for SASL context setup
1241     private static final int SASL_CALLID = -33;
1242     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1243         0, null, null, 0);
1244
1245     // was authentication allowed with a fallback to simple auth
1246     private boolean authenticatedWithFallback;
1247
1248     private boolean retryImmediatelySupported = false;
1249
1250     public UserGroupInformation attemptingUser = null; // user name before auth
1251     protected User user = null;
1252     protected UserGroupInformation ugi = null;
1253
1254     public Connection(SocketChannel channel, long lastContact) {
1255       this.channel = channel;
1256       this.lastContact = lastContact;
1257       this.data = null;
1258       this.dataLengthBuffer = ByteBuffer.allocate(4);
1259       this.socket = channel.socket();
1260       this.addr = socket.getInetAddress();
1261       if (addr == null) {
1262         this.hostAddress = "*Unknown*";
1263       } else {
1264         this.hostAddress = addr.getHostAddress();
1265       }
1266       this.remotePort = socket.getPort();
1267       if (socketSendBufferSize != 0) {
1268         try {
1269           socket.setSendBufferSize(socketSendBufferSize);
1270         } catch (IOException e) {
1271           LOG.warn("Connection: unable to set socket send buffer size to " +
1272                    socketSendBufferSize);
1273         }
1274       }
1275     }
1276
1277       @Override
1278     public String toString() {
1279       return getHostAddress() + ":" + remotePort;
1280     }
1281
1282     public String getHostAddress() {
1283       return hostAddress;
1284     }
1285
1286     public InetAddress getHostInetAddress() {
1287       return addr;
1288     }
1289
1290     public int getRemotePort() {
1291       return remotePort;
1292     }
1293
1294     public void setLastContact(long lastContact) {
1295       this.lastContact = lastContact;
1296     }
1297
1298     public VersionInfo getVersionInfo() {
1299       if (connectionHeader.hasVersionInfo()) {
1300         return connectionHeader.getVersionInfo();
1301       }
1302       return null;
1303     }
1304
1305     public long getLastContact() {
1306       return lastContact;
1307     }
1308
1309     /* Return true if the connection has no outstanding rpc */
1310     private boolean isIdle() {
1311       return rpcCount.get() == 0;
1312     }
1313
1314     /* Decrement the outstanding RPC count */
1315     protected void decRpcCount() {
1316       rpcCount.decrement();
1317     }
1318
1319     /* Increment the outstanding RPC count */
1320     protected void incRpcCount() {
1321       rpcCount.increment();
1322     }
1323
1324     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1325         throws IOException {
1326       UserGroupInformation authorizedUgi;
1327       if (authMethod == AuthMethod.DIGEST) {
1328         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1329             secretManager);
1330         authorizedUgi = tokenId.getUser();
1331         if (authorizedUgi == null) {
1332           throw new AccessDeniedException(
1333               "Can't retrieve username from tokenIdentifier.");
1334         }
1335         authorizedUgi.addTokenIdentifier(tokenId);
1336       } else {
1337         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1338       }
1339       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1340       return authorizedUgi;
1341     }
1342
1343     private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1344         InterruptedException {
1345       if (saslContextEstablished) {
1346         if (LOG.isTraceEnabled())
1347           LOG.trace("Have read input token of size " + saslToken.limit()
1348               + " for processing by saslServer.unwrap()");
1349
1350         if (!useWrap) {
1351           processOneRpc(saslToken);
1352         } else {
1353           byte[] b = saslToken.array();
1354           byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1355           processUnwrappedData(plaintextData);
1356         }
1357       } else {
1358         byte[] replyToken;
1359         try {
1360           if (saslServer == null) {
1361             switch (authMethod) {
1362             case DIGEST:
1363               if (secretManager == null) {
1364                 throw new AccessDeniedException(
1365                     "Server is not configured to do DIGEST authentication.");
1366               }
1367               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1368                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1369                   HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
1370                       secretManager, this));
1371               break;
1372             default:
1373               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1374               String fullName = current.getUserName();
1375               if (LOG.isDebugEnabled()) {
1376                 LOG.debug("Kerberos principal name is " + fullName);
1377               }
1378               final String names[] = SaslUtil.splitKerberosName(fullName);
1379               if (names.length != 3) {
1380                 throw new AccessDeniedException(
1381                     "Kerberos principal name does NOT have the expected "
1382                         + "hostname part: " + fullName);
1383               }
1384               current.doAs(new PrivilegedExceptionAction<Object>() {
1385                 @Override
1386                 public Object run() throws SaslException {
1387                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1388                       .getMechanismName(), names[0], names[1],
1389                       HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
1390                   return null;
1391                 }
1392               });
1393             }
1394             if (saslServer == null)
1395               throw new AccessDeniedException(
1396                   "Unable to find SASL server implementation for "
1397                       + authMethod.getMechanismName());
1398             if (LOG.isDebugEnabled()) {
1399               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1400             }
1401           }
1402           if (LOG.isDebugEnabled()) {
1403             LOG.debug("Have read input token of size " + saslToken.limit()
1404                 + " for processing by saslServer.evaluateResponse()");
1405           }
1406           replyToken = saslServer.evaluateResponse(saslToken.array());
1407         } catch (IOException e) {
1408           IOException sendToClient = e;
1409           Throwable cause = e;
1410           while (cause != null) {
1411             if (cause instanceof InvalidToken) {
1412               sendToClient = (InvalidToken) cause;
1413               break;
1414             }
1415             cause = cause.getCause();
1416           }
1417           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1418             sendToClient.getLocalizedMessage());
1419           metrics.authenticationFailure();
1420           String clientIP = this.toString();
1421           // attempting user could be null
1422           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1423           throw e;
1424         }
1425         if (replyToken != null) {
1426           if (LOG.isDebugEnabled()) {
1427             LOG.debug("Will send token of size " + replyToken.length
1428                 + " from saslServer.");
1429           }
1430           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1431               null);
1432         }
1433         if (saslServer.isComplete()) {
1434           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1435           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1436           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1437           if (LOG.isDebugEnabled()) {
1438             LOG.debug("SASL server context established. Authenticated client: "
1439               + ugi + ". Negotiated QoP is "
1440               + saslServer.getNegotiatedProperty(Sasl.QOP));
1441           }
1442           metrics.authenticationSuccess();
1443           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1444           saslContextEstablished = true;
1445         }
1446       }
1447     }
1448
1449     /**
1450      * No protobuf encoding of raw sasl messages
1451      */
1452     private void doRawSaslReply(SaslStatus status, Writable rv,
1453         String errorClass, String error) throws IOException {
1454       ByteBufferOutputStream saslResponse = null;
1455       DataOutputStream out = null;
1456       try {
1457         // In my testing, have noticed that sasl messages are usually
1458         // in the ballpark of 100-200. That's why the initial capacity is 256.
1459         saslResponse = new ByteBufferOutputStream(256);
1460         out = new DataOutputStream(saslResponse);
1461         out.writeInt(status.state); // write status
1462         if (status == SaslStatus.SUCCESS) {
1463           rv.write(out);
1464         } else {
1465           WritableUtils.writeString(out, errorClass);
1466           WritableUtils.writeString(out, error);
1467         }
1468         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1469         saslCall.responder = responder;
1470         saslCall.sendResponseIfReady();
1471       } finally {
1472         if (saslResponse != null) {
1473           saslResponse.close();
1474         }
1475         if (out != null) {
1476           out.close();
1477         }
1478       }
1479     }
1480
1481     private void disposeSasl() {
1482       if (saslServer != null) {
1483         try {
1484           saslServer.dispose();
1485           saslServer = null;
1486         } catch (SaslException ignored) {
1487           // Ignored. This is being disposed of anyway.
1488         }
1489       }
1490     }
1491
1492     private int readPreamble() throws IOException {
1493       int count;
1494       // Check for 'HBas' magic.
1495       this.dataLengthBuffer.flip();
1496       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1497         return doBadPreambleHandling("Expected HEADER=" +
1498             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1499             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1500             " from " + toString());
1501       }
1502       // Now read the next two bytes, the version and the auth to use.
1503       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1504       count = channelRead(channel, versionAndAuthBytes);
1505       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1506         return count;
1507       }
1508       int version = versionAndAuthBytes.get(0);
1509       byte authbyte = versionAndAuthBytes.get(1);
1510       this.authMethod = AuthMethod.valueOf(authbyte);
1511       if (version != CURRENT_VERSION) {
1512         String msg = getFatalConnectionString(version, authbyte);
1513         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1514       }
1515       if (authMethod == null) {
1516         String msg = getFatalConnectionString(version, authbyte);
1517         return doBadPreambleHandling(msg, new BadAuthException(msg));
1518       }
1519       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1520         if (allowFallbackToSimpleAuth) {
1521           metrics.authenticationFallback();
1522           authenticatedWithFallback = true;
1523         } else {
1524           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1525           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1526           responder.doRespond(authFailedCall);
1527           throw ae;
1528         }
1529       }
1530       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1531         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1532             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1533         authMethod = AuthMethod.SIMPLE;
1534         // client has already sent the initial Sasl message and we
1535         // should ignore it. Both client and server should fall back
1536         // to simple auth from now on.
1537         skipInitialSaslHandshake = true;
1538       }
1539       if (authMethod != AuthMethod.SIMPLE) {
1540         useSasl = true;
1541       }
1542
1543       dataLengthBuffer.clear();
1544       connectionPreambleRead = true;
1545       return count;
1546     }
1547
1548     private int read4Bytes() throws IOException {
1549       if (this.dataLengthBuffer.remaining() > 0) {
1550         return channelRead(channel, this.dataLengthBuffer);
1551       } else {
1552         return 0;
1553       }
1554     }
1555
1556
1557     /**
1558      * Read off the wire. If there is not enough data to read, update the connection state with
1559      *  what we have and returns.
1560      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1561      * @throws IOException
1562      * @throws InterruptedException
1563      */
1564     public int readAndProcess() throws IOException, InterruptedException {
1565       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1566       // does, read in the rest of the connection preamble, the version and the auth method.
1567       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1568       // length into the 4-byte this.dataLengthBuffer.
1569       int count = read4Bytes();
1570       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1571         return count;
1572       }
1573
1574       // If we have not read the connection setup preamble, look to see if that is on the wire.
1575       if (!connectionPreambleRead) {
1576         count = readPreamble();
1577         if (!connectionPreambleRead) {
1578           return count;
1579         }
1580
1581         count = read4Bytes();
1582         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1583           return count;
1584         }
1585       }
1586
1587       // We have read a length and we have read the preamble.  It is either the connection header
1588       // or it is a request.
1589       if (data == null) {
1590         dataLengthBuffer.flip();
1591         int dataLength = dataLengthBuffer.getInt();
1592         if (dataLength == RpcClient.PING_CALL_ID) {
1593           if (!useWrap) { //covers the !useSasl too
1594             dataLengthBuffer.clear();
1595             return 0;  //ping message
1596           }
1597         }
1598         if (dataLength < 0) { // A data length of zero is legal.
1599           throw new DoNotRetryIOException("Unexpected data length "
1600               + dataLength + "!! from " + getHostAddress());
1601         }
1602
1603         if (dataLength > maxRequestSize) {
1604           throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
1605               + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
1606               + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
1607         }
1608
1609         data = ByteBuffer.allocate(dataLength);
1610
1611         // Increment the rpc count. This counter will be decreased when we write
1612         //  the response.  If we want the connection to be detected as idle properly, we
1613         //  need to keep the inc / dec correct.
1614         incRpcCount();
1615       }
1616
1617       count = channelRead(channel, data);
1618
1619       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1620         process();
1621       }
1622
1623       return count;
1624     }
1625
1626     /**
1627      * Process the data buffer and clean the connection state for the next call.
1628      */
1629     private void process() throws IOException, InterruptedException {
1630       data.flip();
1631       try {
1632         if (skipInitialSaslHandshake) {
1633           skipInitialSaslHandshake = false;
1634           return;
1635         }
1636
1637         if (useSasl) {
1638           saslReadAndProcess(data);
1639         } else {
1640           processOneRpc(data);
1641         }
1642
1643       } finally {
1644         dataLengthBuffer.clear(); // Clean for the next call
1645         data = null; // For the GC
1646       }
1647     }
1648
1649     private String getFatalConnectionString(final int version, final byte authByte) {
1650       return "serverVersion=" + CURRENT_VERSION +
1651       ", clientVersion=" + version + ", authMethod=" + authByte +
1652       ", authSupported=" + (authMethod != null) + " from " + toString();
1653     }
1654
1655     private int doBadPreambleHandling(final String msg) throws IOException {
1656       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1657     }
1658
1659     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1660       LOG.warn(msg);
1661       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
1662       setupResponse(null, fakeCall, e, msg);
1663       responder.doRespond(fakeCall);
1664       // Returning -1 closes out the connection.
1665       return -1;
1666     }
1667
1668     // Reads the connection header following version
1669     private void processConnectionHeader(ByteBuffer buf) throws IOException {
1670       this.connectionHeader = ConnectionHeader.parseFrom(
1671         new ByteBufferInputStream(buf));
1672       String serviceName = connectionHeader.getServiceName();
1673       if (serviceName == null) throw new EmptyServiceNameException();
1674       this.service = getService(services, serviceName);
1675       if (this.service == null) throw new UnknownServiceException(serviceName);
1676       setupCellBlockCodecs(this.connectionHeader);
1677       UserGroupInformation protocolUser = createUser(connectionHeader);
1678       if (!useSasl) {
1679         ugi = protocolUser;
1680         if (ugi != null) {
1681           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1682         }
1683         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1684         if (authenticatedWithFallback) {
1685           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1686               + " connecting from " + getHostAddress());
1687         }
1688         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1689       } else {
1690         // user is authenticated
1691         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1692         //Now we check if this is a proxy user case. If the protocol user is
1693         //different from the 'user', it is a proxy user scenario. However,
1694         //this is not allowed if user authenticated with DIGEST.
1695         if ((protocolUser != null)
1696             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1697           if (authMethod == AuthMethod.DIGEST) {
1698             // Not allowed to doAs if token authentication is used
1699             throw new AccessDeniedException("Authenticated user (" + ugi
1700                 + ") doesn't match what the client claims to be ("
1701                 + protocolUser + ")");
1702           } else {
1703             // Effective user can be different from authenticated user
1704             // for simple auth or kerberos auth
1705             // The user is the real user. Now we create a proxy user
1706             UserGroupInformation realUser = ugi;
1707             ugi = UserGroupInformation.createProxyUser(protocolUser
1708                 .getUserName(), realUser);
1709             // Now the user is a proxy user, set Authentication method Proxy.
1710             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1711           }
1712         }
1713       }
1714       if (connectionHeader.hasVersionInfo()) {
1715         // see if this connection will support RetryImmediatelyException
1716         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1717
1718         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1719             + " with version info: "
1720             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1721       } else {
1722         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1723             + " with unknown version info");
1724       }
1725
1726
1727     }
1728
1729     /**
1730      * Set up cell block codecs
1731      * @throws FatalConnectionException
1732      */
1733     private void setupCellBlockCodecs(final ConnectionHeader header)
1734     throws FatalConnectionException {
1735       // TODO: Plug in other supported decoders.
1736       if (!header.hasCellBlockCodecClass()) return;
1737       String className = header.getCellBlockCodecClass();
1738       if (className == null || className.length() == 0) return;
1739       try {
1740         this.codec = (Codec)Class.forName(className).newInstance();
1741       } catch (Exception e) {
1742         throw new UnsupportedCellCodecException(className, e);
1743       }
1744       if (!header.hasCellBlockCompressorClass()) return;
1745       className = header.getCellBlockCompressorClass();
1746       try {
1747         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1748       } catch (Exception e) {
1749         throw new UnsupportedCompressionCodecException(className, e);
1750       }
1751     }
1752
1753     private void processUnwrappedData(byte[] inBuf) throws IOException,
1754     InterruptedException {
1755       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1756       // Read all RPCs contained in the inBuf, even partial ones
1757       while (true) {
1758         int count;
1759         if (unwrappedDataLengthBuffer.remaining() > 0) {
1760           count = channelRead(ch, unwrappedDataLengthBuffer);
1761           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1762             return;
1763         }
1764
1765         if (unwrappedData == null) {
1766           unwrappedDataLengthBuffer.flip();
1767           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1768
1769           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1770             if (LOG.isDebugEnabled())
1771               LOG.debug("Received ping message");
1772             unwrappedDataLengthBuffer.clear();
1773             continue; // ping message
1774           }
1775           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1776         }
1777
1778         count = channelRead(ch, unwrappedData);
1779         if (count <= 0 || unwrappedData.remaining() > 0)
1780           return;
1781
1782         if (unwrappedData.remaining() == 0) {
1783           unwrappedDataLengthBuffer.clear();
1784           unwrappedData.flip();
1785           processOneRpc(unwrappedData);
1786           unwrappedData = null;
1787         }
1788       }
1789     }
1790
1791
1792     private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1793       if (connectionHeaderRead) {
1794         processRequest(buf);
1795       } else {
1796         processConnectionHeader(buf);
1797         this.connectionHeaderRead = true;
1798         if (!authorizeConnection()) {
1799           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1800           // down the connection instead of trying to read non-existent retun.
1801           throw new AccessDeniedException("Connection from " + this + " for service " +
1802             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1803         }
1804         this.user = userProvider.create(this.ugi);
1805       }
1806     }
1807
1808     /**
1809      * @param buf Has the request header and the request param and optionally encoded data buffer
1810      * all in this one array.
1811      * @throws IOException
1812      * @throws InterruptedException
1813      */
1814     protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1815       long totalRequestSize = buf.limit();
1816       int offset = 0;
1817       // Here we read in the header.  We avoid having pb
1818       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1819       CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1820       int headerSize = cis.readRawVarint32();
1821       offset = cis.getTotalBytesRead();
1822       Message.Builder builder = RequestHeader.newBuilder();
1823       ProtobufUtil.mergeFrom(builder, cis, headerSize);
1824       RequestHeader header = (RequestHeader) builder.build();
1825       offset += headerSize;
1826       int id = header.getCallId();
1827       if (LOG.isTraceEnabled()) {
1828         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1829           " totalRequestSize: " + totalRequestSize + " bytes");
1830       }
1831       // Enforcing the call queue size, this triggers a retry in the client
1832       // This is a bit late to be doing this check - we have already read in the total request.
1833       if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
1834         final Call callTooBig =
1835           new Call(id, this.service, null, null, null, null, this,
1836             responder, totalRequestSize, null, null, 0);
1837         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1838         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1839         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1840             "Call queue is full on " + server.getServerName() +
1841                 ", is hbase.ipc.server.max.callqueue.size too small?");
1842         responder.doRespond(callTooBig);
1843         return;
1844       }
1845       MethodDescriptor md = null;
1846       Message param = null;
1847       CellScanner cellScanner = null;
1848       try {
1849         if (header.hasRequestParam() && header.getRequestParam()) {
1850           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1851           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1852           builder = this.service.getRequestPrototype(md).newBuilderForType();
1853           cis.resetSizeCounter();
1854           int paramSize = cis.readRawVarint32();
1855           offset += cis.getTotalBytesRead();
1856           if (builder != null) {
1857             ProtobufUtil.mergeFrom(builder, cis, paramSize);
1858             param = builder.build();
1859           }
1860           offset += paramSize;
1861         }
1862         if (header.hasCellBlockMeta()) {
1863           buf.position(offset);
1864           cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
1865         }
1866       } catch (Throwable t) {
1867         InetSocketAddress address = getListenerAddress();
1868         String msg = (address != null ? address : "(channel closed)") +
1869             " is unable to read call parameter from client " + getHostAddress();
1870         LOG.warn(msg, t);
1871
1872         metrics.exception(t);
1873
1874         // probably the hbase hadoop version does not match the running hadoop version
1875         if (t instanceof LinkageError) {
1876           t = new DoNotRetryIOException(t);
1877         }
1878         // If the method is not present on the server, do not retry.
1879         if (t instanceof UnsupportedOperationException) {
1880           t = new DoNotRetryIOException(t);
1881         }
1882
1883         final Call readParamsFailedCall =
1884           new Call(id, this.service, null, null, null, null, this,
1885             responder, totalRequestSize, null, null, 0);
1886         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1887         setupResponse(responseBuffer, readParamsFailedCall, t,
1888           msg + "; " + t.getMessage());
1889         responder.doRespond(readParamsFailedCall);
1890         return;
1891       }
1892
1893       TraceInfo traceInfo = header.hasTraceInfo()
1894           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1895           : null;
1896       int timeout = 0;
1897       if (header.hasTimeout()){
1898         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
1899       }
1900       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1901               totalRequestSize, traceInfo, this.addr, timeout);
1902
1903       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1904         callQueueSizeInBytes.add(-1 * call.getSize());
1905
1906         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1907         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1908         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1909             "Call queue is full on " + server.getServerName() +
1910                 ", too many items queued ?");
1911         responder.doRespond(call);
1912       }
1913     }
1914
1915     private boolean authorizeConnection() throws IOException {
1916       try {
1917         // If auth method is DIGEST, the token was obtained by the
1918         // real user for the effective user, therefore not required to
1919         // authorize real user. doAs is allowed only for simple or kerberos
1920         // authentication
1921         if (ugi != null && ugi.getRealUser() != null
1922             && (authMethod != AuthMethod.DIGEST)) {
1923           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1924         }
1925         authorize(ugi, connectionHeader, getHostInetAddress());
1926         metrics.authorizationSuccess();
1927       } catch (AuthorizationException ae) {
1928         if (LOG.isDebugEnabled()) {
1929           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1930         }
1931         metrics.authorizationFailure();
1932         setupResponse(authFailedResponse, authFailedCall,
1933           new AccessDeniedException(ae), ae.getMessage());
1934         responder.doRespond(authFailedCall);
1935         return false;
1936       }
1937       return true;
1938     }
1939
1940     protected synchronized void close() {
1941       disposeSasl();
1942       data = null;
1943       if (!channel.isOpen())
1944         return;
1945       try {socket.shutdownOutput();} catch(Exception ignored) {
1946         if (LOG.isTraceEnabled()) {
1947           LOG.trace("Ignored exception", ignored);
1948         }
1949       }
1950       if (channel.isOpen()) {
1951         try {channel.close();} catch(Exception ignored) {}
1952       }
1953       try {
1954         socket.close();
1955       } catch(Exception ignored) {
1956         if (LOG.isTraceEnabled()) {
1957           LOG.trace("Ignored exception", ignored);
1958         }
1959       }
1960     }
1961
1962     private UserGroupInformation createUser(ConnectionHeader head) {
1963       UserGroupInformation ugi = null;
1964
1965       if (!head.hasUserInfo()) {
1966         return null;
1967       }
1968       UserInformation userInfoProto = head.getUserInfo();
1969       String effectiveUser = null;
1970       if (userInfoProto.hasEffectiveUser()) {
1971         effectiveUser = userInfoProto.getEffectiveUser();
1972       }
1973       String realUser = null;
1974       if (userInfoProto.hasRealUser()) {
1975         realUser = userInfoProto.getRealUser();
1976       }
1977       if (effectiveUser != null) {
1978         if (realUser != null) {
1979           UserGroupInformation realUserUgi =
1980               UserGroupInformation.createRemoteUser(realUser);
1981           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1982         } else {
1983           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1984         }
1985       }
1986       return ugi;
1987     }
1988   }
1989
1990   /**
1991    * Datastructure for passing a {@link BlockingService} and its associated class of
1992    * protobuf service interface.  For example, a server that fielded what is defined
1993    * in the client protobuf service would pass in an implementation of the client blocking service
1994    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1995    */
1996   public static class BlockingServiceAndInterface {
1997     private final BlockingService service;
1998     private final Class<?> serviceInterface;
1999     public BlockingServiceAndInterface(final BlockingService service,
2000         final Class<?> serviceInterface) {
2001       this.service = service;
2002       this.serviceInterface = serviceInterface;
2003     }
2004     public Class<?> getServiceInterface() {
2005       return this.serviceInterface;
2006     }
2007     public BlockingService getBlockingService() {
2008       return this.service;
2009     }
2010   }
2011
2012   /**
2013    * Constructs a server listening on the named port and address.
2014    * @param server hosting instance of {@link Server}. We will do authentications if an
2015    * instance else pass null for no authentication check.
2016    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2017    * @param services A list of services.
2018    * @param bindAddress Where to listen
2019    * @param conf
2020    * @param scheduler
2021    */
2022   public RpcServer(final Server server, final String name,
2023       final List<BlockingServiceAndInterface> services,
2024       final InetSocketAddress bindAddress, Configuration conf,
2025       RpcScheduler scheduler)
2026       throws IOException {
2027     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2028       this.reservoir = new ByteBufferPool(
2029           conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
2030           conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
2031               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2032                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2033     } else {
2034       reservoir = null;
2035     }
2036     this.server = server;
2037     this.services = services;
2038     this.bindAddress = bindAddress;
2039     this.conf = conf;
2040     this.socketSendBufferSize = 0;
2041     // See declaration above for documentation on what this size is.
2042     this.maxQueueSizeInBytes =
2043       this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2044     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2045     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2046       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2047     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2048     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2049     this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
2050         DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
2051     this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
2052
2053     // Start the listener here and let it bind to the port
2054     listener = new Listener(name);
2055     this.port = listener.getAddress().getPort();
2056
2057     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2058     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2059     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2060
2061     this.ipcUtil = new IPCUtil(conf);
2062
2063
2064     // Create the responder here
2065     responder = new Responder();
2066     connectionManager = new ConnectionManager();
2067     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2068     this.userProvider = UserProvider.instantiate(conf);
2069     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2070     if (isSecurityEnabled) {
2071       HBaseSaslRpcServer.init(conf);
2072     }
2073     initReconfigurable(conf);
2074
2075     this.scheduler = scheduler;
2076     this.scheduler.init(new RpcSchedulerContext(this));
2077   }
2078
2079   @Override
2080   public void onConfigurationChange(Configuration newConf) {
2081     initReconfigurable(newConf);
2082     if (scheduler instanceof ConfigurationObserver) {
2083       ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
2084     }
2085   }
2086
2087   private void initReconfigurable(Configuration confToLoad) {
2088     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2089     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2090       LOG.warn("********* WARNING! *********");
2091       LOG.warn("This server is configured to allow connections from INSECURE clients");
2092       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2093       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2094       LOG.warn("impersonation is possible!");
2095       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2096       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2097       LOG.warn("****************************");
2098     }
2099   }
2100
2101   /**
2102    * Subclasses of HBaseServer can override this to provide their own
2103    * Connection implementations.
2104    */
2105   protected Connection getConnection(SocketChannel channel, long time) {
2106     return new Connection(channel, time);
2107   }
2108
2109   /**
2110    * Setup response for the RPC Call.
2111    *
2112    * @param response buffer to serialize the response into
2113    * @param call {@link Call} to which we are setting up the response
2114    * @param error error message, if the call failed
2115    * @throws IOException
2116    */
2117   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2118   throws IOException {
2119     if (response != null) response.reset();
2120     call.setResponse(null, null, t, error);
2121   }
2122
2123   protected void closeConnection(Connection connection) {
2124     connectionManager.close(connection);
2125   }
2126
2127   Configuration getConf() {
2128     return conf;
2129   }
2130
2131   /** Sets the socket buffer size used for responding to RPCs.
2132    * @param size send size
2133    */
2134   @Override
2135   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2136
2137   @Override
2138   public boolean isStarted() {
2139     return this.started;
2140   }
2141
2142   /** Starts the service.  Must be called before any calls will be handled. */
2143   @Override
2144   public synchronized void start() {
2145     if (started) return;
2146     authTokenSecretMgr = createSecretManager();
2147     if (authTokenSecretMgr != null) {
2148       setSecretManager(authTokenSecretMgr);
2149       authTokenSecretMgr.start();
2150     }
2151     this.authManager = new ServiceAuthorizationManager();
2152     HBasePolicyProvider.init(conf, authManager);
2153     responder.start();
2154     listener.start();
2155     scheduler.start();
2156     started = true;
2157   }
2158
2159   @Override
2160   public synchronized void refreshAuthManager(PolicyProvider pp) {
2161     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2162     // it'll break if you go via static route.
2163     this.authManager.refresh(this.conf, pp);
2164   }
2165
2166   private AuthenticationTokenSecretManager createSecretManager() {
2167     if (!isSecurityEnabled) return null;
2168     if (server == null) return null;
2169     Configuration conf = server.getConfiguration();
2170     long keyUpdateInterval =
2171         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2172     long maxAge =
2173         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2174     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2175         server.getServerName().toString(), keyUpdateInterval, maxAge);
2176   }
2177
2178   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2179     return this.secretManager;
2180   }
2181
2182   @SuppressWarnings("unchecked")
2183   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2184     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2185   }
2186
2187   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2188       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2189       throws IOException {
2190     return call(service, md, param, cellScanner, receiveTime, status, 0);
2191   }
2192
2193   /**
2194    * This is a server side method, which is invoked over RPC. On success
2195    * the return response has protobuf response payload. On failure, the
2196    * exception name and the stack trace are returned in the protobuf response.
2197    */
2198   @Override
2199   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2200       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
2201       int timeout)
2202   throws IOException {
2203     try {
2204       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2205       // TODO: Review after we add in encoded data blocks.
2206       status.setRPCPacket(param);
2207       status.resume("Servicing call");
2208       //get an instance of the method arg type
2209       long startTime = System.currentTimeMillis();
2210       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2211       controller.setCallTimeout(timeout);
2212       Message result = service.callBlockingMethod(md, controller, param);
2213       long endTime = System.currentTimeMillis();
2214       int processingTime = (int) (endTime - startTime);
2215       int qTime = (int) (startTime - receiveTime);
2216       int totalTime = (int) (endTime - receiveTime);
2217       if (LOG.isTraceEnabled()) {
2218         LOG.trace(CurCall.get().toString() +
2219             ", response " + TextFormat.shortDebugString(result) +
2220             " queueTime: " + qTime +
2221             " processingTime: " + processingTime +
2222             " totalTime: " + totalTime);
2223       }
2224       long requestSize = param.getSerializedSize();
2225       long responseSize = result.getSerializedSize();
2226       metrics.dequeuedCall(qTime);
2227       metrics.processedCall(processingTime);
2228       metrics.totalCall(totalTime);
2229       metrics.receivedRequest(requestSize);
2230       metrics.sentResponse(responseSize);
2231       // log any RPC responses that are slower than the configured warn
2232       // response time or larger than configured warning size
2233       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2234       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2235       if (tooSlow || tooLarge) {
2236         // when tagging, we let TooLarge trump TooSmall to keep output simple
2237         // note that large responses will often also be slow.
2238         logResponse(param,
2239             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2240             (tooLarge ? "TooLarge" : "TooSlow"),
2241             status.getClient(), startTime, processingTime, qTime,
2242             responseSize);
2243       }
2244       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2245     } catch (Throwable e) {
2246       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2247       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2248       // need to pass it over the wire.
2249       if (e instanceof ServiceException) {
2250         if (e.getCause() == null) {
2251           LOG.debug("Caught a ServiceException with null cause", e);
2252         } else {
2253           e = e.getCause();
2254         }
2255       }
2256
2257       // increment the number of requests that were exceptions.
2258       metrics.exception(e);
2259
2260       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2261       if (e instanceof IOException) throw (IOException)e;
2262       LOG.error("Unexpected throwable object ", e);
2263       throw new IOException(e.getMessage(), e);
2264     }
2265   }
2266
2267   /**
2268    * Logs an RPC response to the LOG file, producing valid JSON objects for
2269    * client Operations.
2270    * @param param The parameters received in the call.
2271    * @param methodName The name of the method invoked
2272    * @param call The string representation of the call
2273    * @param tag  The tag that will be used to indicate this event in the log.
2274    * @param clientAddress   The address of the client who made this call.
2275    * @param startTime       The time that the call was initiated, in ms.
2276    * @param processingTime  The duration that the call took to run, in ms.
2277    * @param qTime           The duration that the call spent on the queue
2278    *                        prior to being initiated, in ms.
2279    * @param responseSize    The size in bytes of the response buffer.
2280    */
2281   void logResponse(Message param, String methodName, String call, String tag,
2282       String clientAddress, long startTime, int processingTime, int qTime,
2283       long responseSize)
2284           throws IOException {
2285     // base information that is reported regardless of type of call
2286     Map<String, Object> responseInfo = new HashMap<String, Object>();
2287     responseInfo.put("starttimems", startTime);
2288     responseInfo.put("processingtimems", processingTime);
2289     responseInfo.put("queuetimems", qTime);
2290     responseInfo.put("responsesize", responseSize);
2291     responseInfo.put("client", clientAddress);
2292     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2293     responseInfo.put("method", methodName);
2294     responseInfo.put("call", call);
2295     responseInfo.put("param", ProtobufUtil.getShortTextFormat(param));
2296     LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2297   }
2298
2299   /** Stops the service.  No new calls will be handled after this is called. */
2300   @Override
2301   public synchronized void stop() {
2302     LOG.info("Stopping server on " + port);
2303     running = false;
2304     if (authTokenSecretMgr != null) {
2305       authTokenSecretMgr.stop();
2306       authTokenSecretMgr = null;
2307     }
2308     listener.interrupt();
2309     listener.doStop();
2310     responder.interrupt();
2311     scheduler.stop();
2312     notifyAll();
2313   }
2314
2315   /** Wait for the server to be stopped.
2316    * Does not wait for all subthreads to finish.
2317    *  See {@link #stop()}.
2318    * @throws InterruptedException e
2319    */
2320   @Override
2321   public synchronized void join() throws InterruptedException {
2322     while (running) {
2323       wait();
2324     }
2325   }
2326
2327   /**
2328    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2329    * the listener channel is closed.
2330    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2331    * information cannot be determined
2332    */
2333   @Override
2334   public synchronized InetSocketAddress getListenerAddress() {
2335     if (listener == null) {
2336       return null;
2337     }
2338     return listener.getAddress();
2339   }
2340
2341   /**
2342    * Set the handler for calling out of RPC for error conditions.
2343    * @param handler the handler implementation
2344    */
2345   @Override
2346   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2347     this.errorHandler = handler;
2348   }
2349
2350   @Override
2351   public HBaseRPCErrorHandler getErrorHandler() {
2352     return this.errorHandler;
2353   }
2354
2355   /**
2356    * Returns the metrics instance for reporting RPC call statistics
2357    */
2358   @Override
2359   public MetricsHBaseServer getMetrics() {
2360     return metrics;
2361   }
2362
2363   @Override
2364   public void addCallSize(final long diff) {
2365     this.callQueueSizeInBytes.add(diff);
2366   }
2367
2368   /**
2369    * Authorize the incoming client connection.
2370    *
2371    * @param user client user
2372    * @param connection incoming connection
2373    * @param addr InetAddress of incoming connection
2374    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2375    *         when the client isn't authorized to talk the protocol
2376    */
2377   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2378       InetAddress addr)
2379   throws AuthorizationException {
2380     if (authorize) {
2381       Class<?> c = getServiceInterface(services, connection.getServiceName());
2382       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2383     }
2384   }
2385
2386   /**
2387    * When the read or write buffer size is larger than this limit, i/o will be
2388    * done in chunks of this size. Most RPC requests and responses would be
2389    * be smaller.
2390    */
2391   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2392
2393   /**
2394    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2395    * If the amount of data is large, it writes to channel in smaller chunks.
2396    * This is to avoid jdk from creating many direct buffers as the size of
2397    * buffer increases. This also minimizes extra copies in NIO layer
2398    * as a result of multiple write operations required to write a large
2399    * buffer.
2400    *
2401    * @param channel writable byte channel to write to
2402    * @param bufferChain Chain of buffers to write
2403    * @return number of bytes written
2404    * @throws java.io.IOException e
2405    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2406    */
2407   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2408   throws IOException {
2409     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2410     if (count > 0) this.metrics.sentBytes(count);
2411     return count;
2412   }
2413
2414   /**
2415    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2416    * If the amount of data is large, it writes to channel in smaller chunks.
2417    * This is to avoid jdk from creating many direct buffers as the size of
2418    * ByteBuffer increases. There should not be any performance degredation.
2419    *
2420    * @param channel writable byte channel to write on
2421    * @param buffer buffer to write
2422    * @return number of bytes written
2423    * @throws java.io.IOException e
2424    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2425    */
2426   protected int channelRead(ReadableByteChannel channel,
2427                                    ByteBuffer buffer) throws IOException {
2428
2429     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2430            channel.read(buffer) : channelIO(channel, null, buffer);
2431     if (count > 0) {
2432       metrics.receivedBytes(count);
2433     }
2434     return count;
2435   }
2436
2437   /**
2438    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2439    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2440    * one of readCh or writeCh should be non-null.
2441    *
2442    * @param readCh read channel
2443    * @param writeCh write channel
2444    * @param buf buffer to read or write into/out of
2445    * @return bytes written
2446    * @throws java.io.IOException e
2447    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2448    * @see #channelWrite(GatheringByteChannel, BufferChain)
2449    */
2450   private static int channelIO(ReadableByteChannel readCh,
2451                                WritableByteChannel writeCh,
2452                                ByteBuffer buf) throws IOException {
2453
2454     int originalLimit = buf.limit();
2455     int initialRemaining = buf.remaining();
2456     int ret = 0;
2457
2458     while (buf.remaining() > 0) {
2459       try {
2460         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2461         buf.limit(buf.position() + ioSize);
2462
2463         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2464
2465         if (ret < ioSize) {
2466           break;
2467         }
2468
2469       } finally {
2470         buf.limit(originalLimit);
2471       }
2472     }
2473
2474     int nBytes = initialRemaining - buf.remaining();
2475     return (nBytes > 0) ? nBytes : ret;
2476   }
2477
2478   /**
2479    * Needed for features such as delayed calls.  We need to be able to store the current call
2480    * so that we can complete it later or ask questions of what is supported by the current ongoing
2481    * call.
2482    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2483    */
2484   public static RpcCallContext getCurrentCall() {
2485     return CurCall.get();
2486   }
2487
2488   public static boolean isInRpcCallContext() {
2489     return CurCall.get() != null;
2490   }
2491
2492   /**
2493    * Returns the user credentials associated with the current RPC request or
2494    * <code>null</code> if no credentials were provided.
2495    * @return A User
2496    */
2497   public static User getRequestUser() {
2498     RpcCallContext ctx = getCurrentCall();
2499     return ctx == null? null: ctx.getRequestUser();
2500   }
2501
2502   /**
2503    * The number of open RPC conections
2504    * @return the number of open rpc connections
2505    */
2506   public int getNumOpenConnections() {
2507     return connectionManager.size();
2508   }
2509
2510   /**
2511    * Returns the username for any user associated with the current RPC
2512    * request or <code>null</code> if no user is set.
2513    */
2514   public static String getRequestUserName() {
2515     User user = getRequestUser();
2516     return user == null? null: user.getShortName();
2517   }
2518
2519   /**
2520    * @return Address of remote client if a request is ongoing, else null
2521    */
2522   public static InetAddress getRemoteAddress() {
2523     RpcCallContext ctx = getCurrentCall();
2524     return ctx == null? null: ctx.getRemoteAddress();
2525   }
2526
2527   /**
2528    * @param serviceName Some arbitrary string that represents a 'service'.
2529    * @param services Available service instances
2530    * @return Matching BlockingServiceAndInterface pair
2531    */
2532   static BlockingServiceAndInterface getServiceAndInterface(
2533       final List<BlockingServiceAndInterface> services, final String serviceName) {
2534     for (BlockingServiceAndInterface bs : services) {
2535       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2536         return bs;
2537       }
2538     }
2539     return null;
2540   }
2541
2542   /**
2543    * @param serviceName Some arbitrary string that represents a 'service'.
2544    * @param services Available services and their service interfaces.
2545    * @return Service interface class for <code>serviceName</code>
2546    */
2547   static Class<?> getServiceInterface(
2548       final List<BlockingServiceAndInterface> services,
2549       final String serviceName) {
2550     BlockingServiceAndInterface bsasi =
2551         getServiceAndInterface(services, serviceName);
2552     return bsasi == null? null: bsasi.getServiceInterface();
2553   }
2554
2555   /**
2556    * @param serviceName Some arbitrary string that represents a 'service'.
2557    * @param services Available services and their service interfaces.
2558    * @return BlockingService that goes with the passed <code>serviceName</code>
2559    */
2560   static BlockingService getService(
2561       final List<BlockingServiceAndInterface> services,
2562       final String serviceName) {
2563     BlockingServiceAndInterface bsasi =
2564         getServiceAndInterface(services, serviceName);
2565     return bsasi == null? null: bsasi.getBlockingService();
2566   }
2567
2568   static MonitoredRPCHandler getStatus() {
2569     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2570     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2571     if (status != null) {
2572       return status;
2573     }
2574     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2575     status.pause("Waiting for a call");
2576     RpcServer.MONITORED_RPC.set(status);
2577     return status;
2578   }
2579
2580   /** Returns the remote side ip address when invoked inside an RPC
2581    *  Returns null incase of an error.
2582    *  @return InetAddress
2583    */
2584   public static InetAddress getRemoteIp() {
2585     Call call = CurCall.get();
2586     if (call != null && call.connection != null && call.connection.socket != null) {
2587       return call.connection.socket.getInetAddress();
2588     }
2589     return null;
2590   }
2591
2592
2593   /**
2594    * A convenience method to bind to a given address and report
2595    * better exceptions if the address is not a valid host.
2596    * @param socket the socket to bind
2597    * @param address the address to bind to
2598    * @param backlog the number of connections allowed in the queue
2599    * @throws BindException if the address can't be bound
2600    * @throws UnknownHostException if the address isn't a valid host name
2601    * @throws IOException other random errors from bind
2602    */
2603   public static void bind(ServerSocket socket, InetSocketAddress address,
2604                           int backlog) throws IOException {
2605     try {
2606       socket.bind(address, backlog);
2607     } catch (BindException e) {
2608       BindException bindException =
2609         new BindException("Problem binding to " + address + " : " +
2610             e.getMessage());
2611       bindException.initCause(e);
2612       throw bindException;
2613     } catch (SocketException e) {
2614       // If they try to bind to a different host's address, give a better
2615       // error message.
2616       if ("Unresolved address".equals(e.getMessage())) {
2617         throw new UnknownHostException("Invalid hostname for server: " +
2618                                        address.getHostName());
2619       }
2620       throw e;
2621     }
2622   }
2623
2624   @Override
2625   public RpcScheduler getScheduler() {
2626     return scheduler;
2627   }
2628
2629   private class ConnectionManager {
2630     final private AtomicInteger count = new AtomicInteger();
2631     final private Set<Connection> connections;
2632
2633     final private Timer idleScanTimer;
2634     final private int idleScanThreshold;
2635     final private int idleScanInterval;
2636     final private int maxIdleTime;
2637     final private int maxIdleToClose;
2638
2639     ConnectionManager() {
2640       this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
2641       this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2642       this.idleScanInterval =
2643           conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
2644       this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
2645       this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
2646       int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2647           HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
2648       int maxConnectionQueueSize =
2649           handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
2650       // create a set with concurrency -and- a thread-safe iterator, add 2
2651       // for listener and idle closer threads
2652       this.connections = Collections.newSetFromMap(
2653           new ConcurrentHashMap<Connection,Boolean>(
2654               maxConnectionQueueSize, 0.75f, readThreads+2));
2655     }
2656
2657     private boolean add(Connection connection) {
2658       boolean added = connections.add(connection);
2659       if (added) {
2660         count.getAndIncrement();
2661       }
2662       return added;
2663     }
2664
2665     private boolean remove(Connection connection) {
2666       boolean removed = connections.remove(connection);
2667       if (removed) {
2668         count.getAndDecrement();
2669       }
2670       return removed;
2671     }
2672
2673     int size() {
2674       return count.get();
2675     }
2676
2677     Connection[] toArray() {
2678       return connections.toArray(new Connection[0]);
2679     }
2680
2681     Connection register(SocketChannel channel) {
2682       Connection connection = new Connection(channel, System.currentTimeMillis());
2683       add(connection);
2684       if (LOG.isDebugEnabled()) {
2685         LOG.debug("Server connection from " + connection +
2686             "; connections=" + size() +
2687             ", queued calls size (bytes)=" + callQueueSizeInBytes.get() +
2688             ", general queued calls=" + scheduler.getGeneralQueueLength() +
2689             ", priority queued calls=" + scheduler.getPriorityQueueLength());
2690       }
2691       return connection;
2692     }
2693
2694     boolean close(Connection connection) {
2695       boolean exists = remove(connection);
2696       if (exists) {
2697         if (LOG.isDebugEnabled()) {
2698           LOG.debug(Thread.currentThread().getName() +
2699               ": disconnecting client " + connection +
2700               ". Number of active connections: "+ size());
2701         }
2702         // only close if actually removed to avoid double-closing due
2703         // to possible races
2704         connection.close();
2705       }
2706       return exists;
2707     }
2708
2709     // synch'ed to avoid explicit invocation upon OOM from colliding with
2710     // timer task firing
2711     synchronized void closeIdle(boolean scanAll) {
2712       long minLastContact = System.currentTimeMillis() - maxIdleTime;
2713       // concurrent iterator might miss new connections added
2714       // during the iteration, but that's ok because they won't
2715       // be idle yet anyway and will be caught on next scan
2716       int closed = 0;
2717       for (Connection connection : connections) {
2718         // stop if connections dropped below threshold unless scanning all
2719         if (!scanAll && size() < idleScanThreshold) {
2720           break;
2721         }
2722         // stop if not scanning all and max connections are closed
2723         if (connection.isIdle() &&
2724             connection.getLastContact() < minLastContact &&
2725             close(connection) &&
2726             !scanAll && (++closed == maxIdleToClose)) {
2727           break;
2728         }
2729       }
2730     }
2731
2732     void closeAll() {
2733       // use a copy of the connections to be absolutely sure the concurrent
2734       // iterator doesn't miss a connection
2735       for (Connection connection : toArray()) {
2736         close(connection);
2737       }
2738     }
2739
2740     void startIdleScan() {
2741       scheduleIdleScanTask();
2742     }
2743
2744     void stopIdleScan() {
2745       idleScanTimer.cancel();
2746     }
2747
2748     private void scheduleIdleScanTask() {
2749       if (!running) {
2750         return;
2751       }
2752       TimerTask idleScanTask = new TimerTask(){
2753         @Override
2754         public void run() {
2755           if (!running) {
2756             return;
2757           }
2758           if (LOG.isDebugEnabled()) {
2759             LOG.debug(Thread.currentThread().getName()+": task running");
2760           }
2761           try {
2762             closeIdle(false);
2763           } finally {
2764             // explicitly reschedule so next execution occurs relative
2765             // to the end of this scan, not the beginning
2766             scheduleIdleScanTask();
2767           }
2768         }
2769       };
2770       idleScanTimer.schedule(idleScanTask, idleScanInterval);
2771     }
2772   }
2773 }