001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.BindException; 022import java.net.InetSocketAddress; 023import java.net.ServerSocket; 024import java.net.SocketException; 025import java.net.UnknownHostException; 026import java.nio.channels.CancelledKeyException; 027import java.nio.channels.GatheringByteChannel; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.Selector; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.Timer; 037import java.util.TimerTask; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.atomic.AtomicInteger; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseInterfaceAudience; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.security.HBasePolicyProvider; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.io.IOUtils; 051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 052import org.apache.yetus.audience.InterfaceAudience; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056/** 057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf 058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now. 059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers 060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is 061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel 062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the 063 * scheduler to be run. Reader goes back to see if more to be done and loops till done. 064 * <p> 065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has 066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking 067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking 068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to 069 * put itself on new queue for Responder to pull from and return result to client. 070 * @see BlockingRpcClient 071 */ 072@Deprecated() 073@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 074public class SimpleRpcServer extends RpcServer { 075 076 protected int port; // port we listen on 077 protected InetSocketAddress address; // inet address we listen on 078 private int readThreads; // number of read threads 079 080 protected int socketSendBufferSize; 081 protected final long purgeTimeout; // in milliseconds 082 083 // maintains the set of client connections and handles idle timeouts 084 private ConnectionManager connectionManager; 085 private Listener listener = null; 086 protected SimpleRpcServerResponder responder = null; 087 088 /** Listens on the socket. Creates jobs for the handler threads */ 089 private class Listener extends Thread { 090 091 private ServerSocketChannel acceptChannel = null; // the accept channel 092 private Selector selector = null; // the selector that we use for the server 093 private Reader[] readers = null; 094 private int currentReader = 0; 095 private final int readerPendingConnectionQueueLength; 096 097 private ExecutorService readPool; 098 099 public Listener(final String name) throws IOException { 100 super(name); 101 // The backlog of requests that we will have the serversocket carry. 102 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); 103 readerPendingConnectionQueueLength = 104 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); 105 // Create a new server socket and set to non blocking mode 106 acceptChannel = ServerSocketChannel.open(); 107 acceptChannel.configureBlocking(false); 108 109 // Bind the server socket to the binding addrees (can be different from the default interface) 110 bind(acceptChannel.socket(), bindAddress, backlogLength); 111 port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port 112 address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress(); 113 // create a selector; 114 selector = Selector.open(); 115 116 readers = new Reader[readThreads]; 117 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it 118 // has an advantage in that it is easy to shutdown the pool. 119 readPool = Executors.newFixedThreadPool(readThreads, 120 new ThreadFactoryBuilder() 121 .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port) 122 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 123 for (int i = 0; i < readThreads; ++i) { 124 Reader reader = new Reader(); 125 readers[i] = reader; 126 readPool.execute(reader); 127 } 128 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); 129 130 // Register accepts on the server socket with the selector. 131 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); 132 this.setName("Listener,port=" + port); 133 this.setDaemon(true); 134 } 135 136 private class Reader implements Runnable { 137 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections; 138 private final Selector readSelector; 139 140 Reader() throws IOException { 141 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength); 142 this.readSelector = Selector.open(); 143 } 144 145 @Override 146 public void run() { 147 try { 148 doRunLoop(); 149 } finally { 150 try { 151 readSelector.close(); 152 } catch (IOException ioe) { 153 LOG.error(getName() + ": error closing read selector in " + getName(), ioe); 154 } 155 } 156 } 157 158 private synchronized void doRunLoop() { 159 while (running) { 160 try { 161 // Consume as many connections as currently queued to avoid 162 // unbridled acceptance of connections that starves the select 163 int size = pendingConnections.size(); 164 for (int i = size; i > 0; i--) { 165 SimpleServerRpcConnection conn = pendingConnections.take(); 166 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); 167 } 168 readSelector.select(); 169 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 170 while (iter.hasNext()) { 171 SelectionKey key = iter.next(); 172 iter.remove(); 173 if (key.isValid()) { 174 if (key.isReadable()) { 175 doRead(key); 176 } 177 } 178 key = null; 179 } 180 } catch (InterruptedException e) { 181 if (running) { // unexpected -- log it 182 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); 183 } 184 } catch (CancelledKeyException e) { 185 LOG.error(getName() + ": CancelledKeyException in Reader", e); 186 } catch (IOException ex) { 187 LOG.info(getName() + ": IOException in Reader", ex); 188 } 189 } 190 } 191 192 /** 193 * Updating the readSelector while it's being used is not thread-safe, so the connection must 194 * be queued. The reader will drain the queue and update its readSelector before performing 195 * the next select 196 */ 197 public void addConnection(SimpleServerRpcConnection conn) throws IOException { 198 pendingConnections.add(conn); 199 readSelector.wakeup(); 200 } 201 } 202 203 @Override 204 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 205 justification = "selector access is not synchronized; seems fine but concerned changing " 206 + "it will have per impact") 207 public void run() { 208 LOG.info(getName() + ": starting"); 209 connectionManager.startIdleScan(); 210 while (running) { 211 SelectionKey key = null; 212 try { 213 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC 214 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 215 while (iter.hasNext()) { 216 key = iter.next(); 217 iter.remove(); 218 try { 219 if (key.isValid()) { 220 if (key.isAcceptable()) doAccept(key); 221 } 222 } catch (IOException ignored) { 223 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 224 } 225 key = null; 226 } 227 } catch (OutOfMemoryError e) { 228 if (errorHandler != null) { 229 if (errorHandler.checkOOME(e)) { 230 LOG.info(getName() + ": exiting on OutOfMemoryError"); 231 closeCurrentConnection(key, e); 232 connectionManager.closeIdle(true); 233 return; 234 } 235 } else { 236 // we can run out of memory if we have too many threads 237 // log the event and sleep for a minute and give 238 // some thread(s) a chance to finish 239 LOG.warn(getName() + ": OutOfMemoryError in server select", e); 240 closeCurrentConnection(key, e); 241 connectionManager.closeIdle(true); 242 try { 243 Thread.sleep(60000); 244 } catch (InterruptedException ex) { 245 LOG.debug("Interrupted while sleeping"); 246 } 247 } 248 } catch (Exception e) { 249 closeCurrentConnection(key, e); 250 } 251 } 252 LOG.info(getName() + ": stopping"); 253 synchronized (this) { 254 try { 255 acceptChannel.close(); 256 selector.close(); 257 } catch (IOException ignored) { 258 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 259 } 260 261 selector = null; 262 acceptChannel = null; 263 264 // close all connections 265 connectionManager.stopIdleScan(); 266 connectionManager.closeAll(); 267 } 268 } 269 270 private void closeCurrentConnection(SelectionKey key, Throwable e) { 271 if (key != null) { 272 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 273 if (c != null) { 274 closeConnection(c); 275 key.attach(null); 276 } 277 } 278 } 279 280 InetSocketAddress getAddress() { 281 return address; 282 } 283 284 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { 285 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 286 SocketChannel channel; 287 while ((channel = server.accept()) != null) { 288 channel.configureBlocking(false); 289 channel.socket().setTcpNoDelay(tcpNoDelay); 290 channel.socket().setKeepAlive(tcpKeepAlive); 291 Reader reader = getReader(); 292 SimpleServerRpcConnection c = connectionManager.register(channel); 293 // If the connectionManager can't take it, close the connection. 294 if (c == null) { 295 if (channel.isOpen()) { 296 IOUtils.cleanupWithLogger(LOG, channel); 297 } 298 continue; 299 } 300 key.attach(c); // so closeCurrentConnection can get the object 301 reader.addConnection(c); 302 } 303 } 304 305 void doRead(SelectionKey key) throws InterruptedException { 306 int count; 307 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 308 if (c == null) { 309 return; 310 } 311 c.setLastContact(EnvironmentEdgeManager.currentTime()); 312 try { 313 count = c.readAndProcess(); 314 } catch (InterruptedException ieo) { 315 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", 316 ieo); 317 throw ieo; 318 } catch (Exception e) { 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("Caught exception while reading:", e); 321 } 322 count = -1; // so that the (count < 0) block is executed 323 } 324 if (count < 0) { 325 closeConnection(c); 326 c = null; 327 } else { 328 c.setLastContact(EnvironmentEdgeManager.currentTime()); 329 } 330 } 331 332 synchronized void doStop() { 333 if (selector != null) { 334 selector.wakeup(); 335 Thread.yield(); 336 } 337 if (acceptChannel != null) { 338 try { 339 acceptChannel.socket().close(); 340 } catch (IOException e) { 341 LOG.info(getName() + ": exception in closing listener socket. " + e); 342 } 343 } 344 readPool.shutdownNow(); 345 } 346 347 // The method that will return the next reader to work with 348 // Simplistic implementation of round robin for now 349 Reader getReader() { 350 currentReader = (currentReader + 1) % readers.length; 351 return readers[currentReader]; 352 } 353 } 354 355 /** 356 * Constructs a server listening on the named port and address. 357 * @param server hosting instance of {@link Server}. We will do authentications if an 358 * instance else pass null for no authentication check. 359 * @param name Used keying this rpc servers' metrics and for naming the Listener 360 * thread. 361 * @param services A list of services. 362 * @param bindAddress Where to listen 363 * @param reservoirEnabled Enable ByteBufferPool or not. 364 */ 365 public SimpleRpcServer(final Server server, final String name, 366 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 367 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 368 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 369 this.socketSendBufferSize = 0; 370 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); 371 this.purgeTimeout = 372 conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 373 374 // Start the listener here and let it bind to the port 375 listener = new Listener(name); 376 this.port = listener.getAddress().getPort(); 377 378 // Create the responder here 379 responder = new SimpleRpcServerResponder(this); 380 connectionManager = new ConnectionManager(); 381 initReconfigurable(conf); 382 383 this.scheduler.init(new RpcSchedulerContext(this)); 384 } 385 386 /** 387 * Subclasses of HBaseServer can override this to provide their own Connection implementations. 388 */ 389 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 390 return new SimpleServerRpcConnection(this, channel, time); 391 } 392 393 protected void closeConnection(SimpleServerRpcConnection connection) { 394 connectionManager.close(connection); 395 } 396 397 /** 398 * Sets the socket buffer size used for responding to RPCs. 399 * @param size send size 400 */ 401 @Override 402 public void setSocketSendBufSize(int size) { 403 this.socketSendBufferSize = size; 404 } 405 406 /** Starts the service. Must be called before any calls will be handled. */ 407 @Override 408 public synchronized void start() { 409 if (started) { 410 return; 411 } 412 authTokenSecretMgr = createSecretManager(); 413 if (authTokenSecretMgr != null) { 414 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 415 // LeaderElector start. See HBASE-25875 416 synchronized (authTokenSecretMgr) { 417 setSecretManager(authTokenSecretMgr); 418 authTokenSecretMgr.start(); 419 } 420 } 421 this.authManager = new ServiceAuthorizationManager(); 422 HBasePolicyProvider.init(conf, authManager); 423 responder.start(); 424 listener.start(); 425 scheduler.start(); 426 started = true; 427 } 428 429 /** Stops the service. No new calls will be handled after this is called. */ 430 @Override 431 public synchronized void stop() { 432 LOG.info("Stopping server on " + port); 433 running = false; 434 if (authTokenSecretMgr != null) { 435 authTokenSecretMgr.stop(); 436 authTokenSecretMgr = null; 437 } 438 listener.interrupt(); 439 listener.doStop(); 440 responder.interrupt(); 441 scheduler.stop(); 442 notifyAll(); 443 } 444 445 /** 446 * Wait for the server to be stopped. Does not wait for all subthreads to finish. 447 * @see #stop() 448 */ 449 @Override 450 public synchronized void join() throws InterruptedException { 451 while (running) { 452 wait(); 453 } 454 } 455 456 /** 457 * Return the socket (ip+port) on which the RPC server is listening to. May return null if the 458 * listener channel is closed. 459 * @return the socket (ip+port) on which the RPC server is listening to, or null if this 460 * information cannot be determined 461 */ 462 @Override 463 public synchronized InetSocketAddress getListenerAddress() { 464 if (listener == null) { 465 return null; 466 } 467 return listener.getAddress(); 468 } 469 470 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) 471 throws IOException { 472 long count = bufferChain.write(channel); 473 if (count > 0) { 474 this.metrics.sentBytes(count); 475 } 476 return count; 477 } 478 479 /** 480 * A convenience method to bind to a given address and report better exceptions if the address is 481 * not a valid host. 482 * @param socket the socket to bind 483 * @param address the address to bind to 484 * @param backlog the number of connections allowed in the queue 485 * @throws BindException if the address can't be bound 486 * @throws UnknownHostException if the address isn't a valid host name 487 * @throws IOException other random errors from bind 488 */ 489 public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) 490 throws IOException { 491 try { 492 socket.bind(address, backlog); 493 } catch (BindException e) { 494 BindException bindException = 495 new BindException("Problem binding to " + address + " : " + e.getMessage()); 496 bindException.initCause(e); 497 throw bindException; 498 } catch (SocketException e) { 499 // If they try to bind to a different host's address, give a better 500 // error message. 501 if ("Unresolved address".equals(e.getMessage())) { 502 throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); 503 } 504 throw e; 505 } 506 } 507 508 /** 509 * The number of open RPC conections 510 * @return the number of open rpc connections 511 */ 512 @Override 513 public int getNumOpenConnections() { 514 return connectionManager.size(); 515 } 516 517 private class ConnectionManager { 518 final private AtomicInteger count = new AtomicInteger(); 519 final private Set<SimpleServerRpcConnection> connections; 520 521 final private Timer idleScanTimer; 522 final private int idleScanThreshold; 523 final private int idleScanInterval; 524 final private int maxIdleTime; 525 final private int maxIdleToClose; 526 527 ConnectionManager() { 528 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); 529 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); 530 this.idleScanInterval = 531 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); 532 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); 533 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); 534 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 535 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 536 int maxConnectionQueueSize = 537 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); 538 // create a set with concurrency -and- a thread-safe iterator, add 2 539 // for listener and idle closer threads 540 this.connections = 541 Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>( 542 maxConnectionQueueSize, 0.75f, readThreads + 2)); 543 } 544 545 private boolean add(SimpleServerRpcConnection connection) { 546 boolean added = connections.add(connection); 547 if (added) { 548 count.getAndIncrement(); 549 } 550 return added; 551 } 552 553 private boolean remove(SimpleServerRpcConnection connection) { 554 boolean removed = connections.remove(connection); 555 if (removed) { 556 count.getAndDecrement(); 557 } 558 return removed; 559 } 560 561 int size() { 562 return count.get(); 563 } 564 565 SimpleServerRpcConnection[] toArray() { 566 return connections.toArray(new SimpleServerRpcConnection[0]); 567 } 568 569 SimpleServerRpcConnection register(SocketChannel channel) { 570 SimpleServerRpcConnection connection = 571 getConnection(channel, EnvironmentEdgeManager.currentTime()); 572 add(connection); 573 if (LOG.isTraceEnabled()) { 574 LOG.trace("Connection from " + connection + "; connections=" + size() 575 + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls=" 576 + scheduler.getGeneralQueueLength() + ", priority queued calls=" 577 + scheduler.getPriorityQueueLength() + ", meta priority queued calls=" 578 + scheduler.getMetaPriorityQueueLength()); 579 } 580 return connection; 581 } 582 583 boolean close(SimpleServerRpcConnection connection) { 584 boolean exists = remove(connection); 585 if (exists) { 586 if (LOG.isTraceEnabled()) { 587 LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection 588 + ". Number of active connections: " + size()); 589 } 590 // only close if actually removed to avoid double-closing due 591 // to possible races 592 connection.close(); 593 } 594 return exists; 595 } 596 597 // synch'ed to avoid explicit invocation upon OOM from colliding with 598 // timer task firing 599 synchronized void closeIdle(boolean scanAll) { 600 long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime; 601 // concurrent iterator might miss new connections added 602 // during the iteration, but that's ok because they won't 603 // be idle yet anyway and will be caught on next scan 604 int closed = 0; 605 for (SimpleServerRpcConnection connection : connections) { 606 // stop if connections dropped below threshold unless scanning all 607 if (!scanAll && size() < idleScanThreshold) { 608 break; 609 } 610 // stop if not scanning all and max connections are closed 611 if ( 612 connection.isIdle() && connection.getLastContact() < minLastContact && close(connection) 613 && !scanAll && (++closed == maxIdleToClose) 614 ) { 615 break; 616 } 617 } 618 } 619 620 void closeAll() { 621 // use a copy of the connections to be absolutely sure the concurrent 622 // iterator doesn't miss a connection 623 for (SimpleServerRpcConnection connection : toArray()) { 624 close(connection); 625 } 626 } 627 628 void startIdleScan() { 629 scheduleIdleScanTask(); 630 } 631 632 void stopIdleScan() { 633 idleScanTimer.cancel(); 634 } 635 636 private void scheduleIdleScanTask() { 637 if (!running) { 638 return; 639 } 640 TimerTask idleScanTask = new TimerTask() { 641 @Override 642 public void run() { 643 if (!running) { 644 return; 645 } 646 if (LOG.isTraceEnabled()) { 647 LOG.trace("running"); 648 } 649 try { 650 closeIdle(false); 651 } finally { 652 // explicitly reschedule so next execution occurs relative 653 // to the end of this scan, not the beginning 654 scheduleIdleScanTask(); 655 } 656 } 657 }; 658 idleScanTimer.schedule(idleScanTask, idleScanInterval); 659 } 660 } 661 662}