View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Set;
54  import java.util.Timer;
55  import java.util.TimerTask;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.LinkedBlockingQueue;
61  import java.util.concurrent.atomic.AtomicInteger;
62  import java.util.concurrent.locks.Lock;
63  import java.util.concurrent.locks.ReentrantLock;
64
65  import javax.security.sasl.Sasl;
66  import javax.security.sasl.SaslException;
67  import javax.security.sasl.SaslServer;
68
69  import org.apache.commons.logging.Log;
70  import org.apache.commons.logging.LogFactory;
71  import org.apache.hadoop.conf.Configuration;
72  import org.apache.hadoop.hbase.CallQueueTooBigException;
73  import org.apache.hadoop.hbase.CellScanner;
74  import org.apache.hadoop.hbase.DoNotRetryIOException;
75  import org.apache.hadoop.hbase.HBaseIOException;
76  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.Server;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.classification.InterfaceStability;
81  import org.apache.hadoop.hbase.client.VersionInfoUtil;
82  import org.apache.hadoop.hbase.codec.Codec;
83  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
84  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
85  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
86  import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
87  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
88  import org.apache.hadoop.hbase.io.ByteBufferPool;
89  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
90  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
91  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
93  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
99  import org.apache.hadoop.hbase.security.AccessDeniedException;
100 import org.apache.hadoop.hbase.security.AuthMethod;
101 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
102 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
103 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.hbase.security.SaslStatus;
106 import org.apache.hadoop.hbase.security.SaslUtil;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.UserProvider;
109 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.Counter;
112 import org.apache.hadoop.hbase.util.Pair;
113 import org.apache.hadoop.hbase.util.Threads;
114 import org.apache.hadoop.io.BytesWritable;
115 import org.apache.hadoop.io.IOUtils;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.apache.htrace.TraceInfo;
131 import org.codehaus.jackson.map.ObjectMapper;
132
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.CodedOutputStream;
137 import com.google.protobuf.Descriptors.MethodDescriptor;
138 import com.google.protobuf.Message;
139 import com.google.protobuf.ServiceException;
140 import com.google.protobuf.TextFormat;
141
142 /**
143  * An RPC server that hosts protobuf described Services.
144  *
145  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
146  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
147  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
148  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
149  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
150  * and loops till done.
151  *
152  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
153  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
154  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
155  * and keep taking while the server is up.
156  *
157  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
158  * queue for Responder to pull from and return result to client.
159  *
160  * @see BlockingRpcClient
161  */
162 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
163 @InterfaceStability.Evolving
164 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
165   // LOG is being used in CallRunner and the log level is being changed in tests
166   public static final Log LOG = LogFactory.getLog(RpcServer.class);
167   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
168       = new CallQueueTooBigException();
169
170   private final boolean authorize;
171   private boolean isSecurityEnabled;
172
173   public static final byte CURRENT_VERSION = 0;
174
175   /**
176    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
177    */
178   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
179           "hbase.ipc.server.fallback-to-simple-auth-allowed";
180
181   /**
182    * How many calls/handler are allowed in the queue.
183    */
184   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
185
186   private final CellBlockBuilder cellBlockBuilder;
187
188   private static final String AUTH_FAILED_FOR = "Auth failed for ";
189   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
190   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
191     Server.class.getName());
192   protected SecretManager<TokenIdentifier> secretManager;
193   protected ServiceAuthorizationManager authManager;
194
195   /** This is set to Call object before Handler invokes an RPC and ybdie
196    * after the call returns.
197    */
198   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
199
200   /** Keeps MonitoredRPCHandler per handler thread. */
201   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
202       = new ThreadLocal<MonitoredRPCHandler>();
203
204   protected final InetSocketAddress bindAddress;
205   protected int port;                             // port we listen on
206   protected InetSocketAddress address;            // inet address we listen on
207   private int readThreads;                        // number of read threads
208   protected MetricsHBaseServer metrics;
209
210   protected final Configuration conf;
211
212   /**
213    * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
214    * this size, then we will reject the call (after parsing it though). It will go back to the
215    * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
216    * call queue size gets incremented after we parse a call and before we add it to the queue of
217    * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
218    * size is kept in {@link #callQueueSizeInBytes}.
219    * @see {@link #callQueueSizeInBytes}
220    * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
221    * @see {@link #callQueueSizeInBytes}
222    */
223   private final long maxQueueSizeInBytes;
224   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
225
226   /**
227    * This is a running count of the size in bytes of all outstanding calls whether currently
228    * executing or queued waiting to be run.
229    */
230   protected final Counter callQueueSizeInBytes = new Counter();
231
232   protected int socketSendBufferSize;
233   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
234   protected final boolean tcpKeepAlive; // if T then use keepalives
235   protected final long purgeTimeout;    // in milliseconds
236
237   /**
238    * This flag is used to indicate to sub threads when they should go down.  When we call
239    * {@link #start()}, all threads started will consult this flag on whether they should
240    * keep going.  It is set to false when {@link #stop()} is called.
241    */
242   volatile boolean running = true;
243
244   /**
245    * This flag is set to true after all threads are up and 'running' and the server is then opened
246    * for business by the call to {@link #start()}.
247    */
248   volatile boolean started = false;
249
250   // maintains the set of client connections and handles idle timeouts
251   private ConnectionManager connectionManager;
252   private Listener listener = null;
253   protected Responder responder = null;
254   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
255
256   protected HBaseRPCErrorHandler errorHandler = null;
257
258   static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
259   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
260   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
261
262   /**
263    * Minimum allowable timeout (in milliseconds) in rpc request's header. This
264    * configuration exists to prevent the rpc service regarding this request as timeout immediately.
265    */
266   private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
267   private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
268
269   /** Default value for above params */
270   private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
271   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
272   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
273
274   private static final ObjectMapper MAPPER = new ObjectMapper();
275
276   private final int maxRequestSize;
277   private final int warnResponseTime;
278   private final int warnResponseSize;
279
280   private final int minClientRequestTimeout;
281
282   private final Server server;
283   private final List<BlockingServiceAndInterface> services;
284
285   private final RpcScheduler scheduler;
286
287   private UserProvider userProvider;
288
289   private final ByteBufferPool reservoir;
290
291   private volatile boolean allowFallbackToSimpleAuth;
292
293   /**
294    * Datastructure that holds all necessary to a method invocation and then afterward, carries
295    * the result.
296    */
297   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
298   @InterfaceStability.Evolving
299   public class Call implements RpcCallContext {
300     protected int id;                             // the client's call id
301     protected BlockingService service;
302     protected MethodDescriptor md;
303     protected RequestHeader header;
304     protected Message param;                      // the parameter passed
305     // Optional cell data passed outside of protobufs.
306     protected CellScanner cellScanner;
307     protected Connection connection;              // connection to client
308     protected long timestamp;      // the time received when response is null
309                                    // the time served when response is not null
310     protected int timeout;
311     protected long startTime;
312     protected long deadline;// the deadline to handle this call, if exceed we can drop it.
313
314     /**
315      * Chain of buffers to send as response.
316      */
317     protected BufferChain response;
318     protected Responder responder;
319
320     protected long size;                          // size of current call
321     protected boolean isError;
322     protected TraceInfo tinfo;
323     private ByteBufferListOutputStream cellBlockStream = null;
324
325     private User user;
326     private InetAddress remoteAddress;
327     private RpcCallback callback;
328
329     private long responseCellSize = 0;
330     private long responseBlockSize = 0;
331     private boolean retryImmediatelySupported;
332
333     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
334         justification="Can't figure why this complaint is happening... see below")
335     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
336          Message param, CellScanner cellScanner, Connection connection, Responder responder,
337          long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
338       this.id = id;
339       this.service = service;
340       this.md = md;
341       this.header = header;
342       this.param = param;
343       this.cellScanner = cellScanner;
344       this.connection = connection;
345       this.timestamp = System.currentTimeMillis();
346       this.response = null;
347       this.responder = responder;
348       this.isError = false;
349       this.size = size;
350       this.tinfo = tinfo;
351       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
352       this.remoteAddress = remoteAddress;
353       this.retryImmediatelySupported =
354           connection == null? null: connection.retryImmediatelySupported;
355       this.timeout = timeout;
356       this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
357     }
358
359     /**
360      * Call is done. Execution happened and we returned results to client. It is now safe to
361      * cleanup.
362      */
363     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
364         justification="Presume the lock on processing request held by caller is protection enough")
365     void done() {
366       if (this.cellBlockStream != null) {
367         this.cellBlockStream.releaseResources();// This will return back the BBs which we
368                                                 // got from pool.
369         this.cellBlockStream = null;
370       }
371       this.connection.decRpcCount();  // Say that we're done with this call.
372     }
373
374     @Override
375     public String toString() {
376       return toShortString() + " param: " +
377         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
378         " connection: " + connection.toString();
379     }
380
381     protected RequestHeader getHeader() {
382       return this.header;
383     }
384
385     public boolean hasPriority() {
386       return this.header.hasPriority();
387     }
388
389     public int getPriority() {
390       return this.header.getPriority();
391     }
392
393     /*
394      * Short string representation without param info because param itself could be huge depends on
395      * the payload of a command
396      */
397     String toShortString() {
398       String serviceName = this.connection.service != null ?
399           this.connection.service.getDescriptorForType().getName() : "null";
400       return "callId: " + this.id + " service: " + serviceName +
401           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
402           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
403           " connection: " + connection.toString();
404     }
405
406     String toTraceString() {
407       String serviceName = this.connection.service != null ?
408                            this.connection.service.getDescriptorForType().getName() : "";
409       String methodName = (this.md != null) ? this.md.getName() : "";
410       return serviceName + "." + methodName;
411     }
412
413     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
414       this.response = new BufferChain(response);
415     }
416
417     protected synchronized void setResponse(Object m, final CellScanner cells,
418         Throwable t, String errorMsg) {
419       if (this.isError) return;
420       if (t != null) this.isError = true;
421       BufferChain bc = null;
422       try {
423         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
424         // Presume it a pb Message.  Could be null.
425         Message result = (Message)m;
426         // Call id.
427         headerBuilder.setCallId(this.id);
428         if (t != null) {
429           setExceptionResponse(t, errorMsg, headerBuilder);
430         }
431         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
432         // reservoir when finished. This is hacky and the hack is not contained but benefits are
433         // high when we can avoid a big buffer allocation on each rpc.
434         List<ByteBuffer> cellBlock = null;
435         int cellBlockSize = 0;
436         if (reservoir != null) {
437           this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
438               this.connection.compressionCodec, cells, reservoir);
439           if (this.cellBlockStream != null) {
440             cellBlock = this.cellBlockStream.getByteBuffers();
441             cellBlockSize = this.cellBlockStream.size();
442           }
443         } else {
444           ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
445               this.connection.compressionCodec, cells);
446           if (b != null) {
447             cellBlockSize = b.remaining();
448             cellBlock = new ArrayList<ByteBuffer>(1);
449             cellBlock.add(b);
450           }
451         }
452
453         if (cellBlockSize > 0) {
454           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
455           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
456           cellBlockBuilder.setLength(cellBlockSize);
457           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
458         }
459         Message header = headerBuilder.build();
460         byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
461         List<ByteBuffer> responseBufs = new ArrayList<ByteBuffer>(
462             (cellBlock == null ? 1 : cellBlock.size()) + 1);
463         responseBufs.add(ByteBuffer.wrap(b));
464         if (cellBlock != null) responseBufs.addAll(cellBlock);
465         bc = new BufferChain(responseBufs);
466         if (connection.useWrap) {
467           bc = wrapWithSasl(bc);
468         }
469       } catch (IOException e) {
470         LOG.warn("Exception while creating response " + e);
471       }
472       this.response = bc;
473       // Once a response message is created and set to this.response, this Call can be treated as
474       // done. The Responder thread will do the n/w write of this message back to client.
475       if (this.callback != null) {
476         try {
477           this.callback.run();
478         } catch (Exception e) {
479           // Don't allow any exception here to kill this handler thread.
480           LOG.warn("Exception while running the Rpc Callback.", e);
481         }
482       }
483     }
484
485     private void setExceptionResponse(Throwable t, String errorMsg,
486         ResponseHeader.Builder headerBuilder) {
487       ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
488       exceptionBuilder.setExceptionClassName(t.getClass().getName());
489       exceptionBuilder.setStackTrace(errorMsg);
490       exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
491       if (t instanceof RegionMovedException) {
492         // Special casing for this exception.  This is only one carrying a payload.
493         // Do this instead of build a generic system for allowing exceptions carry
494         // any kind of payload.
495         RegionMovedException rme = (RegionMovedException)t;
496         exceptionBuilder.setHostname(rme.getHostname());
497         exceptionBuilder.setPort(rme.getPort());
498       }
499       // Set the exception as the result of the method invocation.
500       headerBuilder.setException(exceptionBuilder.build());
501     }
502
503     private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
504         throws IOException {
505       // Organize the response as a set of bytebuffers rather than collect it all together inside
506       // one big byte array; save on allocations.
507       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
508           resultVintSize = 0;
509       if (header != null) {
510         headerSerializedSize = header.getSerializedSize();
511         headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
512       }
513       if (result != null) {
514         resultSerializedSize = result.getSerializedSize();
515         resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
516       }
517       // calculate the total size
518       int totalSize = headerSerializedSize + headerVintSize
519           + (resultSerializedSize + resultVintSize)
520           + cellBlockSize;
521       // The byte[] should also hold the totalSize of the header, message and the cellblock
522       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
523           + resultVintSize + Bytes.SIZEOF_INT];
524       // The RpcClient expects the int to be in a format that code be decoded by
525       // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
526       // form of writing int.
527       Bytes.putInt(b, 0, totalSize);
528       CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
529           b.length - Bytes.SIZEOF_INT);
530       if (header != null) {
531         cos.writeMessageNoTag(header);
532       }
533       if (result != null) {
534         cos.writeMessageNoTag(result);
535       }
536       cos.flush();
537       cos.checkNoSpaceLeft();
538       return b;
539     }
540
541     private BufferChain wrapWithSasl(BufferChain bc)
542         throws IOException {
543       if (!this.connection.useSasl) return bc;
544       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
545       // THIS IS A BIG UGLY COPY.
546       byte [] responseBytes = bc.getBytes();
547       byte [] token;
548       // synchronization may be needed since there can be multiple Handler
549       // threads using saslServer to wrap responses.
550       synchronized (connection.saslServer) {
551         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
552       }
553       if (LOG.isTraceEnabled()) {
554         LOG.trace("Adding saslServer wrapped token of size " + token.length
555             + " as call response.");
556       }
557
558       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
559       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
560       return new BufferChain(bbTokenLength, bbTokenBytes);
561     }
562
563     @Override
564     public boolean isClientCellBlockSupported() {
565       return this.connection != null && this.connection.codec != null;
566     }
567
568     @Override
569     public long disconnectSince() {
570       if (!connection.channel.isOpen()) {
571         return System.currentTimeMillis() - timestamp;
572       } else {
573         return -1L;
574       }
575     }
576
577     public long getSize() {
578       return this.size;
579     }
580
581     @Override
582     public long getResponseCellSize() {
583       return responseCellSize;
584     }
585
586     @Override
587     public void incrementResponseCellSize(long cellSize) {
588       responseCellSize += cellSize;
589     }
590
591     @Override
592     public long getResponseBlockSize() {
593       return responseBlockSize;
594     }
595
596     @Override
597     public void incrementResponseBlockSize(long blockSize) {
598       responseBlockSize += blockSize;
599     }
600
601     public synchronized void sendResponseIfReady() throws IOException {
602       this.responder.doRespond(this);
603     }
604
605     public UserGroupInformation getRemoteUser() {
606       return connection.ugi;
607     }
608
609     @Override
610     public User getRequestUser() {
611       return user;
612     }
613
614     @Override
615     public String getRequestUserName() {
616       User user = getRequestUser();
617       return user == null? null: user.getShortName();
618     }
619
620     @Override
621     public InetAddress getRemoteAddress() {
622       return remoteAddress;
623     }
624
625     @Override
626     public VersionInfo getClientVersionInfo() {
627       return connection.getVersionInfo();
628     }
629
630     @Override
631     public synchronized void setCallBack(RpcCallback callback) {
632       this.callback = callback;
633     }
634
635     @Override
636     public boolean isRetryImmediatelySupported() {
637       return retryImmediatelySupported;
638     }
639   }
640
641   /** Listens on the socket. Creates jobs for the handler threads*/
642   private class Listener extends Thread {
643
644     private ServerSocketChannel acceptChannel = null; //the accept channel
645     private Selector selector = null; //the selector that we use for the server
646     private Reader[] readers = null;
647     private int currentReader = 0;
648     private final int readerPendingConnectionQueueLength;
649
650     private ExecutorService readPool;
651
652     public Listener(final String name) throws IOException {
653       super(name);
654       // The backlog of requests that we will have the serversocket carry.
655       int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
656       readerPendingConnectionQueueLength =
657           conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
658       // Create a new server socket and set to non blocking mode
659       acceptChannel = ServerSocketChannel.open();
660       acceptChannel.configureBlocking(false);
661
662       // Bind the server socket to the binding addrees (can be different from the default interface)
663       bind(acceptChannel.socket(), bindAddress, backlogLength);
664       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
665       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
666       // create a selector;
667       selector = Selector.open();
668
669       readers = new Reader[readThreads];
670       // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
671       // has an advantage in that it is easy to shutdown the pool.
672       readPool = Executors.newFixedThreadPool(readThreads,
673         new ThreadFactoryBuilder().setNameFormat(
674           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
675           ",port=" + port).setDaemon(true)
676         .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
677       for (int i = 0; i < readThreads; ++i) {
678         Reader reader = new Reader();
679         readers[i] = reader;
680         readPool.execute(reader);
681       }
682       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
683
684       // Register accepts on the server socket with the selector.
685       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
686       this.setName("RpcServer.listener,port=" + port);
687       this.setDaemon(true);
688     }
689
690
691     private class Reader implements Runnable {
692       final private LinkedBlockingQueue<Connection> pendingConnections;
693       private final Selector readSelector;
694
695       Reader() throws IOException {
696         this.pendingConnections =
697           new LinkedBlockingQueue<Connection>(readerPendingConnectionQueueLength);
698         this.readSelector = Selector.open();
699       }
700
701       @Override
702       public void run() {
703         try {
704           doRunLoop();
705         } finally {
706           try {
707             readSelector.close();
708           } catch (IOException ioe) {
709             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
710           }
711         }
712       }
713
714       private synchronized void doRunLoop() {
715         while (running) {
716           try {
717             // Consume as many connections as currently queued to avoid
718             // unbridled acceptance of connections that starves the select
719             int size = pendingConnections.size();
720             for (int i=size; i>0; i--) {
721               Connection conn = pendingConnections.take();
722               conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
723             }
724             readSelector.select();
725             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
726             while (iter.hasNext()) {
727               SelectionKey key = iter.next();
728               iter.remove();
729               if (key.isValid()) {
730                 if (key.isReadable()) {
731                   doRead(key);
732                 }
733               }
734               key = null;
735             }
736           } catch (InterruptedException e) {
737             if (running) {                      // unexpected -- log it
738               LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
739             }
740             return;
741           } catch (IOException ex) {
742             LOG.info(getName() + ": IOException in Reader", ex);
743           }
744         }
745       }
746
747       /**
748        * Updating the readSelector while it's being used is not thread-safe,
749        * so the connection must be queued.  The reader will drain the queue
750        * and update its readSelector before performing the next select
751        */
752       public void addConnection(Connection conn) throws IOException {
753         pendingConnections.add(conn);
754         readSelector.wakeup();
755       }
756     }
757
758     @Override
759     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
760       justification="selector access is not synchronized; seems fine but concerned changing " +
761         "it will have per impact")
762     public void run() {
763       LOG.info(getName() + ": starting");
764       connectionManager.startIdleScan();
765       while (running) {
766         SelectionKey key = null;
767         try {
768           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
769           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
770           while (iter.hasNext()) {
771             key = iter.next();
772             iter.remove();
773             try {
774               if (key.isValid()) {
775                 if (key.isAcceptable())
776                   doAccept(key);
777               }
778             } catch (IOException ignored) {
779               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
780             }
781             key = null;
782           }
783         } catch (OutOfMemoryError e) {
784           if (errorHandler != null) {
785             if (errorHandler.checkOOME(e)) {
786               LOG.info(getName() + ": exiting on OutOfMemoryError");
787               closeCurrentConnection(key, e);
788               connectionManager.closeIdle(true);
789               return;
790             }
791           } else {
792             // we can run out of memory if we have too many threads
793             // log the event and sleep for a minute and give
794             // some thread(s) a chance to finish
795             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
796             closeCurrentConnection(key, e);
797             connectionManager.closeIdle(true);
798             try {
799               Thread.sleep(60000);
800             } catch (InterruptedException ex) {
801               LOG.debug("Interrupted while sleeping");
802             }
803           }
804         } catch (Exception e) {
805           closeCurrentConnection(key, e);
806         }
807       }
808       LOG.info(getName() + ": stopping");
809       synchronized (this) {
810         try {
811           acceptChannel.close();
812           selector.close();
813         } catch (IOException ignored) {
814           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
815         }
816
817         selector= null;
818         acceptChannel= null;
819
820         // close all connections
821         connectionManager.stopIdleScan();
822         connectionManager.closeAll();
823       }
824     }
825
826     private void closeCurrentConnection(SelectionKey key, Throwable e) {
827       if (key != null) {
828         Connection c = (Connection)key.attachment();
829         if (c != null) {
830           closeConnection(c);
831           key.attach(null);
832         }
833       }
834     }
835
836     InetSocketAddress getAddress() {
837       return address;
838     }
839
840     void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
841       ServerSocketChannel server = (ServerSocketChannel) key.channel();
842       SocketChannel channel;
843       while ((channel = server.accept()) != null) {
844         channel.configureBlocking(false);
845         channel.socket().setTcpNoDelay(tcpNoDelay);
846         channel.socket().setKeepAlive(tcpKeepAlive);
847         Reader reader = getReader();
848         Connection c = connectionManager.register(channel);
849         // If the connectionManager can't take it, close the connection.
850         if (c == null) {
851           if (channel.isOpen()) {
852             IOUtils.cleanup(null, channel);
853           }
854           continue;
855         }
856         key.attach(c);  // so closeCurrentConnection can get the object
857         reader.addConnection(c);
858       }
859     }
860
861     void doRead(SelectionKey key) throws InterruptedException {
862       int count;
863       Connection c = (Connection) key.attachment();
864       if (c == null) {
865         return;
866       }
867       c.setLastContact(System.currentTimeMillis());
868       try {
869         count = c.readAndProcess();
870       } catch (InterruptedException ieo) {
871         LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
872         throw ieo;
873       } catch (Exception e) {
874         if (LOG.isDebugEnabled()) {
875           LOG.debug(getName() + ": Caught exception while reading:", e);
876         }
877         count = -1; //so that the (count < 0) block is executed
878       }
879       if (count < 0) {
880         closeConnection(c);
881         c = null;
882       } else {
883         c.setLastContact(System.currentTimeMillis());
884       }
885     }
886
887     synchronized void doStop() {
888       if (selector != null) {
889         selector.wakeup();
890         Thread.yield();
891       }
892       if (acceptChannel != null) {
893         try {
894           acceptChannel.socket().close();
895         } catch (IOException e) {
896           LOG.info(getName() + ": exception in closing listener socket. " + e);
897         }
898       }
899       readPool.shutdownNow();
900     }
901
902     synchronized Selector getSelector() { return selector; }
903
904     // The method that will return the next reader to work with
905     // Simplistic implementation of round robin for now
906     Reader getReader() {
907       currentReader = (currentReader + 1) % readers.length;
908       return readers[currentReader];
909     }
910   }
911
912   // Sends responses of RPC back to clients.
913   protected class Responder extends Thread {
914     private final Selector writeSelector;
915     private final Set<Connection> writingCons =
916         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
917
918     Responder() throws IOException {
919       this.setName("RpcServer.responder");
920       this.setDaemon(true);
921       this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
922       writeSelector = Selector.open(); // create a selector
923     }
924
925     @Override
926     public void run() {
927       LOG.info(getName() + ": starting");
928       try {
929         doRunLoop();
930       } finally {
931         LOG.info(getName() + ": stopping");
932         try {
933           writeSelector.close();
934         } catch (IOException ioe) {
935           LOG.error(getName() + ": couldn't close write selector", ioe);
936         }
937       }
938     }
939
940     /**
941      * Take the list of the connections that want to write, and register them
942      * in the selector.
943      */
944     private void registerWrites() {
945       Iterator<Connection> it = writingCons.iterator();
946       while (it.hasNext()) {
947         Connection c = it.next();
948         it.remove();
949         SelectionKey sk = c.channel.keyFor(writeSelector);
950         try {
951           if (sk == null) {
952             try {
953               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
954             } catch (ClosedChannelException e) {
955               // ignore: the client went away.
956               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
957             }
958           } else {
959             sk.interestOps(SelectionKey.OP_WRITE);
960           }
961         } catch (CancelledKeyException e) {
962           // ignore: the client went away.
963           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
964         }
965       }
966     }
967
968     /**
969      * Add a connection to the list that want to write,
970      */
971     public void registerForWrite(Connection c) {
972       if (writingCons.add(c)) {
973         writeSelector.wakeup();
974       }
975     }
976
977     private void doRunLoop() {
978       long lastPurgeTime = 0;   // last check for old calls.
979       while (running) {
980         try {
981           registerWrites();
982           int keyCt = writeSelector.select(purgeTimeout);
983           if (keyCt == 0) {
984             continue;
985           }
986
987           Set<SelectionKey> keys = writeSelector.selectedKeys();
988           Iterator<SelectionKey> iter = keys.iterator();
989           while (iter.hasNext()) {
990             SelectionKey key = iter.next();
991             iter.remove();
992             try {
993               if (key.isValid() && key.isWritable()) {
994                 doAsyncWrite(key);
995               }
996             } catch (IOException e) {
997               LOG.debug(getName() + ": asyncWrite", e);
998             }
999           }
1000
1001           lastPurgeTime = purge(lastPurgeTime);
1002
1003         } catch (OutOfMemoryError e) {
1004           if (errorHandler != null) {
1005             if (errorHandler.checkOOME(e)) {
1006               LOG.info(getName() + ": exiting on OutOfMemoryError");
1007               return;
1008             }
1009           } else {
1010             //
1011             // we can run out of memory if we have too many threads
1012             // log the event and sleep for a minute and give
1013             // some thread(s) a chance to finish
1014             //
1015             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1016             try {
1017               Thread.sleep(60000);
1018             } catch (InterruptedException ex) {
1019               LOG.debug("Interrupted while sleeping");
1020               return;
1021             }
1022           }
1023         } catch (Exception e) {
1024           LOG.warn(getName() + ": exception in Responder " +
1025               StringUtils.stringifyException(e), e);
1026         }
1027       }
1028       LOG.info(getName() + ": stopped");
1029     }
1030
1031     /**
1032      * If there were some calls that have not been sent out for a
1033      * long time, we close the connection.
1034      * @return the time of the purge.
1035      */
1036     private long purge(long lastPurgeTime) {
1037       long now = System.currentTimeMillis();
1038       if (now < lastPurgeTime + purgeTimeout) {
1039         return lastPurgeTime;
1040       }
1041
1042       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1043       // get the list of channels from list of keys.
1044       synchronized (writeSelector.keys()) {
1045         for (SelectionKey key : writeSelector.keys()) {
1046           Connection connection = (Connection) key.attachment();
1047           if (connection == null) {
1048             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1049           }
1050           Call call = connection.responseQueue.peekFirst();
1051           if (call != null && now > call.timestamp + purgeTimeout) {
1052             conWithOldCalls.add(call.connection);
1053           }
1054         }
1055       }
1056
1057       // Seems safer to close the connection outside of the synchronized loop...
1058       for (Connection connection : conWithOldCalls) {
1059         closeConnection(connection);
1060       }
1061
1062       return now;
1063     }
1064
1065     private void doAsyncWrite(SelectionKey key) throws IOException {
1066       Connection connection = (Connection) key.attachment();
1067       if (connection == null) {
1068         throw new IOException("doAsyncWrite: no connection");
1069       }
1070       if (key.channel() != connection.channel) {
1071         throw new IOException("doAsyncWrite: bad channel");
1072       }
1073
1074       if (processAllResponses(connection)) {
1075         try {
1076           // We wrote everything, so we don't need to be told when the socket is ready for
1077           //  write anymore.
1078          key.interestOps(0);
1079         } catch (CancelledKeyException e) {
1080           /* The Listener/reader might have closed the socket.
1081            * We don't explicitly cancel the key, so not sure if this will
1082            * ever fire.
1083            * This warning could be removed.
1084            */
1085           LOG.warn("Exception while changing ops : " + e);
1086         }
1087       }
1088     }
1089
1090     /**
1091      * Process the response for this call. You need to have the lock on
1092      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1093      *
1094      * @param call the call
1095      * @return true if we proceed the call fully, false otherwise.
1096      * @throws IOException
1097      */
1098     private boolean processResponse(final Call call) throws IOException {
1099       boolean error = true;
1100       try {
1101         // Send as much data as we can in the non-blocking fashion
1102         long numBytes = channelWrite(call.connection.channel, call.response);
1103         if (numBytes < 0) {
1104           throw new HBaseIOException("Error writing on the socket " +
1105             "for the call:" + call.toShortString());
1106         }
1107         error = false;
1108       } finally {
1109         if (error) {
1110           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1111           // We will be closing this connection itself. Mark this call as done so that all the
1112           // buffer(s) it got from pool can get released
1113           call.done();
1114           closeConnection(call.connection);
1115         }
1116       }
1117
1118       if (!call.response.hasRemaining()) {
1119         call.done();
1120         return true;
1121       } else {
1122         return false; // Socket can't take more, we will have to come back.
1123       }
1124     }
1125
1126     /**
1127      * Process all the responses for this connection
1128      *
1129      * @return true if all the calls were processed or that someone else is doing it.
1130      * false if there * is still some work to do. In this case, we expect the caller to
1131      * delay us.
1132      * @throws IOException
1133      */
1134     private boolean processAllResponses(final Connection connection) throws IOException {
1135       // We want only one writer on the channel for a connection at a time.
1136       connection.responseWriteLock.lock();
1137       try {
1138         for (int i = 0; i < 20; i++) {
1139           // protection if some handlers manage to need all the responder
1140           Call call = connection.responseQueue.pollFirst();
1141           if (call == null) {
1142             return true;
1143           }
1144           if (!processResponse(call)) {
1145             connection.responseQueue.addFirst(call);
1146             return false;
1147           }
1148         }
1149       } finally {
1150         connection.responseWriteLock.unlock();
1151       }
1152
1153       return connection.responseQueue.isEmpty();
1154     }
1155
1156     //
1157     // Enqueue a response from the application.
1158     //
1159     void doRespond(Call call) throws IOException {
1160       boolean added = false;
1161
1162       // If there is already a write in progress, we don't wait. This allows to free the handlers
1163       //  immediately for other tasks.
1164       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1165         try {
1166           if (call.connection.responseQueue.isEmpty()) {
1167             // If we're alone, we can try to do a direct call to the socket. It's
1168             //  an optimisation to save on context switches and data transfer between cores..
1169             if (processResponse(call)) {
1170               return; // we're done.
1171             }
1172             // Too big to fit, putting ahead.
1173             call.connection.responseQueue.addFirst(call);
1174             added = true; // We will register to the selector later, outside of the lock.
1175           }
1176         } finally {
1177           call.connection.responseWriteLock.unlock();
1178         }
1179       }
1180
1181       if (!added) {
1182         call.connection.responseQueue.addLast(call);
1183       }
1184       call.responder.registerForWrite(call.connection);
1185
1186       // set the serve time when the response has to be sent later
1187       call.timestamp = System.currentTimeMillis();
1188     }
1189   }
1190
1191   /** Reads calls from a connection and queues them for handling. */
1192   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1193       value="VO_VOLATILE_INCREMENT",
1194       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1195   public class Connection {
1196     // If initial preamble with version and magic has been read or not.
1197     private boolean connectionPreambleRead = false;
1198     // If the connection header has been read or not.
1199     private boolean connectionHeaderRead = false;
1200     protected SocketChannel channel;
1201     private ByteBuffer data;
1202     private ByteBuffer dataLengthBuffer;
1203     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1204     private final Lock responseWriteLock = new ReentrantLock();
1205     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1206     private long lastContact;
1207     private InetAddress addr;
1208     protected Socket socket;
1209     // Cache the remote host & port info so that even if the socket is
1210     // disconnected, we can say where it used to connect to.
1211     protected String hostAddress;
1212     protected int remotePort;
1213     ConnectionHeader connectionHeader;
1214
1215     /**
1216      * Codec the client asked use.
1217      */
1218     private Codec codec;
1219     /**
1220      * Compression codec the client asked us use.
1221      */
1222     private CompressionCodec compressionCodec;
1223     BlockingService service;
1224
1225     private AuthMethod authMethod;
1226     private boolean saslContextEstablished;
1227     private boolean skipInitialSaslHandshake;
1228     private ByteBuffer unwrappedData;
1229     // When is this set?  FindBugs wants to know!  Says NP
1230     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1231     boolean useSasl;
1232     SaslServer saslServer;
1233     private boolean useWrap = false;
1234     // Fake 'call' for failed authorization response
1235     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1236     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1237         null, null, this, null, 0, null, null, 0);
1238     private ByteArrayOutputStream authFailedResponse =
1239         new ByteArrayOutputStream();
1240     // Fake 'call' for SASL context setup
1241     private static final int SASL_CALLID = -33;
1242     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1243         0, null, null, 0);
1244
1245     // was authentication allowed with a fallback to simple auth
1246     private boolean authenticatedWithFallback;
1247
1248     private boolean retryImmediatelySupported = false;
1249
1250     public UserGroupInformation attemptingUser = null; // user name before auth
1251     protected User user = null;
1252     protected UserGroupInformation ugi = null;
1253
1254     public Connection(SocketChannel channel, long lastContact) {
1255       this.channel = channel;
1256       this.lastContact = lastContact;
1257       this.data = null;
1258       this.dataLengthBuffer = ByteBuffer.allocate(4);
1259       this.socket = channel.socket();
1260       this.addr = socket.getInetAddress();
1261       if (addr == null) {
1262         this.hostAddress = "*Unknown*";
1263       } else {
1264         this.hostAddress = addr.getHostAddress();
1265       }
1266       this.remotePort = socket.getPort();
1267       if (socketSendBufferSize != 0) {
1268         try {
1269           socket.setSendBufferSize(socketSendBufferSize);
1270         } catch (IOException e) {
1271           LOG.warn("Connection: unable to set socket send buffer size to " +
1272                    socketSendBufferSize);
1273         }
1274       }
1275     }
1276
1277       @Override
1278     public String toString() {
1279       return getHostAddress() + ":" + remotePort;
1280     }
1281
1282     public String getHostAddress() {
1283       return hostAddress;
1284     }
1285
1286     public InetAddress getHostInetAddress() {
1287       return addr;
1288     }
1289
1290     public int getRemotePort() {
1291       return remotePort;
1292     }
1293
1294     public void setLastContact(long lastContact) {
1295       this.lastContact = lastContact;
1296     }
1297
1298     public VersionInfo getVersionInfo() {
1299       if (connectionHeader.hasVersionInfo()) {
1300         return connectionHeader.getVersionInfo();
1301       }
1302       return null;
1303     }
1304
1305     public long getLastContact() {
1306       return lastContact;
1307     }
1308
1309     /* Return true if the connection has no outstanding rpc */
1310     private boolean isIdle() {
1311       return rpcCount.get() == 0;
1312     }
1313
1314     /* Decrement the outstanding RPC count */
1315     protected void decRpcCount() {
1316       rpcCount.decrement();
1317     }
1318
1319     /* Increment the outstanding RPC count */
1320     protected void incRpcCount() {
1321       rpcCount.increment();
1322     }
1323
1324     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1325         throws IOException {
1326       UserGroupInformation authorizedUgi;
1327       if (authMethod == AuthMethod.DIGEST) {
1328         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1329             secretManager);
1330         authorizedUgi = tokenId.getUser();
1331         if (authorizedUgi == null) {
1332           throw new AccessDeniedException(
1333               "Can't retrieve username from tokenIdentifier.");
1334         }
1335         authorizedUgi.addTokenIdentifier(tokenId);
1336       } else {
1337         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1338       }
1339       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1340       return authorizedUgi;
1341     }
1342
1343     private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1344         InterruptedException {
1345       if (saslContextEstablished) {
1346         if (LOG.isTraceEnabled())
1347           LOG.trace("Have read input token of size " + saslToken.limit()
1348               + " for processing by saslServer.unwrap()");
1349
1350         if (!useWrap) {
1351           processOneRpc(saslToken);
1352         } else {
1353           byte[] b = saslToken.array();
1354           byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1355           processUnwrappedData(plaintextData);
1356         }
1357       } else {
1358         byte[] replyToken;
1359         try {
1360           if (saslServer == null) {
1361             switch (authMethod) {
1362             case DIGEST:
1363               if (secretManager == null) {
1364                 throw new AccessDeniedException(
1365                     "Server is not configured to do DIGEST authentication.");
1366               }
1367               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1368                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1369                   HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
1370                       secretManager, this));
1371               break;
1372             default:
1373               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1374               String fullName = current.getUserName();
1375               if (LOG.isDebugEnabled()) {
1376                 LOG.debug("Kerberos principal name is " + fullName);
1377               }
1378               final String names[] = SaslUtil.splitKerberosName(fullName);
1379               if (names.length != 3) {
1380                 throw new AccessDeniedException(
1381                     "Kerberos principal name does NOT have the expected "
1382                         + "hostname part: " + fullName);
1383               }
1384               current.doAs(new PrivilegedExceptionAction<Object>() {
1385                 @Override
1386                 public Object run() throws SaslException {
1387                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1388                       .getMechanismName(), names[0], names[1],
1389                       HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
1390                   return null;
1391                 }
1392               });
1393             }
1394             if (saslServer == null)
1395               throw new AccessDeniedException(
1396                   "Unable to find SASL server implementation for "
1397                       + authMethod.getMechanismName());
1398             if (LOG.isDebugEnabled()) {
1399               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1400             }
1401           }
1402           if (LOG.isDebugEnabled()) {
1403             LOG.debug("Have read input token of size " + saslToken.limit()
1404                 + " for processing by saslServer.evaluateResponse()");
1405           }
1406           replyToken = saslServer.evaluateResponse(saslToken.array());
1407         } catch (IOException e) {
1408           IOException sendToClient = e;
1409           Throwable cause = e;
1410           while (cause != null) {
1411             if (cause instanceof InvalidToken) {
1412               sendToClient = (InvalidToken) cause;
1413               break;
1414             }
1415             cause = cause.getCause();
1416           }
1417           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1418             sendToClient.getLocalizedMessage());
1419           metrics.authenticationFailure();
1420           String clientIP = this.toString();
1421           // attempting user could be null
1422           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1423           throw e;
1424         }
1425         if (replyToken != null) {
1426           if (LOG.isDebugEnabled()) {
1427             LOG.debug("Will send token of size " + replyToken.length
1428                 + " from saslServer.");
1429           }
1430           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1431               null);
1432         }
1433         if (saslServer.isComplete()) {
1434           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1435           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1436           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1437           if (LOG.isDebugEnabled()) {
1438             LOG.debug("SASL server context established. Authenticated client: "
1439               + ugi + ". Negotiated QoP is "
1440               + saslServer.getNegotiatedProperty(Sasl.QOP));
1441           }
1442           metrics.authenticationSuccess();
1443           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1444           saslContextEstablished = true;
1445         }
1446       }
1447     }
1448
1449     /**
1450      * No protobuf encoding of raw sasl messages
1451      */
1452     private void doRawSaslReply(SaslStatus status, Writable rv,
1453         String errorClass, String error) throws IOException {
1454       ByteBufferOutputStream saslResponse = null;
1455       DataOutputStream out = null;
1456       try {
1457         // In my testing, have noticed that sasl messages are usually
1458         // in the ballpark of 100-200. That's why the initial capacity is 256.
1459         saslResponse = new ByteBufferOutputStream(256);
1460         out = new DataOutputStream(saslResponse);
1461         out.writeInt(status.state); // write status
1462         if (status == SaslStatus.SUCCESS) {
1463           rv.write(out);
1464         } else {
1465           WritableUtils.writeString(out, errorClass);
1466           WritableUtils.writeString(out, error);
1467         }
1468         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1469         saslCall.responder = responder;
1470         saslCall.sendResponseIfReady();
1471       } finally {
1472         if (saslResponse != null) {
1473           saslResponse.close();
1474         }
1475         if (out != null) {
1476           out.close();
1477         }
1478       }
1479     }
1480
1481     private void disposeSasl() {
1482       if (saslServer != null) {
1483         try {
1484           saslServer.dispose();
1485           saslServer = null;
1486         } catch (SaslException ignored) {
1487           // Ignored. This is being disposed of anyway.
1488         }
1489       }
1490     }
1491
1492     private int readPreamble() throws IOException {
1493       int count;
1494       // Check for 'HBas' magic.
1495       this.dataLengthBuffer.flip();
1496       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1497         return doBadPreambleHandling("Expected HEADER=" +
1498             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1499             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1500             " from " + toString());
1501       }
1502       // Now read the next two bytes, the version and the auth to use.
1503       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1504       count = channelRead(channel, versionAndAuthBytes);
1505       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1506         return count;
1507       }
1508       int version = versionAndAuthBytes.get(0);
1509       byte authbyte = versionAndAuthBytes.get(1);
1510       this.authMethod = AuthMethod.valueOf(authbyte);
1511       if (version != CURRENT_VERSION) {
1512         String msg = getFatalConnectionString(version, authbyte);
1513         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1514       }
1515       if (authMethod == null) {
1516         String msg = getFatalConnectionString(version, authbyte);
1517         return doBadPreambleHandling(msg, new BadAuthException(msg));
1518       }
1519       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1520         if (allowFallbackToSimpleAuth) {
1521           metrics.authenticationFallback();
1522           authenticatedWithFallback = true;
1523         } else {
1524           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1525           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1526           responder.doRespond(authFailedCall);
1527           throw ae;
1528         }
1529       }
1530       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1531         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1532             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1533         authMethod = AuthMethod.SIMPLE;
1534         // client has already sent the initial Sasl message and we
1535         // should ignore it. Both client and server should fall back
1536         // to simple auth from now on.
1537         skipInitialSaslHandshake = true;
1538       }
1539       if (authMethod != AuthMethod.SIMPLE) {
1540         useSasl = true;
1541       }
1542
1543       dataLengthBuffer.clear();
1544       connectionPreambleRead = true;
1545       return count;
1546     }
1547
1548     private int read4Bytes() throws IOException {
1549       if (this.dataLengthBuffer.remaining() > 0) {
1550         return channelRead(channel, this.dataLengthBuffer);
1551       } else {
1552         return 0;
1553       }
1554     }
1555
1556
1557     /**
1558      * Read off the wire. If there is not enough data to read, update the connection state with
1559      *  what we have and returns.
1560      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1561      * @throws IOException
1562      * @throws InterruptedException
1563      */
1564     public int readAndProcess() throws IOException, InterruptedException {
1565       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1566       // does, read in the rest of the connection preamble, the version and the auth method.
1567       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1568       // length into the 4-byte this.dataLengthBuffer.
1569       int count = read4Bytes();
1570       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1571         return count;
1572       }
1573
1574       // If we have not read the connection setup preamble, look to see if that is on the wire.
1575       if (!connectionPreambleRead) {
1576         count = readPreamble();
1577         if (!connectionPreambleRead) {
1578           return count;
1579         }
1580
1581         count = read4Bytes();
1582         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1583           return count;
1584         }
1585       }
1586
1587       // We have read a length and we have read the preamble.  It is either the connection header
1588       // or it is a request.
1589       if (data == null) {
1590         dataLengthBuffer.flip();
1591         int dataLength = dataLengthBuffer.getInt();
1592         if (dataLength == RpcClient.PING_CALL_ID) {
1593           if (!useWrap) { //covers the !useSasl too
1594             dataLengthBuffer.clear();
1595             return 0;  //ping message
1596           }
1597         }
1598         if (dataLength < 0) { // A data length of zero is legal.
1599           throw new DoNotRetryIOException("Unexpected data length "
1600               + dataLength + "!! from " + getHostAddress());
1601         }
1602
1603         if (dataLength > maxRequestSize) {
1604           throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
1605               + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
1606               + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
1607         }
1608
1609         data = ByteBuffer.allocate(dataLength);
1610
1611         // Increment the rpc count. This counter will be decreased when we write
1612         //  the response.  If we want the connection to be detected as idle properly, we
1613         //  need to keep the inc / dec correct.
1614         incRpcCount();
1615       }
1616
1617       count = channelRead(channel, data);
1618
1619       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1620         process();
1621       }
1622
1623       return count;
1624     }
1625
1626     /**
1627      * Process the data buffer and clean the connection state for the next call.
1628      */
1629     private void process() throws IOException, InterruptedException {
1630       data.flip();
1631       try {
1632         if (skipInitialSaslHandshake) {
1633           skipInitialSaslHandshake = false;
1634           return;
1635         }
1636
1637         if (useSasl) {
1638           saslReadAndProcess(data);
1639         } else {
1640           processOneRpc(data);
1641         }
1642
1643       } finally {
1644         dataLengthBuffer.clear(); // Clean for the next call
1645         data = null; // For the GC
1646       }
1647     }
1648
1649     private String getFatalConnectionString(final int version, final byte authByte) {
1650       return "serverVersion=" + CURRENT_VERSION +
1651       ", clientVersion=" + version + ", authMethod=" + authByte +
1652       ", authSupported=" + (authMethod != null) + " from " + toString();
1653     }
1654
1655     private int doBadPreambleHandling(final String msg) throws IOException {
1656       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1657     }
1658
1659     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1660       LOG.warn(msg);
1661       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
1662       setupResponse(null, fakeCall, e, msg);
1663       responder.doRespond(fakeCall);
1664       // Returning -1 closes out the connection.
1665       return -1;
1666     }
1667
1668     // Reads the connection header following version
1669     private void processConnectionHeader(ByteBuffer buf) throws IOException {
1670       this.connectionHeader = ConnectionHeader.parseFrom(
1671         new ByteBufferInputStream(buf));
1672       String serviceName = connectionHeader.getServiceName();
1673       if (serviceName == null) throw new EmptyServiceNameException();
1674       this.service = getService(services, serviceName);
1675       if (this.service == null) throw new UnknownServiceException(serviceName);
1676       setupCellBlockCodecs(this.connectionHeader);
1677       UserGroupInformation protocolUser = createUser(connectionHeader);
1678       if (!useSasl) {
1679         ugi = protocolUser;
1680         if (ugi != null) {
1681           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1682         }
1683         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1684         if (authenticatedWithFallback) {
1685           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1686               + " connecting from " + getHostAddress());
1687         }
1688         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1689       } else {
1690         // user is authenticated
1691         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1692         //Now we check if this is a proxy user case. If the protocol user is
1693         //different from the 'user', it is a proxy user scenario. However,
1694         //this is not allowed if user authenticated with DIGEST.
1695         if ((protocolUser != null)
1696             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1697           if (authMethod == AuthMethod.DIGEST) {
1698             // Not allowed to doAs if token authentication is used
1699             throw new AccessDeniedException("Authenticated user (" + ugi
1700                 + ") doesn't match what the client claims to be ("
1701                 + protocolUser + ")");
1702           } else {
1703             // Effective user can be different from authenticated user
1704             // for simple auth or kerberos auth
1705             // The user is the real user. Now we create a proxy user
1706             UserGroupInformation realUser = ugi;
1707             ugi = UserGroupInformation.createProxyUser(protocolUser
1708                 .getUserName(), realUser);
1709             // Now the user is a proxy user, set Authentication method Proxy.
1710             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1711           }
1712         }
1713       }
1714       if (connectionHeader.hasVersionInfo()) {
1715         // see if this connection will support RetryImmediatelyException
1716         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1717
1718         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1719             + " with version info: "
1720             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1721       } else {
1722         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1723             + " with unknown version info");
1724       }
1725
1726
1727     }
1728
1729     /**
1730      * Set up cell block codecs
1731      * @throws FatalConnectionException
1732      */
1733     private void setupCellBlockCodecs(final ConnectionHeader header)
1734     throws FatalConnectionException {
1735       // TODO: Plug in other supported decoders.
1736       if (!header.hasCellBlockCodecClass()) return;
1737       String className = header.getCellBlockCodecClass();
1738       if (className == null || className.length() == 0) return;
1739       try {
1740         this.codec = (Codec)Class.forName(className).newInstance();
1741       } catch (Exception e) {
1742         throw new UnsupportedCellCodecException(className, e);
1743       }
1744       if (!header.hasCellBlockCompressorClass()) return;
1745       className = header.getCellBlockCompressorClass();
1746       try {
1747         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1748       } catch (Exception e) {
1749         throw new UnsupportedCompressionCodecException(className, e);
1750       }
1751     }
1752
1753     private void processUnwrappedData(byte[] inBuf) throws IOException,
1754     InterruptedException {
1755       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1756       // Read all RPCs contained in the inBuf, even partial ones
1757       while (true) {
1758         int count;
1759         if (unwrappedDataLengthBuffer.remaining() > 0) {
1760           count = channelRead(ch, unwrappedDataLengthBuffer);
1761           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1762             return;
1763         }
1764
1765         if (unwrappedData == null) {
1766           unwrappedDataLengthBuffer.flip();
1767           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1768
1769           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1770             if (LOG.isDebugEnabled())
1771               LOG.debug("Received ping message");
1772             unwrappedDataLengthBuffer.clear();
1773             continue; // ping message
1774           }
1775           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1776         }
1777
1778         count = channelRead(ch, unwrappedData);
1779         if (count <= 0 || unwrappedData.remaining() > 0)
1780           return;
1781
1782         if (unwrappedData.remaining() == 0) {
1783           unwrappedDataLengthBuffer.clear();
1784           unwrappedData.flip();
1785           processOneRpc(unwrappedData);
1786           unwrappedData = null;
1787         }
1788       }
1789     }
1790
1791
1792     private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1793       if (connectionHeaderRead) {
1794         processRequest(buf);
1795       } else {
1796         processConnectionHeader(buf);
1797         this.connectionHeaderRead = true;
1798         if (!authorizeConnection()) {
1799           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1800           // down the connection instead of trying to read non-existent retun.
1801           throw new AccessDeniedException("Connection from " + this + " for service " +
1802             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1803         }
1804         this.user = userProvider.create(this.ugi);
1805       }
1806     }
1807
1808     /**
1809      * @param buf Has the request header and the request param and optionally encoded data buffer
1810      * all in this one array.
1811      * @throws IOException
1812      * @throws InterruptedException
1813      */
1814     protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1815       long totalRequestSize = buf.limit();
1816       int offset = 0;
1817       // Here we read in the header.  We avoid having pb
1818       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1819       CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1820       int headerSize = cis.readRawVarint32();
1821       offset = cis.getTotalBytesRead();
1822       Message.Builder builder = RequestHeader.newBuilder();
1823       ProtobufUtil.mergeFrom(builder, cis, headerSize);
1824       RequestHeader header = (RequestHeader) builder.build();
1825       offset += headerSize;
1826       int id = header.getCallId();
1827       if (LOG.isTraceEnabled()) {
1828         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1829           " totalRequestSize: " + totalRequestSize + " bytes");
1830       }
1831       // Enforcing the call queue size, this triggers a retry in the client
1832       // This is a bit late to be doing this check - we have already read in the total request.
1833       if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
1834         final Call callTooBig =
1835           new Call(id, this.service, null, null, null, null, this,
1836             responder, totalRequestSize, null, null, 0);
1837         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1838         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1839         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1840             "Call queue is full on " + server.getServerName() +
1841                 ", is hbase.ipc.server.max.callqueue.size too small?");
1842         responder.doRespond(callTooBig);
1843         return;
1844       }
1845       MethodDescriptor md = null;
1846       Message param = null;
1847       CellScanner cellScanner = null;
1848       try {
1849         if (header.hasRequestParam() && header.getRequestParam()) {
1850           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1851           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1852           builder = this.service.getRequestPrototype(md).newBuilderForType();
1853           cis.resetSizeCounter();
1854           int paramSize = cis.readRawVarint32();
1855           offset += cis.getTotalBytesRead();
1856           if (builder != null) {
1857             ProtobufUtil.mergeFrom(builder, cis, paramSize);
1858             param = builder.build();
1859           }
1860           offset += paramSize;
1861         }
1862         if (header.hasCellBlockMeta()) {
1863           buf.position(offset);
1864           cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
1865         }
1866       } catch (Throwable t) {
1867         InetSocketAddress address = getListenerAddress();
1868         String msg = (address != null ? address : "(channel closed)") +
1869             " is unable to read call parameter from client " + getHostAddress();
1870         LOG.warn(msg, t);
1871
1872         metrics.exception(t);
1873
1874         // probably the hbase hadoop version does not match the running hadoop version
1875         if (t instanceof LinkageError) {
1876           t = new DoNotRetryIOException(t);
1877         }
1878         // If the method is not present on the server, do not retry.
1879         if (t instanceof UnsupportedOperationException) {
1880           t = new DoNotRetryIOException(t);
1881         }
1882
1883         final Call readParamsFailedCall =
1884           new Call(id, this.service, null, null, null, null, this,
1885             responder, totalRequestSize, null, null, 0);
1886         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1887         setupResponse(responseBuffer, readParamsFailedCall, t,
1888           msg + "; " + t.getMessage());
1889         responder.doRespond(readParamsFailedCall);
1890         return;
1891       }
1892
1893       TraceInfo traceInfo = header.hasTraceInfo()
1894           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1895           : null;
1896       int timeout = 0;
1897       if (header.hasTimeout() && header.getTimeout() > 0){
1898         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
1899       }
1900       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1901               totalRequestSize, traceInfo, this.addr, timeout);
1902
1903       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1904         callQueueSizeInBytes.add(-1 * call.getSize());
1905
1906         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1907         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1908         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1909             "Call queue is full on " + server.getServerName() +
1910                 ", too many items queued ?");
1911         responder.doRespond(call);
1912       }
1913     }
1914
1915     private boolean authorizeConnection() throws IOException {
1916       try {
1917         // If auth method is DIGEST, the token was obtained by the
1918         // real user for the effective user, therefore not required to
1919         // authorize real user. doAs is allowed only for simple or kerberos
1920         // authentication
1921         if (ugi != null && ugi.getRealUser() != null
1922             && (authMethod != AuthMethod.DIGEST)) {
1923           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1924         }
1925         authorize(ugi, connectionHeader, getHostInetAddress());
1926         metrics.authorizationSuccess();
1927       } catch (AuthorizationException ae) {
1928         if (LOG.isDebugEnabled()) {
1929           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1930         }
1931         metrics.authorizationFailure();
1932         setupResponse(authFailedResponse, authFailedCall,
1933           new AccessDeniedException(ae), ae.getMessage());
1934         responder.doRespond(authFailedCall);
1935         return false;
1936       }
1937       return true;
1938     }
1939
1940     protected synchronized void close() {
1941       disposeSasl();
1942       data = null;
1943       if (!channel.isOpen())
1944         return;
1945       try {socket.shutdownOutput();} catch(Exception ignored) {
1946         if (LOG.isTraceEnabled()) {
1947           LOG.trace("Ignored exception", ignored);
1948         }
1949       }
1950       if (channel.isOpen()) {
1951         try {channel.close();} catch(Exception ignored) {}
1952       }
1953       try {
1954         socket.close();
1955       } catch(Exception ignored) {
1956         if (LOG.isTraceEnabled()) {
1957           LOG.trace("Ignored exception", ignored);
1958         }
1959       }
1960       rpcCount.destroy();
1961     }
1962
1963     private UserGroupInformation createUser(ConnectionHeader head) {
1964       UserGroupInformation ugi = null;
1965
1966       if (!head.hasUserInfo()) {
1967         return null;
1968       }
1969       UserInformation userInfoProto = head.getUserInfo();
1970       String effectiveUser = null;
1971       if (userInfoProto.hasEffectiveUser()) {
1972         effectiveUser = userInfoProto.getEffectiveUser();
1973       }
1974       String realUser = null;
1975       if (userInfoProto.hasRealUser()) {
1976         realUser = userInfoProto.getRealUser();
1977       }
1978       if (effectiveUser != null) {
1979         if (realUser != null) {
1980           UserGroupInformation realUserUgi =
1981               UserGroupInformation.createRemoteUser(realUser);
1982           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1983         } else {
1984           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1985         }
1986       }
1987       return ugi;
1988     }
1989   }
1990
1991   /**
1992    * Datastructure for passing a {@link BlockingService} and its associated class of
1993    * protobuf service interface.  For example, a server that fielded what is defined
1994    * in the client protobuf service would pass in an implementation of the client blocking service
1995    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1996    */
1997   public static class BlockingServiceAndInterface {
1998     private final BlockingService service;
1999     private final Class<?> serviceInterface;
2000     public BlockingServiceAndInterface(final BlockingService service,
2001         final Class<?> serviceInterface) {
2002       this.service = service;
2003       this.serviceInterface = serviceInterface;
2004     }
2005     public Class<?> getServiceInterface() {
2006       return this.serviceInterface;
2007     }
2008     public BlockingService getBlockingService() {
2009       return this.service;
2010     }
2011   }
2012
2013   /**
2014    * Constructs a server listening on the named port and address.
2015    * @param server hosting instance of {@link Server}. We will do authentications if an
2016    * instance else pass null for no authentication check.
2017    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2018    * @param services A list of services.
2019    * @param bindAddress Where to listen
2020    * @param conf
2021    * @param scheduler
2022    */
2023   public RpcServer(final Server server, final String name,
2024       final List<BlockingServiceAndInterface> services,
2025       final InetSocketAddress bindAddress, Configuration conf,
2026       RpcScheduler scheduler)
2027       throws IOException {
2028     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2029       this.reservoir = new ByteBufferPool(
2030           conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
2031           conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
2032               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2033                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2034     } else {
2035       reservoir = null;
2036     }
2037     this.server = server;
2038     this.services = services;
2039     this.bindAddress = bindAddress;
2040     this.conf = conf;
2041     this.socketSendBufferSize = 0;
2042     // See declaration above for documentation on what this size is.
2043     this.maxQueueSizeInBytes =
2044       this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2045     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2046     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2047       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2048     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2049     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2050     this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
2051         DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
2052     this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
2053
2054     // Start the listener here and let it bind to the port
2055     listener = new Listener(name);
2056     this.port = listener.getAddress().getPort();
2057
2058     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2059     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2060     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2061
2062     this.cellBlockBuilder = new CellBlockBuilder(conf);
2063
2064
2065     // Create the responder here
2066     responder = new Responder();
2067     connectionManager = new ConnectionManager();
2068     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2069     this.userProvider = UserProvider.instantiate(conf);
2070     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2071     if (isSecurityEnabled) {
2072       HBaseSaslRpcServer.init(conf);
2073     }
2074     initReconfigurable(conf);
2075
2076     this.scheduler = scheduler;
2077     this.scheduler.init(new RpcSchedulerContext(this));
2078   }
2079
2080   @Override
2081   public void onConfigurationChange(Configuration newConf) {
2082     initReconfigurable(newConf);
2083     if (scheduler instanceof ConfigurationObserver) {
2084       ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
2085     }
2086   }
2087
2088   private void initReconfigurable(Configuration confToLoad) {
2089     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2090     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2091       LOG.warn("********* WARNING! *********");
2092       LOG.warn("This server is configured to allow connections from INSECURE clients");
2093       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2094       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2095       LOG.warn("impersonation is possible!");
2096       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2097       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2098       LOG.warn("****************************");
2099     }
2100   }
2101
2102   /**
2103    * Subclasses of HBaseServer can override this to provide their own
2104    * Connection implementations.
2105    */
2106   protected Connection getConnection(SocketChannel channel, long time) {
2107     return new Connection(channel, time);
2108   }
2109
2110   /**
2111    * Setup response for the RPC Call.
2112    *
2113    * @param response buffer to serialize the response into
2114    * @param call {@link Call} to which we are setting up the response
2115    * @param error error message, if the call failed
2116    * @throws IOException
2117    */
2118   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2119   throws IOException {
2120     if (response != null) response.reset();
2121     call.setResponse(null, null, t, error);
2122   }
2123
2124   protected void closeConnection(Connection connection) {
2125     connectionManager.close(connection);
2126   }
2127
2128   Configuration getConf() {
2129     return conf;
2130   }
2131
2132   /** Sets the socket buffer size used for responding to RPCs.
2133    * @param size send size
2134    */
2135   @Override
2136   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2137
2138   @Override
2139   public boolean isStarted() {
2140     return this.started;
2141   }
2142
2143   /** Starts the service.  Must be called before any calls will be handled. */
2144   @Override
2145   public synchronized void start() {
2146     if (started) return;
2147     authTokenSecretMgr = createSecretManager();
2148     if (authTokenSecretMgr != null) {
2149       setSecretManager(authTokenSecretMgr);
2150       authTokenSecretMgr.start();
2151     }
2152     this.authManager = new ServiceAuthorizationManager();
2153     HBasePolicyProvider.init(conf, authManager);
2154     responder.start();
2155     listener.start();
2156     scheduler.start();
2157     started = true;
2158   }
2159
2160   @Override
2161   public synchronized void refreshAuthManager(PolicyProvider pp) {
2162     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2163     // it'll break if you go via static route.
2164     this.authManager.refresh(this.conf, pp);
2165   }
2166
2167   private AuthenticationTokenSecretManager createSecretManager() {
2168     if (!isSecurityEnabled) return null;
2169     if (server == null) return null;
2170     Configuration conf = server.getConfiguration();
2171     long keyUpdateInterval =
2172         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2173     long maxAge =
2174         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2175     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2176         server.getServerName().toString(), keyUpdateInterval, maxAge);
2177   }
2178
2179   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2180     return this.secretManager;
2181   }
2182
2183   @SuppressWarnings("unchecked")
2184   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2185     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2186   }
2187
2188   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2189       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2190       throws IOException {
2191     return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
2192   }
2193
2194   /**
2195    * This is a server side method, which is invoked over RPC. On success
2196    * the return response has protobuf response payload. On failure, the
2197    * exception name and the stack trace are returned in the protobuf response.
2198    */
2199   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2200       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
2201       long startTime, int timeout)
2202   throws IOException {
2203     try {
2204       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2205       // TODO: Review after we add in encoded data blocks.
2206       status.setRPCPacket(param);
2207       status.resume("Servicing call");
2208       //get an instance of the method arg type
2209       HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
2210       controller.setCallTimeout(timeout);
2211       Message result = service.callBlockingMethod(md, controller, param);
2212       long endTime = System.currentTimeMillis();
2213       int processingTime = (int) (endTime - startTime);
2214       int qTime = (int) (startTime - receiveTime);
2215       int totalTime = (int) (endTime - receiveTime);
2216       if (LOG.isTraceEnabled()) {
2217         LOG.trace(CurCall.get().toString() +
2218             ", response " + TextFormat.shortDebugString(result) +
2219             " queueTime: " + qTime +
2220             " processingTime: " + processingTime +
2221             " totalTime: " + totalTime);
2222       }
2223       long requestSize = param.getSerializedSize();
2224       long responseSize = result.getSerializedSize();
2225       metrics.dequeuedCall(qTime);
2226       metrics.processedCall(processingTime);
2227       metrics.totalCall(totalTime);
2228       metrics.receivedRequest(requestSize);
2229       metrics.sentResponse(responseSize);
2230       // log any RPC responses that are slower than the configured warn
2231       // response time or larger than configured warning size
2232       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2233       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2234       if (tooSlow || tooLarge) {
2235         // when tagging, we let TooLarge trump TooSmall to keep output simple
2236         // note that large responses will often also be slow.
2237         logResponse(param,
2238             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2239             (tooLarge ? "TooLarge" : "TooSlow"),
2240             status.getClient(), startTime, processingTime, qTime,
2241             responseSize);
2242       }
2243       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2244     } catch (Throwable e) {
2245       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2246       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2247       // need to pass it over the wire.
2248       if (e instanceof ServiceException) {
2249         if (e.getCause() == null) {
2250           LOG.debug("Caught a ServiceException with null cause", e);
2251         } else {
2252           e = e.getCause();
2253         }
2254       }
2255
2256       // increment the number of requests that were exceptions.
2257       metrics.exception(e);
2258
2259       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2260       if (e instanceof IOException) throw (IOException)e;
2261       LOG.error("Unexpected throwable object ", e);
2262       throw new IOException(e.getMessage(), e);
2263     }
2264   }
2265
2266   /**
2267    * Logs an RPC response to the LOG file, producing valid JSON objects for
2268    * client Operations.
2269    * @param param The parameters received in the call.
2270    * @param methodName The name of the method invoked
2271    * @param call The string representation of the call
2272    * @param tag  The tag that will be used to indicate this event in the log.
2273    * @param clientAddress   The address of the client who made this call.
2274    * @param startTime       The time that the call was initiated, in ms.
2275    * @param processingTime  The duration that the call took to run, in ms.
2276    * @param qTime           The duration that the call spent on the queue
2277    *                        prior to being initiated, in ms.
2278    * @param responseSize    The size in bytes of the response buffer.
2279    */
2280   void logResponse(Message param, String methodName, String call, String tag,
2281       String clientAddress, long startTime, int processingTime, int qTime,
2282       long responseSize)
2283           throws IOException {
2284     // base information that is reported regardless of type of call
2285     Map<String, Object> responseInfo = new HashMap<String, Object>();
2286     responseInfo.put("starttimems", startTime);
2287     responseInfo.put("processingtimems", processingTime);
2288     responseInfo.put("queuetimems", qTime);
2289     responseInfo.put("responsesize", responseSize);
2290     responseInfo.put("client", clientAddress);
2291     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2292     responseInfo.put("method", methodName);
2293     responseInfo.put("call", call);
2294     responseInfo.put("param", ProtobufUtil.getShortTextFormat(param));
2295     LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2296   }
2297
2298   /** Stops the service.  No new calls will be handled after this is called. */
2299   @Override
2300   public synchronized void stop() {
2301     LOG.info("Stopping server on " + port);
2302     running = false;
2303     if (authTokenSecretMgr != null) {
2304       authTokenSecretMgr.stop();
2305       authTokenSecretMgr = null;
2306     }
2307     listener.interrupt();
2308     listener.doStop();
2309     responder.interrupt();
2310     scheduler.stop();
2311     notifyAll();
2312   }
2313
2314   /** Wait for the server to be stopped.
2315    * Does not wait for all subthreads to finish.
2316    *  See {@link #stop()}.
2317    * @throws InterruptedException e
2318    */
2319   @Override
2320   public synchronized void join() throws InterruptedException {
2321     while (running) {
2322       wait();
2323     }
2324   }
2325
2326   /**
2327    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2328    * the listener channel is closed.
2329    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2330    * information cannot be determined
2331    */
2332   @Override
2333   public synchronized InetSocketAddress getListenerAddress() {
2334     if (listener == null) {
2335       return null;
2336     }
2337     return listener.getAddress();
2338   }
2339
2340   /**
2341    * Set the handler for calling out of RPC for error conditions.
2342    * @param handler the handler implementation
2343    */
2344   @Override
2345   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2346     this.errorHandler = handler;
2347   }
2348
2349   @Override
2350   public HBaseRPCErrorHandler getErrorHandler() {
2351     return this.errorHandler;
2352   }
2353
2354   /**
2355    * Returns the metrics instance for reporting RPC call statistics
2356    */
2357   @Override
2358   public MetricsHBaseServer getMetrics() {
2359     return metrics;
2360   }
2361
2362   @Override
2363   public void addCallSize(final long diff) {
2364     this.callQueueSizeInBytes.add(diff);
2365   }
2366
2367   /**
2368    * Authorize the incoming client connection.
2369    *
2370    * @param user client user
2371    * @param connection incoming connection
2372    * @param addr InetAddress of incoming connection
2373    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2374    *         when the client isn't authorized to talk the protocol
2375    */
2376   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2377       InetAddress addr)
2378   throws AuthorizationException {
2379     if (authorize) {
2380       Class<?> c = getServiceInterface(services, connection.getServiceName());
2381       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2382     }
2383   }
2384
2385   /**
2386    * When the read or write buffer size is larger than this limit, i/o will be
2387    * done in chunks of this size. Most RPC requests and responses would be
2388    * be smaller.
2389    */
2390   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2391
2392   /**
2393    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2394    * If the amount of data is large, it writes to channel in smaller chunks.
2395    * This is to avoid jdk from creating many direct buffers as the size of
2396    * buffer increases. This also minimizes extra copies in NIO layer
2397    * as a result of multiple write operations required to write a large
2398    * buffer.
2399    *
2400    * @param channel writable byte channel to write to
2401    * @param bufferChain Chain of buffers to write
2402    * @return number of bytes written
2403    * @throws java.io.IOException e
2404    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2405    */
2406   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2407   throws IOException {
2408     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2409     if (count > 0) this.metrics.sentBytes(count);
2410     return count;
2411   }
2412
2413   /**
2414    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2415    * If the amount of data is large, it writes to channel in smaller chunks.
2416    * This is to avoid jdk from creating many direct buffers as the size of
2417    * ByteBuffer increases. There should not be any performance degredation.
2418    *
2419    * @param channel writable byte channel to write on
2420    * @param buffer buffer to write
2421    * @return number of bytes written
2422    * @throws java.io.IOException e
2423    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2424    */
2425   protected int channelRead(ReadableByteChannel channel,
2426                                    ByteBuffer buffer) throws IOException {
2427
2428     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2429            channel.read(buffer) : channelIO(channel, null, buffer);
2430     if (count > 0) {
2431       metrics.receivedBytes(count);
2432     }
2433     return count;
2434   }
2435
2436   /**
2437    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2438    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2439    * one of readCh or writeCh should be non-null.
2440    *
2441    * @param readCh read channel
2442    * @param writeCh write channel
2443    * @param buf buffer to read or write into/out of
2444    * @return bytes written
2445    * @throws java.io.IOException e
2446    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2447    * @see #channelWrite(GatheringByteChannel, BufferChain)
2448    */
2449   private static int channelIO(ReadableByteChannel readCh,
2450                                WritableByteChannel writeCh,
2451                                ByteBuffer buf) throws IOException {
2452
2453     int originalLimit = buf.limit();
2454     int initialRemaining = buf.remaining();
2455     int ret = 0;
2456
2457     while (buf.remaining() > 0) {
2458       try {
2459         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2460         buf.limit(buf.position() + ioSize);
2461
2462         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2463
2464         if (ret < ioSize) {
2465           break;
2466         }
2467
2468       } finally {
2469         buf.limit(originalLimit);
2470       }
2471     }
2472
2473     int nBytes = initialRemaining - buf.remaining();
2474     return (nBytes > 0) ? nBytes : ret;
2475   }
2476
2477   /**
2478    * Needed for features such as delayed calls.  We need to be able to store the current call
2479    * so that we can complete it later or ask questions of what is supported by the current ongoing
2480    * call.
2481    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2482    */
2483   public static RpcCallContext getCurrentCall() {
2484     return CurCall.get();
2485   }
2486
2487   public static boolean isInRpcCallContext() {
2488     return CurCall.get() != null;
2489   }
2490
2491   /**
2492    * Returns the user credentials associated with the current RPC request or
2493    * <code>null</code> if no credentials were provided.
2494    * @return A User
2495    */
2496   public static User getRequestUser() {
2497     RpcCallContext ctx = getCurrentCall();
2498     return ctx == null? null: ctx.getRequestUser();
2499   }
2500
2501   /**
2502    * The number of open RPC conections
2503    * @return the number of open rpc connections
2504    */
2505   public int getNumOpenConnections() {
2506     return connectionManager.size();
2507   }
2508
2509   /**
2510    * Returns the username for any user associated with the current RPC
2511    * request or <code>null</code> if no user is set.
2512    */
2513   public static String getRequestUserName() {
2514     User user = getRequestUser();
2515     return user == null? null: user.getShortName();
2516   }
2517
2518   /**
2519    * @return Address of remote client if a request is ongoing, else null
2520    */
2521   public static InetAddress getRemoteAddress() {
2522     RpcCallContext ctx = getCurrentCall();
2523     return ctx == null? null: ctx.getRemoteAddress();
2524   }
2525
2526   /**
2527    * @param serviceName Some arbitrary string that represents a 'service'.
2528    * @param services Available service instances
2529    * @return Matching BlockingServiceAndInterface pair
2530    */
2531   static BlockingServiceAndInterface getServiceAndInterface(
2532       final List<BlockingServiceAndInterface> services, final String serviceName) {
2533     for (BlockingServiceAndInterface bs : services) {
2534       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2535         return bs;
2536       }
2537     }
2538     return null;
2539   }
2540
2541   /**
2542    * @param serviceName Some arbitrary string that represents a 'service'.
2543    * @param services Available services and their service interfaces.
2544    * @return Service interface class for <code>serviceName</code>
2545    */
2546   static Class<?> getServiceInterface(
2547       final List<BlockingServiceAndInterface> services,
2548       final String serviceName) {
2549     BlockingServiceAndInterface bsasi =
2550         getServiceAndInterface(services, serviceName);
2551     return bsasi == null? null: bsasi.getServiceInterface();
2552   }
2553
2554   /**
2555    * @param serviceName Some arbitrary string that represents a 'service'.
2556    * @param services Available services and their service interfaces.
2557    * @return BlockingService that goes with the passed <code>serviceName</code>
2558    */
2559   static BlockingService getService(
2560       final List<BlockingServiceAndInterface> services,
2561       final String serviceName) {
2562     BlockingServiceAndInterface bsasi =
2563         getServiceAndInterface(services, serviceName);
2564     return bsasi == null? null: bsasi.getBlockingService();
2565   }
2566
2567   static MonitoredRPCHandler getStatus() {
2568     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2569     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2570     if (status != null) {
2571       return status;
2572     }
2573     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2574     status.pause("Waiting for a call");
2575     RpcServer.MONITORED_RPC.set(status);
2576     return status;
2577   }
2578
2579   /** Returns the remote side ip address when invoked inside an RPC
2580    *  Returns null incase of an error.
2581    *  @return InetAddress
2582    */
2583   public static InetAddress getRemoteIp() {
2584     Call call = CurCall.get();
2585     if (call != null && call.connection != null && call.connection.socket != null) {
2586       return call.connection.socket.getInetAddress();
2587     }
2588     return null;
2589   }
2590
2591
2592   /**
2593    * A convenience method to bind to a given address and report
2594    * better exceptions if the address is not a valid host.
2595    * @param socket the socket to bind
2596    * @param address the address to bind to
2597    * @param backlog the number of connections allowed in the queue
2598    * @throws BindException if the address can't be bound
2599    * @throws UnknownHostException if the address isn't a valid host name
2600    * @throws IOException other random errors from bind
2601    */
2602   public static void bind(ServerSocket socket, InetSocketAddress address,
2603                           int backlog) throws IOException {
2604     try {
2605       socket.bind(address, backlog);
2606     } catch (BindException e) {
2607       BindException bindException =
2608         new BindException("Problem binding to " + address + " : " +
2609             e.getMessage());
2610       bindException.initCause(e);
2611       throw bindException;
2612     } catch (SocketException e) {
2613       // If they try to bind to a different host's address, give a better
2614       // error message.
2615       if ("Unresolved address".equals(e.getMessage())) {
2616         throw new UnknownHostException("Invalid hostname for server: " +
2617                                        address.getHostName());
2618       }
2619       throw e;
2620     }
2621   }
2622
2623   @Override
2624   public RpcScheduler getScheduler() {
2625     return scheduler;
2626   }
2627
2628   private class ConnectionManager {
2629     final private AtomicInteger count = new AtomicInteger();
2630     final private Set<Connection> connections;
2631
2632     final private Timer idleScanTimer;
2633     final private int idleScanThreshold;
2634     final private int idleScanInterval;
2635     final private int maxIdleTime;
2636     final private int maxIdleToClose;
2637
2638     ConnectionManager() {
2639       this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
2640       this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2641       this.idleScanInterval =
2642           conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
2643       this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
2644       this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
2645       int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2646           HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
2647       int maxConnectionQueueSize =
2648           handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
2649       // create a set with concurrency -and- a thread-safe iterator, add 2
2650       // for listener and idle closer threads
2651       this.connections = Collections.newSetFromMap(
2652           new ConcurrentHashMap<Connection,Boolean>(
2653               maxConnectionQueueSize, 0.75f, readThreads+2));
2654     }
2655
2656     private boolean add(Connection connection) {
2657       boolean added = connections.add(connection);
2658       if (added) {
2659         count.getAndIncrement();
2660       }
2661       return added;
2662     }
2663
2664     private boolean remove(Connection connection) {
2665       boolean removed = connections.remove(connection);
2666       if (removed) {
2667         count.getAndDecrement();
2668       }
2669       return removed;
2670     }
2671
2672     int size() {
2673       return count.get();
2674     }
2675
2676     Connection[] toArray() {
2677       return connections.toArray(new Connection[0]);
2678     }
2679
2680     Connection register(SocketChannel channel) {
2681       Connection connection = getConnection(channel, System.currentTimeMillis());
2682       add(connection);
2683       if (LOG.isDebugEnabled()) {
2684         LOG.debug("Server connection from " + connection +
2685             "; connections=" + size() +
2686             ", queued calls size (bytes)=" + callQueueSizeInBytes.get() +
2687             ", general queued calls=" + scheduler.getGeneralQueueLength() +
2688             ", priority queued calls=" + scheduler.getPriorityQueueLength());
2689       }
2690       return connection;
2691     }
2692
2693     boolean close(Connection connection) {
2694       boolean exists = remove(connection);
2695       if (exists) {
2696         if (LOG.isDebugEnabled()) {
2697           LOG.debug(Thread.currentThread().getName() +
2698               ": disconnecting client " + connection +
2699               ". Number of active connections: "+ size());
2700         }
2701         // only close if actually removed to avoid double-closing due
2702         // to possible races
2703         connection.close();
2704       }
2705       return exists;
2706     }
2707
2708     // synch'ed to avoid explicit invocation upon OOM from colliding with
2709     // timer task firing
2710     synchronized void closeIdle(boolean scanAll) {
2711       long minLastContact = System.currentTimeMillis() - maxIdleTime;
2712       // concurrent iterator might miss new connections added
2713       // during the iteration, but that's ok because they won't
2714       // be idle yet anyway and will be caught on next scan
2715       int closed = 0;
2716       for (Connection connection : connections) {
2717         // stop if connections dropped below threshold unless scanning all
2718         if (!scanAll && size() < idleScanThreshold) {
2719           break;
2720         }
2721         // stop if not scanning all and max connections are closed
2722         if (connection.isIdle() &&
2723             connection.getLastContact() < minLastContact &&
2724             close(connection) &&
2725             !scanAll && (++closed == maxIdleToClose)) {
2726           break;
2727         }
2728       }
2729     }
2730
2731     void closeAll() {
2732       // use a copy of the connections to be absolutely sure the concurrent
2733       // iterator doesn't miss a connection
2734       for (Connection connection : toArray()) {
2735         close(connection);
2736       }
2737     }
2738
2739     void startIdleScan() {
2740       scheduleIdleScanTask();
2741     }
2742
2743     void stopIdleScan() {
2744       idleScanTimer.cancel();
2745     }
2746
2747     private void scheduleIdleScanTask() {
2748       if (!running) {
2749         return;
2750       }
2751       TimerTask idleScanTask = new TimerTask(){
2752         @Override
2753         public void run() {
2754           if (!running) {
2755             return;
2756           }
2757           if (LOG.isDebugEnabled()) {
2758             LOG.debug(Thread.currentThread().getName()+": task running");
2759           }
2760           try {
2761             closeIdle(false);
2762           } finally {
2763             // explicitly reschedule so next execution occurs relative
2764             // to the end of this scan, not the beginning
2765             scheduleIdleScanTask();
2766           }
2767         }
2768       };
2769       idleScanTimer.schedule(idleScanTask, idleScanInterval);
2770     }
2771   }
2772 }