001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.BindException; 022import java.net.InetSocketAddress; 023import java.net.ServerSocket; 024import java.net.SocketException; 025import java.net.UnknownHostException; 026import java.nio.channels.CancelledKeyException; 027import java.nio.channels.GatheringByteChannel; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.Selector; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.Timer; 037import java.util.TimerTask; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.atomic.AtomicInteger; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseInterfaceAudience; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.security.HBasePolicyProvider; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.io.IOUtils; 051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 052import org.apache.yetus.audience.InterfaceAudience; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056/** 057 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf 058 * described Services. It's the original one before HBASE-17262, and the default RPC server for now. 059 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers 060 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is 061 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel 062 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the 063 * scheduler to be run. Reader goes back to see if more to be done and loops till done. 064 * <p> 065 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has 066 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking 067 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking 068 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to 069 * put itself on new queue for Responder to pull from and return result to client. 070 * @see BlockingRpcClient 071 * @deprecated Since 2.4.14, 2.5.0, will be removed in 4.0.0. Use {@link NettyRpcServer} instead. 072 */ 073@Deprecated 074@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 075public class SimpleRpcServer extends RpcServer { 076 077 protected int port; // port we listen on 078 protected InetSocketAddress address; // inet address we listen on 079 private int readThreads; // number of read threads 080 081 protected int socketSendBufferSize; 082 protected final long purgeTimeout; // in milliseconds 083 084 // maintains the set of client connections and handles idle timeouts 085 private ConnectionManager connectionManager; 086 private Listener listener = null; 087 protected SimpleRpcServerResponder responder = null; 088 089 /** Listens on the socket. Creates jobs for the handler threads */ 090 private class Listener extends Thread { 091 092 private ServerSocketChannel acceptChannel = null; // the accept channel 093 private Selector selector = null; // the selector that we use for the server 094 private Reader[] readers = null; 095 private int currentReader = 0; 096 private final int readerPendingConnectionQueueLength; 097 098 private ExecutorService readPool; 099 100 public Listener(final String name) throws IOException { 101 super(name); 102 // The backlog of requests that we will have the serversocket carry. 103 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); 104 readerPendingConnectionQueueLength = 105 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); 106 // Create a new server socket and set to non blocking mode 107 acceptChannel = ServerSocketChannel.open(); 108 acceptChannel.configureBlocking(false); 109 110 // Bind the server socket to the binding addrees (can be different from the default interface) 111 bind(acceptChannel.socket(), bindAddress, backlogLength); 112 port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port 113 address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress(); 114 // create a selector; 115 selector = Selector.open(); 116 117 readers = new Reader[readThreads]; 118 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it 119 // has an advantage in that it is easy to shutdown the pool. 120 readPool = Executors.newFixedThreadPool(readThreads, 121 new ThreadFactoryBuilder() 122 .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port) 123 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 124 for (int i = 0; i < readThreads; ++i) { 125 Reader reader = new Reader(); 126 readers[i] = reader; 127 readPool.execute(reader); 128 } 129 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); 130 131 // Register accepts on the server socket with the selector. 132 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); 133 this.setName("Listener,port=" + port); 134 this.setDaemon(true); 135 } 136 137 private class Reader implements Runnable { 138 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections; 139 private final Selector readSelector; 140 141 Reader() throws IOException { 142 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength); 143 this.readSelector = Selector.open(); 144 } 145 146 @Override 147 public void run() { 148 try { 149 doRunLoop(); 150 } finally { 151 try { 152 readSelector.close(); 153 } catch (IOException ioe) { 154 LOG.error(getName() + ": error closing read selector in " + getName(), ioe); 155 } 156 } 157 } 158 159 private synchronized void doRunLoop() { 160 while (running) { 161 try { 162 // Consume as many connections as currently queued to avoid 163 // unbridled acceptance of connections that starves the select 164 int size = pendingConnections.size(); 165 for (int i = size; i > 0; i--) { 166 SimpleServerRpcConnection conn = pendingConnections.take(); 167 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); 168 } 169 readSelector.select(); 170 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 171 while (iter.hasNext()) { 172 SelectionKey key = iter.next(); 173 iter.remove(); 174 if (key.isValid()) { 175 if (key.isReadable()) { 176 doRead(key); 177 } 178 } 179 key = null; 180 } 181 } catch (InterruptedException e) { 182 if (running) { // unexpected -- log it 183 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); 184 } 185 } catch (CancelledKeyException e) { 186 LOG.error(getName() + ": CancelledKeyException in Reader", e); 187 } catch (IOException ex) { 188 LOG.info(getName() + ": IOException in Reader", ex); 189 } 190 } 191 } 192 193 /** 194 * Updating the readSelector while it's being used is not thread-safe, so the connection must 195 * be queued. The reader will drain the queue and update its readSelector before performing 196 * the next select 197 */ 198 public void addConnection(SimpleServerRpcConnection conn) throws IOException { 199 pendingConnections.add(conn); 200 readSelector.wakeup(); 201 } 202 } 203 204 @Override 205 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 206 justification = "selector access is not synchronized; seems fine but concerned changing " 207 + "it will have per impact") 208 public void run() { 209 LOG.info(getName() + ": starting"); 210 connectionManager.startIdleScan(); 211 while (running) { 212 SelectionKey key = null; 213 try { 214 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC 215 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 216 while (iter.hasNext()) { 217 key = iter.next(); 218 iter.remove(); 219 try { 220 if (key.isValid()) { 221 if (key.isAcceptable()) doAccept(key); 222 } 223 } catch (IOException ignored) { 224 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 225 } 226 key = null; 227 } 228 } catch (OutOfMemoryError e) { 229 if (errorHandler != null) { 230 if (errorHandler.checkOOME(e)) { 231 LOG.info(getName() + ": exiting on OutOfMemoryError"); 232 closeCurrentConnection(key, e); 233 connectionManager.closeIdle(true); 234 return; 235 } 236 } else { 237 // we can run out of memory if we have too many threads 238 // log the event and sleep for a minute and give 239 // some thread(s) a chance to finish 240 LOG.warn(getName() + ": OutOfMemoryError in server select", e); 241 closeCurrentConnection(key, e); 242 connectionManager.closeIdle(true); 243 try { 244 Thread.sleep(60000); 245 } catch (InterruptedException ex) { 246 LOG.debug("Interrupted while sleeping"); 247 } 248 } 249 } catch (Exception e) { 250 closeCurrentConnection(key, e); 251 } 252 } 253 LOG.info(getName() + ": stopping"); 254 synchronized (this) { 255 try { 256 acceptChannel.close(); 257 selector.close(); 258 } catch (IOException ignored) { 259 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 260 } 261 262 selector = null; 263 acceptChannel = null; 264 265 // close all connections 266 connectionManager.stopIdleScan(); 267 connectionManager.closeAll(); 268 } 269 } 270 271 private void closeCurrentConnection(SelectionKey key, Throwable e) { 272 if (key != null) { 273 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 274 if (c != null) { 275 closeConnection(c); 276 key.attach(null); 277 } 278 } 279 } 280 281 InetSocketAddress getAddress() { 282 return address; 283 } 284 285 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { 286 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 287 SocketChannel channel; 288 while ((channel = server.accept()) != null) { 289 channel.configureBlocking(false); 290 channel.socket().setTcpNoDelay(tcpNoDelay); 291 channel.socket().setKeepAlive(tcpKeepAlive); 292 Reader reader = getReader(); 293 SimpleServerRpcConnection c = connectionManager.register(channel); 294 // If the connectionManager can't take it, close the connection. 295 if (c == null) { 296 if (channel.isOpen()) { 297 IOUtils.cleanupWithLogger(LOG, channel); 298 } 299 continue; 300 } 301 key.attach(c); // so closeCurrentConnection can get the object 302 reader.addConnection(c); 303 } 304 } 305 306 void doRead(SelectionKey key) throws InterruptedException { 307 int count; 308 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 309 if (c == null) { 310 return; 311 } 312 c.setLastContact(EnvironmentEdgeManager.currentTime()); 313 try { 314 count = c.readAndProcess(); 315 } catch (InterruptedException ieo) { 316 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", 317 ieo); 318 throw ieo; 319 } catch (Exception e) { 320 if (LOG.isDebugEnabled()) { 321 LOG.debug("Caught exception while reading:", e); 322 } 323 count = -1; // so that the (count < 0) block is executed 324 } 325 if (count < 0) { 326 closeConnection(c); 327 c = null; 328 } else { 329 c.setLastContact(EnvironmentEdgeManager.currentTime()); 330 } 331 } 332 333 synchronized void doStop() { 334 if (selector != null) { 335 selector.wakeup(); 336 Thread.yield(); 337 } 338 if (acceptChannel != null) { 339 try { 340 acceptChannel.socket().close(); 341 } catch (IOException e) { 342 LOG.info(getName() + ": exception in closing listener socket. " + e); 343 } 344 } 345 readPool.shutdownNow(); 346 } 347 348 // The method that will return the next reader to work with 349 // Simplistic implementation of round robin for now 350 Reader getReader() { 351 currentReader = (currentReader + 1) % readers.length; 352 return readers[currentReader]; 353 } 354 } 355 356 /** 357 * Constructs a server listening on the named port and address. 358 * @param server hosting instance of {@link Server}. We will do authentications if an 359 * instance else pass null for no authentication check. 360 * @param name Used keying this rpc servers' metrics and for naming the Listener 361 * thread. 362 * @param services A list of services. 363 * @param bindAddress Where to listen 364 * @param reservoirEnabled Enable ByteBufferPool or not. 365 */ 366 public SimpleRpcServer(final Server server, final String name, 367 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 368 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 369 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 370 this.socketSendBufferSize = 0; 371 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); 372 this.purgeTimeout = 373 conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 374 375 // Start the listener here and let it bind to the port 376 listener = new Listener(name); 377 this.port = listener.getAddress().getPort(); 378 379 // Create the responder here 380 responder = new SimpleRpcServerResponder(this); 381 connectionManager = new ConnectionManager(); 382 initReconfigurable(conf); 383 384 this.scheduler.init(new RpcSchedulerContext(this)); 385 } 386 387 /** 388 * Subclasses of HBaseServer can override this to provide their own Connection implementations. 389 */ 390 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 391 return new SimpleServerRpcConnection(this, channel, time); 392 } 393 394 protected void closeConnection(SimpleServerRpcConnection connection) { 395 connectionManager.close(connection); 396 } 397 398 /** 399 * Sets the socket buffer size used for responding to RPCs. 400 * @param size send size 401 */ 402 @Override 403 public void setSocketSendBufSize(int size) { 404 this.socketSendBufferSize = size; 405 } 406 407 /** Starts the service. Must be called before any calls will be handled. */ 408 @Override 409 public synchronized void start() { 410 if (started) { 411 return; 412 } 413 authTokenSecretMgr = createSecretManager(); 414 if (authTokenSecretMgr != null) { 415 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 416 // LeaderElector start. See HBASE-25875 417 synchronized (authTokenSecretMgr) { 418 setSecretManager(authTokenSecretMgr); 419 authTokenSecretMgr.start(); 420 } 421 } 422 this.authManager = new ServiceAuthorizationManager(); 423 HBasePolicyProvider.init(conf, authManager); 424 responder.start(); 425 listener.start(); 426 scheduler.start(); 427 started = true; 428 } 429 430 /** Stops the service. No new calls will be handled after this is called. */ 431 @Override 432 public synchronized void stop() { 433 LOG.info("Stopping server on " + port); 434 running = false; 435 if (authTokenSecretMgr != null) { 436 authTokenSecretMgr.stop(); 437 authTokenSecretMgr = null; 438 } 439 listener.interrupt(); 440 listener.doStop(); 441 responder.interrupt(); 442 scheduler.stop(); 443 notifyAll(); 444 } 445 446 /** 447 * Wait for the server to be stopped. Does not wait for all subthreads to finish. 448 * @see #stop() 449 */ 450 @Override 451 public synchronized void join() throws InterruptedException { 452 while (running) { 453 wait(); 454 } 455 } 456 457 /** 458 * Return the socket (ip+port) on which the RPC server is listening to. May return null if the 459 * listener channel is closed. 460 * @return the socket (ip+port) on which the RPC server is listening to, or null if this 461 * information cannot be determined 462 */ 463 @Override 464 public synchronized InetSocketAddress getListenerAddress() { 465 if (listener == null) { 466 return null; 467 } 468 return listener.getAddress(); 469 } 470 471 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) 472 throws IOException { 473 long count = bufferChain.write(channel); 474 if (count > 0) { 475 this.metrics.sentBytes(count); 476 } 477 return count; 478 } 479 480 /** 481 * A convenience method to bind to a given address and report better exceptions if the address is 482 * not a valid host. 483 * @param socket the socket to bind 484 * @param address the address to bind to 485 * @param backlog the number of connections allowed in the queue 486 * @throws BindException if the address can't be bound 487 * @throws UnknownHostException if the address isn't a valid host name 488 * @throws IOException other random errors from bind 489 */ 490 public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) 491 throws IOException { 492 try { 493 socket.bind(address, backlog); 494 } catch (BindException e) { 495 BindException bindException = 496 new BindException("Problem binding to " + address + " : " + e.getMessage()); 497 bindException.initCause(e); 498 throw bindException; 499 } catch (SocketException e) { 500 // If they try to bind to a different host's address, give a better 501 // error message. 502 if ("Unresolved address".equals(e.getMessage())) { 503 throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); 504 } 505 throw e; 506 } 507 } 508 509 /** 510 * The number of open RPC conections 511 * @return the number of open rpc connections 512 */ 513 @Override 514 public int getNumOpenConnections() { 515 return connectionManager.size(); 516 } 517 518 private class ConnectionManager { 519 final private AtomicInteger count = new AtomicInteger(); 520 final private Set<SimpleServerRpcConnection> connections; 521 522 final private Timer idleScanTimer; 523 final private int idleScanThreshold; 524 final private int idleScanInterval; 525 final private int maxIdleTime; 526 final private int maxIdleToClose; 527 528 ConnectionManager() { 529 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); 530 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); 531 this.idleScanInterval = 532 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); 533 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); 534 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); 535 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 536 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 537 int maxConnectionQueueSize = 538 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); 539 // create a set with concurrency -and- a thread-safe iterator, add 2 540 // for listener and idle closer threads 541 this.connections = 542 Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>( 543 maxConnectionQueueSize, 0.75f, readThreads + 2)); 544 } 545 546 private boolean add(SimpleServerRpcConnection connection) { 547 boolean added = connections.add(connection); 548 if (added) { 549 count.getAndIncrement(); 550 } 551 return added; 552 } 553 554 private boolean remove(SimpleServerRpcConnection connection) { 555 boolean removed = connections.remove(connection); 556 if (removed) { 557 count.getAndDecrement(); 558 } 559 return removed; 560 } 561 562 int size() { 563 return count.get(); 564 } 565 566 SimpleServerRpcConnection[] toArray() { 567 return connections.toArray(new SimpleServerRpcConnection[0]); 568 } 569 570 SimpleServerRpcConnection register(SocketChannel channel) { 571 SimpleServerRpcConnection connection = 572 getConnection(channel, EnvironmentEdgeManager.currentTime()); 573 add(connection); 574 if (LOG.isTraceEnabled()) { 575 LOG.trace("Connection from " + connection + "; connections=" + size() 576 + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls=" 577 + scheduler.getGeneralQueueLength() + ", priority queued calls=" 578 + scheduler.getPriorityQueueLength() + ", meta priority queued calls=" 579 + scheduler.getMetaPriorityQueueLength()); 580 } 581 return connection; 582 } 583 584 boolean close(SimpleServerRpcConnection connection) { 585 boolean exists = remove(connection); 586 if (exists) { 587 if (LOG.isTraceEnabled()) { 588 LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection 589 + ". Number of active connections: " + size()); 590 } 591 // only close if actually removed to avoid double-closing due 592 // to possible races 593 connection.close(); 594 } 595 return exists; 596 } 597 598 // synch'ed to avoid explicit invocation upon OOM from colliding with 599 // timer task firing 600 synchronized void closeIdle(boolean scanAll) { 601 long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime; 602 // concurrent iterator might miss new connections added 603 // during the iteration, but that's ok because they won't 604 // be idle yet anyway and will be caught on next scan 605 int closed = 0; 606 for (SimpleServerRpcConnection connection : connections) { 607 // stop if connections dropped below threshold unless scanning all 608 if (!scanAll && size() < idleScanThreshold) { 609 break; 610 } 611 // stop if not scanning all and max connections are closed 612 if ( 613 connection.isIdle() && connection.getLastContact() < minLastContact && close(connection) 614 && !scanAll && (++closed == maxIdleToClose) 615 ) { 616 break; 617 } 618 } 619 } 620 621 void closeAll() { 622 // use a copy of the connections to be absolutely sure the concurrent 623 // iterator doesn't miss a connection 624 for (SimpleServerRpcConnection connection : toArray()) { 625 close(connection); 626 } 627 } 628 629 void startIdleScan() { 630 scheduleIdleScanTask(); 631 } 632 633 void stopIdleScan() { 634 idleScanTimer.cancel(); 635 } 636 637 private void scheduleIdleScanTask() { 638 if (!running) { 639 return; 640 } 641 TimerTask idleScanTask = new TimerTask() { 642 @Override 643 public void run() { 644 if (!running) { 645 return; 646 } 647 if (LOG.isTraceEnabled()) { 648 LOG.trace("running"); 649 } 650 try { 651 closeIdle(false); 652 } finally { 653 // explicitly reschedule so next execution occurs relative 654 // to the end of this scan, not the beginning 655 scheduleIdleScanTask(); 656 } 657 } 658 }; 659 idleScanTimer.schedule(idleScanTask, idleScanInterval); 660 } 661 } 662 663}