001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.BindException;
022import java.net.InetSocketAddress;
023import java.net.ServerSocket;
024import java.net.SocketException;
025import java.net.UnknownHostException;
026import java.nio.channels.CancelledKeyException;
027import java.nio.channels.GatheringByteChannel;
028import java.nio.channels.SelectionKey;
029import java.nio.channels.Selector;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Set;
036import java.util.Timer;
037import java.util.TimerTask;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.atomic.AtomicInteger;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.CellScanner;
046import org.apache.hadoop.hbase.HBaseInterfaceAudience;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.Server;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
051import org.apache.hadoop.hbase.security.HBasePolicyProvider;
052import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
053import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
054import org.apache.hbase.thirdparty.com.google.protobuf.Message;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062/**
063 * The RPC server with native java NIO implementation deriving from Hadoop to
064 * host protobuf described Services. It's the original one before HBASE-17262,
065 * and the default RPC server for now.
066 *
067 * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
068 * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
069 * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
070 * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
071 * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
072 * and loops till done.
073 *
074 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
075 * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
076 * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
077 * and keep taking while the server is up.
078 *
079 * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
080 * queue for Responder to pull from and return result to client.
081 *
082 * @see BlockingRpcClient
083 */
084@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
085public class SimpleRpcServer extends RpcServer {
086
087  protected int port;                             // port we listen on
088  protected InetSocketAddress address;            // inet address we listen on
089  private int readThreads;                        // number of read threads
090
091  protected int socketSendBufferSize;
092  protected final long purgeTimeout;    // in milliseconds
093
094  // maintains the set of client connections and handles idle timeouts
095  private ConnectionManager connectionManager;
096  private Listener listener = null;
097  protected SimpleRpcServerResponder responder = null;
098
099  /** Listens on the socket. Creates jobs for the handler threads*/
100  private class Listener extends Thread {
101
102    private ServerSocketChannel acceptChannel = null; //the accept channel
103    private Selector selector = null; //the selector that we use for the server
104    private Reader[] readers = null;
105    private int currentReader = 0;
106    private final int readerPendingConnectionQueueLength;
107
108    private ExecutorService readPool;
109
110    public Listener(final String name) throws IOException {
111      super(name);
112      // The backlog of requests that we will have the serversocket carry.
113      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
114      readerPendingConnectionQueueLength =
115          conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
116      // Create a new server socket and set to non blocking mode
117      acceptChannel = ServerSocketChannel.open();
118      acceptChannel.configureBlocking(false);
119
120      // Bind the server socket to the binding addrees (can be different from the default interface)
121      bind(acceptChannel.socket(), bindAddress, backlogLength);
122      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
123      address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
124      // create a selector;
125      selector = Selector.open();
126
127      readers = new Reader[readThreads];
128      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
129      // has an advantage in that it is easy to shutdown the pool.
130      readPool = Executors.newFixedThreadPool(readThreads,
131        new ThreadFactoryBuilder().setNameFormat(
132          "Reader=%d,bindAddress=" + bindAddress.getHostName() +
133          ",port=" + port).setDaemon(true)
134        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
135      for (int i = 0; i < readThreads; ++i) {
136        Reader reader = new Reader();
137        readers[i] = reader;
138        readPool.execute(reader);
139      }
140      LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
141
142      // Register accepts on the server socket with the selector.
143      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
144      this.setName("Listener,port=" + port);
145      this.setDaemon(true);
146    }
147
148
149    private class Reader implements Runnable {
150      final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
151      private final Selector readSelector;
152
153      Reader() throws IOException {
154        this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
155        this.readSelector = Selector.open();
156      }
157
158      @Override
159      public void run() {
160        try {
161          doRunLoop();
162        } finally {
163          try {
164            readSelector.close();
165          } catch (IOException ioe) {
166            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
167          }
168        }
169      }
170
171      private synchronized void doRunLoop() {
172        while (running) {
173          try {
174            // Consume as many connections as currently queued to avoid
175            // unbridled acceptance of connections that starves the select
176            int size = pendingConnections.size();
177            for (int i=size; i>0; i--) {
178              SimpleServerRpcConnection conn = pendingConnections.take();
179              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
180            }
181            readSelector.select();
182            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
183            while (iter.hasNext()) {
184              SelectionKey key = iter.next();
185              iter.remove();
186              if (key.isValid()) {
187                if (key.isReadable()) {
188                  doRead(key);
189                }
190              }
191              key = null;
192            }
193          } catch (InterruptedException e) {
194            if (running) {                      // unexpected -- log it
195              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
196            }
197          } catch (CancelledKeyException e) {
198            LOG.error(getName() + ": CancelledKeyException in Reader", e);
199          } catch (IOException ex) {
200            LOG.info(getName() + ": IOException in Reader", ex);
201          }
202        }
203      }
204
205      /**
206       * Updating the readSelector while it's being used is not thread-safe,
207       * so the connection must be queued.  The reader will drain the queue
208       * and update its readSelector before performing the next select
209       */
210      public void addConnection(SimpleServerRpcConnection conn) throws IOException {
211        pendingConnections.add(conn);
212        readSelector.wakeup();
213      }
214    }
215
216    @Override
217    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
218      justification="selector access is not synchronized; seems fine but concerned changing " +
219        "it will have per impact")
220    public void run() {
221      LOG.info(getName() + ": starting");
222      connectionManager.startIdleScan();
223      while (running) {
224        SelectionKey key = null;
225        try {
226          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
227          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
228          while (iter.hasNext()) {
229            key = iter.next();
230            iter.remove();
231            try {
232              if (key.isValid()) {
233                if (key.isAcceptable())
234                  doAccept(key);
235              }
236            } catch (IOException ignored) {
237              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
238            }
239            key = null;
240          }
241        } catch (OutOfMemoryError e) {
242          if (errorHandler != null) {
243            if (errorHandler.checkOOME(e)) {
244              LOG.info(getName() + ": exiting on OutOfMemoryError");
245              closeCurrentConnection(key, e);
246              connectionManager.closeIdle(true);
247              return;
248            }
249          } else {
250            // we can run out of memory if we have too many threads
251            // log the event and sleep for a minute and give
252            // some thread(s) a chance to finish
253            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
254            closeCurrentConnection(key, e);
255            connectionManager.closeIdle(true);
256            try {
257              Thread.sleep(60000);
258            } catch (InterruptedException ex) {
259              LOG.debug("Interrupted while sleeping");
260            }
261          }
262        } catch (Exception e) {
263          closeCurrentConnection(key, e);
264        }
265      }
266      LOG.info(getName() + ": stopping");
267      synchronized (this) {
268        try {
269          acceptChannel.close();
270          selector.close();
271        } catch (IOException ignored) {
272          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
273        }
274
275        selector= null;
276        acceptChannel= null;
277
278        // close all connections
279        connectionManager.stopIdleScan();
280        connectionManager.closeAll();
281      }
282    }
283
284    private void closeCurrentConnection(SelectionKey key, Throwable e) {
285      if (key != null) {
286        SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
287        if (c != null) {
288          closeConnection(c);
289          key.attach(null);
290        }
291      }
292    }
293
294    InetSocketAddress getAddress() {
295      return address;
296    }
297
298    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
299      ServerSocketChannel server = (ServerSocketChannel) key.channel();
300      SocketChannel channel;
301      while ((channel = server.accept()) != null) {
302        channel.configureBlocking(false);
303        channel.socket().setTcpNoDelay(tcpNoDelay);
304        channel.socket().setKeepAlive(tcpKeepAlive);
305        Reader reader = getReader();
306        SimpleServerRpcConnection c = connectionManager.register(channel);
307        // If the connectionManager can't take it, close the connection.
308        if (c == null) {
309          if (channel.isOpen()) {
310            IOUtils.cleanup(null, channel);
311          }
312          continue;
313        }
314        key.attach(c);  // so closeCurrentConnection can get the object
315        reader.addConnection(c);
316      }
317    }
318
319    void doRead(SelectionKey key) throws InterruptedException {
320      int count;
321      SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
322      if (c == null) {
323        return;
324      }
325      c.setLastContact(System.currentTimeMillis());
326      try {
327        count = c.readAndProcess();
328      } catch (InterruptedException ieo) {
329        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
330        throw ieo;
331      } catch (Exception e) {
332        if (LOG.isDebugEnabled()) {
333          LOG.debug("Caught exception while reading:", e);
334        }
335        count = -1; //so that the (count < 0) block is executed
336      }
337      if (count < 0) {
338        closeConnection(c);
339        c = null;
340      } else {
341        c.setLastContact(System.currentTimeMillis());
342      }
343    }
344
345    synchronized void doStop() {
346      if (selector != null) {
347        selector.wakeup();
348        Thread.yield();
349      }
350      if (acceptChannel != null) {
351        try {
352          acceptChannel.socket().close();
353        } catch (IOException e) {
354          LOG.info(getName() + ": exception in closing listener socket. " + e);
355        }
356      }
357      readPool.shutdownNow();
358    }
359
360    // The method that will return the next reader to work with
361    // Simplistic implementation of round robin for now
362    Reader getReader() {
363      currentReader = (currentReader + 1) % readers.length;
364      return readers[currentReader];
365    }
366  }
367
368  /**
369   * Constructs a server listening on the named port and address.
370   * @param server hosting instance of {@link Server}. We will do authentications if an
371   * instance else pass null for no authentication check.
372   * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
373   * @param services A list of services.
374   * @param bindAddress Where to listen
375   * @param conf
376   * @param scheduler
377   * @param reservoirEnabled Enable ByteBufferPool or not.
378   */
379  public SimpleRpcServer(final Server server, final String name,
380      final List<BlockingServiceAndInterface> services,
381      final InetSocketAddress bindAddress, Configuration conf,
382      RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
383    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
384    this.socketSendBufferSize = 0;
385    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
386    this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
387      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
388
389    // Start the listener here and let it bind to the port
390    listener = new Listener(name);
391    this.port = listener.getAddress().getPort();
392
393    // Create the responder here
394    responder = new SimpleRpcServerResponder(this);
395    connectionManager = new ConnectionManager();
396    initReconfigurable(conf);
397
398    this.scheduler.init(new RpcSchedulerContext(this));
399  }
400
401  /**
402   * Subclasses of HBaseServer can override this to provide their own
403   * Connection implementations.
404   */
405  protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
406    return new SimpleServerRpcConnection(this, channel, time);
407  }
408
409  protected void closeConnection(SimpleServerRpcConnection connection) {
410    connectionManager.close(connection);
411  }
412
413  /** Sets the socket buffer size used for responding to RPCs.
414   * @param size send size
415   */
416  @Override
417  public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
418
419  /** Starts the service.  Must be called before any calls will be handled. */
420  @Override
421  public synchronized void start() {
422    if (started) return;
423    authTokenSecretMgr = createSecretManager();
424    if (authTokenSecretMgr != null) {
425      setSecretManager(authTokenSecretMgr);
426      authTokenSecretMgr.start();
427    }
428    this.authManager = new ServiceAuthorizationManager();
429    HBasePolicyProvider.init(conf, authManager);
430    responder.start();
431    listener.start();
432    scheduler.start();
433    started = true;
434  }
435
436  /** Stops the service.  No new calls will be handled after this is called. */
437  @Override
438  public synchronized void stop() {
439    LOG.info("Stopping server on " + port);
440    running = false;
441    if (authTokenSecretMgr != null) {
442      authTokenSecretMgr.stop();
443      authTokenSecretMgr = null;
444    }
445    listener.interrupt();
446    listener.doStop();
447    responder.interrupt();
448    scheduler.stop();
449    notifyAll();
450  }
451
452  /** Wait for the server to be stopped.
453   * Does not wait for all subthreads to finish.
454   *  See {@link #stop()}.
455   * @throws InterruptedException e
456   */
457  @Override
458  public synchronized void join() throws InterruptedException {
459    while (running) {
460      wait();
461    }
462  }
463
464  /**
465   * Return the socket (ip+port) on which the RPC server is listening to. May return null if
466   * the listener channel is closed.
467   * @return the socket (ip+port) on which the RPC server is listening to, or null if this
468   * information cannot be determined
469   */
470  @Override
471  public synchronized InetSocketAddress getListenerAddress() {
472    if (listener == null) {
473      return null;
474    }
475    return listener.getAddress();
476  }
477
478  @Override
479  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
480      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
481      throws IOException {
482    return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),
483      0);
484  }
485
486  @Override
487  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
488      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
489      long startTime, int timeout) throws IOException {
490    SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
491        null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
492    return call(fakeCall, status);
493  }
494
495  /**
496   * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
497   * If the amount of data is large, it writes to channel in smaller chunks.
498   * This is to avoid jdk from creating many direct buffers as the size of
499   * buffer increases. This also minimizes extra copies in NIO layer
500   * as a result of multiple write operations required to write a large
501   * buffer.
502   *
503   * @param channel writable byte channel to write to
504   * @param bufferChain Chain of buffers to write
505   * @return number of bytes written
506   * @throws java.io.IOException e
507   * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
508   */
509  protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
510  throws IOException {
511    long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
512    if (count > 0) this.metrics.sentBytes(count);
513    return count;
514  }
515
516  /**
517   * A convenience method to bind to a given address and report
518   * better exceptions if the address is not a valid host.
519   * @param socket the socket to bind
520   * @param address the address to bind to
521   * @param backlog the number of connections allowed in the queue
522   * @throws BindException if the address can't be bound
523   * @throws UnknownHostException if the address isn't a valid host name
524   * @throws IOException other random errors from bind
525   */
526  public static void bind(ServerSocket socket, InetSocketAddress address,
527                          int backlog) throws IOException {
528    try {
529      socket.bind(address, backlog);
530    } catch (BindException e) {
531      BindException bindException =
532        new BindException("Problem binding to " + address + " : " +
533            e.getMessage());
534      bindException.initCause(e);
535      throw bindException;
536    } catch (SocketException e) {
537      // If they try to bind to a different host's address, give a better
538      // error message.
539      if ("Unresolved address".equals(e.getMessage())) {
540        throw new UnknownHostException("Invalid hostname for server: " +
541                                       address.getHostName());
542      }
543      throw e;
544    }
545  }
546
547  /**
548   * The number of open RPC conections
549   * @return the number of open rpc connections
550   */
551  @Override
552  public int getNumOpenConnections() {
553    return connectionManager.size();
554  }
555
556  private class ConnectionManager {
557    final private AtomicInteger count = new AtomicInteger();
558    final private Set<SimpleServerRpcConnection> connections;
559
560    final private Timer idleScanTimer;
561    final private int idleScanThreshold;
562    final private int idleScanInterval;
563    final private int maxIdleTime;
564    final private int maxIdleToClose;
565
566    ConnectionManager() {
567      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
568      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
569      this.idleScanInterval =
570          conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
571      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
572      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
573      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
574          HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
575      int maxConnectionQueueSize =
576          handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
577      // create a set with concurrency -and- a thread-safe iterator, add 2
578      // for listener and idle closer threads
579      this.connections = Collections.newSetFromMap(
580          new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>(
581              maxConnectionQueueSize, 0.75f, readThreads+2));
582    }
583
584    private boolean add(SimpleServerRpcConnection connection) {
585      boolean added = connections.add(connection);
586      if (added) {
587        count.getAndIncrement();
588      }
589      return added;
590    }
591
592    private boolean remove(SimpleServerRpcConnection connection) {
593      boolean removed = connections.remove(connection);
594      if (removed) {
595        count.getAndDecrement();
596      }
597      return removed;
598    }
599
600    int size() {
601      return count.get();
602    }
603
604    SimpleServerRpcConnection[] toArray() {
605      return connections.toArray(new SimpleServerRpcConnection[0]);
606    }
607
608    SimpleServerRpcConnection register(SocketChannel channel) {
609      SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis());
610      add(connection);
611      if (LOG.isTraceEnabled()) {
612        LOG.trace("Connection from " + connection +
613            "; connections=" + size() +
614            ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() +
615            ", general queued calls=" + scheduler.getGeneralQueueLength() +
616            ", priority queued calls=" + scheduler.getPriorityQueueLength() +
617            ", meta priority queued calls=" + scheduler.getMetaPriorityQueueLength());
618      }
619      return connection;
620    }
621
622    boolean close(SimpleServerRpcConnection connection) {
623      boolean exists = remove(connection);
624      if (exists) {
625        if (LOG.isTraceEnabled()) {
626          LOG.trace(Thread.currentThread().getName() +
627              ": disconnecting client " + connection +
628              ". Number of active connections: "+ size());
629        }
630        // only close if actually removed to avoid double-closing due
631        // to possible races
632        connection.close();
633      }
634      return exists;
635    }
636
637    // synch'ed to avoid explicit invocation upon OOM from colliding with
638    // timer task firing
639    synchronized void closeIdle(boolean scanAll) {
640      long minLastContact = System.currentTimeMillis() - maxIdleTime;
641      // concurrent iterator might miss new connections added
642      // during the iteration, but that's ok because they won't
643      // be idle yet anyway and will be caught on next scan
644      int closed = 0;
645      for (SimpleServerRpcConnection connection : connections) {
646        // stop if connections dropped below threshold unless scanning all
647        if (!scanAll && size() < idleScanThreshold) {
648          break;
649        }
650        // stop if not scanning all and max connections are closed
651        if (connection.isIdle() &&
652            connection.getLastContact() < minLastContact &&
653            close(connection) &&
654            !scanAll && (++closed == maxIdleToClose)) {
655          break;
656        }
657      }
658    }
659
660    void closeAll() {
661      // use a copy of the connections to be absolutely sure the concurrent
662      // iterator doesn't miss a connection
663      for (SimpleServerRpcConnection connection : toArray()) {
664        close(connection);
665      }
666    }
667
668    void startIdleScan() {
669      scheduleIdleScanTask();
670    }
671
672    void stopIdleScan() {
673      idleScanTimer.cancel();
674    }
675
676    private void scheduleIdleScanTask() {
677      if (!running) {
678        return;
679      }
680      TimerTask idleScanTask = new TimerTask(){
681        @Override
682        public void run() {
683          if (!running) {
684            return;
685          }
686          if (LOG.isTraceEnabled()) {
687            LOG.trace("running");
688          }
689          try {
690            closeIdle(false);
691          } finally {
692            // explicitly reschedule so next execution occurs relative
693            // to the end of this scan, not the beginning
694            scheduleIdleScanTask();
695          }
696        }
697      };
698      idleScanTimer.schedule(idleScanTask, idleScanInterval);
699    }
700  }
701
702}