001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.BindException; 022import java.net.InetSocketAddress; 023import java.net.ServerSocket; 024import java.net.SocketException; 025import java.net.UnknownHostException; 026import java.nio.channels.CancelledKeyException; 027import java.nio.channels.GatheringByteChannel; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.Selector; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.Timer; 037import java.util.TimerTask; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.CellScanner; 046import org.apache.hadoop.hbase.HBaseInterfaceAudience; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.Server; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 051import org.apache.hadoop.hbase.security.HBasePolicyProvider; 052import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 053import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 054import org.apache.hbase.thirdparty.com.google.protobuf.Message; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.hadoop.io.IOUtils; 058import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * The RPC server with native java NIO implementation deriving from Hadoop to 064 * host protobuf described Services. It's the original one before HBASE-17262, 065 * and the default RPC server for now. 066 * 067 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number 068 * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then 069 * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does 070 * total read off the channel and the parse from which it makes a Call. The call is wrapped in a 071 * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done 072 * and loops till done. 073 * 074 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it 075 * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run 076 * taking from the queue. They run the CallRunner#run method on each item gotten from queue 077 * and keep taking while the server is up. 078 * 079 * CallRunner#run executes the call. When done, asks the included Call to put itself on new 080 * queue for Responder to pull from and return result to client. 081 * 082 * @see BlockingRpcClient 083 */ 084@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG}) 085public class SimpleRpcServer extends RpcServer { 086 087 protected int port; // port we listen on 088 protected InetSocketAddress address; // inet address we listen on 089 private int readThreads; // number of read threads 090 091 protected int socketSendBufferSize; 092 protected final long purgeTimeout; // in milliseconds 093 094 // maintains the set of client connections and handles idle timeouts 095 private ConnectionManager connectionManager; 096 private Listener listener = null; 097 protected SimpleRpcServerResponder responder = null; 098 099 /** Listens on the socket. Creates jobs for the handler threads*/ 100 private class Listener extends Thread { 101 102 private ServerSocketChannel acceptChannel = null; //the accept channel 103 private Selector selector = null; //the selector that we use for the server 104 private Reader[] readers = null; 105 private int currentReader = 0; 106 private final int readerPendingConnectionQueueLength; 107 108 private ExecutorService readPool; 109 110 public Listener(final String name) throws IOException { 111 super(name); 112 // The backlog of requests that we will have the serversocket carry. 113 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); 114 readerPendingConnectionQueueLength = 115 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); 116 // Create a new server socket and set to non blocking mode 117 acceptChannel = ServerSocketChannel.open(); 118 acceptChannel.configureBlocking(false); 119 120 // Bind the server socket to the binding addrees (can be different from the default interface) 121 bind(acceptChannel.socket(), bindAddress, backlogLength); 122 port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port 123 address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); 124 // create a selector; 125 selector = Selector.open(); 126 127 readers = new Reader[readThreads]; 128 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it 129 // has an advantage in that it is easy to shutdown the pool. 130 readPool = Executors.newFixedThreadPool(readThreads, 131 new ThreadFactoryBuilder().setNameFormat( 132 "Reader=%d,bindAddress=" + bindAddress.getHostName() + 133 ",port=" + port).setDaemon(true) 134 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 135 for (int i = 0; i < readThreads; ++i) { 136 Reader reader = new Reader(); 137 readers[i] = reader; 138 readPool.execute(reader); 139 } 140 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); 141 142 // Register accepts on the server socket with the selector. 143 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); 144 this.setName("Listener,port=" + port); 145 this.setDaemon(true); 146 } 147 148 149 private class Reader implements Runnable { 150 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections; 151 private final Selector readSelector; 152 153 Reader() throws IOException { 154 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength); 155 this.readSelector = Selector.open(); 156 } 157 158 @Override 159 public void run() { 160 try { 161 doRunLoop(); 162 } finally { 163 try { 164 readSelector.close(); 165 } catch (IOException ioe) { 166 LOG.error(getName() + ": error closing read selector in " + getName(), ioe); 167 } 168 } 169 } 170 171 private synchronized void doRunLoop() { 172 while (running) { 173 try { 174 // Consume as many connections as currently queued to avoid 175 // unbridled acceptance of connections that starves the select 176 int size = pendingConnections.size(); 177 for (int i=size; i>0; i--) { 178 SimpleServerRpcConnection conn = pendingConnections.take(); 179 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); 180 } 181 readSelector.select(); 182 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 183 while (iter.hasNext()) { 184 SelectionKey key = iter.next(); 185 iter.remove(); 186 if (key.isValid()) { 187 if (key.isReadable()) { 188 doRead(key); 189 } 190 } 191 key = null; 192 } 193 } catch (InterruptedException e) { 194 if (running) { // unexpected -- log it 195 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); 196 } 197 } catch (CancelledKeyException e) { 198 LOG.error(getName() + ": CancelledKeyException in Reader", e); 199 } catch (IOException ex) { 200 LOG.info(getName() + ": IOException in Reader", ex); 201 } 202 } 203 } 204 205 /** 206 * Updating the readSelector while it's being used is not thread-safe, 207 * so the connection must be queued. The reader will drain the queue 208 * and update its readSelector before performing the next select 209 */ 210 public void addConnection(SimpleServerRpcConnection conn) throws IOException { 211 pendingConnections.add(conn); 212 readSelector.wakeup(); 213 } 214 } 215 216 @Override 217 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 218 justification="selector access is not synchronized; seems fine but concerned changing " + 219 "it will have per impact") 220 public void run() { 221 LOG.info(getName() + ": starting"); 222 connectionManager.startIdleScan(); 223 while (running) { 224 SelectionKey key = null; 225 try { 226 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC 227 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 228 while (iter.hasNext()) { 229 key = iter.next(); 230 iter.remove(); 231 try { 232 if (key.isValid()) { 233 if (key.isAcceptable()) 234 doAccept(key); 235 } 236 } catch (IOException ignored) { 237 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 238 } 239 key = null; 240 } 241 } catch (OutOfMemoryError e) { 242 if (errorHandler != null) { 243 if (errorHandler.checkOOME(e)) { 244 LOG.info(getName() + ": exiting on OutOfMemoryError"); 245 closeCurrentConnection(key, e); 246 connectionManager.closeIdle(true); 247 return; 248 } 249 } else { 250 // we can run out of memory if we have too many threads 251 // log the event and sleep for a minute and give 252 // some thread(s) a chance to finish 253 LOG.warn(getName() + ": OutOfMemoryError in server select", e); 254 closeCurrentConnection(key, e); 255 connectionManager.closeIdle(true); 256 try { 257 Thread.sleep(60000); 258 } catch (InterruptedException ex) { 259 LOG.debug("Interrupted while sleeping"); 260 } 261 } 262 } catch (Exception e) { 263 closeCurrentConnection(key, e); 264 } 265 } 266 LOG.info(getName() + ": stopping"); 267 synchronized (this) { 268 try { 269 acceptChannel.close(); 270 selector.close(); 271 } catch (IOException ignored) { 272 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 273 } 274 275 selector= null; 276 acceptChannel= null; 277 278 // close all connections 279 connectionManager.stopIdleScan(); 280 connectionManager.closeAll(); 281 } 282 } 283 284 private void closeCurrentConnection(SelectionKey key, Throwable e) { 285 if (key != null) { 286 SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment(); 287 if (c != null) { 288 closeConnection(c); 289 key.attach(null); 290 } 291 } 292 } 293 294 InetSocketAddress getAddress() { 295 return address; 296 } 297 298 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { 299 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 300 SocketChannel channel; 301 while ((channel = server.accept()) != null) { 302 channel.configureBlocking(false); 303 channel.socket().setTcpNoDelay(tcpNoDelay); 304 channel.socket().setKeepAlive(tcpKeepAlive); 305 Reader reader = getReader(); 306 SimpleServerRpcConnection c = connectionManager.register(channel); 307 // If the connectionManager can't take it, close the connection. 308 if (c == null) { 309 if (channel.isOpen()) { 310 IOUtils.cleanup(null, channel); 311 } 312 continue; 313 } 314 key.attach(c); // so closeCurrentConnection can get the object 315 reader.addConnection(c); 316 } 317 } 318 319 void doRead(SelectionKey key) throws InterruptedException { 320 int count; 321 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 322 if (c == null) { 323 return; 324 } 325 c.setLastContact(System.currentTimeMillis()); 326 try { 327 count = c.readAndProcess(); 328 } catch (InterruptedException ieo) { 329 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); 330 throw ieo; 331 } catch (Exception e) { 332 if (LOG.isDebugEnabled()) { 333 LOG.debug("Caught exception while reading:", e); 334 } 335 count = -1; //so that the (count < 0) block is executed 336 } 337 if (count < 0) { 338 closeConnection(c); 339 c = null; 340 } else { 341 c.setLastContact(System.currentTimeMillis()); 342 } 343 } 344 345 synchronized void doStop() { 346 if (selector != null) { 347 selector.wakeup(); 348 Thread.yield(); 349 } 350 if (acceptChannel != null) { 351 try { 352 acceptChannel.socket().close(); 353 } catch (IOException e) { 354 LOG.info(getName() + ": exception in closing listener socket. " + e); 355 } 356 } 357 readPool.shutdownNow(); 358 } 359 360 // The method that will return the next reader to work with 361 // Simplistic implementation of round robin for now 362 Reader getReader() { 363 currentReader = (currentReader + 1) % readers.length; 364 return readers[currentReader]; 365 } 366 } 367 368 /** 369 * Constructs a server listening on the named port and address. 370 * @param server hosting instance of {@link Server}. We will do authentications if an 371 * instance else pass null for no authentication check. 372 * @param name Used keying this rpc servers' metrics and for naming the Listener thread. 373 * @param services A list of services. 374 * @param bindAddress Where to listen 375 * @param conf 376 * @param scheduler 377 * @param reservoirEnabled Enable ByteBufferPool or not. 378 */ 379 public SimpleRpcServer(final Server server, final String name, 380 final List<BlockingServiceAndInterface> services, 381 final InetSocketAddress bindAddress, Configuration conf, 382 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 383 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 384 this.socketSendBufferSize = 0; 385 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); 386 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", 387 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 388 389 // Start the listener here and let it bind to the port 390 listener = new Listener(name); 391 this.port = listener.getAddress().getPort(); 392 393 // Create the responder here 394 responder = new SimpleRpcServerResponder(this); 395 connectionManager = new ConnectionManager(); 396 initReconfigurable(conf); 397 398 this.scheduler.init(new RpcSchedulerContext(this)); 399 } 400 401 /** 402 * Subclasses of HBaseServer can override this to provide their own 403 * Connection implementations. 404 */ 405 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 406 return new SimpleServerRpcConnection(this, channel, time); 407 } 408 409 protected void closeConnection(SimpleServerRpcConnection connection) { 410 connectionManager.close(connection); 411 } 412 413 /** Sets the socket buffer size used for responding to RPCs. 414 * @param size send size 415 */ 416 @Override 417 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } 418 419 /** Starts the service. Must be called before any calls will be handled. */ 420 @Override 421 public synchronized void start() { 422 if (started) return; 423 authTokenSecretMgr = createSecretManager(); 424 if (authTokenSecretMgr != null) { 425 setSecretManager(authTokenSecretMgr); 426 authTokenSecretMgr.start(); 427 } 428 this.authManager = new ServiceAuthorizationManager(); 429 HBasePolicyProvider.init(conf, authManager); 430 responder.start(); 431 listener.start(); 432 scheduler.start(); 433 started = true; 434 } 435 436 /** Stops the service. No new calls will be handled after this is called. */ 437 @Override 438 public synchronized void stop() { 439 LOG.info("Stopping server on " + port); 440 running = false; 441 if (authTokenSecretMgr != null) { 442 authTokenSecretMgr.stop(); 443 authTokenSecretMgr = null; 444 } 445 listener.interrupt(); 446 listener.doStop(); 447 responder.interrupt(); 448 scheduler.stop(); 449 notifyAll(); 450 } 451 452 /** Wait for the server to be stopped. 453 * Does not wait for all subthreads to finish. 454 * See {@link #stop()}. 455 * @throws InterruptedException e 456 */ 457 @Override 458 public synchronized void join() throws InterruptedException { 459 while (running) { 460 wait(); 461 } 462 } 463 464 /** 465 * Return the socket (ip+port) on which the RPC server is listening to. May return null if 466 * the listener channel is closed. 467 * @return the socket (ip+port) on which the RPC server is listening to, or null if this 468 * information cannot be determined 469 */ 470 @Override 471 public synchronized InetSocketAddress getListenerAddress() { 472 if (listener == null) { 473 return null; 474 } 475 return listener.getAddress(); 476 } 477 478 @Override 479 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 480 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) 481 throws IOException { 482 return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(), 483 0); 484 } 485 486 @Override 487 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 488 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 489 long startTime, int timeout) throws IOException { 490 SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner, 491 null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null); 492 return call(fakeCall, status); 493 } 494 495 /** 496 * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. 497 * If the amount of data is large, it writes to channel in smaller chunks. 498 * This is to avoid jdk from creating many direct buffers as the size of 499 * buffer increases. This also minimizes extra copies in NIO layer 500 * as a result of multiple write operations required to write a large 501 * buffer. 502 * 503 * @param channel writable byte channel to write to 504 * @param bufferChain Chain of buffers to write 505 * @return number of bytes written 506 * @throws java.io.IOException e 507 * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) 508 */ 509 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) 510 throws IOException { 511 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); 512 if (count > 0) this.metrics.sentBytes(count); 513 return count; 514 } 515 516 /** 517 * A convenience method to bind to a given address and report 518 * better exceptions if the address is not a valid host. 519 * @param socket the socket to bind 520 * @param address the address to bind to 521 * @param backlog the number of connections allowed in the queue 522 * @throws BindException if the address can't be bound 523 * @throws UnknownHostException if the address isn't a valid host name 524 * @throws IOException other random errors from bind 525 */ 526 public static void bind(ServerSocket socket, InetSocketAddress address, 527 int backlog) throws IOException { 528 try { 529 socket.bind(address, backlog); 530 } catch (BindException e) { 531 BindException bindException = 532 new BindException("Problem binding to " + address + " : " + 533 e.getMessage()); 534 bindException.initCause(e); 535 throw bindException; 536 } catch (SocketException e) { 537 // If they try to bind to a different host's address, give a better 538 // error message. 539 if ("Unresolved address".equals(e.getMessage())) { 540 throw new UnknownHostException("Invalid hostname for server: " + 541 address.getHostName()); 542 } 543 throw e; 544 } 545 } 546 547 /** 548 * The number of open RPC conections 549 * @return the number of open rpc connections 550 */ 551 @Override 552 public int getNumOpenConnections() { 553 return connectionManager.size(); 554 } 555 556 private class ConnectionManager { 557 final private AtomicInteger count = new AtomicInteger(); 558 final private Set<SimpleServerRpcConnection> connections; 559 560 final private Timer idleScanTimer; 561 final private int idleScanThreshold; 562 final private int idleScanInterval; 563 final private int maxIdleTime; 564 final private int maxIdleToClose; 565 566 ConnectionManager() { 567 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); 568 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); 569 this.idleScanInterval = 570 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); 571 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); 572 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); 573 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 574 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 575 int maxConnectionQueueSize = 576 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); 577 // create a set with concurrency -and- a thread-safe iterator, add 2 578 // for listener and idle closer threads 579 this.connections = Collections.newSetFromMap( 580 new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>( 581 maxConnectionQueueSize, 0.75f, readThreads+2)); 582 } 583 584 private boolean add(SimpleServerRpcConnection connection) { 585 boolean added = connections.add(connection); 586 if (added) { 587 count.getAndIncrement(); 588 } 589 return added; 590 } 591 592 private boolean remove(SimpleServerRpcConnection connection) { 593 boolean removed = connections.remove(connection); 594 if (removed) { 595 count.getAndDecrement(); 596 } 597 return removed; 598 } 599 600 int size() { 601 return count.get(); 602 } 603 604 SimpleServerRpcConnection[] toArray() { 605 return connections.toArray(new SimpleServerRpcConnection[0]); 606 } 607 608 SimpleServerRpcConnection register(SocketChannel channel) { 609 SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis()); 610 add(connection); 611 if (LOG.isTraceEnabled()) { 612 LOG.trace("Connection from " + connection + 613 "; connections=" + size() + 614 ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + 615 ", general queued calls=" + scheduler.getGeneralQueueLength() + 616 ", priority queued calls=" + scheduler.getPriorityQueueLength() + 617 ", meta priority queued calls=" + scheduler.getMetaPriorityQueueLength()); 618 } 619 return connection; 620 } 621 622 boolean close(SimpleServerRpcConnection connection) { 623 boolean exists = remove(connection); 624 if (exists) { 625 if (LOG.isTraceEnabled()) { 626 LOG.trace(Thread.currentThread().getName() + 627 ": disconnecting client " + connection + 628 ". Number of active connections: "+ size()); 629 } 630 // only close if actually removed to avoid double-closing due 631 // to possible races 632 connection.close(); 633 } 634 return exists; 635 } 636 637 // synch'ed to avoid explicit invocation upon OOM from colliding with 638 // timer task firing 639 synchronized void closeIdle(boolean scanAll) { 640 long minLastContact = System.currentTimeMillis() - maxIdleTime; 641 // concurrent iterator might miss new connections added 642 // during the iteration, but that's ok because they won't 643 // be idle yet anyway and will be caught on next scan 644 int closed = 0; 645 for (SimpleServerRpcConnection connection : connections) { 646 // stop if connections dropped below threshold unless scanning all 647 if (!scanAll && size() < idleScanThreshold) { 648 break; 649 } 650 // stop if not scanning all and max connections are closed 651 if (connection.isIdle() && 652 connection.getLastContact() < minLastContact && 653 close(connection) && 654 !scanAll && (++closed == maxIdleToClose)) { 655 break; 656 } 657 } 658 } 659 660 void closeAll() { 661 // use a copy of the connections to be absolutely sure the concurrent 662 // iterator doesn't miss a connection 663 for (SimpleServerRpcConnection connection : toArray()) { 664 close(connection); 665 } 666 } 667 668 void startIdleScan() { 669 scheduleIdleScanTask(); 670 } 671 672 void stopIdleScan() { 673 idleScanTimer.cancel(); 674 } 675 676 private void scheduleIdleScanTask() { 677 if (!running) { 678 return; 679 } 680 TimerTask idleScanTask = new TimerTask(){ 681 @Override 682 public void run() { 683 if (!running) { 684 return; 685 } 686 if (LOG.isTraceEnabled()) { 687 LOG.trace("running"); 688 } 689 try { 690 closeIdle(false); 691 } finally { 692 // explicitly reschedule so next execution occurs relative 693 // to the end of this scan, not the beginning 694 scheduleIdleScanTask(); 695 } 696 } 697 }; 698 idleScanTimer.schedule(idleScanTask, idleScanInterval); 699 } 700 } 701 702}