001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.BindException;
022import java.net.InetSocketAddress;
023import java.net.ServerSocket;
024import java.net.SocketException;
025import java.net.UnknownHostException;
026import java.nio.channels.CancelledKeyException;
027import java.nio.channels.GatheringByteChannel;
028import java.nio.channels.SelectionKey;
029import java.nio.channels.Selector;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Set;
036import java.util.Timer;
037import java.util.TimerTask;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.atomic.AtomicInteger;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.security.HBasePolicyProvider;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.io.IOUtils;
051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
052import org.apache.yetus.audience.InterfaceAudience;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056/**
057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf
058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now.
059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers
060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is
061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel
062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the
063 * scheduler to be run. Reader goes back to see if more to be done and loops till done.
064 * <p>
065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has
066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking
067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking
068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to
069 * put itself on new queue for Responder to pull from and return result to client.
070 * @see BlockingRpcClient
071 */
072@Deprecated()
073@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
074public class SimpleRpcServer extends RpcServer {
075
076  protected int port; // port we listen on
077  protected InetSocketAddress address; // inet address we listen on
078  private int readThreads; // number of read threads
079
080  protected int socketSendBufferSize;
081  protected final long purgeTimeout; // in milliseconds
082
083  // maintains the set of client connections and handles idle timeouts
084  private ConnectionManager connectionManager;
085  private Listener listener = null;
086  protected SimpleRpcServerResponder responder = null;
087
088  /** Listens on the socket. Creates jobs for the handler threads */
089  private class Listener extends Thread {
090
091    private ServerSocketChannel acceptChannel = null; // the accept channel
092    private Selector selector = null; // the selector that we use for the server
093    private Reader[] readers = null;
094    private int currentReader = 0;
095    private final int readerPendingConnectionQueueLength;
096
097    private ExecutorService readPool;
098
099    public Listener(final String name) throws IOException {
100      super(name);
101      // The backlog of requests that we will have the serversocket carry.
102      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
103      readerPendingConnectionQueueLength =
104        conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
105      // Create a new server socket and set to non blocking mode
106      acceptChannel = ServerSocketChannel.open();
107      acceptChannel.configureBlocking(false);
108
109      // Bind the server socket to the binding addrees (can be different from the default interface)
110      bind(acceptChannel.socket(), bindAddress, backlogLength);
111      port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port
112      address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress();
113      // create a selector;
114      selector = Selector.open();
115
116      readers = new Reader[readThreads];
117      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
118      // has an advantage in that it is easy to shutdown the pool.
119      readPool = Executors.newFixedThreadPool(readThreads,
120        new ThreadFactoryBuilder()
121          .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port)
122          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
123      for (int i = 0; i < readThreads; ++i) {
124        Reader reader = new Reader();
125        readers[i] = reader;
126        readPool.execute(reader);
127      }
128      LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
129
130      // Register accepts on the server socket with the selector.
131      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
132      this.setName("Listener,port=" + port);
133      this.setDaemon(true);
134    }
135
136    private class Reader implements Runnable {
137      final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
138      private final Selector readSelector;
139
140      Reader() throws IOException {
141        this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
142        this.readSelector = Selector.open();
143      }
144
145      @Override
146      public void run() {
147        try {
148          doRunLoop();
149        } finally {
150          try {
151            readSelector.close();
152          } catch (IOException ioe) {
153            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
154          }
155        }
156      }
157
158      private synchronized void doRunLoop() {
159        while (running) {
160          try {
161            // Consume as many connections as currently queued to avoid
162            // unbridled acceptance of connections that starves the select
163            int size = pendingConnections.size();
164            for (int i = size; i > 0; i--) {
165              SimpleServerRpcConnection conn = pendingConnections.take();
166              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
167            }
168            readSelector.select();
169            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
170            while (iter.hasNext()) {
171              SelectionKey key = iter.next();
172              iter.remove();
173              if (key.isValid()) {
174                if (key.isReadable()) {
175                  doRead(key);
176                }
177              }
178              key = null;
179            }
180          } catch (InterruptedException e) {
181            if (running) { // unexpected -- log it
182              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
183            }
184          } catch (CancelledKeyException e) {
185            LOG.error(getName() + ": CancelledKeyException in Reader", e);
186          } catch (IOException ex) {
187            LOG.info(getName() + ": IOException in Reader", ex);
188          }
189        }
190      }
191
192      /**
193       * Updating the readSelector while it's being used is not thread-safe, so the connection must
194       * be queued. The reader will drain the queue and update its readSelector before performing
195       * the next select
196       */
197      public void addConnection(SimpleServerRpcConnection conn) throws IOException {
198        pendingConnections.add(conn);
199        readSelector.wakeup();
200      }
201    }
202
203    @Override
204    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
205        justification = "selector access is not synchronized; seems fine but concerned changing "
206          + "it will have per impact")
207    public void run() {
208      LOG.info(getName() + ": starting");
209      connectionManager.startIdleScan();
210      while (running) {
211        SelectionKey key = null;
212        try {
213          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
214          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
215          while (iter.hasNext()) {
216            key = iter.next();
217            iter.remove();
218            try {
219              if (key.isValid()) {
220                if (key.isAcceptable()) doAccept(key);
221              }
222            } catch (IOException ignored) {
223              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
224            }
225            key = null;
226          }
227        } catch (OutOfMemoryError e) {
228          if (errorHandler != null) {
229            if (errorHandler.checkOOME(e)) {
230              LOG.info(getName() + ": exiting on OutOfMemoryError");
231              closeCurrentConnection(key, e);
232              connectionManager.closeIdle(true);
233              return;
234            }
235          } else {
236            // we can run out of memory if we have too many threads
237            // log the event and sleep for a minute and give
238            // some thread(s) a chance to finish
239            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
240            closeCurrentConnection(key, e);
241            connectionManager.closeIdle(true);
242            try {
243              Thread.sleep(60000);
244            } catch (InterruptedException ex) {
245              LOG.debug("Interrupted while sleeping");
246            }
247          }
248        } catch (Exception e) {
249          closeCurrentConnection(key, e);
250        }
251      }
252      LOG.info(getName() + ": stopping");
253      synchronized (this) {
254        try {
255          acceptChannel.close();
256          selector.close();
257        } catch (IOException ignored) {
258          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
259        }
260
261        selector = null;
262        acceptChannel = null;
263
264        // close all connections
265        connectionManager.stopIdleScan();
266        connectionManager.closeAll();
267      }
268    }
269
270    private void closeCurrentConnection(SelectionKey key, Throwable e) {
271      if (key != null) {
272        SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
273        if (c != null) {
274          closeConnection(c);
275          key.attach(null);
276        }
277      }
278    }
279
280    InetSocketAddress getAddress() {
281      return address;
282    }
283
284    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
285      ServerSocketChannel server = (ServerSocketChannel) key.channel();
286      SocketChannel channel;
287      while ((channel = server.accept()) != null) {
288        channel.configureBlocking(false);
289        channel.socket().setTcpNoDelay(tcpNoDelay);
290        channel.socket().setKeepAlive(tcpKeepAlive);
291        Reader reader = getReader();
292        SimpleServerRpcConnection c = connectionManager.register(channel);
293        // If the connectionManager can't take it, close the connection.
294        if (c == null) {
295          if (channel.isOpen()) {
296            IOUtils.cleanupWithLogger(LOG, channel);
297          }
298          continue;
299        }
300        key.attach(c); // so closeCurrentConnection can get the object
301        reader.addConnection(c);
302      }
303    }
304
305    void doRead(SelectionKey key) throws InterruptedException {
306      int count;
307      SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
308      if (c == null) {
309        return;
310      }
311      c.setLastContact(EnvironmentEdgeManager.currentTime());
312      try {
313        count = c.readAndProcess();
314      } catch (InterruptedException ieo) {
315        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
316          ieo);
317        throw ieo;
318      } catch (Exception e) {
319        if (LOG.isDebugEnabled()) {
320          LOG.debug("Caught exception while reading:", e);
321        }
322        count = -1; // so that the (count < 0) block is executed
323      }
324      if (count < 0) {
325        closeConnection(c);
326        c = null;
327      } else {
328        c.setLastContact(EnvironmentEdgeManager.currentTime());
329      }
330    }
331
332    synchronized void doStop() {
333      if (selector != null) {
334        selector.wakeup();
335        Thread.yield();
336      }
337      if (acceptChannel != null) {
338        try {
339          acceptChannel.socket().close();
340        } catch (IOException e) {
341          LOG.info(getName() + ": exception in closing listener socket. " + e);
342        }
343      }
344      readPool.shutdownNow();
345    }
346
347    // The method that will return the next reader to work with
348    // Simplistic implementation of round robin for now
349    Reader getReader() {
350      currentReader = (currentReader + 1) % readers.length;
351      return readers[currentReader];
352    }
353  }
354
355  /**
356   * Constructs a server listening on the named port and address.
357   * @param server           hosting instance of {@link Server}. We will do authentications if an
358   *                         instance else pass null for no authentication check.
359   * @param name             Used keying this rpc servers' metrics and for naming the Listener
360   *                         thread.
361   * @param services         A list of services.
362   * @param bindAddress      Where to listen
363   * @param reservoirEnabled Enable ByteBufferPool or not.
364   */
365  public SimpleRpcServer(final Server server, final String name,
366    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
367    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
368    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
369    this.socketSendBufferSize = 0;
370    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
371    this.purgeTimeout =
372      conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
373
374    // Start the listener here and let it bind to the port
375    listener = new Listener(name);
376    this.port = listener.getAddress().getPort();
377
378    // Create the responder here
379    responder = new SimpleRpcServerResponder(this);
380    connectionManager = new ConnectionManager();
381    initReconfigurable(conf);
382
383    this.scheduler.init(new RpcSchedulerContext(this));
384  }
385
386  /**
387   * Subclasses of HBaseServer can override this to provide their own Connection implementations.
388   */
389  protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
390    return new SimpleServerRpcConnection(this, channel, time);
391  }
392
393  protected void closeConnection(SimpleServerRpcConnection connection) {
394    connectionManager.close(connection);
395  }
396
397  /**
398   * Sets the socket buffer size used for responding to RPCs.
399   * @param size send size
400   */
401  @Override
402  public void setSocketSendBufSize(int size) {
403    this.socketSendBufferSize = size;
404  }
405
406  /** Starts the service. Must be called before any calls will be handled. */
407  @Override
408  public synchronized void start() {
409    if (started) {
410      return;
411    }
412    authTokenSecretMgr = createSecretManager();
413    if (authTokenSecretMgr != null) {
414      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
415      // LeaderElector start. See HBASE-25875
416      synchronized (authTokenSecretMgr) {
417        setSecretManager(authTokenSecretMgr);
418        authTokenSecretMgr.start();
419      }
420    }
421    this.authManager = new ServiceAuthorizationManager();
422    HBasePolicyProvider.init(conf, authManager);
423    responder.start();
424    listener.start();
425    scheduler.start();
426    started = true;
427  }
428
429  /** Stops the service. No new calls will be handled after this is called. */
430  @Override
431  public synchronized void stop() {
432    LOG.info("Stopping server on " + port);
433    running = false;
434    if (authTokenSecretMgr != null) {
435      authTokenSecretMgr.stop();
436      authTokenSecretMgr = null;
437    }
438    listener.interrupt();
439    listener.doStop();
440    responder.interrupt();
441    scheduler.stop();
442    notifyAll();
443  }
444
445  /**
446   * Wait for the server to be stopped. Does not wait for all subthreads to finish.
447   * @see #stop()
448   */
449  @Override
450  public synchronized void join() throws InterruptedException {
451    while (running) {
452      wait();
453    }
454  }
455
456  /**
457   * Return the socket (ip+port) on which the RPC server is listening to. May return null if the
458   * listener channel is closed.
459   * @return the socket (ip+port) on which the RPC server is listening to, or null if this
460   *         information cannot be determined
461   */
462  @Override
463  public synchronized InetSocketAddress getListenerAddress() {
464    if (listener == null) {
465      return null;
466    }
467    return listener.getAddress();
468  }
469
470  protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
471    throws IOException {
472    long count = bufferChain.write(channel);
473    if (count > 0) {
474      this.metrics.sentBytes(count);
475    }
476    return count;
477  }
478
479  /**
480   * A convenience method to bind to a given address and report better exceptions if the address is
481   * not a valid host.
482   * @param socket  the socket to bind
483   * @param address the address to bind to
484   * @param backlog the number of connections allowed in the queue
485   * @throws BindException        if the address can't be bound
486   * @throws UnknownHostException if the address isn't a valid host name
487   * @throws IOException          other random errors from bind
488   */
489  public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
490    throws IOException {
491    try {
492      socket.bind(address, backlog);
493    } catch (BindException e) {
494      BindException bindException =
495        new BindException("Problem binding to " + address + " : " + e.getMessage());
496      bindException.initCause(e);
497      throw bindException;
498    } catch (SocketException e) {
499      // If they try to bind to a different host's address, give a better
500      // error message.
501      if ("Unresolved address".equals(e.getMessage())) {
502        throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
503      }
504      throw e;
505    }
506  }
507
508  /**
509   * The number of open RPC conections
510   * @return the number of open rpc connections
511   */
512  @Override
513  public int getNumOpenConnections() {
514    return connectionManager.size();
515  }
516
517  private class ConnectionManager {
518    final private AtomicInteger count = new AtomicInteger();
519    final private Set<SimpleServerRpcConnection> connections;
520
521    final private Timer idleScanTimer;
522    final private int idleScanThreshold;
523    final private int idleScanInterval;
524    final private int maxIdleTime;
525    final private int maxIdleToClose;
526
527    ConnectionManager() {
528      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
529      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
530      this.idleScanInterval =
531        conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
532      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
533      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
534      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
535        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
536      int maxConnectionQueueSize =
537        handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
538      // create a set with concurrency -and- a thread-safe iterator, add 2
539      // for listener and idle closer threads
540      this.connections =
541        Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>(
542          maxConnectionQueueSize, 0.75f, readThreads + 2));
543    }
544
545    private boolean add(SimpleServerRpcConnection connection) {
546      boolean added = connections.add(connection);
547      if (added) {
548        count.getAndIncrement();
549      }
550      return added;
551    }
552
553    private boolean remove(SimpleServerRpcConnection connection) {
554      boolean removed = connections.remove(connection);
555      if (removed) {
556        count.getAndDecrement();
557      }
558      return removed;
559    }
560
561    int size() {
562      return count.get();
563    }
564
565    SimpleServerRpcConnection[] toArray() {
566      return connections.toArray(new SimpleServerRpcConnection[0]);
567    }
568
569    SimpleServerRpcConnection register(SocketChannel channel) {
570      SimpleServerRpcConnection connection =
571        getConnection(channel, EnvironmentEdgeManager.currentTime());
572      add(connection);
573      if (LOG.isTraceEnabled()) {
574        LOG.trace("Connection from " + connection + "; connections=" + size()
575          + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls="
576          + scheduler.getGeneralQueueLength() + ", priority queued calls="
577          + scheduler.getPriorityQueueLength() + ", meta priority queued calls="
578          + scheduler.getMetaPriorityQueueLength());
579      }
580      return connection;
581    }
582
583    boolean close(SimpleServerRpcConnection connection) {
584      boolean exists = remove(connection);
585      if (exists) {
586        if (LOG.isTraceEnabled()) {
587          LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection
588            + ". Number of active connections: " + size());
589        }
590        // only close if actually removed to avoid double-closing due
591        // to possible races
592        connection.close();
593      }
594      return exists;
595    }
596
597    // synch'ed to avoid explicit invocation upon OOM from colliding with
598    // timer task firing
599    synchronized void closeIdle(boolean scanAll) {
600      long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime;
601      // concurrent iterator might miss new connections added
602      // during the iteration, but that's ok because they won't
603      // be idle yet anyway and will be caught on next scan
604      int closed = 0;
605      for (SimpleServerRpcConnection connection : connections) {
606        // stop if connections dropped below threshold unless scanning all
607        if (!scanAll && size() < idleScanThreshold) {
608          break;
609        }
610        // stop if not scanning all and max connections are closed
611        if (
612          connection.isIdle() && connection.getLastContact() < minLastContact && close(connection)
613            && !scanAll && (++closed == maxIdleToClose)
614        ) {
615          break;
616        }
617      }
618    }
619
620    void closeAll() {
621      // use a copy of the connections to be absolutely sure the concurrent
622      // iterator doesn't miss a connection
623      for (SimpleServerRpcConnection connection : toArray()) {
624        close(connection);
625      }
626    }
627
628    void startIdleScan() {
629      scheduleIdleScanTask();
630    }
631
632    void stopIdleScan() {
633      idleScanTimer.cancel();
634    }
635
636    private void scheduleIdleScanTask() {
637      if (!running) {
638        return;
639      }
640      TimerTask idleScanTask = new TimerTask() {
641        @Override
642        public void run() {
643          if (!running) {
644            return;
645          }
646          if (LOG.isTraceEnabled()) {
647            LOG.trace("running");
648          }
649          try {
650            closeIdle(false);
651          } finally {
652            // explicitly reschedule so next execution occurs relative
653            // to the end of this scan, not the beginning
654            scheduleIdleScanTask();
655          }
656        }
657      };
658      idleScanTimer.schedule(idleScanTask, idleScanInterval);
659    }
660  }
661
662}