001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.BindException;
022import java.net.InetSocketAddress;
023import java.net.ServerSocket;
024import java.net.SocketException;
025import java.net.UnknownHostException;
026import java.nio.channels.CancelledKeyException;
027import java.nio.channels.GatheringByteChannel;
028import java.nio.channels.SelectionKey;
029import java.nio.channels.Selector;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Set;
036import java.util.Timer;
037import java.util.TimerTask;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.atomic.AtomicInteger;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.security.HBasePolicyProvider;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.io.IOUtils;
051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
052import org.apache.yetus.audience.InterfaceAudience;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056/**
057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf
058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now.
059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers
060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is
061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel
062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the
063 * scheduler to be run. Reader goes back to see if more to be done and loops till done.
064 * <p>
065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has
066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking
067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking
068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to
069 * put itself on new queue for Responder to pull from and return result to client.
070 * @see BlockingRpcClient
071 */
072@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
073public class SimpleRpcServer extends RpcServer {
074
075  protected int port; // port we listen on
076  protected InetSocketAddress address; // inet address we listen on
077  private int readThreads; // number of read threads
078
079  protected int socketSendBufferSize;
080  protected final long purgeTimeout; // in milliseconds
081
082  // maintains the set of client connections and handles idle timeouts
083  private ConnectionManager connectionManager;
084  private Listener listener = null;
085  protected SimpleRpcServerResponder responder = null;
086
087  /** Listens on the socket. Creates jobs for the handler threads */
088  private class Listener extends Thread {
089
090    private ServerSocketChannel acceptChannel = null; // the accept channel
091    private Selector selector = null; // the selector that we use for the server
092    private Reader[] readers = null;
093    private int currentReader = 0;
094    private final int readerPendingConnectionQueueLength;
095
096    private ExecutorService readPool;
097
098    public Listener(final String name) throws IOException {
099      super(name);
100      // The backlog of requests that we will have the serversocket carry.
101      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
102      readerPendingConnectionQueueLength =
103        conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
104      // Create a new server socket and set to non blocking mode
105      acceptChannel = ServerSocketChannel.open();
106      acceptChannel.configureBlocking(false);
107
108      // Bind the server socket to the binding addrees (can be different from the default interface)
109      bind(acceptChannel.socket(), bindAddress, backlogLength);
110      port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port
111      address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress();
112      // create a selector;
113      selector = Selector.open();
114
115      readers = new Reader[readThreads];
116      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
117      // has an advantage in that it is easy to shutdown the pool.
118      readPool = Executors.newFixedThreadPool(readThreads,
119        new ThreadFactoryBuilder()
120          .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port)
121          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
122      for (int i = 0; i < readThreads; ++i) {
123        Reader reader = new Reader();
124        readers[i] = reader;
125        readPool.execute(reader);
126      }
127      LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
128
129      // Register accepts on the server socket with the selector.
130      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
131      this.setName("Listener,port=" + port);
132      this.setDaemon(true);
133    }
134
135    private class Reader implements Runnable {
136      final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
137      private final Selector readSelector;
138
139      Reader() throws IOException {
140        this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
141        this.readSelector = Selector.open();
142      }
143
144      @Override
145      public void run() {
146        try {
147          doRunLoop();
148        } finally {
149          try {
150            readSelector.close();
151          } catch (IOException ioe) {
152            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
153          }
154        }
155      }
156
157      private synchronized void doRunLoop() {
158        while (running) {
159          try {
160            // Consume as many connections as currently queued to avoid
161            // unbridled acceptance of connections that starves the select
162            int size = pendingConnections.size();
163            for (int i = size; i > 0; i--) {
164              SimpleServerRpcConnection conn = pendingConnections.take();
165              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
166            }
167            readSelector.select();
168            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
169            while (iter.hasNext()) {
170              SelectionKey key = iter.next();
171              iter.remove();
172              if (key.isValid()) {
173                if (key.isReadable()) {
174                  doRead(key);
175                }
176              }
177              key = null;
178            }
179          } catch (InterruptedException e) {
180            if (running) { // unexpected -- log it
181              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
182            }
183          } catch (CancelledKeyException e) {
184            LOG.error(getName() + ": CancelledKeyException in Reader", e);
185          } catch (IOException ex) {
186            LOG.info(getName() + ": IOException in Reader", ex);
187          }
188        }
189      }
190
191      /**
192       * Updating the readSelector while it's being used is not thread-safe, so the connection must
193       * be queued. The reader will drain the queue and update its readSelector before performing
194       * the next select
195       */
196      public void addConnection(SimpleServerRpcConnection conn) throws IOException {
197        pendingConnections.add(conn);
198        readSelector.wakeup();
199      }
200    }
201
202    @Override
203    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
204        justification = "selector access is not synchronized; seems fine but concerned changing "
205          + "it will have per impact")
206    public void run() {
207      LOG.info(getName() + ": starting");
208      connectionManager.startIdleScan();
209      while (running) {
210        SelectionKey key = null;
211        try {
212          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
213          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
214          while (iter.hasNext()) {
215            key = iter.next();
216            iter.remove();
217            try {
218              if (key.isValid()) {
219                if (key.isAcceptable()) doAccept(key);
220              }
221            } catch (IOException ignored) {
222              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
223            }
224            key = null;
225          }
226        } catch (OutOfMemoryError e) {
227          if (errorHandler != null) {
228            if (errorHandler.checkOOME(e)) {
229              LOG.info(getName() + ": exiting on OutOfMemoryError");
230              closeCurrentConnection(key, e);
231              connectionManager.closeIdle(true);
232              return;
233            }
234          } else {
235            // we can run out of memory if we have too many threads
236            // log the event and sleep for a minute and give
237            // some thread(s) a chance to finish
238            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
239            closeCurrentConnection(key, e);
240            connectionManager.closeIdle(true);
241            try {
242              Thread.sleep(60000);
243            } catch (InterruptedException ex) {
244              LOG.debug("Interrupted while sleeping");
245            }
246          }
247        } catch (Exception e) {
248          closeCurrentConnection(key, e);
249        }
250      }
251      LOG.info(getName() + ": stopping");
252      synchronized (this) {
253        try {
254          acceptChannel.close();
255          selector.close();
256        } catch (IOException ignored) {
257          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
258        }
259
260        selector = null;
261        acceptChannel = null;
262
263        // close all connections
264        connectionManager.stopIdleScan();
265        connectionManager.closeAll();
266      }
267    }
268
269    private void closeCurrentConnection(SelectionKey key, Throwable e) {
270      if (key != null) {
271        SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
272        if (c != null) {
273          closeConnection(c);
274          key.attach(null);
275        }
276      }
277    }
278
279    InetSocketAddress getAddress() {
280      return address;
281    }
282
283    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
284      ServerSocketChannel server = (ServerSocketChannel) key.channel();
285      SocketChannel channel;
286      while ((channel = server.accept()) != null) {
287        channel.configureBlocking(false);
288        channel.socket().setTcpNoDelay(tcpNoDelay);
289        channel.socket().setKeepAlive(tcpKeepAlive);
290        Reader reader = getReader();
291        SimpleServerRpcConnection c = connectionManager.register(channel);
292        // If the connectionManager can't take it, close the connection.
293        if (c == null) {
294          if (channel.isOpen()) {
295            IOUtils.cleanupWithLogger(LOG, channel);
296          }
297          continue;
298        }
299        key.attach(c); // so closeCurrentConnection can get the object
300        reader.addConnection(c);
301      }
302    }
303
304    void doRead(SelectionKey key) throws InterruptedException {
305      int count;
306      SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
307      if (c == null) {
308        return;
309      }
310      c.setLastContact(EnvironmentEdgeManager.currentTime());
311      try {
312        count = c.readAndProcess();
313      } catch (InterruptedException ieo) {
314        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
315          ieo);
316        throw ieo;
317      } catch (Exception e) {
318        if (LOG.isDebugEnabled()) {
319          LOG.debug("Caught exception while reading:", e);
320        }
321        count = -1; // so that the (count < 0) block is executed
322      }
323      if (count < 0) {
324        closeConnection(c);
325        c = null;
326      } else {
327        c.setLastContact(EnvironmentEdgeManager.currentTime());
328      }
329    }
330
331    synchronized void doStop() {
332      if (selector != null) {
333        selector.wakeup();
334        Thread.yield();
335      }
336      if (acceptChannel != null) {
337        try {
338          acceptChannel.socket().close();
339        } catch (IOException e) {
340          LOG.info(getName() + ": exception in closing listener socket. " + e);
341        }
342      }
343      readPool.shutdownNow();
344    }
345
346    // The method that will return the next reader to work with
347    // Simplistic implementation of round robin for now
348    Reader getReader() {
349      currentReader = (currentReader + 1) % readers.length;
350      return readers[currentReader];
351    }
352  }
353
354  /**
355   * Constructs a server listening on the named port and address.
356   * @param server      hosting instance of {@link Server}. We will do authentications if an
357   *                    instance else pass null for no authentication check.
358   * @param name        Used keying this rpc servers' metrics and for naming the Listener thread.
359   * @param services    A list of services.
360   * @param bindAddress Where to listen nn * @param reservoirEnabled Enable ByteBufferPool or not.
361   */
362  public SimpleRpcServer(final Server server, final String name,
363    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
364    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
365    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
366    this.socketSendBufferSize = 0;
367    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
368    this.purgeTimeout =
369      conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
370
371    // Start the listener here and let it bind to the port
372    listener = new Listener(name);
373    this.port = listener.getAddress().getPort();
374
375    // Create the responder here
376    responder = new SimpleRpcServerResponder(this);
377    connectionManager = new ConnectionManager();
378    initReconfigurable(conf);
379
380    this.scheduler.init(new RpcSchedulerContext(this));
381  }
382
383  /**
384   * Subclasses of HBaseServer can override this to provide their own Connection implementations.
385   */
386  protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
387    return new SimpleServerRpcConnection(this, channel, time);
388  }
389
390  protected void closeConnection(SimpleServerRpcConnection connection) {
391    connectionManager.close(connection);
392  }
393
394  /**
395   * Sets the socket buffer size used for responding to RPCs.
396   * @param size send size
397   */
398  @Override
399  public void setSocketSendBufSize(int size) {
400    this.socketSendBufferSize = size;
401  }
402
403  /** Starts the service. Must be called before any calls will be handled. */
404  @Override
405  public synchronized void start() {
406    if (started) {
407      return;
408    }
409    authTokenSecretMgr = createSecretManager();
410    if (authTokenSecretMgr != null) {
411      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
412      // LeaderElector start. See HBASE-25875
413      synchronized (authTokenSecretMgr) {
414        setSecretManager(authTokenSecretMgr);
415        authTokenSecretMgr.start();
416      }
417    }
418    this.authManager = new ServiceAuthorizationManager();
419    HBasePolicyProvider.init(conf, authManager);
420    responder.start();
421    listener.start();
422    scheduler.start();
423    started = true;
424  }
425
426  /** Stops the service. No new calls will be handled after this is called. */
427  @Override
428  public synchronized void stop() {
429    LOG.info("Stopping server on " + port);
430    running = false;
431    if (authTokenSecretMgr != null) {
432      authTokenSecretMgr.stop();
433      authTokenSecretMgr = null;
434    }
435    listener.interrupt();
436    listener.doStop();
437    responder.interrupt();
438    scheduler.stop();
439    notifyAll();
440  }
441
442  /**
443   * Wait for the server to be stopped. Does not wait for all subthreads to finish.
444   * @see #stop()
445   */
446  @Override
447  public synchronized void join() throws InterruptedException {
448    while (running) {
449      wait();
450    }
451  }
452
453  /**
454   * Return the socket (ip+port) on which the RPC server is listening to. May return null if the
455   * listener channel is closed.
456   * @return the socket (ip+port) on which the RPC server is listening to, or null if this
457   *         information cannot be determined
458   */
459  @Override
460  public synchronized InetSocketAddress getListenerAddress() {
461    if (listener == null) {
462      return null;
463    }
464    return listener.getAddress();
465  }
466
467  /**
468   * This is a wrapper around
469   * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data
470   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
471   * direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer
472   * as a result of multiple write operations required to write a large buffer.
473   * @param channel     writable byte channel to write to
474   * @param bufferChain Chain of buffers to write
475   * @return number of bytes written
476   * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
477   */
478  protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
479    throws IOException {
480    long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
481    if (count > 0) {
482      this.metrics.sentBytes(count);
483    }
484    return count;
485  }
486
487  /**
488   * A convenience method to bind to a given address and report better exceptions if the address is
489   * not a valid host.
490   * @param socket  the socket to bind
491   * @param address the address to bind to
492   * @param backlog the number of connections allowed in the queue
493   * @throws BindException        if the address can't be bound
494   * @throws UnknownHostException if the address isn't a valid host name
495   * @throws IOException          other random errors from bind
496   */
497  public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
498    throws IOException {
499    try {
500      socket.bind(address, backlog);
501    } catch (BindException e) {
502      BindException bindException =
503        new BindException("Problem binding to " + address + " : " + e.getMessage());
504      bindException.initCause(e);
505      throw bindException;
506    } catch (SocketException e) {
507      // If they try to bind to a different host's address, give a better
508      // error message.
509      if ("Unresolved address".equals(e.getMessage())) {
510        throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
511      }
512      throw e;
513    }
514  }
515
516  /**
517   * The number of open RPC conections
518   * @return the number of open rpc connections
519   */
520  @Override
521  public int getNumOpenConnections() {
522    return connectionManager.size();
523  }
524
525  private class ConnectionManager {
526    final private AtomicInteger count = new AtomicInteger();
527    final private Set<SimpleServerRpcConnection> connections;
528
529    final private Timer idleScanTimer;
530    final private int idleScanThreshold;
531    final private int idleScanInterval;
532    final private int maxIdleTime;
533    final private int maxIdleToClose;
534
535    ConnectionManager() {
536      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
537      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
538      this.idleScanInterval =
539        conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
540      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
541      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
542      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
543        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
544      int maxConnectionQueueSize =
545        handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
546      // create a set with concurrency -and- a thread-safe iterator, add 2
547      // for listener and idle closer threads
548      this.connections =
549        Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>(
550          maxConnectionQueueSize, 0.75f, readThreads + 2));
551    }
552
553    private boolean add(SimpleServerRpcConnection connection) {
554      boolean added = connections.add(connection);
555      if (added) {
556        count.getAndIncrement();
557      }
558      return added;
559    }
560
561    private boolean remove(SimpleServerRpcConnection connection) {
562      boolean removed = connections.remove(connection);
563      if (removed) {
564        count.getAndDecrement();
565      }
566      return removed;
567    }
568
569    int size() {
570      return count.get();
571    }
572
573    SimpleServerRpcConnection[] toArray() {
574      return connections.toArray(new SimpleServerRpcConnection[0]);
575    }
576
577    SimpleServerRpcConnection register(SocketChannel channel) {
578      SimpleServerRpcConnection connection =
579        getConnection(channel, EnvironmentEdgeManager.currentTime());
580      add(connection);
581      if (LOG.isTraceEnabled()) {
582        LOG.trace("Connection from " + connection + "; connections=" + size()
583          + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls="
584          + scheduler.getGeneralQueueLength() + ", priority queued calls="
585          + scheduler.getPriorityQueueLength() + ", meta priority queued calls="
586          + scheduler.getMetaPriorityQueueLength());
587      }
588      return connection;
589    }
590
591    boolean close(SimpleServerRpcConnection connection) {
592      boolean exists = remove(connection);
593      if (exists) {
594        if (LOG.isTraceEnabled()) {
595          LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection
596            + ". Number of active connections: " + size());
597        }
598        // only close if actually removed to avoid double-closing due
599        // to possible races
600        connection.close();
601      }
602      return exists;
603    }
604
605    // synch'ed to avoid explicit invocation upon OOM from colliding with
606    // timer task firing
607    synchronized void closeIdle(boolean scanAll) {
608      long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime;
609      // concurrent iterator might miss new connections added
610      // during the iteration, but that's ok because they won't
611      // be idle yet anyway and will be caught on next scan
612      int closed = 0;
613      for (SimpleServerRpcConnection connection : connections) {
614        // stop if connections dropped below threshold unless scanning all
615        if (!scanAll && size() < idleScanThreshold) {
616          break;
617        }
618        // stop if not scanning all and max connections are closed
619        if (
620          connection.isIdle() && connection.getLastContact() < minLastContact && close(connection)
621            && !scanAll && (++closed == maxIdleToClose)
622        ) {
623          break;
624        }
625      }
626    }
627
628    void closeAll() {
629      // use a copy of the connections to be absolutely sure the concurrent
630      // iterator doesn't miss a connection
631      for (SimpleServerRpcConnection connection : toArray()) {
632        close(connection);
633      }
634    }
635
636    void startIdleScan() {
637      scheduleIdleScanTask();
638    }
639
640    void stopIdleScan() {
641      idleScanTimer.cancel();
642    }
643
644    private void scheduleIdleScanTask() {
645      if (!running) {
646        return;
647      }
648      TimerTask idleScanTask = new TimerTask() {
649        @Override
650        public void run() {
651          if (!running) {
652            return;
653          }
654          if (LOG.isTraceEnabled()) {
655            LOG.trace("running");
656          }
657          try {
658            closeIdle(false);
659          } finally {
660            // explicitly reschedule so next execution occurs relative
661            // to the end of this scan, not the beginning
662            scheduleIdleScanTask();
663          }
664        }
665      };
666      idleScanTimer.schedule(idleScanTask, idleScanInterval);
667    }
668  }
669
670}