View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.locks.Lock;
61  import java.util.concurrent.locks.ReentrantLock;
62  
63  import javax.security.sasl.Sasl;
64  import javax.security.sasl.SaslException;
65  import javax.security.sasl.SaslServer;
66  
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  import org.apache.hadoop.conf.Configuration;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.CellScanner;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.HBaseIOException;
74  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
75  import org.apache.hadoop.hbase.HConstants;
76  import org.apache.hadoop.hbase.HRegionInfo;
77  import org.apache.hadoop.hbase.Server;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.classification.InterfaceAudience;
80  import org.apache.hadoop.hbase.classification.InterfaceStability;
81  import org.apache.hadoop.hbase.client.Operation;
82  import org.apache.hadoop.hbase.client.VersionInfoUtil;
83  import org.apache.hadoop.hbase.codec.Codec;
84  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
85  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
86  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
87  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
90  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
91  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
93  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
99  import org.apache.hadoop.hbase.regionserver.HRegionServer;
100 import org.apache.hadoop.hbase.security.AccessDeniedException;
101 import org.apache.hadoop.hbase.security.AuthMethod;
102 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
103 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
105 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
106 import org.apache.hadoop.hbase.security.SaslStatus;
107 import org.apache.hadoop.hbase.security.SaslUtil;
108 import org.apache.hadoop.hbase.security.User;
109 import org.apache.hadoop.hbase.security.UserProvider;
110 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
111 import org.apache.hadoop.hbase.util.Bytes;
112 import org.apache.hadoop.hbase.util.Counter;
113 import org.apache.hadoop.hbase.util.Pair;
114 import org.apache.hadoop.hbase.util.Threads;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.apache.htrace.TraceInfo;
131 import org.codehaus.jackson.map.ObjectMapper;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.CodedOutputStream;
137 import com.google.protobuf.Descriptors.MethodDescriptor;
138 import com.google.protobuf.Message;
139 import com.google.protobuf.ServiceException;
140 import com.google.protobuf.TextFormat;
141 
142 /**
143  * An RPC server that hosts protobuf described Services.
144  *
145  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
146  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
147  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
148  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
149  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
150  * and loops till done.
151  *
152  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
153  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
154  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
155  * and keep taking while the server is up.
156  *
157  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
158  * queue for Responder to pull from and return result to client.
159  *
160  * @see RpcClientImpl
161  */
162 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
163 @InterfaceStability.Evolving
164 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
165   // LOG is being used in CallRunner and the log level is being changed in tests
166   public static final Log LOG = LogFactory.getLog(RpcServer.class);
167   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
168       = new CallQueueTooBigException();
169 
170   private final boolean authorize;
171   private boolean isSecurityEnabled;
172 
173   public static final byte CURRENT_VERSION = 0;
174 
175   /**
176    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
177    */
178   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
179           "hbase.ipc.server.fallback-to-simple-auth-allowed";
180 
181   /**
182    * How many calls/handler are allowed in the queue.
183    */
184   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
185 
186   /**
187    * The maximum size that we can hold in the RPC queue
188    */
189   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
190 
191   private final IPCUtil ipcUtil;
192 
193   private static final String AUTH_FAILED_FOR = "Auth failed for ";
194   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
195   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
196     Server.class.getName());
197   protected SecretManager<TokenIdentifier> secretManager;
198   protected ServiceAuthorizationManager authManager;
199 
200   /** This is set to Call object before Handler invokes an RPC and ybdie
201    * after the call returns.
202    */
203   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
204 
205   /** Keeps MonitoredRPCHandler per handler thread. */
206   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
207       = new ThreadLocal<MonitoredRPCHandler>();
208 
209   protected final InetSocketAddress bindAddress;
210   protected int port;                             // port we listen on
211   protected InetSocketAddress address;            // inet address we listen on
212   private int readThreads;                        // number of read threads
213   protected int maxIdleTime;                      // the maximum idle time after
214                                                   // which a client may be
215                                                   // disconnected
216   protected int thresholdIdleConnections;         // the number of idle
217                                                   // connections after which we
218                                                   // will start cleaning up idle
219                                                   // connections
220   int maxConnectionsToNuke;                       // the max number of
221                                                   // connections to nuke
222                                                   // during a cleanup
223 
224   protected MetricsHBaseServer metrics;
225 
226   protected final Configuration conf;
227 
228   private int maxQueueSize;
229   protected int socketSendBufferSize;
230   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
231   protected final boolean tcpKeepAlive; // if T then use keepalives
232   protected final long purgeTimeout;    // in milliseconds
233 
234   /**
235    * This flag is used to indicate to sub threads when they should go down.  When we call
236    * {@link #start()}, all threads started will consult this flag on whether they should
237    * keep going.  It is set to false when {@link #stop()} is called.
238    */
239   volatile boolean running = true;
240 
241   /**
242    * This flag is set to true after all threads are up and 'running' and the server is then opened
243    * for business by the call to {@link #start()}.
244    */
245   volatile boolean started = false;
246 
247   /**
248    * This is a running count of the size of all outstanding calls by size.
249    */
250   protected final Counter callQueueSize = new Counter();
251 
252   protected final List<Connection> connectionList =
253     Collections.synchronizedList(new LinkedList<Connection>());
254   //maintain a list
255   //of client connections
256   private Listener listener = null;
257   protected Responder responder = null;
258   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
259   protected int numConnections = 0;
260 
261   protected HBaseRPCErrorHandler errorHandler = null;
262 
263   static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
264   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
265   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
266 
267   /**
268    * Minimum allowable timeout (in milliseconds) in rpc request's header. This
269    * configuration exists to prevent the rpc service regarding this request as timeout immediately.
270    */
271   private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
272   private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
273 
274   /** Default value for above params */
275   private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
276   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
277   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
278
279   private static final ObjectMapper MAPPER = new ObjectMapper();
280
281   private final int maxRequestSize;
282   private final int warnResponseTime;
283   private final int warnResponseSize;
284
285   private final int minClientRequestTimeout;
286
287   private final Server server;
288   private final List<BlockingServiceAndInterface> services;
289
290   private final RpcScheduler scheduler;
291
292   private UserProvider userProvider;
293
294   private final BoundedByteBufferPool reservoir;
295
296   private volatile boolean allowFallbackToSimpleAuth;
297
298   /**
299    * Datastructure that holds all necessary to a method invocation and then afterward, carries
300    * the result.
301    */
302   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
303   @InterfaceStability.Evolving
304   public class Call implements RpcCallContext {
305     protected int id;                             // the client's call id
306     protected BlockingService service;
307     protected MethodDescriptor md;
308     protected RequestHeader header;
309     protected Message param;                      // the parameter passed
310     // Optional cell data passed outside of protobufs.
311     protected CellScanner cellScanner;
312     protected Connection connection;              // connection to client
313     protected long timestamp;      // the time received when response is null
314                                    // the time served when response is not null
315     protected int timeout;
316     /**
317      * Chain of buffers to send as response.
318      */
319     protected BufferChain response;
320     protected Responder responder;
321
322     protected long size;                          // size of current call
323     protected boolean isError;
324     protected TraceInfo tinfo;
325     private ByteBuffer cellBlock = null;
326
327     private User user;
328     private InetAddress remoteAddress;
329     private RpcCallback callback;
330
331     private long responseCellSize = 0;
332     private long responseBlockSize = 0;
333     private boolean retryImmediatelySupported;
334
335     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
336         justification="Can't figure why this complaint is happening... see below")
337     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
338          Message param, CellScanner cellScanner, Connection connection, Responder responder,
339          long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
340       this.id = id;
341       this.service = service;
342       this.md = md;
343       this.header = header;
344       this.param = param;
345       this.cellScanner = cellScanner;
346       this.connection = connection;
347       this.timestamp = System.currentTimeMillis();
348       this.response = null;
349       this.responder = responder;
350       this.isError = false;
351       this.size = size;
352       this.tinfo = tinfo;
353       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
354       this.remoteAddress = remoteAddress;
355       this.retryImmediatelySupported =
356           connection == null? null: connection.retryImmediatelySupported;
357       this.timeout = timeout;
358     }
359
360     /**
361      * Call is done. Execution happened and we returned results to client. It is now safe to
362      * cleanup.
363      */
364     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
365         justification="Presume the lock on processing request held by caller is protection enough")
366     void done() {
367       if (this.cellBlock != null && reservoir != null) {
368         // Return buffer to reservoir now we are done with it.
369         reservoir.putBuffer(this.cellBlock);
370         this.cellBlock = null;
371       }
372       this.connection.decRpcCount();  // Say that we're done with this call.
373     }
374
375     @Override
376     public String toString() {
377       return toShortString() + " param: " +
378         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
379         " connection: " + connection.toString();
380     }
381 
382     protected RequestHeader getHeader() {
383       return this.header;
384     }
385
386     public boolean hasPriority() {
387       return this.header.hasPriority();
388     }
389
390     public int getPriority() {
391       return this.header.getPriority();
392     }
393
394     /*
395      * Short string representation without param info because param itself could be huge depends on
396      * the payload of a command
397      */
398     String toShortString() {
399       String serviceName = this.connection.service != null ?
400           this.connection.service.getDescriptorForType().getName() : "null";
401       return "callId: " + this.id + " service: " + serviceName +
402           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
403           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
404           " connection: " + connection.toString();
405     }
406
407     String toTraceString() {
408       String serviceName = this.connection.service != null ?
409                            this.connection.service.getDescriptorForType().getName() : "";
410       String methodName = (this.md != null) ? this.md.getName() : "";
411       return serviceName + "." + methodName;
412     }
413
414     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
415       this.response = new BufferChain(response);
416     }
417
418     protected synchronized void setResponse(Object m, final CellScanner cells,
419         Throwable t, String errorMsg) {
420       if (this.isError) return;
421       if (t != null) this.isError = true;
422       BufferChain bc = null;
423       try {
424         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
425         // Presume it a pb Message.  Could be null.
426         Message result = (Message)m;
427         // Call id.
428         headerBuilder.setCallId(this.id);
429         if (t != null) {
430           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
431           exceptionBuilder.setExceptionClassName(t.getClass().getName());
432           exceptionBuilder.setStackTrace(errorMsg);
433           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
434           if (t instanceof RegionMovedException) {
435             // Special casing for this exception.  This is only one carrying a payload.
436             // Do this instead of build a generic system for allowing exceptions carry
437             // any kind of payload.
438             RegionMovedException rme = (RegionMovedException)t;
439             exceptionBuilder.setHostname(rme.getHostname());
440             exceptionBuilder.setPort(rme.getPort());
441           }
442           // Set the exception as the result of the method invocation.
443           headerBuilder.setException(exceptionBuilder.build());
444         }
445         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
446         // reservoir when finished. This is hacky and the hack is not contained but benefits are
447         // high when we can avoid a big buffer allocation on each rpc.
448         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
449           this.connection.compressionCodec, cells, reservoir);
450         if (this.cellBlock != null) {
451           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
452           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
453           cellBlockBuilder.setLength(this.cellBlock.limit());
454           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
455         }
456         Message header = headerBuilder.build();
457
458         byte[] b = createHeaderAndMessageBytes(result, header);
459
460         bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
461
462         if (connection.useWrap) {
463           bc = wrapWithSasl(bc);
464         }
465       } catch (IOException e) {
466         LOG.warn("Exception while creating response " + e);
467       }
468       this.response = bc;
469       // Once a response message is created and set to this.response, this Call can be treated as
470       // done. The Responder thread will do the n/w write of this message back to client.
471       if (this.callback != null) {
472         try {
473           this.callback.run();
474         } catch (Exception e) {
475           // Don't allow any exception here to kill this handler thread.
476           LOG.warn("Exception while running the Rpc Callback.", e);
477         }
478       }
479     }
480
481     private byte[] createHeaderAndMessageBytes(Message result, Message header)
482         throws IOException {
483       // Organize the response as a set of bytebuffers rather than collect it all together inside
484       // one big byte array; save on allocations.
485       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
486           resultVintSize = 0;
487       if (header != null) {
488         headerSerializedSize = header.getSerializedSize();
489         headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
490       }
491       if (result != null) {
492         resultSerializedSize = result.getSerializedSize();
493         resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
494       }
495       // calculate the total size
496       int totalSize = headerSerializedSize + headerVintSize
497           + (resultSerializedSize + resultVintSize)
498           + (this.cellBlock == null ? 0 : this.cellBlock.limit());
499       // The byte[] should also hold the totalSize of the header, message and the cellblock
500       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
501           + resultVintSize + Bytes.SIZEOF_INT];
502       // The RpcClient expects the int to be in a format that code be decoded by
503       // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
504       // form of writing int.
505       Bytes.putInt(b, 0, totalSize);
506       CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
507           b.length - Bytes.SIZEOF_INT);
508       if (header != null) {
509         cos.writeMessageNoTag(header);
510       }
511       if (result != null) {
512         cos.writeMessageNoTag(result);
513       }
514       cos.flush();
515       cos.checkNoSpaceLeft();
516       return b;
517     }
518
519     private BufferChain wrapWithSasl(BufferChain bc)
520         throws IOException {
521       if (!this.connection.useSasl) return bc;
522       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
523       // THIS IS A BIG UGLY COPY.
524       byte [] responseBytes = bc.getBytes();
525       byte [] token;
526       // synchronization may be needed since there can be multiple Handler
527       // threads using saslServer to wrap responses.
528       synchronized (connection.saslServer) {
529         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
530       }
531       if (LOG.isTraceEnabled()) {
532         LOG.trace("Adding saslServer wrapped token of size " + token.length
533             + " as call response.");
534       }
535
536       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
537       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
538       return new BufferChain(bbTokenLength, bbTokenBytes);
539     }
540
541     @Override
542     public boolean isClientCellBlockSupported() {
543       return this.connection != null && this.connection.codec != null;
544     }
545
546     @Override
547     public long disconnectSince() {
548       if (!connection.channel.isOpen()) {
549         return System.currentTimeMillis() - timestamp;
550       } else {
551         return -1L;
552       }
553     }
554
555     public long getSize() {
556       return this.size;
557     }
558
559     @Override
560     public long getResponseCellSize() {
561       return responseCellSize;
562     }
563
564     @Override
565     public void incrementResponseCellSize(long cellSize) {
566       responseCellSize += cellSize;
567     }
568
569     @Override
570     public long getResponseBlockSize() {
571       return responseBlockSize;
572     }
573
574     @Override
575     public void incrementResponseBlockSize(long blockSize) {
576       responseBlockSize += blockSize;
577     }
578
579     public synchronized void sendResponseIfReady() throws IOException {
580       this.responder.doRespond(this);
581     }
582
583     public UserGroupInformation getRemoteUser() {
584       return connection.ugi;
585     }
586
587     @Override
588     public User getRequestUser() {
589       return user;
590     }
591
592     @Override
593     public String getRequestUserName() {
594       User user = getRequestUser();
595       return user == null? null: user.getShortName();
596     }
597
598     @Override
599     public InetAddress getRemoteAddress() {
600       return remoteAddress;
601     }
602
603     @Override
604     public VersionInfo getClientVersionInfo() {
605       return connection.getVersionInfo();
606     }
607
608     @Override
609     public synchronized void setCallBack(RpcCallback callback) {
610       this.callback = callback;
611     }
612
613     @Override
614     public boolean isRetryImmediatelySupported() {
615       return retryImmediatelySupported;
616     }
617   }
618
619   /** Listens on the socket. Creates jobs for the handler threads*/
620   private class Listener extends Thread {
621
622     private ServerSocketChannel acceptChannel = null; //the accept channel
623     private Selector selector = null; //the selector that we use for the server
624     private Reader[] readers = null;
625     private int currentReader = 0;
626     private Random rand = new Random();
627     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
628                                          //-tion (for idle connections) ran
629     private long cleanupInterval = 10000; //the minimum interval between
630                                           //two cleanup runs
631     private int backlogLength;
632
633     private ExecutorService readPool;
634
635     public Listener(final String name) throws IOException {
636       super(name);
637       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
638       // Create a new server socket and set to non blocking mode
639       acceptChannel = ServerSocketChannel.open();
640       acceptChannel.configureBlocking(false);
641
642       // Bind the server socket to the binding addrees (can be different from the default interface)
643       bind(acceptChannel.socket(), bindAddress, backlogLength);
644       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
645       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
646       // create a selector;
647       selector= Selector.open();
648
649       readers = new Reader[readThreads];
650       readPool = Executors.newFixedThreadPool(readThreads,
651         new ThreadFactoryBuilder().setNameFormat(
652           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
653           ",port=" + port).setDaemon(true)
654         .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
655       for (int i = 0; i < readThreads; ++i) {
656         Reader reader = new Reader();
657         readers[i] = reader;
658         readPool.execute(reader);
659       }
660       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
661
662       // Register accepts on the server socket with the selector.
663       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
664       this.setName("RpcServer.listener,port=" + port);
665       this.setDaemon(true);
666     }
667
668
669     private class Reader implements Runnable {
670       private volatile boolean adding = false;
671       private final Selector readSelector;
672
673       Reader() throws IOException {
674         this.readSelector = Selector.open();
675       }
676       @Override
677       public void run() {
678         try {
679           doRunLoop();
680         } finally {
681           try {
682             readSelector.close();
683           } catch (IOException ioe) {
684             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
685           }
686         }
687       }
688
689       private synchronized void doRunLoop() {
690         while (running) {
691           try {
692             readSelector.select();
693             while (adding) {
694               this.wait(1000);
695             }
696
697             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
698             while (iter.hasNext()) {
699               SelectionKey key = iter.next();
700               iter.remove();
701               if (key.isValid()) {
702                 if (key.isReadable()) {
703                   doRead(key);
704                 }
705               }
706             }
707           } catch (InterruptedException e) {
708             LOG.debug("Interrupted while sleeping");
709             return;
710           } catch (IOException ex) {
711             LOG.info(getName() + ": IOException in Reader", ex);
712           }
713         }
714       }
715 
716       /**
717        * This gets reader into the state that waits for the new channel
718        * to be registered with readSelector. If it was waiting in select()
719        * the thread will be woken up, otherwise whenever select() is called
720        * it will return even if there is nothing to read and wait
721        * in while(adding) for finishAdd call
722        */
723       public void startAdd() {
724         adding = true;
725         readSelector.wakeup();
726       }
727
728       public synchronized SelectionKey registerChannel(SocketChannel channel)
729         throws IOException {
730         return channel.register(readSelector, SelectionKey.OP_READ);
731       }
732
733       public synchronized void finishAdd() {
734         adding = false;
735         this.notify();
736       }
737     }
738
739     /** cleanup connections from connectionList. Choose a random range
740      * to scan and also have a limit on the number of the connections
741      * that will be cleanedup per run. The criteria for cleanup is the time
742      * for which the connection was idle. If 'force' is true then all
743      * connections will be looked at for the cleanup.
744      * @param force all connections will be looked at for cleanup
745      */
746     private void cleanupConnections(boolean force) {
747       if (force || numConnections > thresholdIdleConnections) {
748         long currentTime = System.currentTimeMillis();
749         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
750           return;
751         }
752         int start = 0;
753         int end = numConnections - 1;
754         if (!force) {
755           start = rand.nextInt() % numConnections;
756           end = rand.nextInt() % numConnections;
757           int temp;
758           if (end < start) {
759             temp = start;
760             start = end;
761             end = temp;
762           }
763         }
764         int i = start;
765         int numNuked = 0;
766         while (i <= end) {
767           Connection c;
768           synchronized (connectionList) {
769             try {
770               c = connectionList.get(i);
771             } catch (Exception e) {return;}
772           }
773           if (c.timedOut(currentTime)) {
774             if (LOG.isDebugEnabled())
775               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
776             closeConnection(c);
777             numNuked++;
778             end--;
779             //noinspection UnusedAssignment
780             c = null;
781             if (!force && numNuked == maxConnectionsToNuke) break;
782           }
783           else i++;
784         }
785         lastCleanupRunTime = System.currentTimeMillis();
786       }
787     }
788
789     @Override
790     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
791       justification="selector access is not synchronized; seems fine but concerned changing " +
792         "it will have per impact")
793     public void run() {
794       LOG.info(getName() + ": starting");
795       while (running) {
796         SelectionKey key = null;
797         try {
798           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
799           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
800           while (iter.hasNext()) {
801             key = iter.next();
802             iter.remove();
803             try {
804               if (key.isValid()) {
805                 if (key.isAcceptable())
806                   doAccept(key);
807               }
808             } catch (IOException ignored) {
809               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
810             }
811             key = null;
812           }
813         } catch (OutOfMemoryError e) {
814           if (errorHandler != null) {
815             if (errorHandler.checkOOME(e)) {
816               LOG.info(getName() + ": exiting on OutOfMemoryError");
817               closeCurrentConnection(key, e);
818               cleanupConnections(true);
819               return;
820             }
821           } else {
822             // we can run out of memory if we have too many threads
823             // log the event and sleep for a minute and give
824             // some thread(s) a chance to finish
825             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
826             closeCurrentConnection(key, e);
827             cleanupConnections(true);
828             try {
829               Thread.sleep(60000);
830             } catch (InterruptedException ex) {
831               LOG.debug("Interrupted while sleeping");
832               return;
833             }
834           }
835         } catch (Exception e) {
836           closeCurrentConnection(key, e);
837         }
838         cleanupConnections(false);
839       }
840
841       LOG.info(getName() + ": stopping");
842
843       synchronized (this) {
844         try {
845           acceptChannel.close();
846           selector.close();
847         } catch (IOException ignored) {
848           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
849         }
850
851         selector= null;
852         acceptChannel= null;
853
854         // clean up all connections
855         while (!connectionList.isEmpty()) {
856           closeConnection(connectionList.remove(0));
857         }
858       }
859     }
860
861     private void closeCurrentConnection(SelectionKey key, Throwable e) {
862       if (key != null) {
863         Connection c = (Connection)key.attachment();
864         if (c != null) {
865           if (LOG.isDebugEnabled()) {
866             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
867                 (e != null ? " on error " + e.getMessage() : ""));
868           }
869           closeConnection(c);
870           key.attach(null);
871         }
872       }
873     }
874
875     InetSocketAddress getAddress() {
876       return address;
877     }
878
879     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
880       Connection c;
881       ServerSocketChannel server = (ServerSocketChannel) key.channel();
882
883       SocketChannel channel;
884       while ((channel = server.accept()) != null) {
885         try {
886           channel.configureBlocking(false);
887           channel.socket().setTcpNoDelay(tcpNoDelay);
888           channel.socket().setKeepAlive(tcpKeepAlive);
889         } catch (IOException ioe) {
890           channel.close();
891           throw ioe;
892         }
893
894         Reader reader = getReader();
895         try {
896           reader.startAdd();
897           SelectionKey readKey = reader.registerChannel(channel);
898           c = getConnection(channel, System.currentTimeMillis());
899           readKey.attach(c);
900           synchronized (connectionList) {
901             connectionList.add(numConnections, c);
902             numConnections++;
903           }
904           if (LOG.isDebugEnabled())
905             LOG.debug(getName() + ": connection from " + c.toString() +
906                 "; # active connections: " + numConnections);
907         } finally {
908           reader.finishAdd();
909         }
910       }
911     }
912
913     void doRead(SelectionKey key) throws InterruptedException {
914       int count;
915       Connection c = (Connection) key.attachment();
916       if (c == null) {
917         return;
918       }
919       c.setLastContact(System.currentTimeMillis());
920       try {
921         count = c.readAndProcess();
922
923         if (count > 0) {
924           c.setLastContact(System.currentTimeMillis());
925         }
926
927       } catch (InterruptedException ieo) {
928         throw ieo;
929       } catch (Exception e) {
930         if (LOG.isDebugEnabled()) {
931           LOG.debug(getName() + ": Caught exception while reading:", e);
932         }
933         count = -1; //so that the (count < 0) block is executed
934       }
935       if (count < 0) {
936         if (LOG.isDebugEnabled()) {
937           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
938               " because read count=" + count +
939               ". Number of active connections: " + numConnections);
940         }
941         closeConnection(c);
942       }
943     }
944
945     synchronized void doStop() {
946       if (selector != null) {
947         selector.wakeup();
948         Thread.yield();
949       }
950       if (acceptChannel != null) {
951         try {
952           acceptChannel.socket().close();
953         } catch (IOException e) {
954           LOG.info(getName() + ": exception in closing listener socket. " + e);
955         }
956       }
957       readPool.shutdownNow();
958     }
959
960     // The method that will return the next reader to work with
961     // Simplistic implementation of round robin for now
962     Reader getReader() {
963       currentReader = (currentReader + 1) % readers.length;
964       return readers[currentReader];
965     }
966   }
967
968   // Sends responses of RPC back to clients.
969   protected class Responder extends Thread {
970     private final Selector writeSelector;
971     private final Set<Connection> writingCons =
972         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
973
974     Responder() throws IOException {
975       this.setName("RpcServer.responder");
976       this.setDaemon(true);
977       this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
978       writeSelector = Selector.open(); // create a selector
979     }
980
981     @Override
982     public void run() {
983       LOG.info(getName() + ": starting");
984       try {
985         doRunLoop();
986       } finally {
987         LOG.info(getName() + ": stopping");
988         try {
989           writeSelector.close();
990         } catch (IOException ioe) {
991           LOG.error(getName() + ": couldn't close write selector", ioe);
992         }
993       }
994     }
995
996     /**
997      * Take the list of the connections that want to write, and register them
998      * in the selector.
999      */
1000     private void registerWrites() {
1001       Iterator<Connection> it = writingCons.iterator();
1002       while (it.hasNext()) {
1003         Connection c = it.next();
1004         it.remove();
1005         SelectionKey sk = c.channel.keyFor(writeSelector);
1006         try {
1007           if (sk == null) {
1008             try {
1009               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
1010             } catch (ClosedChannelException e) {
1011               // ignore: the client went away.
1012               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1013             }
1014           } else {
1015             sk.interestOps(SelectionKey.OP_WRITE);
1016           }
1017         } catch (CancelledKeyException e) {
1018           // ignore: the client went away.
1019           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1020         }
1021       }
1022     }
1023
1024     /**
1025      * Add a connection to the list that want to write,
1026      */
1027     public void registerForWrite(Connection c) {
1028       if (writingCons.add(c)) {
1029         writeSelector.wakeup();
1030       }
1031     }
1032
1033     private void doRunLoop() {
1034       long lastPurgeTime = 0;   // last check for old calls.
1035       while (running) {
1036         try {
1037           registerWrites();
1038           int keyCt = writeSelector.select(purgeTimeout);
1039           if (keyCt == 0) {
1040             continue;
1041           }
1042
1043           Set<SelectionKey> keys = writeSelector.selectedKeys();
1044           Iterator<SelectionKey> iter = keys.iterator();
1045           while (iter.hasNext()) {
1046             SelectionKey key = iter.next();
1047             iter.remove();
1048             try {
1049               if (key.isValid() && key.isWritable()) {
1050                 doAsyncWrite(key);
1051               }
1052             } catch (IOException e) {
1053               LOG.debug(getName() + ": asyncWrite", e);
1054             }
1055           }
1056
1057           lastPurgeTime = purge(lastPurgeTime);
1058
1059         } catch (OutOfMemoryError e) {
1060           if (errorHandler != null) {
1061             if (errorHandler.checkOOME(e)) {
1062               LOG.info(getName() + ": exiting on OutOfMemoryError");
1063               return;
1064             }
1065           } else {
1066             //
1067             // we can run out of memory if we have too many threads
1068             // log the event and sleep for a minute and give
1069             // some thread(s) a chance to finish
1070             //
1071             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1072             try {
1073               Thread.sleep(60000);
1074             } catch (InterruptedException ex) {
1075               LOG.debug("Interrupted while sleeping");
1076               return;
1077             }
1078           }
1079         } catch (Exception e) {
1080           LOG.warn(getName() + ": exception in Responder " +
1081               StringUtils.stringifyException(e), e);
1082         }
1083       }
1084       LOG.info(getName() + ": stopped");
1085     }
1086
1087     /**
1088      * If there were some calls that have not been sent out for a
1089      * long time, we close the connection.
1090      * @return the time of the purge.
1091      */
1092     private long purge(long lastPurgeTime) {
1093       long now = System.currentTimeMillis();
1094       if (now < lastPurgeTime + purgeTimeout) {
1095         return lastPurgeTime;
1096       }
1097
1098       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1099       // get the list of channels from list of keys.
1100       synchronized (writeSelector.keys()) {
1101         for (SelectionKey key : writeSelector.keys()) {
1102           Connection connection = (Connection) key.attachment();
1103           if (connection == null) {
1104             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1105           }
1106           Call call = connection.responseQueue.peekFirst();
1107           if (call != null && now > call.timestamp + purgeTimeout) {
1108             conWithOldCalls.add(call.connection);
1109           }
1110         }
1111       }
1112
1113       // Seems safer to close the connection outside of the synchronized loop...
1114       for (Connection connection : conWithOldCalls) {
1115         closeConnection(connection);
1116       }
1117 
1118       return now;
1119     }
1120
1121     private void doAsyncWrite(SelectionKey key) throws IOException {
1122       Connection connection = (Connection) key.attachment();
1123       if (connection == null) {
1124         throw new IOException("doAsyncWrite: no connection");
1125       }
1126       if (key.channel() != connection.channel) {
1127         throw new IOException("doAsyncWrite: bad channel");
1128       }
1129
1130       if (processAllResponses(connection)) {
1131         try {
1132           // We wrote everything, so we don't need to be told when the socket is ready for
1133           //  write anymore.
1134          key.interestOps(0);
1135         } catch (CancelledKeyException e) {
1136           /* The Listener/reader might have closed the socket.
1137            * We don't explicitly cancel the key, so not sure if this will
1138            * ever fire.
1139            * This warning could be removed.
1140            */
1141           LOG.warn("Exception while changing ops : " + e);
1142         }
1143       }
1144     }
1145
1146     /**
1147      * Process the response for this call. You need to have the lock on
1148      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1149      *
1150      * @param call the call
1151      * @return true if we proceed the call fully, false otherwise.
1152      * @throws IOException
1153      */
1154     private boolean processResponse(final Call call) throws IOException {
1155       boolean error = true;
1156       try {
1157         // Send as much data as we can in the non-blocking fashion
1158         long numBytes = channelWrite(call.connection.channel, call.response);
1159         if (numBytes < 0) {
1160           throw new HBaseIOException("Error writing on the socket " +
1161             "for the call:" + call.toShortString());
1162         }
1163         error = false;
1164       } finally {
1165         if (error) {
1166           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1167           closeConnection(call.connection);
1168         }
1169       }
1170
1171       if (!call.response.hasRemaining()) {
1172         call.done();
1173         return true;
1174       } else {
1175         return false; // Socket can't take more, we will have to come back.
1176       }
1177     }
1178
1179     /**
1180      * Process all the responses for this connection
1181      *
1182      * @return true if all the calls were processed or that someone else is doing it.
1183      * false if there * is still some work to do. In this case, we expect the caller to
1184      * delay us.
1185      * @throws IOException
1186      */
1187     private boolean processAllResponses(final Connection connection) throws IOException {
1188       // We want only one writer on the channel for a connection at a time.
1189       connection.responseWriteLock.lock();
1190       try {
1191         for (int i = 0; i < 20; i++) {
1192           // protection if some handlers manage to need all the responder
1193           Call call = connection.responseQueue.pollFirst();
1194           if (call == null) {
1195             return true;
1196           }
1197           if (!processResponse(call)) {
1198             connection.responseQueue.addFirst(call);
1199             return false;
1200           }
1201         }
1202       } finally {
1203         connection.responseWriteLock.unlock();
1204       }
1205
1206       return connection.responseQueue.isEmpty();
1207     }
1208
1209     //
1210     // Enqueue a response from the application.
1211     //
1212     void doRespond(Call call) throws IOException {
1213       boolean added = false;
1214
1215       // If there is already a write in progress, we don't wait. This allows to free the handlers
1216       //  immediately for other tasks.
1217       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1218         try {
1219           if (call.connection.responseQueue.isEmpty()) {
1220             // If we're alone, we can try to do a direct call to the socket. It's
1221             //  an optimisation to save on context switches and data transfer between cores..
1222             if (processResponse(call)) {
1223               return; // we're done.
1224             }
1225             // Too big to fit, putting ahead.
1226             call.connection.responseQueue.addFirst(call);
1227             added = true; // We will register to the selector later, outside of the lock.
1228           }
1229         } finally {
1230           call.connection.responseWriteLock.unlock();
1231         }
1232       }
1233
1234       if (!added) {
1235         call.connection.responseQueue.addLast(call);
1236       }
1237       call.responder.registerForWrite(call.connection);
1238
1239       // set the serve time when the response has to be sent later
1240       call.timestamp = System.currentTimeMillis();
1241     }
1242   }
1243
1244   /** Reads calls from a connection and queues them for handling. */
1245   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1246       value="VO_VOLATILE_INCREMENT",
1247       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1248   public class Connection {
1249     // If initial preamble with version and magic has been read or not.
1250     private boolean connectionPreambleRead = false;
1251     // If the connection header has been read or not.
1252     private boolean connectionHeaderRead = false;
1253     protected SocketChannel channel;
1254     private ByteBuffer data;
1255     private ByteBuffer dataLengthBuffer;
1256     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1257     private final Lock responseWriteLock = new ReentrantLock();
1258     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1259     private long lastContact;
1260     private InetAddress addr;
1261     protected Socket socket;
1262     // Cache the remote host & port info so that even if the socket is
1263     // disconnected, we can say where it used to connect to.
1264     protected String hostAddress;
1265     protected int remotePort;
1266     ConnectionHeader connectionHeader;
1267
1268     /**
1269      * Codec the client asked use.
1270      */
1271     private Codec codec;
1272     /**
1273      * Compression codec the client asked us use.
1274      */
1275     private CompressionCodec compressionCodec;
1276     BlockingService service;
1277
1278     private AuthMethod authMethod;
1279     private boolean saslContextEstablished;
1280     private boolean skipInitialSaslHandshake;
1281     private ByteBuffer unwrappedData;
1282     // When is this set?  FindBugs wants to know!  Says NP
1283     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1284     boolean useSasl;
1285     SaslServer saslServer;
1286     private boolean useWrap = false;
1287     // Fake 'call' for failed authorization response
1288     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1289     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1290         null, null, this, null, 0, null, null, 0);
1291     private ByteArrayOutputStream authFailedResponse =
1292         new ByteArrayOutputStream();
1293     // Fake 'call' for SASL context setup
1294     private static final int SASL_CALLID = -33;
1295     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1296         0, null, null, 0);
1297
1298     // was authentication allowed with a fallback to simple auth
1299     private boolean authenticatedWithFallback;
1300
1301     private boolean retryImmediatelySupported = false;
1302
1303     public UserGroupInformation attemptingUser = null; // user name before auth
1304     protected User user = null;
1305     protected UserGroupInformation ugi = null;
1306
1307     public Connection(SocketChannel channel, long lastContact) {
1308       this.channel = channel;
1309       this.lastContact = lastContact;
1310       this.data = null;
1311       this.dataLengthBuffer = ByteBuffer.allocate(4);
1312       this.socket = channel.socket();
1313       this.addr = socket.getInetAddress();
1314       if (addr == null) {
1315         this.hostAddress = "*Unknown*";
1316       } else {
1317         this.hostAddress = addr.getHostAddress();
1318       }
1319       this.remotePort = socket.getPort();
1320       if (socketSendBufferSize != 0) {
1321         try {
1322           socket.setSendBufferSize(socketSendBufferSize);
1323         } catch (IOException e) {
1324           LOG.warn("Connection: unable to set socket send buffer size to " +
1325                    socketSendBufferSize);
1326         }
1327       }
1328     }
1329
1330       @Override
1331     public String toString() {
1332       return getHostAddress() + ":" + remotePort;
1333     }
1334 
1335     public String getHostAddress() {
1336       return hostAddress;
1337     }
1338 
1339     public InetAddress getHostInetAddress() {
1340       return addr;
1341     }
1342
1343     public int getRemotePort() {
1344       return remotePort;
1345     }
1346
1347     public void setLastContact(long lastContact) {
1348       this.lastContact = lastContact;
1349     }
1350 
1351     public VersionInfo getVersionInfo() {
1352       if (connectionHeader.hasVersionInfo()) {
1353         return connectionHeader.getVersionInfo();
1354       }
1355       return null;
1356     }
1357
1358     /* Return true if the connection has no outstanding rpc */
1359     private boolean isIdle() {
1360       return rpcCount.get() == 0;
1361     }
1362
1363     /* Decrement the outstanding RPC count */
1364     protected void decRpcCount() {
1365       rpcCount.decrement();
1366     }
1367
1368     /* Increment the outstanding RPC count */
1369     protected void incRpcCount() {
1370       rpcCount.increment();
1371     }
1372
1373     protected boolean timedOut(long currentTime) {
1374       return isIdle() && currentTime - lastContact > maxIdleTime;
1375     }
1376
1377     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1378         throws IOException {
1379       UserGroupInformation authorizedUgi;
1380       if (authMethod == AuthMethod.DIGEST) {
1381         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1382             secretManager);
1383         authorizedUgi = tokenId.getUser();
1384         if (authorizedUgi == null) {
1385           throw new AccessDeniedException(
1386               "Can't retrieve username from tokenIdentifier.");
1387         }
1388         authorizedUgi.addTokenIdentifier(tokenId);
1389       } else {
1390         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1391       }
1392       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1393       return authorizedUgi;
1394     }
1395
1396     private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1397         InterruptedException {
1398       if (saslContextEstablished) {
1399         if (LOG.isTraceEnabled())
1400           LOG.trace("Have read input token of size " + saslToken.limit()
1401               + " for processing by saslServer.unwrap()");
1402
1403         if (!useWrap) {
1404           processOneRpc(saslToken);
1405         } else {
1406           byte[] b = saslToken.array();
1407           byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1408           processUnwrappedData(plaintextData);
1409         }
1410       } else {
1411         byte[] replyToken;
1412         try {
1413           if (saslServer == null) {
1414             switch (authMethod) {
1415             case DIGEST:
1416               if (secretManager == null) {
1417                 throw new AccessDeniedException(
1418                     "Server is not configured to do DIGEST authentication.");
1419               }
1420               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1421                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1422                   HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
1423                       secretManager, this));
1424               break;
1425             default:
1426               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1427               String fullName = current.getUserName();
1428               if (LOG.isDebugEnabled()) {
1429                 LOG.debug("Kerberos principal name is " + fullName);
1430               }
1431               final String names[] = SaslUtil.splitKerberosName(fullName);
1432               if (names.length != 3) {
1433                 throw new AccessDeniedException(
1434                     "Kerberos principal name does NOT have the expected "
1435                         + "hostname part: " + fullName);
1436               }
1437               current.doAs(new PrivilegedExceptionAction<Object>() {
1438                 @Override
1439                 public Object run() throws SaslException {
1440                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1441                       .getMechanismName(), names[0], names[1],
1442                       HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
1443                   return null;
1444                 }
1445               });
1446             }
1447             if (saslServer == null)
1448               throw new AccessDeniedException(
1449                   "Unable to find SASL server implementation for "
1450                       + authMethod.getMechanismName());
1451             if (LOG.isDebugEnabled()) {
1452               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1453             }
1454           }
1455           if (LOG.isDebugEnabled()) {
1456             LOG.debug("Have read input token of size " + saslToken.limit()
1457                 + " for processing by saslServer.evaluateResponse()");
1458           }
1459           replyToken = saslServer.evaluateResponse(saslToken.array());
1460         } catch (IOException e) {
1461           IOException sendToClient = e;
1462           Throwable cause = e;
1463           while (cause != null) {
1464             if (cause instanceof InvalidToken) {
1465               sendToClient = (InvalidToken) cause;
1466               break;
1467             }
1468             cause = cause.getCause();
1469           }
1470           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1471             sendToClient.getLocalizedMessage());
1472           metrics.authenticationFailure();
1473           String clientIP = this.toString();
1474           // attempting user could be null
1475           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1476           throw e;
1477         }
1478         if (replyToken != null) {
1479           if (LOG.isDebugEnabled()) {
1480             LOG.debug("Will send token of size " + replyToken.length
1481                 + " from saslServer.");
1482           }
1483           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1484               null);
1485         }
1486         if (saslServer.isComplete()) {
1487           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1488           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1489           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1490           if (LOG.isDebugEnabled()) {
1491             LOG.debug("SASL server context established. Authenticated client: "
1492               + ugi + ". Negotiated QoP is "
1493               + saslServer.getNegotiatedProperty(Sasl.QOP));
1494           }
1495           metrics.authenticationSuccess();
1496           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1497           saslContextEstablished = true;
1498         }
1499       }
1500     }
1501
1502     /**
1503      * No protobuf encoding of raw sasl messages
1504      */
1505     private void doRawSaslReply(SaslStatus status, Writable rv,
1506         String errorClass, String error) throws IOException {
1507       ByteBufferOutputStream saslResponse = null;
1508       DataOutputStream out = null;
1509       try {
1510         // In my testing, have noticed that sasl messages are usually
1511         // in the ballpark of 100-200. That's why the initial capacity is 256.
1512         saslResponse = new ByteBufferOutputStream(256);
1513         out = new DataOutputStream(saslResponse);
1514         out.writeInt(status.state); // write status
1515         if (status == SaslStatus.SUCCESS) {
1516           rv.write(out);
1517         } else {
1518           WritableUtils.writeString(out, errorClass);
1519           WritableUtils.writeString(out, error);
1520         }
1521         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1522         saslCall.responder = responder;
1523         saslCall.sendResponseIfReady();
1524       } finally {
1525         if (saslResponse != null) {
1526           saslResponse.close();
1527         }
1528         if (out != null) {
1529           out.close();
1530         }
1531       }
1532     }
1533
1534     private void disposeSasl() {
1535       if (saslServer != null) {
1536         try {
1537           saslServer.dispose();
1538           saslServer = null;
1539         } catch (SaslException ignored) {
1540           // Ignored. This is being disposed of anyway.
1541         }
1542       }
1543     }
1544
1545     private int readPreamble() throws IOException {
1546       int count;
1547       // Check for 'HBas' magic.
1548       this.dataLengthBuffer.flip();
1549       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1550         return doBadPreambleHandling("Expected HEADER=" +
1551             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1552             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1553             " from " + toString());
1554       }
1555       // Now read the next two bytes, the version and the auth to use.
1556       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1557       count = channelRead(channel, versionAndAuthBytes);
1558       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1559         return count;
1560       }
1561       int version = versionAndAuthBytes.get(0);
1562       byte authbyte = versionAndAuthBytes.get(1);
1563       this.authMethod = AuthMethod.valueOf(authbyte);
1564       if (version != CURRENT_VERSION) {
1565         String msg = getFatalConnectionString(version, authbyte);
1566         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1567       }
1568       if (authMethod == null) {
1569         String msg = getFatalConnectionString(version, authbyte);
1570         return doBadPreambleHandling(msg, new BadAuthException(msg));
1571       }
1572       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1573         if (allowFallbackToSimpleAuth) {
1574           metrics.authenticationFallback();
1575           authenticatedWithFallback = true;
1576         } else {
1577           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1578           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1579           responder.doRespond(authFailedCall);
1580           throw ae;
1581         }
1582       }
1583       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1584         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1585             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1586         authMethod = AuthMethod.SIMPLE;
1587         // client has already sent the initial Sasl message and we
1588         // should ignore it. Both client and server should fall back
1589         // to simple auth from now on.
1590         skipInitialSaslHandshake = true;
1591       }
1592       if (authMethod != AuthMethod.SIMPLE) {
1593         useSasl = true;
1594       }
1595
1596       dataLengthBuffer.clear();
1597       connectionPreambleRead = true;
1598       return count;
1599     }
1600
1601     private int read4Bytes() throws IOException {
1602       if (this.dataLengthBuffer.remaining() > 0) {
1603         return channelRead(channel, this.dataLengthBuffer);
1604       } else {
1605         return 0;
1606       }
1607     }
1608
1609
1610     /**
1611      * Read off the wire. If there is not enough data to read, update the connection state with
1612      *  what we have and returns.
1613      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1614      * @throws IOException
1615      * @throws InterruptedException
1616      */
1617     public int readAndProcess() throws IOException, InterruptedException {
1618       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1619       // does, read in the rest of the connection preamble, the version and the auth method.
1620       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1621       // length into the 4-byte this.dataLengthBuffer.
1622       int count = read4Bytes();
1623       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1624         return count;
1625       }
1626
1627       // If we have not read the connection setup preamble, look to see if that is on the wire.
1628       if (!connectionPreambleRead) {
1629         count = readPreamble();
1630         if (!connectionPreambleRead) {
1631           return count;
1632         }
1633
1634         count = read4Bytes();
1635         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1636           return count;
1637         }
1638       }
1639
1640       // We have read a length and we have read the preamble.  It is either the connection header
1641       // or it is a request.
1642       if (data == null) {
1643         dataLengthBuffer.flip();
1644         int dataLength = dataLengthBuffer.getInt();
1645         if (dataLength == RpcClient.PING_CALL_ID) {
1646           if (!useWrap) { //covers the !useSasl too
1647             dataLengthBuffer.clear();
1648             return 0;  //ping message
1649           }
1650         }
1651         if (dataLength < 0) { // A data length of zero is legal.
1652           throw new DoNotRetryIOException("Unexpected data length "
1653               + dataLength + "!! from " + getHostAddress());
1654         }
1655
1656         if (dataLength > maxRequestSize) {
1657           throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
1658               + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
1659               + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
1660         }
1661
1662         data = ByteBuffer.allocate(dataLength);
1663 
1664         // Increment the rpc count. This counter will be decreased when we write
1665         //  the response.  If we want the connection to be detected as idle properly, we
1666         //  need to keep the inc / dec correct.
1667         incRpcCount();
1668       }
1669
1670       count = channelRead(channel, data);
1671
1672       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1673         process();
1674       }
1675
1676       return count;
1677     }
1678
1679     /**
1680      * Process the data buffer and clean the connection state for the next call.
1681      */
1682     private void process() throws IOException, InterruptedException {
1683       data.flip();
1684       try {
1685         if (skipInitialSaslHandshake) {
1686           skipInitialSaslHandshake = false;
1687           return;
1688         }
1689 
1690         if (useSasl) {
1691           saslReadAndProcess(data);
1692         } else {
1693           processOneRpc(data);
1694         }
1695 
1696       } finally {
1697         dataLengthBuffer.clear(); // Clean for the next call
1698         data = null; // For the GC
1699       }
1700     }
1701
1702     private String getFatalConnectionString(final int version, final byte authByte) {
1703       return "serverVersion=" + CURRENT_VERSION +
1704       ", clientVersion=" + version + ", authMethod=" + authByte +
1705       ", authSupported=" + (authMethod != null) + " from " + toString();
1706     }
1707
1708     private int doBadPreambleHandling(final String msg) throws IOException {
1709       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1710     }
1711
1712     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1713       LOG.warn(msg);
1714       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
1715       setupResponse(null, fakeCall, e, msg);
1716       responder.doRespond(fakeCall);
1717       // Returning -1 closes out the connection.
1718       return -1;
1719     }
1720
1721     // Reads the connection header following version
1722     private void processConnectionHeader(ByteBuffer buf) throws IOException {
1723       this.connectionHeader = ConnectionHeader.parseFrom(
1724         new ByteBufferInputStream(buf));
1725       String serviceName = connectionHeader.getServiceName();
1726       if (serviceName == null) throw new EmptyServiceNameException();
1727       this.service = getService(services, serviceName);
1728       if (this.service == null) throw new UnknownServiceException(serviceName);
1729       setupCellBlockCodecs(this.connectionHeader);
1730       UserGroupInformation protocolUser = createUser(connectionHeader);
1731       if (!useSasl) {
1732         ugi = protocolUser;
1733         if (ugi != null) {
1734           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1735         }
1736         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1737         if (authenticatedWithFallback) {
1738           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1739               + " connecting from " + getHostAddress());
1740         }
1741         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1742       } else {
1743         // user is authenticated
1744         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1745         //Now we check if this is a proxy user case. If the protocol user is
1746         //different from the 'user', it is a proxy user scenario. However,
1747         //this is not allowed if user authenticated with DIGEST.
1748         if ((protocolUser != null)
1749             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1750           if (authMethod == AuthMethod.DIGEST) {
1751             // Not allowed to doAs if token authentication is used
1752             throw new AccessDeniedException("Authenticated user (" + ugi
1753                 + ") doesn't match what the client claims to be ("
1754                 + protocolUser + ")");
1755           } else {
1756             // Effective user can be different from authenticated user
1757             // for simple auth or kerberos auth
1758             // The user is the real user. Now we create a proxy user
1759             UserGroupInformation realUser = ugi;
1760             ugi = UserGroupInformation.createProxyUser(protocolUser
1761                 .getUserName(), realUser);
1762             // Now the user is a proxy user, set Authentication method Proxy.
1763             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1764           }
1765         }
1766       }
1767       if (connectionHeader.hasVersionInfo()) {
1768         // see if this connection will support RetryImmediatelyException
1769         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1770
1771         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1772             + " with version info: "
1773             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1774       } else {
1775         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1776             + " with unknown version info");
1777       }
1778
1779
1780     }
1781
1782     /**
1783      * Set up cell block codecs
1784      * @throws FatalConnectionException
1785      */
1786     private void setupCellBlockCodecs(final ConnectionHeader header)
1787     throws FatalConnectionException {
1788       // TODO: Plug in other supported decoders.
1789       if (!header.hasCellBlockCodecClass()) return;
1790       String className = header.getCellBlockCodecClass();
1791       if (className == null || className.length() == 0) return;
1792       try {
1793         this.codec = (Codec)Class.forName(className).newInstance();
1794       } catch (Exception e) {
1795         throw new UnsupportedCellCodecException(className, e);
1796       }
1797       if (!header.hasCellBlockCompressorClass()) return;
1798       className = header.getCellBlockCompressorClass();
1799       try {
1800         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1801       } catch (Exception e) {
1802         throw new UnsupportedCompressionCodecException(className, e);
1803       }
1804     }
1805 
1806     private void processUnwrappedData(byte[] inBuf) throws IOException,
1807     InterruptedException {
1808       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1809       // Read all RPCs contained in the inBuf, even partial ones
1810       while (true) {
1811         int count;
1812         if (unwrappedDataLengthBuffer.remaining() > 0) {
1813           count = channelRead(ch, unwrappedDataLengthBuffer);
1814           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1815             return;
1816         }
1817
1818         if (unwrappedData == null) {
1819           unwrappedDataLengthBuffer.flip();
1820           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1821
1822           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1823             if (LOG.isDebugEnabled())
1824               LOG.debug("Received ping message");
1825             unwrappedDataLengthBuffer.clear();
1826             continue; // ping message
1827           }
1828           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1829         }
1830
1831         count = channelRead(ch, unwrappedData);
1832         if (count <= 0 || unwrappedData.remaining() > 0)
1833           return;
1834
1835         if (unwrappedData.remaining() == 0) {
1836           unwrappedDataLengthBuffer.clear();
1837           unwrappedData.flip();
1838           processOneRpc(unwrappedData);
1839           unwrappedData = null;
1840         }
1841       }
1842     }
1843
1844
1845     private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1846       if (connectionHeaderRead) {
1847         processRequest(buf);
1848       } else {
1849         processConnectionHeader(buf);
1850         this.connectionHeaderRead = true;
1851         if (!authorizeConnection()) {
1852           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1853           // down the connection instead of trying to read non-existent retun.
1854           throw new AccessDeniedException("Connection from " + this + " for service " +
1855             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1856         }
1857         this.user = userProvider.create(this.ugi);
1858       }
1859     }
1860
1861     /**
1862      * @param buf Has the request header and the request param and optionally encoded data buffer
1863      * all in this one array.
1864      * @throws IOException
1865      * @throws InterruptedException
1866      */
1867     protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1868       long totalRequestSize = buf.limit();
1869       int offset = 0;
1870       // Here we read in the header.  We avoid having pb
1871       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1872       CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1873       int headerSize = cis.readRawVarint32();
1874       offset = cis.getTotalBytesRead();
1875       Message.Builder builder = RequestHeader.newBuilder();
1876       ProtobufUtil.mergeFrom(builder, cis, headerSize);
1877       RequestHeader header = (RequestHeader) builder.build();
1878       offset += headerSize;
1879       int id = header.getCallId();
1880       if (LOG.isTraceEnabled()) {
1881         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1882           " totalRequestSize: " + totalRequestSize + " bytes");
1883       }
1884       // Enforcing the call queue size, this triggers a retry in the client
1885       // This is a bit late to be doing this check - we have already read in the total request.
1886       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1887         final Call callTooBig =
1888           new Call(id, this.service, null, null, null, null, this,
1889             responder, totalRequestSize, null, null, 0);
1890         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1891         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1892         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1893             "Call queue is full on " + server.getServerName() +
1894                 ", is hbase.ipc.server.max.callqueue.size too small?");
1895         responder.doRespond(callTooBig);
1896         return;
1897       }
1898       MethodDescriptor md = null;
1899       Message param = null;
1900       CellScanner cellScanner = null;
1901       try {
1902         if (header.hasRequestParam() && header.getRequestParam()) {
1903           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1904           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1905           builder = this.service.getRequestPrototype(md).newBuilderForType();
1906           cis.resetSizeCounter();
1907           int paramSize = cis.readRawVarint32();
1908           offset += cis.getTotalBytesRead();
1909           if (builder != null) {
1910             ProtobufUtil.mergeFrom(builder, cis, paramSize);
1911             param = builder.build();
1912           }
1913           offset += paramSize;
1914         }
1915         if (header.hasCellBlockMeta()) {
1916           buf.position(offset);
1917           cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
1918         }
1919       } catch (Throwable t) {
1920         InetSocketAddress address = getListenerAddress();
1921         String msg = (address != null ? address : "(channel closed)") +
1922             " is unable to read call parameter from client " + getHostAddress();
1923         LOG.warn(msg, t);
1924
1925         metrics.exception(t);
1926
1927         // probably the hbase hadoop version does not match the running hadoop version
1928         if (t instanceof LinkageError) {
1929           t = new DoNotRetryIOException(t);
1930         }
1931         // If the method is not present on the server, do not retry.
1932         if (t instanceof UnsupportedOperationException) {
1933           t = new DoNotRetryIOException(t);
1934         }
1935
1936         final Call readParamsFailedCall =
1937           new Call(id, this.service, null, null, null, null, this,
1938             responder, totalRequestSize, null, null, 0);
1939         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1940         setupResponse(responseBuffer, readParamsFailedCall, t,
1941           msg + "; " + t.getMessage());
1942         responder.doRespond(readParamsFailedCall);
1943         return;
1944       }
1945
1946       TraceInfo traceInfo = header.hasTraceInfo()
1947           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1948           : null;
1949       int timeout = 0;
1950       if (header.hasTimeout()){
1951         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
1952       }
1953       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1954               totalRequestSize, traceInfo, this.addr, timeout);
1955
1956       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1957         callQueueSize.add(-1 * call.getSize());
1958
1959         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1960         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1961         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1962             "Call queue is full on " + server.getServerName() +
1963                 ", too many items queued ?");
1964         responder.doRespond(call);
1965       }
1966     }
1967
1968     private boolean authorizeConnection() throws IOException {
1969       try {
1970         // If auth method is DIGEST, the token was obtained by the
1971         // real user for the effective user, therefore not required to
1972         // authorize real user. doAs is allowed only for simple or kerberos
1973         // authentication
1974         if (ugi != null && ugi.getRealUser() != null
1975             && (authMethod != AuthMethod.DIGEST)) {
1976           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1977         }
1978         authorize(ugi, connectionHeader, getHostInetAddress());
1979         metrics.authorizationSuccess();
1980       } catch (AuthorizationException ae) {
1981         if (LOG.isDebugEnabled()) {
1982           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1983         }
1984         metrics.authorizationFailure();
1985         setupResponse(authFailedResponse, authFailedCall,
1986           new AccessDeniedException(ae), ae.getMessage());
1987         responder.doRespond(authFailedCall);
1988         return false;
1989       }
1990       return true;
1991     }
1992
1993     protected synchronized void close() {
1994       disposeSasl();
1995       data = null;
1996       if (!channel.isOpen())
1997         return;
1998       try {socket.shutdownOutput();} catch(Exception ignored) {
1999         if (LOG.isTraceEnabled()) {
2000           LOG.trace("Ignored exception", ignored);
2001         }
2002       }
2003       if (channel.isOpen()) {
2004         try {channel.close();} catch(Exception ignored) {}
2005       }
2006       try {
2007         socket.close();
2008       } catch(Exception ignored) {
2009         if (LOG.isTraceEnabled()) {
2010           LOG.trace("Ignored exception", ignored);
2011         }
2012       }
2013     }
2014
2015     private UserGroupInformation createUser(ConnectionHeader head) {
2016       UserGroupInformation ugi = null;
2017
2018       if (!head.hasUserInfo()) {
2019         return null;
2020       }
2021       UserInformation userInfoProto = head.getUserInfo();
2022       String effectiveUser = null;
2023       if (userInfoProto.hasEffectiveUser()) {
2024         effectiveUser = userInfoProto.getEffectiveUser();
2025       }
2026       String realUser = null;
2027       if (userInfoProto.hasRealUser()) {
2028         realUser = userInfoProto.getRealUser();
2029       }
2030       if (effectiveUser != null) {
2031         if (realUser != null) {
2032           UserGroupInformation realUserUgi =
2033               UserGroupInformation.createRemoteUser(realUser);
2034           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
2035         } else {
2036           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
2037         }
2038       }
2039       return ugi;
2040     }
2041   }
2042
2043   /**
2044    * Datastructure for passing a {@link BlockingService} and its associated class of
2045    * protobuf service interface.  For example, a server that fielded what is defined
2046    * in the client protobuf service would pass in an implementation of the client blocking service
2047    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
2048    */
2049   public static class BlockingServiceAndInterface {
2050     private final BlockingService service;
2051     private final Class<?> serviceInterface;
2052     public BlockingServiceAndInterface(final BlockingService service,
2053         final Class<?> serviceInterface) {
2054       this.service = service;
2055       this.serviceInterface = serviceInterface;
2056     }
2057     public Class<?> getServiceInterface() {
2058       return this.serviceInterface;
2059     }
2060     public BlockingService getBlockingService() {
2061       return this.service;
2062     }
2063   }
2064
2065   /**
2066    * Constructs a server listening on the named port and address.
2067    * @param server hosting instance of {@link Server}. We will do authentications if an
2068    * instance else pass null for no authentication check.
2069    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
2070    * @param services A list of services.
2071    * @param bindAddress Where to listen
2072    * @param conf
2073    * @param scheduler
2074    */
2075   public RpcServer(final Server server, final String name,
2076       final List<BlockingServiceAndInterface> services,
2077       final InetSocketAddress bindAddress, Configuration conf,
2078       RpcScheduler scheduler)
2079       throws IOException {
2080     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2081       this.reservoir = new BoundedByteBufferPool(
2082           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2083           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2084           // Make the max twice the number of handlers to be safe.
2085           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2086               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2087                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2088     } else {
2089       reservoir = null;
2090     }
2091     this.server = server;
2092     this.services = services;
2093     this.bindAddress = bindAddress;
2094     this.conf = conf;
2095     this.socketSendBufferSize = 0;
2096     this.maxQueueSize =
2097       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2098     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2099     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2100     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2101     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2102     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2103       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2104     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2105     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2106     this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
2107         DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
2108     this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
2109
2110     // Start the listener here and let it bind to the port
2111     listener = new Listener(name);
2112     this.port = listener.getAddress().getPort();
2113
2114     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2115     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2116     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2117
2118     this.ipcUtil = new IPCUtil(conf);
2119
2120
2121     // Create the responder here
2122     responder = new Responder();
2123     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2124     this.userProvider = UserProvider.instantiate(conf);
2125     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2126     if (isSecurityEnabled) {
2127       HBaseSaslRpcServer.init(conf);
2128     }
2129     initReconfigurable(conf);
2130
2131     this.scheduler = scheduler;
2132     this.scheduler.init(new RpcSchedulerContext(this));
2133   }
2134
2135   @Override
2136   public void onConfigurationChange(Configuration newConf) {
2137     initReconfigurable(newConf);
2138     if (scheduler instanceof ConfigurationObserver) {
2139       ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
2140     }
2141   }
2142
2143   private void initReconfigurable(Configuration confToLoad) {
2144     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2145     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2146       LOG.warn("********* WARNING! *********");
2147       LOG.warn("This server is configured to allow connections from INSECURE clients");
2148       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2149       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2150       LOG.warn("impersonation is possible!");
2151       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2152       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2153       LOG.warn("****************************");
2154     }
2155   }
2156
2157   /**
2158    * Subclasses of HBaseServer can override this to provide their own
2159    * Connection implementations.
2160    */
2161   protected Connection getConnection(SocketChannel channel, long time) {
2162     return new Connection(channel, time);
2163   }
2164
2165   /**
2166    * Setup response for the RPC Call.
2167    *
2168    * @param response buffer to serialize the response into
2169    * @param call {@link Call} to which we are setting up the response
2170    * @param error error message, if the call failed
2171    * @throws IOException
2172    */
2173   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2174   throws IOException {
2175     if (response != null) response.reset();
2176     call.setResponse(null, null, t, error);
2177   }
2178
2179   protected void closeConnection(Connection connection) {
2180     synchronized (connectionList) {
2181       if (connectionList.remove(connection)) {
2182         numConnections--;
2183       }
2184     }
2185     connection.close();
2186   }
2187
2188   Configuration getConf() {
2189     return conf;
2190   }
2191
2192   /** Sets the socket buffer size used for responding to RPCs.
2193    * @param size send size
2194    */
2195   @Override
2196   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2197
2198   @Override
2199   public boolean isStarted() {
2200     return this.started;
2201   }
2202
2203   /** Starts the service.  Must be called before any calls will be handled. */
2204   @Override
2205   public synchronized void start() {
2206     if (started) return;
2207     authTokenSecretMgr = createSecretManager();
2208     if (authTokenSecretMgr != null) {
2209       setSecretManager(authTokenSecretMgr);
2210       authTokenSecretMgr.start();
2211     }
2212     this.authManager = new ServiceAuthorizationManager();
2213     HBasePolicyProvider.init(conf, authManager);
2214     responder.start();
2215     listener.start();
2216     scheduler.start();
2217     started = true;
2218   }
2219
2220   @Override
2221   public synchronized void refreshAuthManager(PolicyProvider pp) {
2222     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2223     // it'll break if you go via static route.
2224     this.authManager.refresh(this.conf, pp);
2225   }
2226
2227   private AuthenticationTokenSecretManager createSecretManager() {
2228     if (!isSecurityEnabled) return null;
2229     if (server == null) return null;
2230     Configuration conf = server.getConfiguration();
2231     long keyUpdateInterval =
2232         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2233     long maxAge =
2234         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2235     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2236         server.getServerName().toString(), keyUpdateInterval, maxAge);
2237   }
2238
2239   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2240     return this.secretManager;
2241   }
2242
2243   @SuppressWarnings("unchecked")
2244   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2245     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2246   }
2247
2248   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2249       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2250       throws IOException {
2251     return call(service, md, param, cellScanner, receiveTime, status, 0);
2252   }
2253
2254   /**
2255    * This is a server side method, which is invoked over RPC. On success
2256    * the return response has protobuf response payload. On failure, the
2257    * exception name and the stack trace are returned in the protobuf response.
2258    */
2259   @Override
2260   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2261       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
2262       int timeout)
2263   throws IOException {
2264     try {
2265       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2266       // TODO: Review after we add in encoded data blocks.
2267       status.setRPCPacket(param);
2268       status.resume("Servicing call");
2269       //get an instance of the method arg type
2270       long startTime = System.currentTimeMillis();
2271       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2272       controller.setCallTimeout(timeout);
2273       Message result = service.callBlockingMethod(md, controller, param);
2274       long endTime = System.currentTimeMillis();
2275       int processingTime = (int) (endTime - startTime);
2276       int qTime = (int) (startTime - receiveTime);
2277       int totalTime = (int) (endTime - receiveTime);
2278       if (LOG.isTraceEnabled()) {
2279         LOG.trace(CurCall.get().toString() +
2280             ", response " + TextFormat.shortDebugString(result) +
2281             " queueTime: " + qTime +
2282             " processingTime: " + processingTime +
2283             " totalTime: " + totalTime);
2284       }
2285       long requestSize = param.getSerializedSize();
2286       long responseSize = result.getSerializedSize();
2287       metrics.dequeuedCall(qTime);
2288       metrics.processedCall(processingTime);
2289       metrics.totalCall(totalTime);
2290       metrics.receivedRequest(requestSize);
2291       metrics.sentResponse(responseSize);
2292       // log any RPC responses that are slower than the configured warn
2293       // response time or larger than configured warning size
2294       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2295       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2296       if (tooSlow || tooLarge) {
2297         // when tagging, we let TooLarge trump TooSmall to keep output simple
2298         // note that large responses will often also be slow.
2299         logResponse(new Object[]{param},
2300             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2301             (tooLarge ? "TooLarge" : "TooSlow"),
2302             status.getClient(), startTime, processingTime, qTime,
2303             responseSize);
2304       }
2305       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2306     } catch (Throwable e) {
2307       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2308       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2309       // need to pass it over the wire.
2310       if (e instanceof ServiceException) e = e.getCause();
2311
2312       // increment the number of requests that were exceptions.
2313       metrics.exception(e);
2314
2315       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2316       if (e instanceof IOException) throw (IOException)e;
2317       LOG.error("Unexpected throwable object ", e);
2318       throw new IOException(e.getMessage(), e);
2319     }
2320   }
2321
2322   /**
2323    * Logs an RPC response to the LOG file, producing valid JSON objects for
2324    * client Operations.
2325    * @param params The parameters received in the call.
2326    * @param methodName The name of the method invoked
2327    * @param call The string representation of the call
2328    * @param tag  The tag that will be used to indicate this event in the log.
2329    * @param clientAddress   The address of the client who made this call.
2330    * @param startTime       The time that the call was initiated, in ms.
2331    * @param processingTime  The duration that the call took to run, in ms.
2332    * @param qTime           The duration that the call spent on the queue
2333    *                        prior to being initiated, in ms.
2334    * @param responseSize    The size in bytes of the response buffer.
2335    */
2336   void logResponse(Object[] params, String methodName, String call, String tag,
2337       String clientAddress, long startTime, int processingTime, int qTime,
2338       long responseSize)
2339           throws IOException {
2340     // base information that is reported regardless of type of call
2341     Map<String, Object> responseInfo = new HashMap<String, Object>();
2342     responseInfo.put("starttimems", startTime);
2343     responseInfo.put("processingtimems", processingTime);
2344     responseInfo.put("queuetimems", qTime);
2345     responseInfo.put("responsesize", responseSize);
2346     responseInfo.put("client", clientAddress);
2347     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2348     responseInfo.put("method", methodName);
2349     if (params.length == 2 && server instanceof HRegionServer &&
2350         params[0] instanceof byte[] &&
2351         params[1] instanceof Operation) {
2352       // if the slow process is a query, we want to log its table as well
2353       // as its own fingerprint
2354       TableName tableName = TableName.valueOf(
2355           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2356       responseInfo.put("table", tableName.getNameAsString());
2357       // annotate the response map with operation details
2358       responseInfo.putAll(((Operation) params[1]).toMap());
2359       // report to the log file
2360       LOG.warn("(operation" + tag + "): " +
2361                MAPPER.writeValueAsString(responseInfo));
2362     } else if (params.length == 1 && server instanceof HRegionServer &&
2363         params[0] instanceof Operation) {
2364       // annotate the response map with operation details
2365       responseInfo.putAll(((Operation) params[0]).toMap());
2366       // report to the log file
2367       LOG.warn("(operation" + tag + "): " +
2368                MAPPER.writeValueAsString(responseInfo));
2369     } else {
2370       // can't get JSON details, so just report call.toString() along with
2371       // a more generic tag.
2372       responseInfo.put("call", call);
2373       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2374     }
2375   }
2376
2377   /** Stops the service.  No new calls will be handled after this is called. */
2378   @Override
2379   public synchronized void stop() {
2380     LOG.info("Stopping server on " + port);
2381     running = false;
2382     if (authTokenSecretMgr != null) {
2383       authTokenSecretMgr.stop();
2384       authTokenSecretMgr = null;
2385     }
2386     listener.interrupt();
2387     listener.doStop();
2388     responder.interrupt();
2389     scheduler.stop();
2390     notifyAll();
2391   }
2392
2393   /** Wait for the server to be stopped.
2394    * Does not wait for all subthreads to finish.
2395    *  See {@link #stop()}.
2396    * @throws InterruptedException e
2397    */
2398   @Override
2399   public synchronized void join() throws InterruptedException {
2400     while (running) {
2401       wait();
2402     }
2403   }
2404
2405   /**
2406    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2407    * the listener channel is closed.
2408    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2409    * information cannot be determined
2410    */
2411   @Override
2412   public synchronized InetSocketAddress getListenerAddress() {
2413     if (listener == null) {
2414       return null;
2415     }
2416     return listener.getAddress();
2417   }
2418
2419   /**
2420    * Set the handler for calling out of RPC for error conditions.
2421    * @param handler the handler implementation
2422    */
2423   @Override
2424   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2425     this.errorHandler = handler;
2426   }
2427
2428   @Override
2429   public HBaseRPCErrorHandler getErrorHandler() {
2430     return this.errorHandler;
2431   }
2432
2433   /**
2434    * Returns the metrics instance for reporting RPC call statistics
2435    */
2436   @Override
2437   public MetricsHBaseServer getMetrics() {
2438     return metrics;
2439   }
2440
2441   @Override
2442   public void addCallSize(final long diff) {
2443     this.callQueueSize.add(diff);
2444   }
2445
2446   /**
2447    * Authorize the incoming client connection.
2448    *
2449    * @param user client user
2450    * @param connection incoming connection
2451    * @param addr InetAddress of incoming connection
2452    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2453    *         when the client isn't authorized to talk the protocol
2454    */
2455   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2456       InetAddress addr)
2457   throws AuthorizationException {
2458     if (authorize) {
2459       Class<?> c = getServiceInterface(services, connection.getServiceName());
2460       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2461     }
2462   }
2463
2464   /**
2465    * When the read or write buffer size is larger than this limit, i/o will be
2466    * done in chunks of this size. Most RPC requests and responses would be
2467    * be smaller.
2468    */
2469   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2470
2471   /**
2472    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2473    * If the amount of data is large, it writes to channel in smaller chunks.
2474    * This is to avoid jdk from creating many direct buffers as the size of
2475    * buffer increases. This also minimizes extra copies in NIO layer
2476    * as a result of multiple write operations required to write a large
2477    * buffer.
2478    *
2479    * @param channel writable byte channel to write to
2480    * @param bufferChain Chain of buffers to write
2481    * @return number of bytes written
2482    * @throws java.io.IOException e
2483    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2484    */
2485   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2486   throws IOException {
2487     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2488     if (count > 0) this.metrics.sentBytes(count);
2489     return count;
2490   }
2491
2492   /**
2493    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2494    * If the amount of data is large, it writes to channel in smaller chunks.
2495    * This is to avoid jdk from creating many direct buffers as the size of
2496    * ByteBuffer increases. There should not be any performance degredation.
2497    *
2498    * @param channel writable byte channel to write on
2499    * @param buffer buffer to write
2500    * @return number of bytes written
2501    * @throws java.io.IOException e
2502    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2503    */
2504   protected int channelRead(ReadableByteChannel channel,
2505                                    ByteBuffer buffer) throws IOException {
2506
2507     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2508            channel.read(buffer) : channelIO(channel, null, buffer);
2509     if (count > 0) {
2510       metrics.receivedBytes(count);
2511     }
2512     return count;
2513   }
2514
2515   /**
2516    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2517    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2518    * one of readCh or writeCh should be non-null.
2519    *
2520    * @param readCh read channel
2521    * @param writeCh write channel
2522    * @param buf buffer to read or write into/out of
2523    * @return bytes written
2524    * @throws java.io.IOException e
2525    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2526    * @see #channelWrite(GatheringByteChannel, BufferChain)
2527    */
2528   private static int channelIO(ReadableByteChannel readCh,
2529                                WritableByteChannel writeCh,
2530                                ByteBuffer buf) throws IOException {
2531
2532     int originalLimit = buf.limit();
2533     int initialRemaining = buf.remaining();
2534     int ret = 0;
2535
2536     while (buf.remaining() > 0) {
2537       try {
2538         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2539         buf.limit(buf.position() + ioSize);
2540
2541         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2542
2543         if (ret < ioSize) {
2544           break;
2545         }
2546
2547       } finally {
2548         buf.limit(originalLimit);
2549       }
2550     }
2551
2552     int nBytes = initialRemaining - buf.remaining();
2553     return (nBytes > 0) ? nBytes : ret;
2554   }
2555
2556   /**
2557    * Needed for features such as delayed calls.  We need to be able to store the current call
2558    * so that we can complete it later or ask questions of what is supported by the current ongoing
2559    * call.
2560    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2561    */
2562   public static RpcCallContext getCurrentCall() {
2563     return CurCall.get();
2564   }
2565
2566   public static boolean isInRpcCallContext() {
2567     return CurCall.get() != null;
2568   }
2569
2570   /**
2571    * Returns the user credentials associated with the current RPC request or
2572    * <code>null</code> if no credentials were provided.
2573    * @return A User
2574    */
2575   public static User getRequestUser() {
2576     RpcCallContext ctx = getCurrentCall();
2577     return ctx == null? null: ctx.getRequestUser();
2578   }
2579
2580   /**
2581    * Returns the username for any user associated with the current RPC
2582    * request or <code>null</code> if no user is set.
2583    */
2584   public static String getRequestUserName() {
2585     User user = getRequestUser();
2586     return user == null? null: user.getShortName();
2587   }
2588
2589   /**
2590    * @return Address of remote client if a request is ongoing, else null
2591    */
2592   public static InetAddress getRemoteAddress() {
2593     RpcCallContext ctx = getCurrentCall();
2594     return ctx == null? null: ctx.getRemoteAddress();
2595   }
2596
2597   /**
2598    * @param serviceName Some arbitrary string that represents a 'service'.
2599    * @param services Available service instances
2600    * @return Matching BlockingServiceAndInterface pair
2601    */
2602   static BlockingServiceAndInterface getServiceAndInterface(
2603       final List<BlockingServiceAndInterface> services, final String serviceName) {
2604     for (BlockingServiceAndInterface bs : services) {
2605       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2606         return bs;
2607       }
2608     }
2609     return null;
2610   }
2611
2612   /**
2613    * @param serviceName Some arbitrary string that represents a 'service'.
2614    * @param services Available services and their service interfaces.
2615    * @return Service interface class for <code>serviceName</code>
2616    */
2617   static Class<?> getServiceInterface(
2618       final List<BlockingServiceAndInterface> services,
2619       final String serviceName) {
2620     BlockingServiceAndInterface bsasi =
2621         getServiceAndInterface(services, serviceName);
2622     return bsasi == null? null: bsasi.getServiceInterface();
2623   }
2624
2625   /**
2626    * @param serviceName Some arbitrary string that represents a 'service'.
2627    * @param services Available services and their service interfaces.
2628    * @return BlockingService that goes with the passed <code>serviceName</code>
2629    */
2630   static BlockingService getService(
2631       final List<BlockingServiceAndInterface> services,
2632       final String serviceName) {
2633     BlockingServiceAndInterface bsasi =
2634         getServiceAndInterface(services, serviceName);
2635     return bsasi == null? null: bsasi.getBlockingService();
2636   }
2637
2638   static MonitoredRPCHandler getStatus() {
2639     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2640     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2641     if (status != null) {
2642       return status;
2643     }
2644     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2645     status.pause("Waiting for a call");
2646     RpcServer.MONITORED_RPC.set(status);
2647     return status;
2648   }
2649
2650   /** Returns the remote side ip address when invoked inside an RPC
2651    *  Returns null incase of an error.
2652    *  @return InetAddress
2653    */
2654   public static InetAddress getRemoteIp() {
2655     Call call = CurCall.get();
2656     if (call != null && call.connection != null && call.connection.socket != null) {
2657       return call.connection.socket.getInetAddress();
2658     }
2659     return null;
2660   }
2661
2662
2663   /**
2664    * A convenience method to bind to a given address and report
2665    * better exceptions if the address is not a valid host.
2666    * @param socket the socket to bind
2667    * @param address the address to bind to
2668    * @param backlog the number of connections allowed in the queue
2669    * @throws BindException if the address can't be bound
2670    * @throws UnknownHostException if the address isn't a valid host name
2671    * @throws IOException other random errors from bind
2672    */
2673   public static void bind(ServerSocket socket, InetSocketAddress address,
2674                           int backlog) throws IOException {
2675     try {
2676       socket.bind(address, backlog);
2677     } catch (BindException e) {
2678       BindException bindException =
2679         new BindException("Problem binding to " + address + " : " +
2680             e.getMessage());
2681       bindException.initCause(e);
2682       throw bindException;
2683     } catch (SocketException e) {
2684       // If they try to bind to a different host's address, give a better
2685       // error message.
2686       if ("Unresolved address".equals(e.getMessage())) {
2687         throw new UnknownHostException("Invalid hostname for server: " +
2688                                        address.getHostName());
2689       }
2690       throw e;
2691     }
2692   }
2693
2694   @Override
2695   public RpcScheduler getScheduler() {
2696     return scheduler;
2697   }
2698 }