View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.net.BindException;
30  import java.net.InetAddress;
31  import java.net.InetSocketAddress;
32  import java.net.ServerSocket;
33  import java.net.Socket;
34  import java.net.SocketException;
35  import java.net.UnknownHostException;
36  import java.nio.ByteBuffer;
37  import java.nio.channels.CancelledKeyException;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.util.ArrayList;
46  import java.util.Collections;
47  import java.util.Iterator;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.Random;
52  import java.util.concurrent.BlockingQueue;
53  import java.util.concurrent.ConcurrentHashMap;
54  import java.util.concurrent.ExecutorService;
55  import java.util.concurrent.Executors;
56  import java.util.concurrent.LinkedBlockingQueue;
57  import java.util.concurrent.atomic.AtomicInteger;
58  
59  import org.apache.commons.logging.Log;
60  import org.apache.commons.logging.LogFactory;
61  import org.apache.hadoop.conf.Configuration;
62  import org.apache.hadoop.hbase.HConstants;
63  import org.apache.hadoop.hbase.io.HbaseObjectWritable;
64  import org.apache.hadoop.hbase.io.WritableWithSize;
65  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
66  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
67  import org.apache.hadoop.hbase.security.User;
68  import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.SizeBasedThrottler;
71  import org.apache.hadoop.io.Writable;
72  import org.apache.hadoop.io.WritableUtils;
73  import org.apache.hadoop.ipc.RPC.VersionMismatch;
74  import org.apache.hadoop.util.ReflectionUtils;
75  import org.apache.hadoop.util.StringUtils;
76  import org.cliffc.high_scale_lib.Counter;
77  
78  import com.google.common.base.Function;
79  import com.google.common.util.concurrent.ThreadFactoryBuilder;
80  
81  /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
82   * parameter, and return a {@link Writable} as their value.  A service runs on
83   * a port and is defined by a parameter class and a value class.
84   *
85   *
86   * <p>Copied local so can fix HBASE-900.
87   *
88   * @see HBaseClient
89   */
90  public abstract class HBaseServer implements RpcServer {
91  
92    /**
93     * The first four bytes of Hadoop RPC connections
94     */
95    public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
96    public static final byte CURRENT_VERSION = 3;
97  
98    /**
99     * How many calls/handler are allowed in the queue.
100    */
101   private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
102 
103   /**
104    * The maximum size that we can hold in the IPC queue
105    */
106   private static final int DEFAULT_MAX_CALLQUEUE_SIZE =
107     1024 * 1024 * 1024;
108 
109   static final int BUFFER_INITIAL_SIZE = 1024;
110 
111   private static final String WARN_DELAYED_CALLS =
112       "hbase.ipc.warn.delayedrpc.number";
113 
114   private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
115 
116   private final int warnDelayedCalls;
117 
118   private AtomicInteger delayedCalls;
119 
120   public static final Log LOG =
121     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
122   protected static final Log TRACELOG =
123       LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
124 
125   protected static final ThreadLocal<RpcServer> SERVER =
126     new ThreadLocal<RpcServer>();
127   private volatile boolean started = false;
128 
129   private static final Map<String, Class<? extends VersionedProtocol>>
130       PROTOCOL_CACHE =
131       new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
132 
133   static Class<? extends VersionedProtocol> getProtocolClass(
134       String protocolName, Configuration conf)
135   throws ClassNotFoundException {
136     Class<? extends VersionedProtocol> protocol =
137         PROTOCOL_CACHE.get(protocolName);
138 
139     if (protocol == null) {
140       protocol = (Class<? extends VersionedProtocol>)
141           conf.getClassByName(protocolName);
142       PROTOCOL_CACHE.put(protocolName, protocol);
143     }
144     return protocol;
145   }
146 
147   /** Returns the server instance called under or null.  May be called under
148    * {@link #call(Class, Writable, long, MonitoredRPCHandler)} implementations,
149    * and under {@link Writable} methods of paramters and return values.
150    * Permits applications to access the server context.
151    * @return HBaseServer
152    */
153   public static RpcServer get() {
154     return SERVER.get();
155   }
156 
157   /** This is set to Call object before Handler invokes an RPC and reset
158    * after the call returns.
159    */
160   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
161 
162   /** Returns the remote side ip address when invoked inside an RPC
163    *  Returns null incase of an error.
164    *  @return InetAddress
165    */
166   public static InetAddress getRemoteIp() {
167     Call call = CurCall.get();
168     if (call != null) {
169       return call.connection.socket.getInetAddress();
170     }
171     return null;
172   }
173   /** Returns remote address as a string when invoked inside an RPC.
174    *  Returns null in case of an error.
175    *  @return String
176    */
177   public static String getRemoteAddress() {
178     Call call = CurCall.get();
179     if (call != null) {
180       return call.connection.getHostAddress();
181     }
182     return null;
183   }
184 
185   protected String bindAddress;
186   protected int port;                             // port we listen on
187   private int handlerCount;                       // number of handler threads
188   private int priorityHandlerCount;
189   private int readThreads;                        // number of read threads
190   protected Class<? extends Writable> paramClass; // class of call parameters
191   protected int maxIdleTime;                      // the maximum idle time after
192                                                   // which a client may be
193                                                   // disconnected
194   protected int thresholdIdleConnections;         // the number of idle
195                                                   // connections after which we
196                                                   // will start cleaning up idle
197                                                   // connections
198   int maxConnectionsToNuke;                       // the max number of
199                                                   // connections to nuke
200                                                   // during a cleanup
201 
202   protected HBaseRpcMetrics  rpcMetrics;
203 
204   protected Configuration conf;
205 
206   private int maxQueueLength;
207   protected int maxQueueSize;
208   protected int socketSendBufferSize;
209   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
210   protected final boolean tcpKeepAlive; // if T then use keepalives
211   protected final long purgeTimeout;    // in milliseconds
212 
213   // responseQueuesSizeThrottler is shared among all responseQueues,
214   // it bounds memory occupied by responses in all responseQueues
215   final SizeBasedThrottler responseQueuesSizeThrottler;
216 
217   // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue
218   private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G
219   private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
220 
221   volatile protected boolean running = true;         // true while server runs
222   protected BlockingQueue<Call> callQueue; // queued calls
223   protected final Counter callQueueSize = new Counter();
224   protected BlockingQueue<Call> priorityCallQueue;
225   private final Counter activeRpcCount = new Counter();
226 
227   protected int highPriorityLevel;  // what level a high priority call is at
228 
229   protected final List<Connection> connectionList =
230     Collections.synchronizedList(new LinkedList<Connection>());
231   //maintain a list
232   //of client connections
233   private Listener listener = null;
234   protected Responder responder = null;
235   protected int numConnections = 0;
236   private Handler[] handlers = null;
237   private Handler[] priorityHandlers = null;
238   /** replication related queue; */
239   protected BlockingQueue<Call> replicationQueue;
240   private int numOfReplicationHandlers = 0;
241   private Handler[] replicationHandlers = null;
242   protected HBaseRPCErrorHandler errorHandler = null;
243 
244   /**
245    * A convenience method to bind to a given address and report
246    * better exceptions if the address is not a valid host.
247    * @param socket the socket to bind
248    * @param address the address to bind to
249    * @param backlog the number of connections allowed in the queue
250    * @throws BindException if the address can't be bound
251    * @throws UnknownHostException if the address isn't a valid host name
252    * @throws IOException other random errors from bind
253    */
254   public static void bind(ServerSocket socket, InetSocketAddress address,
255                           int backlog) throws IOException {
256     try {
257       socket.bind(address, backlog);
258     } catch (BindException e) {
259       BindException bindException =
260         new BindException("Problem binding to " + address + " : " +
261             e.getMessage());
262       bindException.initCause(e);
263       throw bindException;
264     } catch (SocketException e) {
265       // If they try to bind to a different host's address, give a better
266       // error message.
267       if ("Unresolved address".equals(e.getMessage())) {
268         throw new UnknownHostException("Invalid hostname for server: " +
269                                        address.getHostName());
270       }
271       throw e;
272     }
273   }
274 
275   /** A call queued for handling. */
276   protected class Call implements RpcCallContext {
277     protected int id;                             // the client's call id
278     protected Writable param;                     // the parameter passed
279     protected Connection connection;              // connection to client
280     protected long timestamp;      // the time received when response is null
281                                    // the time served when response is not null
282     protected ByteBuffer response;                // the response for this call
283     protected boolean delayResponse;
284     protected Responder responder;
285     protected boolean delayReturnValue;           // if the return value should be
286                                                   // set at call completion
287     protected long size;                          // size of current call
288     protected boolean isError;
289 
290     public Call(int id, Writable param, Connection connection,
291         Responder responder, long size) {
292       this.id = id;
293       this.param = param;
294       this.connection = connection;
295       this.timestamp = System.currentTimeMillis();
296       this.response = null;
297       this.delayResponse = false;
298       this.responder = responder;
299       this.isError = false;
300       this.size = size;
301     }
302 
303     @Override
304     public String toString() {
305       return param.toString() + " from " + connection.toString();
306     }
307 
308     protected synchronized void setResponse(Object value, Status status,
309         String errorClass, String error) {
310       // Avoid overwriting an error value in the response.  This can happen if
311       // endDelayThrowing is called by another thread before the actual call
312       // returning.
313       if (this.isError)
314         return;
315       if (errorClass != null) {
316         this.isError = true;
317       }
318       Writable result = null;
319       if (value instanceof Writable) {
320         result = (Writable) value;
321       } else {
322         /* We might have a null value and errors. Avoid creating a
323          * HbaseObjectWritable, because the constructor fails on null. */
324         if (value != null) {
325           result = new HbaseObjectWritable(value);
326         }
327       }
328 
329       int size = BUFFER_INITIAL_SIZE;
330       if (result instanceof WritableWithSize) {
331         // get the size hint.
332         WritableWithSize ohint = (WritableWithSize) result;
333         long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
334           (2 * Bytes.SIZEOF_INT);
335         if (hint > Integer.MAX_VALUE) {
336           // oops, new problem.
337           IOException ioe =
338             new IOException("Result buffer size too large: " + hint);
339           errorClass = ioe.getClass().getName();
340           error = StringUtils.stringifyException(ioe);
341         } else {
342           size = (int)hint;
343         }
344       }
345 
346       ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
347       DataOutputStream out = new DataOutputStream(buf);
348       try {
349         // Call id.
350         out.writeInt(this.id);
351         // Write flag.
352         byte flag = (error != null)?
353           ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
354         out.writeByte(flag);
355         // Place holder for length set later below after we
356         // fill the buffer with data.
357         out.writeInt(0xdeadbeef);
358         out.writeInt(status.state);
359       } catch (IOException e) {
360         errorClass = e.getClass().getName();
361         error = StringUtils.stringifyException(e);
362       }
363 
364       try {
365         if (error == null) {
366           result.write(out);
367         } else {
368           WritableUtils.writeString(out, errorClass);
369           WritableUtils.writeString(out, error);
370         }
371       } catch (IOException e) {
372         LOG.warn("Error sending response to call: ", e);
373       }
374 
375       // Set the length into the ByteBuffer after call id and after
376       // byte flag.
377       ByteBuffer bb = buf.getByteBuffer();
378       int bufSiz = bb.remaining();
379       // Move to the size location in our ByteBuffer past call.id
380       // and past the byte flag.
381       bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); 
382       bb.putInt(bufSiz);
383       bb.position(0);
384       this.response = bb;
385     }
386 
387     @Override
388     public synchronized void endDelay(Object result) throws IOException {
389       assert this.delayResponse;
390       assert this.delayReturnValue || result == null;
391       this.delayResponse = false;
392       delayedCalls.decrementAndGet();
393       if (this.delayReturnValue)
394         this.setResponse(result, Status.SUCCESS, null, null);
395       this.responder.doRespond(this);
396     }
397 
398     @Override
399     public synchronized void endDelay() throws IOException {
400       this.endDelay(null);
401     }
402 
403     @Override
404     public synchronized void startDelay(boolean delayReturnValue) {
405       assert !this.delayResponse;
406       this.delayResponse = true;
407       this.delayReturnValue = delayReturnValue;
408       int numDelayed = delayedCalls.incrementAndGet();
409       if (numDelayed > warnDelayedCalls) {
410         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
411             " current " + numDelayed);
412       }
413     }
414 
415     @Override
416     public synchronized void endDelayThrowing(Throwable t) throws IOException {
417       this.setResponse(null, Status.ERROR, t.getClass().toString(),
418           StringUtils.stringifyException(t));
419       this.delayResponse = false;
420       this.sendResponseIfReady();
421     }
422 
423     @Override
424     public synchronized boolean isDelayed() {
425       return this.delayResponse;
426     }
427 
428     @Override
429     public synchronized boolean isReturnValueDelayed() {
430       return this.delayReturnValue;
431     }
432     
433     @Override
434     public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
435       if (!connection.channel.isOpen()) {
436         long afterTime = System.currentTimeMillis() - timestamp;
437         throw new CallerDisconnectedException(
438             "Aborting call " + this + " after " + afterTime + " ms, since " +
439             "caller disconnected");
440       }
441     }
442 
443     public long getSize() {
444       return this.size;
445     }
446 
447     /**
448      * If we have a response, and delay is not set, then respond
449      * immediately.  Otherwise, do not respond to client.  This is
450      * called the by the RPC code in the context of the Handler thread.
451      */
452     public synchronized void sendResponseIfReady() throws IOException {
453       if (!this.delayResponse) {
454         this.responder.doRespond(this);
455       }
456     }
457   }
458 
459   /** Listens on the socket. Creates jobs for the handler threads*/
460   private class Listener extends Thread {
461 
462     private ServerSocketChannel acceptChannel = null; //the accept channel
463     private Selector selector = null; //the selector that we use for the server
464     private Reader[] readers = null;
465     private int currentReader = 0;
466     private InetSocketAddress address; //the address we bind at
467     private Random rand = new Random();
468     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
469                                          //-tion (for idle connections) ran
470     private long cleanupInterval = 10000; //the minimum interval between
471                                           //two cleanup runs
472     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
473 
474     private ExecutorService readPool;
475 
476     public Listener() throws IOException {
477       address = new InetSocketAddress(bindAddress, port);
478       // Create a new server socket and set to non blocking mode
479       acceptChannel = ServerSocketChannel.open();
480       acceptChannel.configureBlocking(false);
481 
482       // Bind the server socket to the local host and port
483       bind(acceptChannel.socket(), address, backlogLength);
484       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
485       // create a selector;
486       selector= Selector.open();
487 
488       readers = new Reader[readThreads];
489       readPool = Executors.newFixedThreadPool(readThreads,
490         new ThreadFactoryBuilder().setNameFormat(
491           "IPC Reader %d on port " + port).setDaemon(true).build());
492       for (int i = 0; i < readThreads; ++i) {
493         Reader reader = new Reader();
494         readers[i] = reader;
495         readPool.execute(reader);
496       }
497 
498       // Register accepts on the server socket with the selector.
499       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
500       this.setName("IPC Server listener on " + port);
501       this.setDaemon(true);
502     }
503 
504 
505     private class Reader implements Runnable {
506       private volatile boolean adding = false;
507       private final Selector readSelector;
508 
509       Reader() throws IOException {
510         this.readSelector = Selector.open();
511       }
512       public void run() {
513         LOG.info("Starting " + getName());
514         try {
515           doRunLoop();
516         } finally {
517           try {
518             readSelector.close();
519           } catch (IOException ioe) {
520             LOG.error("Error closing read selector in " + getName(), ioe);
521           }
522         }
523       }
524 
525       private synchronized void doRunLoop() {
526         while (running) {
527           SelectionKey key = null;
528           try {
529             readSelector.select();
530             while (adding) {
531               this.wait(1000);
532             }
533 
534             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
535             while (iter.hasNext()) {
536               key = iter.next();
537               iter.remove();
538               if (key.isValid()) {
539                 if (key.isReadable()) {
540                   doRead(key);
541                 }
542               }
543               key = null;
544             }
545           } catch (InterruptedException e) {
546             if (running) {                      // unexpected -- log it
547               LOG.info(getName() + " unexpectedly interrupted: " +
548                   StringUtils.stringifyException(e));
549             }
550           } catch (IOException ex) {
551             LOG.error("Error in Reader", ex);
552           }
553         }
554       }
555 
556       /**
557        * This gets reader into the state that waits for the new channel
558        * to be registered with readSelector. If it was waiting in select()
559        * the thread will be woken up, otherwise whenever select() is called
560        * it will return even if there is nothing to read and wait
561        * in while(adding) for finishAdd call
562        */
563       public void startAdd() {
564         adding = true;
565         readSelector.wakeup();
566       }
567 
568       public synchronized SelectionKey registerChannel(SocketChannel channel)
569         throws IOException {
570         return channel.register(readSelector, SelectionKey.OP_READ);
571       }
572 
573       public synchronized void finishAdd() {
574         adding = false;
575         this.notify();
576       }
577     }
578 
579     /** cleanup connections from connectionList. Choose a random range
580      * to scan and also have a limit on the number of the connections
581      * that will be cleanedup per run. The criteria for cleanup is the time
582      * for which the connection was idle. If 'force' is true then all
583      * connections will be looked at for the cleanup.
584      * @param force all connections will be looked at for cleanup
585      */
586     private void cleanupConnections(boolean force) {
587       if (force || numConnections > thresholdIdleConnections) {
588         long currentTime = System.currentTimeMillis();
589         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
590           return;
591         }
592         int start = 0;
593         int end = numConnections - 1;
594         if (!force) {
595           start = rand.nextInt() % numConnections;
596           end = rand.nextInt() % numConnections;
597           int temp;
598           if (end < start) {
599             temp = start;
600             start = end;
601             end = temp;
602           }
603         }
604         int i = start;
605         int numNuked = 0;
606         while (i <= end) {
607           Connection c;
608           synchronized (connectionList) {
609             try {
610               c = connectionList.get(i);
611             } catch (Exception e) {return;}
612           }
613           if (c.timedOut(currentTime)) {
614             if (LOG.isDebugEnabled())
615               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
616             closeConnection(c);
617             numNuked++;
618             end--;
619             //noinspection UnusedAssignment
620             c = null;
621             if (!force && numNuked == maxConnectionsToNuke) break;
622           }
623           else i++;
624         }
625         lastCleanupRunTime = System.currentTimeMillis();
626       }
627     }
628 
629     @Override
630     public void run() {
631       LOG.info(getName() + ": starting");
632       SERVER.set(HBaseServer.this);
633 
634       while (running) {
635         SelectionKey key = null;
636         try {
637           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
638           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
639           while (iter.hasNext()) {
640             key = iter.next();
641             iter.remove();
642             try {
643               if (key.isValid()) {
644                 if (key.isAcceptable())
645                   doAccept(key);
646               }
647             } catch (IOException ignored) {
648             }
649             key = null;
650           }
651         } catch (OutOfMemoryError e) {
652           if (errorHandler != null) {
653             if (errorHandler.checkOOME(e)) {
654               LOG.info(getName() + ": exiting on OOME");
655               closeCurrentConnection(key, e);
656               cleanupConnections(true);
657               return;
658             }
659           } else {
660             // we can run out of memory if we have too many threads
661             // log the event and sleep for a minute and give
662             // some thread(s) a chance to finish
663             LOG.warn("Out of Memory in server select", e);
664             closeCurrentConnection(key, e);
665             cleanupConnections(true);
666             try { Thread.sleep(60000); } catch (Exception ignored) {}
667       }
668         } catch (Exception e) {
669           closeCurrentConnection(key, e);
670         }
671         cleanupConnections(false);
672       }
673       LOG.info("Stopping " + this.getName());
674 
675       synchronized (this) {
676         try {
677           acceptChannel.close();
678           selector.close();
679         } catch (IOException ignored) { }
680 
681         selector= null;
682         acceptChannel= null;
683 
684         // clean up all connections
685         while (!connectionList.isEmpty()) {
686           closeConnection(connectionList.remove(0));
687         }
688       }
689     }
690 
691     private void closeCurrentConnection(SelectionKey key, Throwable e) {
692       if (key != null) {
693         Connection c = (Connection)key.attachment();
694         if (c != null) {
695           if (LOG.isDebugEnabled()) {
696             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
697                 (e != null ? " on error " + e.getMessage() : ""));
698           }
699           closeConnection(c);
700           key.attach(null);
701         }
702       }
703     }
704 
705     InetSocketAddress getAddress() {
706       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
707     }
708 
709     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
710       Connection c;
711       ServerSocketChannel server = (ServerSocketChannel) key.channel();
712 
713       SocketChannel channel;
714       while ((channel = server.accept()) != null) {
715         channel.configureBlocking(false);
716         channel.socket().setTcpNoDelay(tcpNoDelay);
717         channel.socket().setKeepAlive(tcpKeepAlive);
718 
719         Reader reader = getReader();
720         try {
721           reader.startAdd();
722           SelectionKey readKey = reader.registerChannel(channel);
723           c = getConnection(channel, System.currentTimeMillis());
724           readKey.attach(c);
725           synchronized (connectionList) {
726             connectionList.add(numConnections, c);
727             numConnections++;
728           }
729           if (LOG.isDebugEnabled())
730             LOG.debug("Server connection from " + c.toString() +
731                 "; # active connections: " + numConnections +
732                 "; # queued calls: " + callQueue.size());
733         } finally {
734           reader.finishAdd();
735         }
736       }
737       rpcMetrics.numOpenConnections.set(numConnections);
738     }
739 
740     void doRead(SelectionKey key) throws InterruptedException {
741       int count = 0;
742       Connection c = (Connection)key.attachment();
743       if (c == null) {
744         return;
745       }
746       c.setLastContact(System.currentTimeMillis());
747 
748       try {
749         count = c.readAndProcess();
750       } catch (InterruptedException ieo) {
751         throw ieo;
752       } catch (Exception e) {
753         LOG.warn(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
754         count = -1; //so that the (count < 0) block is executed
755       }
756       if (count < 0) {
757         if (LOG.isDebugEnabled())
758           LOG.debug(getName() + ": disconnecting client " +
759                     c.getHostAddress() + ". Number of active connections: "+
760                     numConnections);
761         closeConnection(c);
762         // c = null;
763       }
764       else {
765         c.setLastContact(System.currentTimeMillis());
766       }
767     }
768 
769     synchronized void doStop() {
770       if (selector != null) {
771         selector.wakeup();
772         Thread.yield();
773       }
774       if (acceptChannel != null) {
775         try {
776           acceptChannel.socket().close();
777         } catch (IOException e) {
778           LOG.info(getName() + ":Exception in closing listener socket. " + e);
779         }
780       }
781       readPool.shutdownNow();
782     }
783 
784     // The method that will return the next reader to work with
785     // Simplistic implementation of round robin for now
786     Reader getReader() {
787       currentReader = (currentReader + 1) % readers.length;
788       return readers[currentReader];
789     }
790   }
791 
792   // Sends responses of RPC back to clients.
793   protected class Responder extends Thread {
794     private final Selector writeSelector;
795     private int pending;         // connections waiting to register
796 
797     Responder() throws IOException {
798       this.setName("IPC Server Responder");
799       this.setDaemon(true);
800       writeSelector = Selector.open(); // create a selector
801       pending = 0;
802     }
803 
804     @Override
805     public void run() {
806       LOG.info(getName() + ": starting");
807       SERVER.set(HBaseServer.this);
808       try {
809         doRunLoop();
810       } finally {
811         LOG.info("Stopping " + this.getName());
812         try {
813           writeSelector.close();
814         } catch (IOException ioe) {
815           LOG.error("Couldn't close write selector in " + this.getName(), ioe);
816         }
817       }
818     }
819 
820     private void doRunLoop() {
821       long lastPurgeTime = 0;   // last check for old calls.
822 
823       while (running) {
824         try {
825           waitPending();     // If a channel is being registered, wait.
826           writeSelector.select(purgeTimeout);
827           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
828           while (iter.hasNext()) {
829             SelectionKey key = iter.next();
830             iter.remove();
831             try {
832               if (key.isValid() && key.isWritable()) {
833                   doAsyncWrite(key);
834               }
835             } catch (IOException e) {
836               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
837             }
838           }
839           long now = System.currentTimeMillis();
840           if (now < lastPurgeTime + purgeTimeout) {
841             continue;
842           }
843           lastPurgeTime = now;
844           //
845           // If there were some calls that have not been sent out for a
846           // long time, discard them.
847           //
848           LOG.debug("Checking for old call responses.");
849           ArrayList<Call> calls;
850 
851           // get the list of channels from list of keys.
852           synchronized (writeSelector.keys()) {
853             calls = new ArrayList<Call>(writeSelector.keys().size());
854             iter = writeSelector.keys().iterator();
855             while (iter.hasNext()) {
856               SelectionKey key = iter.next();
857               Call call = (Call)key.attachment();
858               if (call != null && key.channel() == call.connection.channel) {
859                 calls.add(call);
860               }
861             }
862           }
863 
864           for(Call call : calls) {
865             try {
866               doPurge(call, now);
867             } catch (IOException e) {
868               LOG.warn("Error in purging old calls " + e);
869             }
870           }
871         } catch (OutOfMemoryError e) {
872           if (errorHandler != null) {
873             if (errorHandler.checkOOME(e)) {
874               LOG.info(getName() + ": exiting on OOME");
875               return;
876             }
877           } else {
878             //
879             // we can run out of memory if we have too many threads
880             // log the event and sleep for a minute and give
881             // some thread(s) a chance to finish
882             //
883             LOG.warn("Out of Memory in server select", e);
884             try { Thread.sleep(60000); } catch (Exception ignored) {}
885           }
886         } catch (Exception e) {
887           LOG.warn("Exception in Responder " +
888                    StringUtils.stringifyException(e));
889         }
890       }
891       LOG.info("Stopping " + this.getName());
892     }
893 
894     private void doAsyncWrite(SelectionKey key) throws IOException {
895       Call call = (Call)key.attachment();
896       if (call == null) {
897         return;
898       }
899       if (key.channel() != call.connection.channel) {
900         throw new IOException("doAsyncWrite: bad channel");
901       }
902 
903       synchronized(call.connection.responseQueue) {
904         if (processResponse(call.connection.responseQueue, false)) {
905           try {
906             key.interestOps(0);
907           } catch (CancelledKeyException e) {
908             /* The Listener/reader might have closed the socket.
909              * We don't explicitly cancel the key, so not sure if this will
910              * ever fire.
911              * This warning could be removed.
912              */
913             LOG.warn("Exception while changing ops : " + e);
914           }
915         }
916       }
917     }
918 
919     //
920     // Remove calls that have been pending in the responseQueue
921     // for a long time.
922     //
923     private void doPurge(Call call, long now) throws IOException {
924       synchronized (call.connection.responseQueue) {
925         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
926         while (iter.hasNext()) {
927           Call nextCall = iter.next();
928           if (now > nextCall.timestamp + purgeTimeout) {
929             closeConnection(nextCall.connection);
930             break;
931           }
932         }
933       }
934     }
935 
936     // Processes one response. Returns true if there are no more pending
937     // data for this channel.
938     //
939     private boolean processResponse(final LinkedList<Call> responseQueue,
940                                     boolean inHandler) throws IOException {
941       boolean error = true;
942       boolean done = false;       // there is more data for this channel.
943       int numElements;
944       Call call = null;
945       try {
946         //noinspection SynchronizationOnLocalVariableOrMethodParameter
947         synchronized (responseQueue) {
948           //
949           // If there are no items for this channel, then we are done
950           //
951           numElements = responseQueue.size();
952           if (numElements == 0) {
953             error = false;
954             return true;              // no more data for this channel.
955           }
956           //
957           // Extract the first call
958           //
959           call = responseQueue.peek();
960           SocketChannel channel = call.connection.channel;
961           if (LOG.isDebugEnabled()) {
962             LOG.debug(getName() + ": responding to #" + call.id + " from " +
963                       call.connection);
964           }
965           //
966           // Send as much data as we can in the non-blocking fashion
967           //
968           int numBytes = channelWrite(channel, call.response);
969           if (numBytes < 0) {
970             // Error flag is set, so returning here closes connection and
971             // clears responseQueue.                   
972             return true;
973           }
974           if (!call.response.hasRemaining()) {
975             responseQueue.poll();
976             responseQueuesSizeThrottler.decrease(call.response.limit());    
977             call.connection.decRpcCount();
978             //noinspection RedundantIfStatement
979             if (numElements == 1) {    // last call fully processes.
980               done = true;             // no more data for this channel.
981             } else {
982               done = false;            // more calls pending to be sent.
983             }
984             if (LOG.isDebugEnabled()) {
985               LOG.debug(getName() + ": responding to #" + call.id + " from " +
986                         call.connection + " Wrote " + numBytes + " bytes.");
987             }
988           } else {
989             if (inHandler) {
990               // set the serve time when the response has to be sent later
991               call.timestamp = System.currentTimeMillis();
992               if (enqueueInSelector(call))
993                 done = true;
994             }
995             if (LOG.isDebugEnabled()) {
996               LOG.debug(getName() + ": responding to #" + call.id + " from " +
997                         call.connection + " Wrote partial " + numBytes +
998                         " bytes.");
999             }
1000           }
1001           error = false;              // everything went off well
1002         }
1003       } finally {
1004         if (error && call != null) {
1005           LOG.warn(getName()+", call " + call + ": output error");
1006           done = true;               // error. no more data for this channel.
1007           closeConnection(call.connection);
1008         }
1009       }
1010       return done;
1011     }
1012 
1013     //
1014     // Enqueue for background thread to send responses out later.
1015     //
1016     private boolean enqueueInSelector(Call call) throws IOException {
1017       boolean done = false;
1018       incPending();
1019       try {
1020         // Wake up the thread blocked on select, only then can the call
1021         // to channel.register() complete.
1022         SocketChannel channel = call.connection.channel;
1023         writeSelector.wakeup();
1024         channel.register(writeSelector, SelectionKey.OP_WRITE, call);
1025       } catch (ClosedChannelException e) {
1026         //It's OK.  Channel might be closed else where.
1027         done = true;
1028       } finally {
1029         decPending();
1030       }
1031       return done;
1032     }
1033 
1034     //
1035     // Enqueue a response from the application.
1036     //
1037     void doRespond(Call call) throws IOException {
1038       // set the serve time when the response has to be sent later
1039       call.timestamp = System.currentTimeMillis();
1040 
1041       boolean doRegister = false;
1042       boolean closed;
1043       try {
1044         responseQueuesSizeThrottler.increase(call.response.remaining());
1045       } catch (InterruptedException ie) {
1046         throw new InterruptedIOException(ie.getMessage());
1047       }
1048       synchronized (call.connection.responseQueue) {
1049         closed = call.connection.closed;
1050         if (!closed) {
1051           call.connection.responseQueue.addLast(call);
1052 
1053           if (call.connection.responseQueue.size() == 1) {
1054             doRegister = !processResponse(call.connection.responseQueue, false);
1055           }
1056         }
1057       }
1058       if (doRegister) {
1059         enqueueInSelector(call);
1060       }
1061       if (closed) {
1062         // Connection was closed when we tried to submit response, but we
1063         // increased responseQueues size already. It shoud be
1064         // decreased here.
1065         responseQueuesSizeThrottler.decrease(call.response.remaining());
1066       }      
1067     }
1068 
1069     private synchronized void incPending() {   // call waiting to be enqueued.
1070       pending++;
1071     }
1072 
1073     private synchronized void decPending() { // call done enqueueing.
1074       pending--;
1075       notify();
1076     }
1077 
1078     private synchronized void waitPending() throws InterruptedException {
1079       while (pending > 0) {
1080         wait();
1081       }
1082     }
1083   }
1084 
1085   /** Reads calls from a connection and queues them for handling. */
1086   protected class Connection {
1087     private boolean versionRead = false; //if initial signature and
1088                                          //version are read
1089     private boolean headerRead = false;  //if the connection header that
1090                                          //follows version is read.
1091 
1092     protected volatile boolean closed = false;    // indicates if connection was closed
1093     protected SocketChannel channel;
1094     private ByteBuffer data;
1095     private ByteBuffer dataLengthBuffer;
1096     protected final LinkedList<Call> responseQueue;
1097     private volatile int rpcCount = 0; // number of outstanding rpcs
1098     private long lastContact;
1099     private int dataLength;
1100     protected Socket socket;
1101     // Cache the remote host & port info so that even if the socket is
1102     // disconnected, we can say where it used to connect to.
1103     protected String hostAddress;
1104     protected int remotePort;
1105     ConnectionHeader header = new ConnectionHeader();
1106     Class<? extends VersionedProtocol> protocol;
1107     protected User ticket = null;
1108 
1109     public Connection(SocketChannel channel, long lastContact) {
1110       this.channel = channel;
1111       this.lastContact = lastContact;
1112       this.data = null;
1113       this.dataLengthBuffer = ByteBuffer.allocate(4);
1114       this.socket = channel.socket();
1115       InetAddress addr = socket.getInetAddress();
1116       if (addr == null) {
1117         this.hostAddress = "*Unknown*";
1118       } else {
1119         this.hostAddress = addr.getHostAddress();
1120       }
1121       this.remotePort = socket.getPort();
1122       this.responseQueue = new LinkedList<Call>();
1123       if (socketSendBufferSize != 0) {
1124         try {
1125           socket.setSendBufferSize(socketSendBufferSize);
1126         } catch (IOException e) {
1127           LOG.warn("Connection: unable to set socket send buffer size to " +
1128                    socketSendBufferSize);
1129         }
1130       }
1131     }
1132 
1133     @Override
1134     public String toString() {
1135       return getHostAddress() + ":" + remotePort;
1136     }
1137 
1138     public String getHostAddress() {
1139       return hostAddress;
1140     }
1141 
1142     public int getRemotePort() {
1143       return remotePort;
1144     }
1145 
1146     public void setLastContact(long lastContact) {
1147       this.lastContact = lastContact;
1148     }
1149 
1150     public long getLastContact() {
1151       return lastContact;
1152     }
1153 
1154     /* Return true if the connection has no outstanding rpc */
1155     private boolean isIdle() {
1156       return rpcCount == 0;
1157     }
1158 
1159     /* Decrement the outstanding RPC count */
1160     protected void decRpcCount() {
1161       rpcCount--;
1162     }
1163 
1164     /* Increment the outstanding RPC count */
1165     protected void incRpcCount() {
1166       rpcCount++;
1167     }
1168 
1169     protected boolean timedOut(long currentTime) {
1170       return isIdle() && currentTime - lastContact > maxIdleTime;
1171     }
1172 
1173     public int readAndProcess() throws IOException, InterruptedException {
1174       while (true) {
1175         /* Read at most one RPC. If the header is not read completely yet
1176          * then iterate until we read first RPC or until there is no data left.
1177          */
1178         int count;
1179         if (dataLengthBuffer.remaining() > 0) {
1180           count = channelRead(channel, dataLengthBuffer);
1181           if (count < 0 || dataLengthBuffer.remaining() > 0)
1182             return count;
1183         }
1184 
1185         if (!versionRead) {
1186           //Every connection is expected to send the header.
1187           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
1188           count = channelRead(channel, versionBuffer);
1189           if (count <= 0) {
1190             return count;
1191           }
1192           int version = versionBuffer.get(0);
1193 
1194           dataLengthBuffer.flip();
1195           if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
1196             //Warning is ok since this is not supposed to happen.
1197             LOG.warn("Incorrect header or version mismatch from " +
1198                      hostAddress + ":" + remotePort +
1199                      " got version " + version +
1200                      " expected version " + CURRENT_VERSION);
1201             setupBadVersionResponse(version);
1202             return -1;
1203           }
1204           dataLengthBuffer.clear();
1205           versionRead = true;
1206           continue;
1207         }
1208 
1209         if (data == null) {
1210           dataLengthBuffer.flip();
1211           dataLength = dataLengthBuffer.getInt();
1212 
1213           if (dataLength == HBaseClient.PING_CALL_ID) {
1214             dataLengthBuffer.clear();
1215             return 0;  //ping message
1216           }
1217           data = ByteBuffer.allocate(dataLength);
1218           incRpcCount();  // Increment the rpc count
1219         }
1220 
1221         count = channelRead(channel, data);
1222 
1223         if (data.remaining() == 0) {
1224           dataLengthBuffer.clear();
1225           data.flip();
1226           if (headerRead) {
1227             processData(data.array());
1228             data = null;
1229             return count;
1230           }
1231           processHeader();
1232           headerRead = true;
1233           data = null;
1234           continue;
1235         }
1236         return count;
1237       }
1238     }
1239 
1240     /**
1241      * Try to set up the response to indicate that the client version
1242      * is incompatible with the server. This can contain special-case
1243      * code to speak enough of past IPC protocols to pass back
1244      * an exception to the caller.
1245      * @param clientVersion the version the caller is using
1246      * @throws IOException
1247      */
1248     private void setupBadVersionResponse(int clientVersion) throws IOException {
1249       String errMsg = "Server IPC version " + CURRENT_VERSION +
1250       " cannot communicate with client version " + clientVersion;
1251       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
1252 
1253       if (clientVersion >= 3) {
1254         // We used to return an id of -1 which caused server to close the
1255         // connection without telling the client what the problem was.  Now
1256         // we return 0 which will keep the socket up -- bad clients, unless
1257         // they switch to suit the running server -- will fail later doing
1258         // getProtocolVersion.
1259         Call fakeCall =  new Call(0, null, this, responder, 0);
1260         // Versions 3 and greater can interpret this exception
1261         // response in the same manner
1262         setupResponse(buffer, fakeCall, Status.FATAL,
1263             null, VersionMismatch.class.getName(), errMsg);
1264 
1265         responder.doRespond(fakeCall);
1266       }
1267     }
1268 
1269     /// Reads the connection header following version
1270     private void processHeader() throws IOException {
1271       DataInputStream in =
1272         new DataInputStream(new ByteArrayInputStream(data.array()));
1273       header.readFields(in);
1274       try {
1275         String protocolClassName = header.getProtocol();
1276         if (protocolClassName == null) {
1277           protocolClassName = "org.apache.hadoop.hbase.ipc.HRegionInterface";
1278         }
1279         protocol = getProtocolClass(protocolClassName, conf);
1280       } catch (ClassNotFoundException cnfe) {
1281         throw new IOException("Unknown protocol: " + header.getProtocol());
1282       }
1283 
1284       ticket = header.getUser();
1285     }
1286 
1287     protected void processData(byte[] buf) throws  IOException, InterruptedException {
1288       DataInputStream dis =
1289         new DataInputStream(new ByteArrayInputStream(buf));
1290       int id = dis.readInt();                    // try to read an id
1291       long callSize = buf.length;
1292 
1293       if (LOG.isDebugEnabled()) {
1294         LOG.debug(" got call #" + id + ", " + callSize + " bytes");
1295       }
1296 
1297       // Enforcing the call queue size, this triggers a retry in the client
1298       if ((callSize + callQueueSize.get()) > maxQueueSize) {
1299         final Call callTooBig =
1300           new Call(id, null, this, responder, callSize);
1301         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1302         setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
1303             IOException.class.getName(),
1304             "Call queue is full, is ipc.server.max.callqueue.size too small?");
1305         responder.doRespond(callTooBig);
1306         return;
1307       }
1308 
1309       Writable param;
1310       try {
1311         param = ReflectionUtils.newInstance(paramClass, conf);//read param
1312         param.readFields(dis);
1313       } catch (Throwable t) {
1314         LOG.warn("Unable to read call parameters for client " +
1315                  getHostAddress(), t);
1316         final Call readParamsFailedCall =
1317           new Call(id, null, this, responder, callSize);
1318         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1319 
1320         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
1321             t.getClass().getName(),
1322             "IPC server unable to read call parameters: " + t.getMessage());
1323         responder.doRespond(readParamsFailedCall);
1324         return;
1325       }
1326       Call call = new Call(id, param, this, responder, callSize);
1327       callQueueSize.add(callSize);
1328 
1329       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
1330         priorityCallQueue.put(call);
1331         updateCallQueueLenMetrics(priorityCallQueue);
1332       } else if (replicationQueue != null && getQosLevel(param) == HConstants.REPLICATION_QOS) {
1333         replicationQueue.put(call);
1334         updateCallQueueLenMetrics(replicationQueue);
1335       } else {
1336         callQueue.put(call); // queue the call; maybe blocked here
1337         updateCallQueueLenMetrics(callQueue);
1338       }
1339     }
1340 
1341     protected synchronized void close() {
1342       closed = true;
1343       data = null;
1344       dataLengthBuffer = null;
1345       if (!channel.isOpen())
1346         return;
1347       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
1348       if (channel.isOpen()) {
1349         try {channel.close();} catch(Exception ignored) {}
1350       }
1351       try {socket.close();} catch(Exception ignored) {}
1352     }
1353   }
1354 
1355   /**
1356    * Reports length of the call queue to HBaseRpcMetrics.
1357    * @param queue Which queue to report
1358    */
1359   protected void updateCallQueueLenMetrics(BlockingQueue<Call> queue) {
1360     if (queue == callQueue) {
1361       rpcMetrics.callQueueLen.set(callQueue.size());
1362     } else if (queue == priorityCallQueue) {
1363       rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
1364     } else if (queue == replicationQueue) {
1365       rpcMetrics.replicationCallQueueLen.set(replicationQueue.size());
1366     } else {
1367       LOG.warn("Unknown call queue");
1368     }
1369   }
1370 
1371   /** Handles queued calls . */
1372   private class Handler extends Thread {
1373     private final BlockingQueue<Call> myCallQueue;
1374     private MonitoredRPCHandler status;
1375 
1376     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
1377       this.myCallQueue = cq;
1378       this.setDaemon(true);
1379 
1380       String threadName = "IPC Server handler " + instanceNumber + " on " + port;
1381       if (cq == priorityCallQueue) {
1382         // this is just an amazing hack, but it works.
1383         threadName = "PRI " + threadName;
1384       } else if (cq == replicationQueue) {
1385         threadName = "REPL " + threadName;
1386       }
1387       this.setName(threadName);
1388       this.status = TaskMonitor.get().createRPCStatus(threadName);
1389     }
1390 
1391     @Override
1392     public void run() {
1393       LOG.info(getName() + ": starting");
1394       status.setStatus("starting");
1395       SERVER.set(HBaseServer.this);
1396       while (running) {
1397         try {
1398           status.pause("Waiting for a call");
1399           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
1400           updateCallQueueLenMetrics(myCallQueue);
1401           if (!call.connection.channel.isOpen()) {
1402             if (LOG.isDebugEnabled()) {
1403               LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
1404             }
1405             continue;
1406           }
1407           status.setStatus("Setting up call");
1408           status.setConnection(call.connection.getHostAddress(), 
1409               call.connection.getRemotePort());
1410 
1411           if (LOG.isDebugEnabled())
1412             LOG.debug(getName() + ": has #" + call.id + " from " +
1413                       call.connection);
1414 
1415           String errorClass = null;
1416           String error = null;
1417           Writable value = null;
1418 
1419           CurCall.set(call);
1420           try {
1421             activeRpcCount.increment();
1422             if (!started)
1423               throw new ServerNotRunningYetException("Server is not running yet");
1424 
1425             if (LOG.isDebugEnabled()) {
1426               User remoteUser = call.connection.ticket;
1427               LOG.debug(getName() + ": call #" + call.id + " executing as "
1428                   + (remoteUser == null ? "NULL principal" : remoteUser.getName()));
1429             }
1430 
1431             RequestContext.set(call.connection.ticket, getRemoteIp(),
1432                 call.connection.protocol);
1433             // make the call
1434             value = call(call.connection.protocol, call.param, call.timestamp, 
1435                 status);
1436           } catch (Throwable e) {
1437             if(LOG.isDebugEnabled()){
1438               LOG.debug(getName()+", call "+call+": error: " + e, e);
1439             }
1440             errorClass = e.getClass().getName();
1441             error = StringUtils.stringifyException(e);
1442           } finally {
1443             // Must always clear the request context to avoid leaking
1444             // credentials between requests.
1445             RequestContext.clear();
1446             activeRpcCount.decrement();
1447             rpcMetrics.activeRpcCount.set((int) activeRpcCount.get());
1448           }
1449           CurCall.set(null);
1450           callQueueSize.add(call.getSize() * -1);
1451           // Set the response for undelayed calls and delayed calls with
1452           // undelayed responses.
1453           if (!call.isDelayed() || !call.isReturnValueDelayed()) {
1454             call.setResponse(value,
1455               errorClass == null? Status.SUCCESS: Status.ERROR,
1456                 errorClass, error);
1457           }
1458           call.sendResponseIfReady();
1459           status.markComplete("Sent response");
1460         } catch (InterruptedException e) {
1461           if (running) {                          // unexpected -- log it
1462             LOG.info(getName() + " caught: " +
1463                      StringUtils.stringifyException(e));
1464           }
1465         } catch (OutOfMemoryError e) {
1466           if (errorHandler != null) {
1467             if (errorHandler.checkOOME(e)) {
1468               LOG.info(getName() + ": exiting on OOME");
1469               return;
1470             }
1471           } else {
1472             // rethrow if no handler
1473             throw e;
1474           }
1475        } catch (ClosedChannelException cce) {
1476           LOG.warn(getName() + " caught a ClosedChannelException, " +
1477             "this means that the server was processing a " +
1478             "request but the client went away. The error message was: " +
1479             cce.getMessage());
1480         } catch (Exception e) {
1481           LOG.warn(getName() + " caught: " +
1482                    StringUtils.stringifyException(e));
1483         }
1484       }
1485       LOG.info(getName() + ": exiting");
1486     }
1487 
1488   }
1489 
1490 
1491   private Function<Writable,Integer> qosFunction = null;
1492 
1493   /**
1494    * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
1495    * are priorityHandlers available it will be processed in it's own thread set.
1496    *
1497    * @param newFunc
1498    */
1499   @Override
1500   public void setQosFunction(Function<Writable, Integer> newFunc) {
1501     qosFunction = newFunc;
1502   }
1503 
1504   protected int getQosLevel(Writable param) {
1505     if (qosFunction == null) {
1506       return 0;
1507     }
1508 
1509     Integer res = qosFunction.apply(param);
1510     if (res == null) {
1511       return 0;
1512     }
1513     return res;
1514   }
1515 
1516   /* Constructs a server listening on the named port and address.  Parameters passed must
1517    * be of the named class.  The <code>handlerCount</handlerCount> determines
1518    * the number of handler threads that will be used to process calls.
1519    *
1520    */
1521   protected HBaseServer(String bindAddress, int port,
1522                         Class<? extends Writable> paramClass, int handlerCount,
1523                         int priorityHandlerCount, Configuration conf, String serverName,
1524                         int highPriorityLevel)
1525     throws IOException {
1526     this.bindAddress = bindAddress;
1527     this.conf = conf;
1528     this.port = port;
1529     this.paramClass = paramClass;
1530     this.handlerCount = handlerCount;
1531     this.priorityHandlerCount = priorityHandlerCount;
1532     this.socketSendBufferSize = 0;
1533 
1534     // temporary backward compatibility
1535     String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size");
1536     if (oldMaxQueueSize == null) {
1537       this.maxQueueLength =
1538         this.conf.getInt("ipc.server.max.callqueue.length",
1539           handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
1540     } else {
1541       LOG.warn("ipc.server.max.queue.size was renamed " +
1542                "ipc.server.max.callqueue.length, " +
1543                "please update your configuration");
1544       this.maxQueueLength = Integer.getInteger(oldMaxQueueSize);
1545     }
1546 
1547     this.maxQueueSize =
1548       this.conf.getInt("ipc.server.max.callqueue.size",
1549         DEFAULT_MAX_CALLQUEUE_SIZE);
1550      this.readThreads = conf.getInt(
1551         "ipc.server.read.threadpool.size",
1552         10);
1553     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueLength);
1554     if (priorityHandlerCount > 0) {
1555       this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength); // TODO hack on size
1556     } else {
1557       this.priorityCallQueue = null;
1558     }
1559     this.highPriorityLevel = highPriorityLevel;
1560     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1561     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1562     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1563     this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
1564                                      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1565     this.numOfReplicationHandlers = 
1566       conf.getInt("hbase.regionserver.replication.handler.count", 3);
1567     if (numOfReplicationHandlers > 0) {
1568       this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
1569     }
1570     // Start the listener here and let it bind to the port
1571     listener = new Listener();
1572     this.port = listener.getAddress().getPort();
1573     this.rpcMetrics = new HBaseRpcMetrics(
1574         serverName, Integer.toString(this.port));
1575     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
1576     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1577 
1578     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
1579                                         DEFAULT_WARN_DELAYED_CALLS);
1580     this.delayedCalls = new AtomicInteger(0);
1581 
1582 
1583     this.responseQueuesSizeThrottler = new SizeBasedThrottler(
1584         conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE));
1585 
1586     // Create the responder here
1587     responder = new Responder();
1588   }
1589 
1590   /**
1591    * Subclasses of HBaseServer can override this to provide their own
1592    * Connection implementations.
1593    */
1594   protected Connection getConnection(SocketChannel channel, long time) {
1595     return new Connection(channel, time);
1596   }
1597 
1598   /**
1599    * Setup response for the IPC Call.
1600    *
1601    * @param response buffer to serialize the response into
1602    * @param call {@link Call} to which we are setting up the response
1603    * @param status {@link Status} of the IPC call
1604    * @param rv return value for the IPC Call, if the call was successful
1605    * @param errorClass error class, if the the call failed
1606    * @param error error message, if the call failed
1607    * @throws IOException
1608    */
1609   protected void setupResponse(ByteArrayOutputStream response,
1610                              Call call, Status status,
1611                              Writable rv, String errorClass, String error)
1612   throws IOException {
1613     response.reset();
1614     DataOutputStream out = new DataOutputStream(response);
1615 
1616     if (status == Status.SUCCESS) {
1617       try {
1618         rv.write(out);
1619         call.setResponse(rv, status, null, null);
1620       } catch (Throwable t) {
1621         LOG.warn("Error serializing call response for call " + call, t);
1622         // Call back to same function - this is OK since the
1623         // buffer is reset at the top, and since status is changed
1624         // to ERROR it won't infinite loop.
1625         call.setResponse(null, status.ERROR, t.getClass().getName(),
1626             StringUtils.stringifyException(t));
1627       }
1628     } else {
1629       call.setResponse(rv, status, errorClass, error);
1630     }
1631   }
1632 
1633   protected void closeConnection(Connection connection) {
1634     synchronized (connectionList) {
1635       if (connectionList.remove(connection)) {
1636         numConnections--;
1637       }
1638     }
1639     connection.close();
1640     long bytes = 0;
1641     synchronized (connection.responseQueue) {
1642       for (Call c : connection.responseQueue) {
1643         bytes += c.response.limit();
1644       }
1645       connection.responseQueue.clear();
1646     }
1647     responseQueuesSizeThrottler.decrease(bytes);    
1648     rpcMetrics.numOpenConnections.set(numConnections);
1649   }
1650 
1651   /** Sets the socket buffer size used for responding to RPCs.
1652    * @param size send size
1653    */
1654   @Override
1655   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1656 
1657   /** Starts the service.  Must be called before any calls will be handled. */
1658   @Override
1659   public void start() {
1660     startThreads();
1661     openServer();
1662   }
1663 
1664   /**
1665    * Open a previously started server.
1666    */
1667   @Override
1668   public void openServer() {
1669     started = true;
1670   }
1671 
1672   /**
1673    * Starts the service threads but does not allow requests to be responded yet.
1674    * Client will get {@link ServerNotRunningYetException} instead.
1675    */
1676   @Override
1677   public synchronized void startThreads() {
1678     responder.start();
1679     listener.start();
1680     handlers = startHandlers(callQueue, handlerCount);
1681     priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
1682     replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
1683     }
1684 
1685   private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
1686     if (numOfHandlers <= 0) {
1687       return null;
1688     }
1689     Handler[] handlers = new Handler[numOfHandlers];
1690     for (int i = 0; i < numOfHandlers; i++) {
1691       handlers[i] = new Handler(queue, i);
1692       handlers[i].start();
1693     }
1694     return handlers;
1695   }
1696 
1697   /** Stops the service.  No new calls will be handled after this is called. */
1698   @Override
1699   public synchronized void stop() {
1700     LOG.info("Stopping server on " + port);
1701     running = false;
1702     stopHandlers(handlers);
1703     stopHandlers(priorityHandlers);
1704     stopHandlers(replicationHandlers);
1705     listener.interrupt();
1706     listener.doStop();
1707     responder.interrupt();
1708     notifyAll();
1709     if (this.rpcMetrics != null) {
1710       this.rpcMetrics.shutdown();
1711     }
1712   }
1713 
1714   private void stopHandlers(Handler[] handlers) {
1715     if (handlers != null) {
1716       for (Handler handler : handlers) {
1717         if (handler != null) {
1718           handler.interrupt();
1719         }
1720       }
1721     }
1722   }
1723 
1724   /** Wait for the server to be stopped.
1725    * Does not wait for all subthreads to finish.
1726    *  See {@link #stop()}.
1727    * @throws InterruptedException e
1728    */
1729   @Override
1730   public synchronized void join() throws InterruptedException {
1731     while (running) {
1732       wait();
1733     }
1734   }
1735 
1736   /**
1737    * Return the socket (ip+port) on which the RPC server is listening to.
1738    * @return the socket (ip+port) on which the RPC server is listening to.
1739    */
1740   @Override
1741   public synchronized InetSocketAddress getListenerAddress() {
1742     return listener.getAddress();
1743   }
1744 
1745   /**
1746    * Set the handler for calling out of RPC for error conditions.
1747    * @param handler the handler implementation
1748    */
1749   @Override
1750   public void setErrorHandler(HBaseRPCErrorHandler handler) {
1751     this.errorHandler = handler;
1752   }
1753 
1754   /**
1755    * Returns the metrics instance for reporting RPC call statistics
1756    */
1757   public HBaseRpcMetrics getRpcMetrics() {
1758     return rpcMetrics;
1759   }
1760 
1761   /**
1762    * When the read or write buffer size is larger than this limit, i/o will be
1763    * done in chunks of this size. Most RPC requests and responses would be
1764    * be smaller.
1765    */
1766   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
1767 
1768   /**
1769    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
1770    * If the amount of data is large, it writes to channel in smaller chunks.
1771    * This is to avoid jdk from creating many direct buffers as the size of
1772    * buffer increases. This also minimizes extra copies in NIO layer
1773    * as a result of multiple write operations required to write a large
1774    * buffer.
1775    *
1776    * @param channel writable byte channel to write to
1777    * @param buffer buffer to write
1778    * @return number of bytes written
1779    * @throws java.io.IOException e
1780    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
1781    */
1782   protected int channelWrite(WritableByteChannel channel,
1783                                     ByteBuffer buffer) throws IOException {
1784 
1785     int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1786            channel.write(buffer) : channelIO(null, channel, buffer);
1787     if (count > 0) {
1788       rpcMetrics.sentBytes.inc(count);
1789     }
1790     return count;
1791   }
1792 
1793   /**
1794    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
1795    * If the amount of data is large, it writes to channel in smaller chunks.
1796    * This is to avoid jdk from creating many direct buffers as the size of
1797    * ByteBuffer increases. There should not be any performance degredation.
1798    *
1799    * @param channel writable byte channel to write on
1800    * @param buffer buffer to write
1801    * @return number of bytes written
1802    * @throws java.io.IOException e
1803    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
1804    */
1805   protected int channelRead(ReadableByteChannel channel,
1806                                    ByteBuffer buffer) throws IOException {
1807 
1808     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1809            channel.read(buffer) : channelIO(channel, null, buffer);
1810     if (count > 0) {
1811       rpcMetrics.receivedBytes.inc(count);
1812   }
1813     return count;
1814   }
1815 
1816   /**
1817    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
1818    * and {@link #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)}. Only
1819    * one of readCh or writeCh should be non-null.
1820    *
1821    * @param readCh read channel
1822    * @param writeCh write channel
1823    * @param buf buffer to read or write into/out of
1824    * @return bytes written
1825    * @throws java.io.IOException e
1826    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
1827    * @see #channelWrite(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer)
1828    */
1829   private static int channelIO(ReadableByteChannel readCh,
1830                                WritableByteChannel writeCh,
1831                                ByteBuffer buf) throws IOException {
1832 
1833     int originalLimit = buf.limit();
1834     int initialRemaining = buf.remaining();
1835     int ret = 0;
1836 
1837     while (buf.remaining() > 0) {
1838       try {
1839         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
1840         buf.limit(buf.position() + ioSize);
1841 
1842         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
1843 
1844         if (ret < ioSize) {
1845           break;
1846         }
1847 
1848       } finally {
1849         buf.limit(originalLimit);
1850       }
1851     }
1852 
1853     int nBytes = initialRemaining - buf.remaining();
1854     return (nBytes > 0) ? nBytes : ret;
1855   }
1856 
1857   /**
1858    * Needed for delayed calls.  We need to be able to store the current call
1859    * so that we can complete it later.
1860    * @return Call the server is currently handling.
1861    */
1862   public static RpcCallContext getCurrentCall() {
1863     return CurCall.get();
1864   }
1865 
1866   public long getResponseQueueSize(){
1867     return responseQueuesSizeThrottler.getCurrentValue();
1868   }
1869 }