001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.util.concurrent.LinkedBlockingQueue; 021import java.util.concurrent.ThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.DaemonThreadFactory; 026import org.apache.hadoop.hbase.conf.ConfigurationObserver; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * The thread pool used for scan directories 033 */ 034@InterfaceAudience.Private 035public class DirScanPool implements ConfigurationObserver { 036 private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class); 037 private volatile int size; 038 private final ThreadPoolExecutor pool; 039 private int cleanerLatch; 040 private boolean reconfigNotification; 041 042 public DirScanPool(Configuration conf) { 043 String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); 044 size = CleanerChore.calculatePoolSize(poolSize); 045 // poolSize may be 0 or 0.0 from a careless configuration, 046 // double check to make sure. 047 size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; 048 pool = initializePool(size); 049 LOG.info("Cleaner pool size is {}", size); 050 cleanerLatch = 0; 051 } 052 053 private static ThreadPoolExecutor initializePool(int size) { 054 ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, 055 new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool")); 056 executor.allowCoreThreadTimeOut(true); 057 return executor; 058 } 059 060 /** 061 * Checks if pool can be updated. If so, mark for update later. 062 * @param conf configuration 063 */ 064 @Override 065 public synchronized void onConfigurationChange(Configuration conf) { 066 int newSize = CleanerChore.calculatePoolSize( 067 conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); 068 if (newSize == size) { 069 LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); 070 return; 071 } 072 size = newSize; 073 // Chore is working, update it later. 074 reconfigNotification = true; 075 } 076 077 synchronized void latchCountUp() { 078 cleanerLatch++; 079 } 080 081 synchronized void latchCountDown() { 082 cleanerLatch--; 083 notifyAll(); 084 } 085 086 synchronized void execute(Runnable runnable) { 087 pool.execute(runnable); 088 } 089 090 public synchronized void shutdownNow() { 091 if (pool == null || pool.isShutdown()) { 092 return; 093 } 094 pool.shutdownNow(); 095 } 096 097 synchronized void tryUpdatePoolSize(long timeout) { 098 if (!reconfigNotification) { 099 return; 100 } 101 reconfigNotification = false; 102 long stopTime = System.currentTimeMillis() + timeout; 103 while (cleanerLatch != 0 && timeout > 0) { 104 try { 105 wait(timeout); 106 timeout = stopTime - System.currentTimeMillis(); 107 } catch (InterruptedException ie) { 108 Thread.currentThread().interrupt(); 109 break; 110 } 111 } 112 LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); 113 pool.setCorePoolSize(size); 114 } 115 116 public int getSize() { 117 return size; 118 } 119}