001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT; 021import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT; 022 023import com.google.errorprone.annotations.RestrictedApi; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.TableDescriptors; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.conf.ConfigurationObserver; 042import org.apache.hadoop.hbase.master.HMaster; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 048 049/** 050 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete 051 * (files which have no active references to) mob files. 052 */ 053@InterfaceAudience.Private 054public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver { 055 056 private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class); 057 058 private final HMaster master; 059 private final ExpiredMobFileCleaner cleaner; 060 private final ThreadPoolExecutor executor; 061 private final int cleanerFutureTimeout; 062 private int threadCount; 063 064 public MobFileCleanerChore(HMaster master) { 065 super(master.getServerName() + "-MobFileCleanerChore", master, 066 master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 067 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 068 master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 069 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 070 TimeUnit.SECONDS); 071 this.master = master; 072 cleaner = new ExpiredMobFileCleaner(); 073 cleaner.setConf(master.getConfiguration()); 074 threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, 075 MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); 076 if (threadCount < 1) { 077 threadCount = 1; 078 } 079 080 ThreadFactory threadFactory = 081 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build(); 082 083 executor = new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS, 084 new LinkedBlockingQueue<Runnable>(), threadFactory); 085 086 checkObsoleteConfigurations(); 087 cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT, 088 DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT); 089 } 090 091 private void checkObsoleteConfigurations() { 092 Configuration conf = master.getConfiguration(); 093 094 if (conf.get("hbase.mob.compaction.mergeable.threshold") != null) { 095 LOG.warn("'hbase.mob.compaction.mergeable.threshold' is obsolete and not used anymore."); 096 } 097 if (conf.get("hbase.mob.delfile.max.count") != null) { 098 LOG.warn("'hbase.mob.delfile.max.count' is obsolete and not used anymore."); 099 } 100 if (conf.get("hbase.mob.compaction.threads.max") != null) { 101 LOG.warn("'hbase.mob.compaction.threads.max' is obsolete and not used anymore."); 102 } 103 if (conf.get("hbase.mob.compaction.batch.size") != null) { 104 LOG.warn("'hbase.mob.compaction.batch.size' is obsolete and not used anymore."); 105 } 106 } 107 108 @Override 109 protected void chore() { 110 TableDescriptors htds = master.getTableDescriptors(); 111 112 Map<String, TableDescriptor> map = null; 113 try { 114 map = htds.getAll(); 115 } catch (IOException e) { 116 LOG.error("MobFileCleanerChore failed", e); 117 return; 118 } 119 List<Future<?>> futureList = new ArrayList<>(map.size()); 120 for (TableDescriptor htd : map.values()) { 121 Future<?> future = executor.submit(() -> handleOneTable(htd)); 122 futureList.add(future); 123 } 124 125 for (Future<?> future : futureList) { 126 try { 127 future.get(cleanerFutureTimeout, TimeUnit.SECONDS); 128 } catch (InterruptedException e) { 129 LOG.warn("MobFileCleanerChore interrupted while waiting for futures", e); 130 Thread.currentThread().interrupt(); 131 cancelAllFutures(futureList); 132 break; 133 } catch (ExecutionException e) { 134 LOG.error("Exception during execution of MobFileCleanerChore task", e); 135 } catch (TimeoutException e) { 136 LOG.error("MobFileCleanerChore timed out waiting for a task to complete", e); 137 } 138 } 139 } 140 141 private void cancelAllFutures(List<Future<?>> futureList) { 142 long pendingTaskCounter = 0; 143 for (Future<?> f : futureList) { 144 if (!f.isDone()) { 145 f.cancel(true); // interrupt running tasks 146 pendingTaskCounter++; 147 } 148 } 149 LOG.info("Cancelled {} pending mob file cleaner tasks", pendingTaskCounter); 150 } 151 152 private void handleOneTable(TableDescriptor htd) { 153 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 154 if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { 155 try { 156 cleaner.cleanExpiredMobFiles(htd, hcd); 157 } catch (IOException e) { 158 LOG.error("Failed to clean the expired mob files table={} family={}", 159 htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); 160 } 161 } 162 } 163 try { 164 // Now clean obsolete files for a table 165 LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); 166 try (final Admin admin = master.getConnection().getAdmin()) { 167 MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(), 168 admin); 169 } 170 LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); 171 } catch (IOException e) { 172 LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); 173 } 174 } 175 176 @Override 177 public void onConfigurationChange(Configuration conf) { 178 int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, 179 MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); 180 if (newThreadCount < 1) { 181 return; // invalid value , skip the config change 182 } 183 184 if (newThreadCount != threadCount) { 185 resizeThreadPool(newThreadCount, newThreadCount); 186 threadCount = newThreadCount; 187 } 188 } 189 190 private void resizeThreadPool(int newCoreSize, int newMaxSize) { 191 int currentCoreSize = executor.getCorePoolSize(); 192 if (newCoreSize > currentCoreSize) { 193 // Increasing the pool size: Set max first, then core 194 executor.setMaximumPoolSize(newMaxSize); 195 executor.setCorePoolSize(newCoreSize); 196 } else { 197 // Decreasing the pool size: Set core first, then max 198 executor.setCorePoolSize(newCoreSize); 199 executor.setMaximumPoolSize(newMaxSize); 200 } 201 } 202 203 @RestrictedApi(explanation = "Should only be called in tests", link = "", 204 allowedOnPath = ".*/src/test/.*") 205 public ThreadPoolExecutor getExecutor() { 206 return executor; 207 } 208}