001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.BindException; 022import java.net.InetSocketAddress; 023import java.net.ServerSocket; 024import java.net.SocketException; 025import java.net.UnknownHostException; 026import java.nio.channels.CancelledKeyException; 027import java.nio.channels.GatheringByteChannel; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.Selector; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Set; 036import java.util.Timer; 037import java.util.TimerTask; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.atomic.AtomicInteger; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.CellScanner; 045import org.apache.hadoop.hbase.HBaseInterfaceAudience; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.Server; 048import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 049import org.apache.hadoop.hbase.security.HBasePolicyProvider; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.hbase.util.Threads; 053import org.apache.hadoop.io.IOUtils; 054import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; 055import org.apache.yetus.audience.InterfaceAudience; 056 057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 058import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 059import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 060import org.apache.hbase.thirdparty.com.google.protobuf.Message; 061 062/** 063 * The RPC server with native java NIO implementation deriving from Hadoop to host protobuf 064 * described Services. It's the original one before HBASE-17262, and the default RPC server for now. 065 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number of Readers 066 * in an ExecutorPool, 10 by default. The Listener does an accept and then round robin a Reader is 067 * chosen to do the read. The reader is registered on Selector. Read does total read off the channel 068 * and the parse from which it makes a Call. The call is wrapped in a CallRunner and passed to the 069 * scheduler to be run. Reader goes back to see if more to be done and loops till done. 070 * <p> 071 * Scheduler can be variously implemented but default simple scheduler has handlers to which it has 072 * given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run taking 073 * from the queue. They run the CallRunner#run method on each item gotten from queue and keep taking 074 * while the server is up. CallRunner#run executes the call. When done, asks the included Call to 075 * put itself on new queue for Responder to pull from and return result to client. 076 * @see BlockingRpcClient 077 */ 078@Deprecated 079@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) 080public class SimpleRpcServer extends RpcServer { 081 082 protected int port; // port we listen on 083 protected InetSocketAddress address; // inet address we listen on 084 private int readThreads; // number of read threads 085 086 protected int socketSendBufferSize; 087 protected final long purgeTimeout; // in milliseconds 088 089 // maintains the set of client connections and handles idle timeouts 090 private ConnectionManager connectionManager; 091 private Listener listener = null; 092 protected SimpleRpcServerResponder responder = null; 093 094 /** Listens on the socket. Creates jobs for the handler threads */ 095 private class Listener extends Thread { 096 097 private ServerSocketChannel acceptChannel = null; // the accept channel 098 private Selector selector = null; // the selector that we use for the server 099 private Reader[] readers = null; 100 private int currentReader = 0; 101 private final int readerPendingConnectionQueueLength; 102 103 private ExecutorService readPool; 104 105 public Listener(final String name) throws IOException { 106 super(name); 107 // The backlog of requests that we will have the serversocket carry. 108 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); 109 readerPendingConnectionQueueLength = 110 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100); 111 // Create a new server socket and set to non blocking mode 112 acceptChannel = ServerSocketChannel.open(); 113 acceptChannel.configureBlocking(false); 114 115 // Bind the server socket to the binding addrees (can be different from the default interface) 116 bind(acceptChannel.socket(), bindAddress, backlogLength); 117 port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral port 118 address = (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress(); 119 // create a selector; 120 selector = Selector.open(); 121 122 readers = new Reader[readThreads]; 123 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it 124 // has an advantage in that it is easy to shutdown the pool. 125 readPool = Executors.newFixedThreadPool(readThreads, 126 new ThreadFactoryBuilder() 127 .setNameFormat("Reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port) 128 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 129 for (int i = 0; i < readThreads; ++i) { 130 Reader reader = new Reader(); 131 readers[i] = reader; 132 readPool.execute(reader); 133 } 134 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); 135 136 // Register accepts on the server socket with the selector. 137 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); 138 this.setName("Listener,port=" + port); 139 this.setDaemon(true); 140 } 141 142 private class Reader implements Runnable { 143 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections; 144 private final Selector readSelector; 145 146 Reader() throws IOException { 147 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength); 148 this.readSelector = Selector.open(); 149 } 150 151 @Override 152 public void run() { 153 try { 154 doRunLoop(); 155 } finally { 156 try { 157 readSelector.close(); 158 } catch (IOException ioe) { 159 LOG.error(getName() + ": error closing read selector in " + getName(), ioe); 160 } 161 } 162 } 163 164 private synchronized void doRunLoop() { 165 while (running) { 166 try { 167 // Consume as many connections as currently queued to avoid 168 // unbridled acceptance of connections that starves the select 169 int size = pendingConnections.size(); 170 for (int i = size; i > 0; i--) { 171 SimpleServerRpcConnection conn = pendingConnections.take(); 172 conn.channel.register(readSelector, SelectionKey.OP_READ, conn); 173 } 174 readSelector.select(); 175 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); 176 while (iter.hasNext()) { 177 SelectionKey key = iter.next(); 178 iter.remove(); 179 if (key.isValid()) { 180 if (key.isReadable()) { 181 doRead(key); 182 } 183 } 184 key = null; 185 } 186 } catch (InterruptedException e) { 187 if (running) { // unexpected -- log it 188 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); 189 } 190 } catch (CancelledKeyException e) { 191 LOG.error(getName() + ": CancelledKeyException in Reader", e); 192 } catch (IOException ex) { 193 LOG.info(getName() + ": IOException in Reader", ex); 194 } 195 } 196 } 197 198 /** 199 * Updating the readSelector while it's being used is not thread-safe, so the connection must 200 * be queued. The reader will drain the queue and update its readSelector before performing 201 * the next select 202 */ 203 public void addConnection(SimpleServerRpcConnection conn) throws IOException { 204 pendingConnections.add(conn); 205 readSelector.wakeup(); 206 } 207 } 208 209 @Override 210 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 211 justification = "selector access is not synchronized; seems fine but concerned changing " 212 + "it will have per impact") 213 public void run() { 214 LOG.info(getName() + ": starting"); 215 connectionManager.startIdleScan(); 216 while (running) { 217 SelectionKey key = null; 218 try { 219 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC 220 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 221 while (iter.hasNext()) { 222 key = iter.next(); 223 iter.remove(); 224 try { 225 if (key.isValid()) { 226 if (key.isAcceptable()) doAccept(key); 227 } 228 } catch (IOException ignored) { 229 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 230 } 231 key = null; 232 } 233 } catch (OutOfMemoryError e) { 234 if (errorHandler != null) { 235 if (errorHandler.checkOOME(e)) { 236 LOG.info(getName() + ": exiting on OutOfMemoryError"); 237 closeCurrentConnection(key, e); 238 connectionManager.closeIdle(true); 239 return; 240 } 241 } else { 242 // we can run out of memory if we have too many threads 243 // log the event and sleep for a minute and give 244 // some thread(s) a chance to finish 245 LOG.warn(getName() + ": OutOfMemoryError in server select", e); 246 closeCurrentConnection(key, e); 247 connectionManager.closeIdle(true); 248 try { 249 Thread.sleep(60000); 250 } catch (InterruptedException ex) { 251 LOG.debug("Interrupted while sleeping"); 252 } 253 } 254 } catch (Exception e) { 255 closeCurrentConnection(key, e); 256 } 257 } 258 LOG.info(getName() + ": stopping"); 259 synchronized (this) { 260 try { 261 acceptChannel.close(); 262 selector.close(); 263 } catch (IOException ignored) { 264 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); 265 } 266 267 selector = null; 268 acceptChannel = null; 269 270 // close all connections 271 connectionManager.stopIdleScan(); 272 connectionManager.closeAll(); 273 } 274 } 275 276 private void closeCurrentConnection(SelectionKey key, Throwable e) { 277 if (key != null) { 278 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 279 if (c != null) { 280 closeConnection(c); 281 key.attach(null); 282 } 283 } 284 } 285 286 InetSocketAddress getAddress() { 287 return address; 288 } 289 290 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { 291 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 292 SocketChannel channel; 293 while ((channel = server.accept()) != null) { 294 channel.configureBlocking(false); 295 channel.socket().setTcpNoDelay(tcpNoDelay); 296 channel.socket().setKeepAlive(tcpKeepAlive); 297 Reader reader = getReader(); 298 SimpleServerRpcConnection c = connectionManager.register(channel); 299 // If the connectionManager can't take it, close the connection. 300 if (c == null) { 301 if (channel.isOpen()) { 302 IOUtils.cleanupWithLogger(LOG, channel); 303 } 304 continue; 305 } 306 key.attach(c); // so closeCurrentConnection can get the object 307 reader.addConnection(c); 308 } 309 } 310 311 void doRead(SelectionKey key) throws InterruptedException { 312 int count; 313 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); 314 if (c == null) { 315 return; 316 } 317 c.setLastContact(EnvironmentEdgeManager.currentTime()); 318 try { 319 count = c.readAndProcess(); 320 } catch (InterruptedException ieo) { 321 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", 322 ieo); 323 throw ieo; 324 } catch (Exception e) { 325 if (LOG.isDebugEnabled()) { 326 LOG.debug("Caught exception while reading:", e); 327 } 328 count = -1; // so that the (count < 0) block is executed 329 } 330 if (count < 0) { 331 closeConnection(c); 332 c = null; 333 } else { 334 c.setLastContact(EnvironmentEdgeManager.currentTime()); 335 } 336 } 337 338 synchronized void doStop() { 339 if (selector != null) { 340 selector.wakeup(); 341 Thread.yield(); 342 } 343 if (acceptChannel != null) { 344 try { 345 acceptChannel.socket().close(); 346 } catch (IOException e) { 347 LOG.info(getName() + ": exception in closing listener socket. " + e); 348 } 349 } 350 readPool.shutdownNow(); 351 } 352 353 // The method that will return the next reader to work with 354 // Simplistic implementation of round robin for now 355 Reader getReader() { 356 currentReader = (currentReader + 1) % readers.length; 357 return readers[currentReader]; 358 } 359 } 360 361 /** 362 * Constructs a server listening on the named port and address. 363 * @param server hosting instance of {@link Server}. We will do authentications if an 364 * instance else pass null for no authentication check. 365 * @param name Used keying this rpc servers' metrics and for naming the Listener 366 * thread. 367 * @param services A list of services. 368 * @param bindAddress Where to listen 369 * @param reservoirEnabled Enable ByteBufferPool or not. 370 */ 371 public SimpleRpcServer(final Server server, final String name, 372 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 373 Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { 374 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); 375 this.socketSendBufferSize = 0; 376 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10); 377 this.purgeTimeout = 378 conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 379 380 // Start the listener here and let it bind to the port 381 listener = new Listener(name); 382 this.port = listener.getAddress().getPort(); 383 384 // Create the responder here 385 responder = new SimpleRpcServerResponder(this); 386 connectionManager = new ConnectionManager(); 387 initReconfigurable(conf); 388 389 this.scheduler.init(new RpcSchedulerContext(this)); 390 } 391 392 /** 393 * Subclasses of HBaseServer can override this to provide their own Connection implementations. 394 */ 395 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 396 return new SimpleServerRpcConnection(this, channel, time); 397 } 398 399 protected void closeConnection(SimpleServerRpcConnection connection) { 400 connectionManager.close(connection); 401 } 402 403 /** 404 * Sets the socket buffer size used for responding to RPCs. 405 * @param size send size 406 */ 407 @Override 408 public void setSocketSendBufSize(int size) { 409 this.socketSendBufferSize = size; 410 } 411 412 /** Starts the service. Must be called before any calls will be handled. */ 413 @Override 414 public synchronized void start() { 415 if (started) { 416 return; 417 } 418 authTokenSecretMgr = createSecretManager(); 419 if (authTokenSecretMgr != null) { 420 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in 421 // LeaderElector start. See HBASE-25875 422 synchronized (authTokenSecretMgr) { 423 setSecretManager(authTokenSecretMgr); 424 authTokenSecretMgr.start(); 425 } 426 } 427 this.authManager = new ServiceAuthorizationManager(); 428 HBasePolicyProvider.init(conf, authManager); 429 responder.start(); 430 listener.start(); 431 scheduler.start(); 432 started = true; 433 } 434 435 /** Stops the service. No new calls will be handled after this is called. */ 436 @Override 437 public synchronized void stop() { 438 LOG.info("Stopping server on " + port); 439 running = false; 440 if (authTokenSecretMgr != null) { 441 authTokenSecretMgr.stop(); 442 authTokenSecretMgr = null; 443 } 444 listener.interrupt(); 445 listener.doStop(); 446 responder.interrupt(); 447 scheduler.stop(); 448 notifyAll(); 449 } 450 451 /** 452 * Wait for the server to be stopped. Does not wait for all subthreads to finish. 453 * @see #stop() 454 */ 455 @Override 456 public synchronized void join() throws InterruptedException { 457 while (running) { 458 wait(); 459 } 460 } 461 462 /** 463 * Return the socket (ip+port) on which the RPC server is listening to. May return null if the 464 * listener channel is closed. 465 * @return the socket (ip+port) on which the RPC server is listening to, or null if this 466 * information cannot be determined 467 */ 468 @Override 469 public synchronized InetSocketAddress getListenerAddress() { 470 if (listener == null) { 471 return null; 472 } 473 return listener.getAddress(); 474 } 475 476 @Override 477 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 478 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) 479 throws IOException { 480 return call(service, md, param, cellScanner, receiveTime, status, 481 EnvironmentEdgeManager.currentTime(), 0); 482 } 483 484 @Override 485 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 486 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, 487 long startTime, int timeout) throws IOException { 488 SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner, 489 null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null); 490 return call(fakeCall, status); 491 } 492 493 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) 494 throws IOException { 495 long count = bufferChain.write(channel); 496 if (count > 0) { 497 this.metrics.sentBytes(count); 498 } 499 return count; 500 } 501 502 /** 503 * A convenience method to bind to a given address and report better exceptions if the address is 504 * not a valid host. 505 * @param socket the socket to bind 506 * @param address the address to bind to 507 * @param backlog the number of connections allowed in the queue 508 * @throws BindException if the address can't be bound 509 * @throws UnknownHostException if the address isn't a valid host name 510 * @throws IOException other random errors from bind 511 */ 512 public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) 513 throws IOException { 514 try { 515 socket.bind(address, backlog); 516 } catch (BindException e) { 517 BindException bindException = 518 new BindException("Problem binding to " + address + " : " + e.getMessage()); 519 bindException.initCause(e); 520 throw bindException; 521 } catch (SocketException e) { 522 // If they try to bind to a different host's address, give a better 523 // error message. 524 if ("Unresolved address".equals(e.getMessage())) { 525 throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); 526 } 527 throw e; 528 } 529 } 530 531 /** 532 * The number of open RPC conections 533 * @return the number of open rpc connections 534 */ 535 @Override 536 public int getNumOpenConnections() { 537 return connectionManager.size(); 538 } 539 540 private class ConnectionManager { 541 final private AtomicInteger count = new AtomicInteger(); 542 final private Set<SimpleServerRpcConnection> connections; 543 544 final private Timer idleScanTimer; 545 final private int idleScanThreshold; 546 final private int idleScanInterval; 547 final private int maxIdleTime; 548 final private int maxIdleToClose; 549 550 ConnectionManager() { 551 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true); 552 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000); 553 this.idleScanInterval = 554 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000); 555 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); 556 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10); 557 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 558 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 559 int maxConnectionQueueSize = 560 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100); 561 // create a set with concurrency -and- a thread-safe iterator, add 2 562 // for listener and idle closer threads 563 this.connections = 564 Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>( 565 maxConnectionQueueSize, 0.75f, readThreads + 2)); 566 } 567 568 private boolean add(SimpleServerRpcConnection connection) { 569 boolean added = connections.add(connection); 570 if (added) { 571 count.getAndIncrement(); 572 } 573 return added; 574 } 575 576 private boolean remove(SimpleServerRpcConnection connection) { 577 boolean removed = connections.remove(connection); 578 if (removed) { 579 count.getAndDecrement(); 580 } 581 return removed; 582 } 583 584 int size() { 585 return count.get(); 586 } 587 588 SimpleServerRpcConnection[] toArray() { 589 return connections.toArray(new SimpleServerRpcConnection[0]); 590 } 591 592 SimpleServerRpcConnection register(SocketChannel channel) { 593 SimpleServerRpcConnection connection = 594 getConnection(channel, EnvironmentEdgeManager.currentTime()); 595 add(connection); 596 if (LOG.isTraceEnabled()) { 597 LOG.trace("Connection from " + connection + "; connections=" + size() 598 + ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() + ", general queued calls=" 599 + scheduler.getGeneralQueueLength() + ", priority queued calls=" 600 + scheduler.getPriorityQueueLength() + ", meta priority queued calls=" 601 + scheduler.getMetaPriorityQueueLength()); 602 } 603 return connection; 604 } 605 606 boolean close(SimpleServerRpcConnection connection) { 607 boolean exists = remove(connection); 608 if (exists) { 609 if (LOG.isTraceEnabled()) { 610 LOG.trace(Thread.currentThread().getName() + ": disconnecting client " + connection 611 + ". Number of active connections: " + size()); 612 } 613 // only close if actually removed to avoid double-closing due 614 // to possible races 615 connection.close(); 616 } 617 return exists; 618 } 619 620 // synch'ed to avoid explicit invocation upon OOM from colliding with 621 // timer task firing 622 synchronized void closeIdle(boolean scanAll) { 623 long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime; 624 // concurrent iterator might miss new connections added 625 // during the iteration, but that's ok because they won't 626 // be idle yet anyway and will be caught on next scan 627 int closed = 0; 628 for (SimpleServerRpcConnection connection : connections) { 629 // stop if connections dropped below threshold unless scanning all 630 if (!scanAll && size() < idleScanThreshold) { 631 break; 632 } 633 // stop if not scanning all and max connections are closed 634 if ( 635 connection.isIdle() && connection.getLastContact() < minLastContact && close(connection) 636 && !scanAll && (++closed == maxIdleToClose) 637 ) { 638 break; 639 } 640 } 641 } 642 643 void closeAll() { 644 // use a copy of the connections to be absolutely sure the concurrent 645 // iterator doesn't miss a connection 646 for (SimpleServerRpcConnection connection : toArray()) { 647 close(connection); 648 } 649 } 650 651 void startIdleScan() { 652 scheduleIdleScanTask(); 653 } 654 655 void stopIdleScan() { 656 idleScanTimer.cancel(); 657 } 658 659 private void scheduleIdleScanTask() { 660 if (!running) { 661 return; 662 } 663 TimerTask idleScanTask = new TimerTask() { 664 @Override 665 public void run() { 666 if (!running) { 667 return; 668 } 669 if (LOG.isTraceEnabled()) { 670 LOG.trace("running"); 671 } 672 try { 673 closeIdle(false); 674 } finally { 675 // explicitly reschedule so next execution occurs relative 676 // to the end of this scan, not the beginning 677 scheduleIdleScanTask(); 678 } 679 } 680 }; 681 idleScanTimer.schedule(idleScanTask, idleScanInterval); 682 } 683 } 684 685}