001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.BindException;
022import java.net.InetSocketAddress;
023import java.net.ServerSocket;
024import java.net.SocketException;
025import java.net.UnknownHostException;
026import java.nio.channels.CancelledKeyException;
027import java.nio.channels.GatheringByteChannel;
028import java.nio.channels.SelectionKey;
029import java.nio.channels.Selector;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Set;
036import java.util.Timer;
037import java.util.TimerTask;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.atomic.AtomicInteger;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.security.HBasePolicyProvider;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.io.IOUtils;
051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
052import org.apache.yetus.audience.InterfaceAudience;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056/**
057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf
058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now.
059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers
060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is
061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel
062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the
063 * scheduler to be run. Reader goes back to see if more to be done and loops till done.
064 * <p>
065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has
066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking
067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking
068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to
069 * put itself on new queue for Responder to pull from and return result to client.
070 * @see BlockingRpcClient
071 * @deprecated Since 2.4.14, 2.5.0, will be removed in 4.0.0. Use {@link NettyRpcServer} instead.
072 */
073@Deprecated
074@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
075public class SimpleRpcServer extends RpcServer {
076
077  protected int port; // port we listen on
078  protected InetSocketAddress address; // inet address we listen on
079  private int readThreads; // number of read threads
080
081  protected int socketSendBufferSize;
082  protected final long purgeTimeout; // in milliseconds
083
084  // maintains the set of client connections and handles idle timeouts
085  private ConnectionManager connectionManager;
086  private Listener listener = null;
087  protected SimpleRpcServerResponder responder = null;
088
089  /** Listens on the socket. Creates jobs for the handler threads */
090  private class Listener extends Thread {
091
092    private ServerSocketChannel acceptChannel = null; // the accept channel
093    private Selector selector = null; // the selector that we use for the server
094    private Reader[] readers = null;
095    private int currentReader = 0;
096    private final int readerPendingConnectionQueueLength;
097
098    private ExecutorService readPool;
099
100    public Listener(final String name) throws IOException {
101      super(name);
102      // The backlog of requests that we will have the serversocket carry.
103      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
104      readerPendingConnectionQueueLength =
105        conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
106      // Create a new server socket and set to non blocking mode
107      acceptChannel = ServerSocketChannel.open();
108      acceptChannel.configureBlocking(false);
109
110      // Bind the server socket to the binding addrees (can be different from the default interface)
111      bind(acceptChannel.socket(), bindAddress, backlogLength);
112      port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port
113      address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress();
114      // create a selector;
115      selector = Selector.open();
116
117      readers = new Reader[readThreads];
118      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
119      // has an advantage in that it is easy to shutdown the pool.
120      readPool = Executors.newFixedThreadPool(readThreads,
121        new ThreadFactoryBuilder()
122          .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port)
123          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
124      for (int i = 0; i < readThreads; ++i) {
125        Reader reader = new Reader();
126        readers[i] = reader;
127        readPool.execute(reader);
128      }
129      LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
130
131      // Register accepts on the server socket with the selector.
132      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
133      this.setName("Listener,port=" + port);
134      this.setDaemon(true);
135    }
136
137    private class Reader implements Runnable {
138      final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
139      private final Selector readSelector;
140
141      Reader() throws IOException {
142        this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
143        this.readSelector = Selector.open();
144      }
145
146      @Override
147      public void run() {
148        try {
149          doRunLoop();
150        } finally {
151          try {
152            readSelector.close();
153          } catch (IOException ioe) {
154            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
155          }
156        }
157      }
158
159      private synchronized void doRunLoop() {
160        while (running) {
161          try {
162            // Consume as many connections as currently queued to avoid
163            // unbridled acceptance of connections that starves the select
164            int size = pendingConnections.size();
165            for (int i = size; i > 0; i--) {
166              SimpleServerRpcConnection conn = pendingConnections.take();
167              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
168            }
169            readSelector.select();
170            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
171            while (iter.hasNext()) {
172              SelectionKey key = iter.next();
173              iter.remove();
174              if (key.isValid()) {
175                if (key.isReadable()) {
176                  doRead(key);
177                }
178              }
179              key = null;
180            }
181          } catch (InterruptedException e) {
182            if (running) { // unexpected -- log it
183              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
184            }
185          } catch (CancelledKeyException e) {
186            LOG.error(getName() + ": CancelledKeyException in Reader", e);
187          } catch (IOException ex) {
188            LOG.info(getName() + ": IOException in Reader", ex);
189          }
190        }
191      }
192
193      /**
194       * Updating the readSelector while it's being used is not thread-safe, so the connection must
195       * be queued. The reader will drain the queue and update its readSelector before performing
196       * the next select
197       */
198      public void addConnection(SimpleServerRpcConnection conn) throws IOException {
199        pendingConnections.add(conn);
200        readSelector.wakeup();
201      }
202    }
203
204    @Override
205    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
206        justification = "selector access is not synchronized; seems fine but concerned changing "
207          + "it will have per impact")
208    public void run() {
209      LOG.info(getName() + ": starting");
210      connectionManager.startIdleScan();
211      while (running) {
212        SelectionKey key = null;
213        try {
214          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
215          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
216          while (iter.hasNext()) {
217            key = iter.next();
218            iter.remove();
219            try {
220              if (key.isValid()) {
221                if (key.isAcceptable()) doAccept(key);
222              }
223            } catch (IOException ignored) {
224              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
225            }
226            key = null;
227          }
228        } catch (OutOfMemoryError e) {
229          if (errorHandler != null) {
230            if (errorHandler.checkOOME(e)) {
231              LOG.info(getName() + ": exiting on OutOfMemoryError");
232              closeCurrentConnection(key, e);
233              connectionManager.closeIdle(true);
234              return;
235            }
236          } else {
237            // we can run out of memory if we have too many threads
238            // log the event and sleep for a minute and give
239            // some thread(s) a chance to finish
240            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
241            closeCurrentConnection(key, e);
242            connectionManager.closeIdle(true);
243            try {
244              Thread.sleep(60000);
245            } catch (InterruptedException ex) {
246              LOG.debug("Interrupted while sleeping");
247            }
248          }
249        } catch (Exception e) {
250          closeCurrentConnection(key, e);
251        }
252      }
253      LOG.info(getName() + ": stopping");
254      synchronized (this) {
255        try {
256          acceptChannel.close();
257          selector.close();
258        } catch (IOException ignored) {
259          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
260        }
261
262        selector = null;
263        acceptChannel = null;
264
265        // close all connections
266        connectionManager.stopIdleScan();
267        connectionManager.closeAll();
268      }
269    }
270
271    private void closeCurrentConnection(SelectionKey key, Throwable e) {
272      if (key != null) {
273        SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
274        if (c != null) {
275          closeConnection(c);
276          key.attach(null);
277        }
278      }
279    }
280
281    InetSocketAddress getAddress() {
282      return address;
283    }
284
285    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
286      ServerSocketChannel server = (ServerSocketChannel) key.channel();
287      SocketChannel channel;
288      while ((channel = server.accept()) != null) {
289        channel.configureBlocking(false);
290        channel.socket().setTcpNoDelay(tcpNoDelay);
291        channel.socket().setKeepAlive(tcpKeepAlive);
292        Reader reader = getReader();
293        SimpleServerRpcConnection c = connectionManager.register(channel);
294        // If the connectionManager can't take it, close the connection.
295        if (c == null) {
296          if (channel.isOpen()) {
297            IOUtils.cleanupWithLogger(LOG, channel);
298          }
299          continue;
300        }
301        key.attach(c); // so closeCurrentConnection can get the object
302        reader.addConnection(c);
303      }
304    }
305
306    void doRead(SelectionKey key) throws InterruptedException {
307      int count;
308      SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
309      if (c == null) {
310        return;
311      }
312      c.setLastContact(EnvironmentEdgeManager.currentTime());
313      try {
314        count = c.readAndProcess();
315      } catch (InterruptedException ieo) {
316        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
317          ieo);
318        throw ieo;
319      } catch (Exception e) {
320        if (LOG.isDebugEnabled()) {
321          LOG.debug("Caught exception while reading:", e);
322        }
323        count = -1; // so that the (count < 0) block is executed
324      }
325      if (count < 0) {
326        closeConnection(c);
327        c = null;
328      } else {
329        c.setLastContact(EnvironmentEdgeManager.currentTime());
330      }
331    }
332
333    synchronized void doStop() {
334      if (selector != null) {
335        selector.wakeup();
336        Thread.yield();
337      }
338      if (acceptChannel != null) {
339        try {
340          acceptChannel.socket().close();
341        } catch (IOException e) {
342          LOG.info(getName() + ": exception in closing listener socket. " + e);
343        }
344      }
345      readPool.shutdownNow();
346    }
347
348    // The method that will return the next reader to work with
349    // Simplistic implementation of round robin for now
350    Reader getReader() {
351      currentReader = (currentReader + 1) % readers.length;
352      return readers[currentReader];
353    }
354  }
355
356  /**
357   * Constructs a server listening on the named port and address.
358   * @param server           hosting instance of {@link Server}. We will do authentications if an
359   *                         instance else pass null for no authentication check.
360   * @param name             Used keying this rpc servers' metrics and for naming the Listener
361   *                         thread.
362   * @param services         A list of services.
363   * @param bindAddress      Where to listen
364   * @param reservoirEnabled Enable ByteBufferPool or not.
365   */
366  public SimpleRpcServer(final Server server, final String name,
367    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
368    Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
369    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
370    this.socketSendBufferSize = 0;
371    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
372    this.purgeTimeout =
373      conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
374
375    // Start the listener here and let it bind to the port
376    listener = new Listener(name);
377    this.port = listener.getAddress().getPort();
378
379    // Create the responder here
380    responder = new SimpleRpcServerResponder(this);
381    connectionManager = new ConnectionManager();
382    initReconfigurable(conf);
383
384    this.scheduler.init(new RpcSchedulerContext(this));
385  }
386
387  /**
388   * Subclasses of HBaseServer can override this to provide their own Connection implementations.
389   */
390  protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
391    return new SimpleServerRpcConnection(this, channel, time);
392  }
393
394  protected void closeConnection(SimpleServerRpcConnection connection) {
395    connectionManager.close(connection);
396  }
397
398  /**
399   * Sets the socket buffer size used for responding to RPCs.
400   * @param size send size
401   */
402  @Override
403  public void setSocketSendBufSize(int size) {
404    this.socketSendBufferSize = size;
405  }
406
407  /** Starts the service. Must be called before any calls will be handled. */
408  @Override
409  public synchronized void start() {
410    if (started) {
411      return;
412    }
413    authTokenSecretMgr = createSecretManager();
414    if (authTokenSecretMgr != null) {
415      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
416      // LeaderElector start. See HBASE-25875
417      synchronized (authTokenSecretMgr) {
418        setSecretManager(authTokenSecretMgr);
419        authTokenSecretMgr.start();
420      }
421    }
422    this.authManager = new ServiceAuthorizationManager();
423    HBasePolicyProvider.init(conf, authManager);
424    responder.start();
425    listener.start();
426    scheduler.start();
427    started = true;
428  }
429
430  /** Stops the service. No new calls will be handled after this is called. */
431  @Override
432  public synchronized void stop() {
433    LOG.info("Stopping server on " + port);
434    running = false;
435    if (authTokenSecretMgr != null) {
436      authTokenSecretMgr.stop();
437      authTokenSecretMgr = null;
438    }
439    listener.interrupt();
440    listener.doStop();
441    responder.interrupt();
442    scheduler.stop();
443    notifyAll();
444  }
445
446  /**
447   * Wait for the server to be stopped. Does not wait for all subthreads to finish.
448   * @see #stop()
449   */
450  @Override
451  public synchronized void join() throws InterruptedException {
452    while (running) {
453      wait();
454    }
455  }
456
457  /**
458   * Return the socket (ip+port) on which the RPC server is listening to. May return null if the
459   * listener channel is closed.
460   * @return the socket (ip+port) on which the RPC server is listening to, or null if this
461   *         information cannot be determined
462   */
463  @Override
464  public synchronized InetSocketAddress getListenerAddress() {
465    if (listener == null) {
466      return null;
467    }
468    return listener.getAddress();
469  }
470
471  protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
472    throws IOException {
473    long count = bufferChain.write(channel);
474    if (count > 0) {
475      this.metrics.sentBytes(count);
476    }
477    return count;
478  }
479
480  /**
481   * A convenience method to bind to a given address and report better exceptions if the address is
482   * not a valid host.
483   * @param socket  the socket to bind
484   * @param address the address to bind to
485   * @param backlog the number of connections allowed in the queue
486   * @throws BindException        if the address can't be bound
487   * @throws UnknownHostException if the address isn't a valid host name
488   * @throws IOException          other random errors from bind
489   */
490  public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
491    throws IOException {
492    try {
493      socket.bind(address, backlog);
494    } catch (BindException e) {
495      BindException bindException =
496        new BindException("Problem binding to " + address + " : " + e.getMessage());
497      bindException.initCause(e);
498      throw bindException;
499    } catch (SocketException e) {
500      // If they try to bind to a different host's address, give a better
501      // error message.
502      if ("Unresolved address".equals(e.getMessage())) {
503        throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
504      }
505      throw e;
506    }
507  }
508
509  /**
510   * The number of open RPC conections
511   * @return the number of open rpc connections
512   */
513  @Override
514  public int getNumOpenConnections() {
515    return connectionManager.size();
516  }
517
518  private class ConnectionManager {
519    final private AtomicInteger count = new AtomicInteger();
520    final private Set<SimpleServerRpcConnection> connections;
521
522    final private Timer idleScanTimer;
523    final private int idleScanThreshold;
524    final private int idleScanInterval;
525    final private int maxIdleTime;
526    final private int maxIdleToClose;
527
528    ConnectionManager() {
529      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
530      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
531      this.idleScanInterval =
532        conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
533      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
534      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
535      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
536        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
537      int maxConnectionQueueSize =
538        handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
539      // create a set with concurrency -and- a thread-safe iterator, add 2
540      // for listener and idle closer threads
541      this.connections =
542        Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>(
543          maxConnectionQueueSize, 0.75f, readThreads + 2));
544    }
545
546    private boolean add(SimpleServerRpcConnection connection) {
547      boolean added = connections.add(connection);
548      if (added) {
549        count.getAndIncrement();
550      }
551      return added;
552    }
553
554    private boolean remove(SimpleServerRpcConnection connection) {
555      boolean removed = connections.remove(connection);
556      if (removed) {
557        count.getAndDecrement();
558      }
559      return removed;
560    }
561
562    int size() {
563      return count.get();
564    }
565
566    SimpleServerRpcConnection[] toArray() {
567      return connections.toArray(new SimpleServerRpcConnection[0]);
568    }
569
570    SimpleServerRpcConnection register(SocketChannel channel) {
571      SimpleServerRpcConnection connection =
572        getConnection(channel, EnvironmentEdgeManager.currentTime());
573      add(connection);
574      if (LOG.isTraceEnabled()) {
575        LOG.trace("Connection from " + connection + "; connections=" + size()
576          + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls="
577          + scheduler.getGeneralQueueLength() + ", priority queued calls="
578          + scheduler.getPriorityQueueLength() + ", meta priority queued calls="
579          + scheduler.getMetaPriorityQueueLength());
580      }
581      return connection;
582    }
583
584    boolean close(SimpleServerRpcConnection connection) {
585      boolean exists = remove(connection);
586      if (exists) {
587        if (LOG.isTraceEnabled()) {
588          LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection
589            + ". Number of active connections: " + size());
590        }
591        // only close if actually removed to avoid double-closing due
592        // to possible races
593        connection.close();
594      }
595      return exists;
596    }
597
598    // synch'ed to avoid explicit invocation upon OOM from colliding with
599    // timer task firing
600    synchronized void closeIdle(boolean scanAll) {
601      long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime;
602      // concurrent iterator might miss new connections added
603      // during the iteration, but that's ok because they won't
604      // be idle yet anyway and will be caught on next scan
605      int closed = 0;
606      for (SimpleServerRpcConnection connection : connections) {
607        // stop if connections dropped below threshold unless scanning all
608        if (!scanAll && size() < idleScanThreshold) {
609          break;
610        }
611        // stop if not scanning all and max connections are closed
612        if (
613          connection.isIdle() && connection.getLastContact() < minLastContact && close(connection)
614            && !scanAll && (++closed == maxIdleToClose)
615        ) {
616          break;
617        }
618      }
619    }
620
621    void closeAll() {
622      // use a copy of the connections to be absolutely sure the concurrent
623      // iterator doesn't miss a connection
624      for (SimpleServerRpcConnection connection : toArray()) {
625        close(connection);
626      }
627    }
628
629    void startIdleScan() {
630      scheduleIdleScanTask();
631    }
632
633    void stopIdleScan() {
634      idleScanTimer.cancel();
635    }
636
637    private void scheduleIdleScanTask() {
638      if (!running) {
639        return;
640      }
641      TimerTask idleScanTask = new TimerTask() {
642        @Override
643        public void run() {
644          if (!running) {
645            return;
646          }
647          if (LOG.isTraceEnabled()) {
648            LOG.trace("running");
649          }
650          try {
651            closeIdle(false);
652          } finally {
653            // explicitly reschedule so next execution occurs relative
654            // to the end of this scan, not the beginning
655            scheduleIdleScanTask();
656          }
657        }
658      };
659      idleScanTimer.schedule(idleScanTask, idleScanInterval);
660    }
661  }
662
663}