001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.BindException; 022import java.net.InetSocketAddress; 023import java.net.ServerSocket; 024import java.net.SocketException; 025import java.net.UnknownHostException; 026import java.nio.channels.CancelledKeyException; 027import java.nio.channels.GatheringByteChannel; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.Selector; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.Timer; 037import java.util.TimerTask; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.atomic.AtomicInteger; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseInterfaceAudience; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.security.HBasePolicyProvider; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.io.IOUtils; 051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 052import org.apache.yetus.audience.InterfaceAudience; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056/** 057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf 058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now. 059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers 060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is 061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel 062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the 063 * scheduler to be run. Reader goes back to see if more to be done and loops till done. 064 * <p> 065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has 066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking 067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking 068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to 069 * put itself on new queue for Responder to pull from and return result to client. 070 * @see BlockingRpcClient 071 */ 072@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 073public class SimpleRpcServer extends RpcServer { 074 075 protected int port; // port we listen on 076 protected InetSocketAddress address; // inet address we listen on 077 private int readThreads; // number of read threads 078 079 protected int socketSendBufferSize; 080 protected final long purgeTimeout; // in milliseconds 081 082 // maintains the set of client connections and handles idle timeouts 083 private ConnectionManager connectionManager; 084 private Listener listener = null; 085 protected SimpleRpcServerResponder responder = null; 086 087 /** Listens on the socket. Creates jobs for the handler threads */ 088 private class Listener extends Thread { 089 090 private ServerSocketChannel acceptChannel = null; // the accept channel 091 private Selector selector = null; // the selector that we use for the server 092 private Reader[] readers = null; 093 private int currentReader = 0; 094 private final int readerPendingConnectionQueueLength; 095 096 private ExecutorService readPool; 097 098 public Listener(final String name) throws IOException { 099 super(name); 100 // The backlog of requests that we will have the serversocket carry. 101 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); 102 readerPendingConnectionQueueLength = 103 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); 104 // Create a new server socket and set to non blocking mode 105 acceptChannel = ServerSocketChannel.open(); 106 acceptChannel.configureBlocking(false); 107 108 // Bind the server socket to the binding addrees (can be different from the default interface) 109 bind(acceptChannel.socket(), bindAddress, backlogLength); 110 port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port 111 address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress(); 112 // create a selector; 113 selector = Selector.open(); 114 115 readers = new Reader[readThreads]; 116 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it 117 // has an advantage in that it is easy to shutdown the pool. 118 readPool = Executors.newFixedThreadPool(readThreads, 119 new ThreadFactoryBuilder() 120 .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port) 121 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 122 for (int i = 0; i < readThreads; ++i) { 123 Reader reader = new Reader(); 124 readers[i] = reader; 125 readPool.execute(reader); 126 } 127 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); 128 129 // Register accepts on the server socket with the selector. 130 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); 131 this.setName("Listener,port=" + port); 132 this.setDaemon(true); 133 } 134 135 private class Reader implements Runnable { 136 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections; 137 private final Selector readSelector; 138 139 Reader() throws IOException { 140 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength); 141 this.readSelector = Selector.open(); 142 } 143 144 @Override 145 public void run() { 146 try { 147 doRunLoop(); 148 } finally { 149 try { 150 readSelector.close(); 151 } catch (IOException ioe) { 152 LOG.error(getName() + ": error closing read selector in " + getName(), ioe); 153 } 154 } 155 } 156 157 private synchronized void doRunLoop() { 158 while (running) { 159 try { 160 // Consume as many connections as currently queued to avoid 161 // unbridled acceptance of connections that starves the select 162 int size = pendingConnections.size(); 163 for (int i = size; i > 0; i--) { 164 SimpleServerRpcConnection conn = pendingConnections.take(); 165 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); 166 } 167 readSelector.select(); 168 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 169 while (iter.hasNext()) { 170 SelectionKey key = iter.next(); 171 iter.remove(); 172 if (key.isValid()) { 173 if (key.isReadable()) { 174 doRead(key); 175 } 176 } 177 key = null; 178 } 179 } catch (InterruptedException e) { 180 if (running) { // unexpected -- log it 181 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); 182 } 183 } catch (CancelledKeyException e) { 184 LOG.error(getName() + ": CancelledKeyException in Reader", e); 185 } catch (IOException ex) { 186 LOG.info(getName() + ": IOException in Reader", ex); 187 } 188 } 189 } 190 191 /** 192 * Updating the readSelector while it's being used is not thread-safe, so the connection must 193 * be queued. The reader will drain the queue and update its readSelector before performing 194 * the next select 195 */ 196 public void addConnection(SimpleServerRpcConnection conn) throws IOException { 197 pendingConnections.add(conn); 198 readSelector.wakeup(); 199 } 200 } 201 202 @Override 203 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 204 justification = "selector access is not synchronized; seems fine but concerned changing " 205 + "it will have per impact") 206 public void run() { 207 LOG.info(getName() + ": starting"); 208 connectionManager.startIdleScan(); 209 while (running) { 210 SelectionKey key = null; 211 try { 212 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC 213 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 214 while (iter.hasNext()) { 215 key = iter.next(); 216 iter.remove(); 217 try { 218 if (key.isValid()) { 219 if (key.isAcceptable()) doAccept(key); 220 } 221 } catch (IOException ignored) { 222 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 223 } 224 key = null; 225 } 226 } catch (OutOfMemoryError e) { 227 if (errorHandler != null) { 228 if (errorHandler.checkOOME(e)) { 229 LOG.info(getName() + ": exiting on OutOfMemoryError"); 230 closeCurrentConnection(key, e); 231 connectionManager.closeIdle(true); 232 return; 233 } 234 } else { 235 // we can run out of memory if we have too many threads 236 // log the event and sleep for a minute and give 237 // some thread(s) a chance to finish 238 LOG.warn(getName() + ": OutOfMemoryError in server select", e); 239 closeCurrentConnection(key, e); 240 connectionManager.closeIdle(true); 241 try { 242 Thread.sleep(60000); 243 } catch (InterruptedException ex) { 244 LOG.debug("Interrupted while sleeping"); 245 } 246 } 247 } catch (Exception e) { 248 closeCurrentConnection(key, e); 249 } 250 } 251 LOG.info(getName() + ": stopping"); 252 synchronized (this) { 253 try { 254 acceptChannel.close(); 255 selector.close(); 256 } catch (IOException ignored) { 257 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 258 } 259 260 selector = null; 261 acceptChannel = null; 262 263 // close all connections 264 connectionManager.stopIdleScan(); 265 connectionManager.closeAll(); 266 } 267 } 268 269 private void closeCurrentConnection(SelectionKey key, Throwable e) { 270 if (key != null) { 271 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 272 if (c != null) { 273 closeConnection(c); 274 key.attach(null); 275 } 276 } 277 } 278 279 InetSocketAddress getAddress() { 280 return address; 281 } 282 283 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { 284 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 285 SocketChannel channel; 286 while ((channel = server.accept()) != null) { 287 channel.configureBlocking(false); 288 channel.socket().setTcpNoDelay(tcpNoDelay); 289 channel.socket().setKeepAlive(tcpKeepAlive); 290 Reader reader = getReader(); 291 SimpleServerRpcConnection c = connectionManager.register(channel); 292 // If the connectionManager can't take it, close the connection. 293 if (c == null) { 294 if (channel.isOpen()) { 295 IOUtils.cleanupWithLogger(LOG, channel); 296 } 297 continue; 298 } 299 key.attach(c); // so closeCurrentConnection can get the object 300 reader.addConnection(c); 301 } 302 } 303 304 void doRead(SelectionKey key) throws InterruptedException { 305 int count; 306 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 307 if (c == null) { 308 return; 309 } 310 c.setLastContact(EnvironmentEdgeManager.currentTime()); 311 try { 312 count = c.readAndProcess(); 313 } catch (InterruptedException ieo) { 314 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", 315 ieo); 316 throw ieo; 317 } catch (Exception e) { 318 if (LOG.isDebugEnabled()) { 319 LOG.debug("Caught exception while reading:", e); 320 } 321 count = -1; // so that the (count < 0) block is executed 322 } 323 if (count < 0) { 324 closeConnection(c); 325 c = null; 326 } else { 327 c.setLastContact(EnvironmentEdgeManager.currentTime()); 328 } 329 } 330 331 synchronized void doStop() { 332 if (selector != null) { 333 selector.wakeup(); 334 Thread.yield(); 335 } 336 if (acceptChannel != null) { 337 try { 338 acceptChannel.socket().close(); 339 } catch (IOException e) { 340 LOG.info(getName() + ": exception in closing listener socket. " + e); 341 } 342 } 343 readPool.shutdownNow(); 344 } 345 346 // The method that will return the next reader to work with 347 // Simplistic implementation of round robin for now 348 Reader getReader() { 349 currentReader = (currentReader + 1) % readers.length; 350 return readers[currentReader]; 351 } 352 } 353 354 /** 355 * Constructs a server listening on the named port and address. 356 * @param server hosting instance of {@link Server}. We will do authentications if an 357 * instance else pass null for no authentication check. 358 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 359 * @param services A list of services. 360 * @param bindAddress Where to listen nn * @param reservoirEnabled Enable ByteBufferPool or not. 361 */ 362 public SimpleRpcServer(final Server server, final String name, 363 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 364 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 365 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 366 this.socketSendBufferSize = 0; 367 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); 368 this.purgeTimeout = 369 conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 370 371 // Start the listener here and let it bind to the port 372 listener = new Listener(name); 373 this.port = listener.getAddress().getPort(); 374 375 // Create the responder here 376 responder = new SimpleRpcServerResponder(this); 377 connectionManager = new ConnectionManager(); 378 initReconfigurable(conf); 379 380 this.scheduler.init(new RpcSchedulerContext(this)); 381 } 382 383 /** 384 * Subclasses of HBaseServer can override this to provide their own Connection implementations. 385 */ 386 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 387 return new SimpleServerRpcConnection(this, channel, time); 388 } 389 390 protected void closeConnection(SimpleServerRpcConnection connection) { 391 connectionManager.close(connection); 392 } 393 394 /** 395 * Sets the socket buffer size used for responding to RPCs. 396 * @param size send size 397 */ 398 @Override 399 public void setSocketSendBufSize(int size) { 400 this.socketSendBufferSize = size; 401 } 402 403 /** Starts the service. Must be called before any calls will be handled. */ 404 @Override 405 public synchronized void start() { 406 if (started) { 407 return; 408 } 409 authTokenSecretMgr = createSecretManager(); 410 if (authTokenSecretMgr != null) { 411 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 412 // LeaderElector start. See HBASE-25875 413 synchronized (authTokenSecretMgr) { 414 setSecretManager(authTokenSecretMgr); 415 authTokenSecretMgr.start(); 416 } 417 } 418 this.authManager = new ServiceAuthorizationManager(); 419 HBasePolicyProvider.init(conf, authManager); 420 responder.start(); 421 listener.start(); 422 scheduler.start(); 423 started = true; 424 } 425 426 /** Stops the service. No new calls will be handled after this is called. */ 427 @Override 428 public synchronized void stop() { 429 LOG.info("Stopping server on " + port); 430 running = false; 431 if (authTokenSecretMgr != null) { 432 authTokenSecretMgr.stop(); 433 authTokenSecretMgr = null; 434 } 435 listener.interrupt(); 436 listener.doStop(); 437 responder.interrupt(); 438 scheduler.stop(); 439 notifyAll(); 440 } 441 442 /** 443 * Wait for the server to be stopped. Does not wait for all subthreads to finish. 444 * @see #stop() 445 */ 446 @Override 447 public synchronized void join() throws InterruptedException { 448 while (running) { 449 wait(); 450 } 451 } 452 453 /** 454 * Return the socket (ip+port) on which the RPC server is listening to. May return null if the 455 * listener channel is closed. 456 * @return the socket (ip+port) on which the RPC server is listening to, or null if this 457 * information cannot be determined 458 */ 459 @Override 460 public synchronized InetSocketAddress getListenerAddress() { 461 if (listener == null) { 462 return null; 463 } 464 return listener.getAddress(); 465 } 466 467 /** 468 * This is a wrapper around 469 * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data 470 * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many 471 * direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer 472 * as a result of multiple write operations required to write a large buffer. 473 * @param channel writable byte channel to write to 474 * @param bufferChain Chain of buffers to write 475 * @return number of bytes written 476 * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) 477 */ 478 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) 479 throws IOException { 480 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); 481 if (count > 0) { 482 this.metrics.sentBytes(count); 483 } 484 return count; 485 } 486 487 /** 488 * A convenience method to bind to a given address and report better exceptions if the address is 489 * not a valid host. 490 * @param socket the socket to bind 491 * @param address the address to bind to 492 * @param backlog the number of connections allowed in the queue 493 * @throws BindException if the address can't be bound 494 * @throws UnknownHostException if the address isn't a valid host name 495 * @throws IOException other random errors from bind 496 */ 497 public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) 498 throws IOException { 499 try { 500 socket.bind(address, backlog); 501 } catch (BindException e) { 502 BindException bindException = 503 new BindException("Problem binding to " + address + " : " + e.getMessage()); 504 bindException.initCause(e); 505 throw bindException; 506 } catch (SocketException e) { 507 // If they try to bind to a different host's address, give a better 508 // error message. 509 if ("Unresolved address".equals(e.getMessage())) { 510 throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); 511 } 512 throw e; 513 } 514 } 515 516 /** 517 * The number of open RPC conections 518 * @return the number of open rpc connections 519 */ 520 @Override 521 public int getNumOpenConnections() { 522 return connectionManager.size(); 523 } 524 525 private class ConnectionManager { 526 final private AtomicInteger count = new AtomicInteger(); 527 final private Set<SimpleServerRpcConnection> connections; 528 529 final private Timer idleScanTimer; 530 final private int idleScanThreshold; 531 final private int idleScanInterval; 532 final private int maxIdleTime; 533 final private int maxIdleToClose; 534 535 ConnectionManager() { 536 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); 537 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); 538 this.idleScanInterval = 539 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); 540 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); 541 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); 542 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 543 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 544 int maxConnectionQueueSize = 545 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); 546 // create a set with concurrency -and- a thread-safe iterator, add 2 547 // for listener and idle closer threads 548 this.connections = 549 Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>( 550 maxConnectionQueueSize, 0.75f, readThreads + 2)); 551 } 552 553 private boolean add(SimpleServerRpcConnection connection) { 554 boolean added = connections.add(connection); 555 if (added) { 556 count.getAndIncrement(); 557 } 558 return added; 559 } 560 561 private boolean remove(SimpleServerRpcConnection connection) { 562 boolean removed = connections.remove(connection); 563 if (removed) { 564 count.getAndDecrement(); 565 } 566 return removed; 567 } 568 569 int size() { 570 return count.get(); 571 } 572 573 SimpleServerRpcConnection[] toArray() { 574 return connections.toArray(new SimpleServerRpcConnection[0]); 575 } 576 577 SimpleServerRpcConnection register(SocketChannel channel) { 578 SimpleServerRpcConnection connection = 579 getConnection(channel, EnvironmentEdgeManager.currentTime()); 580 add(connection); 581 if (LOG.isTraceEnabled()) { 582 LOG.trace("Connection from " + connection + "; connections=" + size() 583 + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls=" 584 + scheduler.getGeneralQueueLength() + ", priority queued calls=" 585 + scheduler.getPriorityQueueLength() + ", meta priority queued calls=" 586 + scheduler.getMetaPriorityQueueLength()); 587 } 588 return connection; 589 } 590 591 boolean close(SimpleServerRpcConnection connection) { 592 boolean exists = remove(connection); 593 if (exists) { 594 if (LOG.isTraceEnabled()) { 595 LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection 596 + ". Number of active connections: " + size()); 597 } 598 // only close if actually removed to avoid double-closing due 599 // to possible races 600 connection.close(); 601 } 602 return exists; 603 } 604 605 // synch'ed to avoid explicit invocation upon OOM from colliding with 606 // timer task firing 607 synchronized void closeIdle(boolean scanAll) { 608 long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime; 609 // concurrent iterator might miss new connections added 610 // during the iteration, but that's ok because they won't 611 // be idle yet anyway and will be caught on next scan 612 int closed = 0; 613 for (SimpleServerRpcConnection connection : connections) { 614 // stop if connections dropped below threshold unless scanning all 615 if (!scanAll && size() < idleScanThreshold) { 616 break; 617 } 618 // stop if not scanning all and max connections are closed 619 if ( 620 connection.isIdle() && connection.getLastContact() < minLastContact && close(connection) 621 && !scanAll && (++closed == maxIdleToClose) 622 ) { 623 break; 624 } 625 } 626 } 627 628 void closeAll() { 629 // use a copy of the connections to be absolutely sure the concurrent 630 // iterator doesn't miss a connection 631 for (SimpleServerRpcConnection connection : toArray()) { 632 close(connection); 633 } 634 } 635 636 void startIdleScan() { 637 scheduleIdleScanTask(); 638 } 639 640 void stopIdleScan() { 641 idleScanTimer.cancel(); 642 } 643 644 private void scheduleIdleScanTask() { 645 if (!running) { 646 return; 647 } 648 TimerTask idleScanTask = new TimerTask() { 649 @Override 650 public void run() { 651 if (!running) { 652 return; 653 } 654 if (LOG.isTraceEnabled()) { 655 LOG.trace("running"); 656 } 657 try { 658 closeIdle(false); 659 } finally { 660 // explicitly reschedule so next execution occurs relative 661 // to the end of this scan, not the beginning 662 scheduleIdleScanTask(); 663 } 664 } 665 }; 666 idleScanTimer.schedule(idleScanTask, idleScanInterval); 667 } 668 } 669 670}