001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.util.concurrent.ThreadPoolExecutor; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.conf.ConfigurationObserver; 024import org.apache.hadoop.hbase.util.Threads; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 030 031/** 032 * The thread pool used for scan directories 033 */ 034@InterfaceAudience.Private 035public class DirScanPool implements ConfigurationObserver { 036 private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class); 037 private volatile int size; 038 private final ThreadPoolExecutor pool; 039 private int cleanerLatch; 040 private boolean reconfigNotification; 041 042 public DirScanPool(Configuration conf) { 043 String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); 044 size = CleanerChore.calculatePoolSize(poolSize); 045 // poolSize may be 0 or 0.0 from a careless configuration, 046 // double check to make sure. 047 size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; 048 pool = initializePool(size); 049 LOG.info("Cleaner pool size is {}", size); 050 cleanerLatch = 0; 051 } 052 053 private static ThreadPoolExecutor initializePool(int size) { 054 return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, 055 new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true) 056 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 057 } 058 059 /** 060 * Checks if pool can be updated. If so, mark for update later. 061 * @param conf configuration 062 */ 063 @Override 064 public synchronized void onConfigurationChange(Configuration conf) { 065 int newSize = CleanerChore.calculatePoolSize( 066 conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); 067 if (newSize == size) { 068 LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); 069 return; 070 } 071 size = newSize; 072 // Chore is working, update it later. 073 reconfigNotification = true; 074 } 075 076 synchronized void latchCountUp() { 077 cleanerLatch++; 078 } 079 080 synchronized void latchCountDown() { 081 cleanerLatch--; 082 notifyAll(); 083 } 084 085 synchronized void execute(Runnable runnable) { 086 pool.execute(runnable); 087 } 088 089 public synchronized void shutdownNow() { 090 if (pool == null || pool.isShutdown()) { 091 return; 092 } 093 pool.shutdownNow(); 094 } 095 096 synchronized void tryUpdatePoolSize(long timeout) { 097 if (!reconfigNotification) { 098 return; 099 } 100 reconfigNotification = false; 101 long stopTime = System.currentTimeMillis() + timeout; 102 while (cleanerLatch != 0 && timeout > 0) { 103 try { 104 wait(timeout); 105 timeout = stopTime - System.currentTimeMillis(); 106 } catch (InterruptedException ie) { 107 Thread.currentThread().interrupt(); 108 break; 109 } 110 } 111 LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); 112 pool.setCorePoolSize(size); 113 } 114 115 public int getSize() { 116 return size; 117 } 118}