001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.util.concurrent.ThreadPoolExecutor;
021import java.util.concurrent.TimeUnit;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.conf.ConfigurationObserver;
024import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
025import org.apache.hadoop.hbase.util.Threads;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
031
032/**
033 * The thread pool used for scan directories
034 */
035@InterfaceAudience.Private
036public class DirScanPool implements ConfigurationObserver {
037  private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
038  private volatile int size;
039  private final ThreadPoolExecutor pool;
040  private int cleanerLatch;
041  private boolean reconfigNotification;
042  private Type dirScanPoolType;
043  private final String name;
044
045  private enum Type {
046    LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
047      CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
048    HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
049
050    private final String cleanerPoolSizeConfigName;
051    private final String cleanerPoolSizeConfigDefault;
052
053    private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
054      this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
055      this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
056    }
057  }
058
059  private DirScanPool(Configuration conf, Type dirScanPoolType) {
060    this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
061      dirScanPoolType.cleanerPoolSizeConfigDefault));
062  }
063
064  private DirScanPool(Type dirScanPoolType, String poolSize) {
065    this.dirScanPoolType = dirScanPoolType;
066    this.name = dirScanPoolType.name().toLowerCase();
067    size = CleanerChore.calculatePoolSize(poolSize);
068    // poolSize may be 0 or 0.0 from a careless configuration,
069    // double check to make sure.
070    size = size == 0
071      ? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault)
072      : size;
073    pool = initializePool(size, name);
074    LOG.info("{} Cleaner pool size is {}", name, size);
075    cleanerLatch = 0;
076  }
077
078  private static ThreadPoolExecutor initializePool(int size, String name) {
079    return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
080      new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
081        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
082  }
083
084  /**
085   * Checks if pool can be updated. If so, mark for update later.
086   * @param conf configuration
087   */
088  @Override
089  public synchronized void onConfigurationChange(Configuration conf) {
090    int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
091      dirScanPoolType.cleanerPoolSizeConfigDefault));
092    if (newSize == size) {
093      LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
094        name, newSize);
095      return;
096    }
097    size = newSize;
098    // Chore is working, update it later.
099    reconfigNotification = true;
100  }
101
102  synchronized void latchCountUp() {
103    cleanerLatch++;
104  }
105
106  synchronized void latchCountDown() {
107    cleanerLatch--;
108    notifyAll();
109  }
110
111  synchronized void execute(Runnable runnable) {
112    pool.execute(runnable);
113  }
114
115  public synchronized void shutdownNow() {
116    if (pool == null || pool.isShutdown()) {
117      return;
118    }
119    pool.shutdownNow();
120  }
121
122  synchronized void tryUpdatePoolSize(long timeout) {
123    if (!reconfigNotification) {
124      return;
125    }
126    reconfigNotification = false;
127    long stopTime = EnvironmentEdgeManager.currentTime() + timeout;
128    while (cleanerLatch != 0 && timeout > 0) {
129      try {
130        wait(timeout);
131        timeout = stopTime - EnvironmentEdgeManager.currentTime();
132      } catch (InterruptedException ie) {
133        Thread.currentThread().interrupt();
134        break;
135      }
136    }
137    LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
138    pool.setCorePoolSize(size);
139  }
140
141  public int getSize() {
142    return size;
143  }
144
145  public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
146    return new DirScanPool(conf, Type.HFILE_CLEANER);
147  }
148
149  public static DirScanPool getHFileCleanerScanPool(String poolSize) {
150    return new DirScanPool(Type.HFILE_CLEANER, poolSize);
151  }
152
153  public static DirScanPool getLogCleanerScanPool(Configuration conf) {
154    return new DirScanPool(conf, Type.LOG_CLEANER);
155  }
156}