001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT;
021import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT;
022
023import com.google.errorprone.annotations.RestrictedApi;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.TimeoutException;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.TableDescriptors;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.conf.ConfigurationObserver;
042import org.apache.hadoop.hbase.master.HMaster;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
048
049/**
050 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
051 * (files which have no active references to) mob files.
052 */
053@InterfaceAudience.Private
054public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver {
055
056  private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class);
057
058  private final HMaster master;
059  private final ExpiredMobFileCleaner cleaner;
060  private final ThreadPoolExecutor executor;
061  private final int cleanerFutureTimeout;
062  private int threadCount;
063
064  public MobFileCleanerChore(HMaster master) {
065    super(master.getServerName() + "-MobFileCleanerChore", master,
066      master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
067        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
068      master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
069        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
070      TimeUnit.SECONDS);
071    this.master = master;
072    cleaner = new ExpiredMobFileCleaner();
073    cleaner.setConf(master.getConfiguration());
074    threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
075      MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
076    if (threadCount < 1) {
077      threadCount = 1;
078    }
079
080    ThreadFactory threadFactory =
081      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();
082
083    executor = new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS,
084      new LinkedBlockingQueue<Runnable>(), threadFactory);
085
086    checkObsoleteConfigurations();
087    cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT,
088      DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT);
089  }
090
091  private void checkObsoleteConfigurations() {
092    Configuration conf = master.getConfiguration();
093
094    if (conf.get("hbase.mob.compaction.mergeable.threshold") != null) {
095      LOG.warn("'hbase.mob.compaction.mergeable.threshold' is obsolete and not used anymore.");
096    }
097    if (conf.get("hbase.mob.delfile.max.count") != null) {
098      LOG.warn("'hbase.mob.delfile.max.count' is obsolete and not used anymore.");
099    }
100    if (conf.get("hbase.mob.compaction.threads.max") != null) {
101      LOG.warn("'hbase.mob.compaction.threads.max' is obsolete and not used anymore.");
102    }
103    if (conf.get("hbase.mob.compaction.batch.size") != null) {
104      LOG.warn("'hbase.mob.compaction.batch.size' is obsolete and not used anymore.");
105    }
106  }
107
108  @Override
109  protected void chore() {
110    TableDescriptors htds = master.getTableDescriptors();
111
112    Map<String, TableDescriptor> map = null;
113    try {
114      map = htds.getAll();
115    } catch (IOException e) {
116      LOG.error("MobFileCleanerChore failed", e);
117      return;
118    }
119    List<Future<?>> futureList = new ArrayList<>(map.size());
120    for (TableDescriptor htd : map.values()) {
121      Future<?> future = executor.submit(() -> handleOneTable(htd));
122      futureList.add(future);
123    }
124
125    for (Future<?> future : futureList) {
126      try {
127        future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
128      } catch (InterruptedException e) {
129        LOG.warn("MobFileCleanerChore interrupted while waiting for futures", e);
130        Thread.currentThread().interrupt();
131        cancelAllFutures(futureList);
132        break;
133      } catch (ExecutionException e) {
134        LOG.error("Exception during execution of MobFileCleanerChore task", e);
135      } catch (TimeoutException e) {
136        LOG.error("MobFileCleanerChore timed out waiting for a task to complete", e);
137      }
138    }
139  }
140
141  private void cancelAllFutures(List<Future<?>> futureList) {
142    long pendingTaskCounter = 0;
143    for (Future<?> f : futureList) {
144      if (!f.isDone()) {
145        f.cancel(true); // interrupt running tasks
146        pendingTaskCounter++;
147      }
148    }
149    LOG.info("Cancelled {} pending mob file cleaner tasks", pendingTaskCounter);
150  }
151
152  private void handleOneTable(TableDescriptor htd) {
153    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
154      if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
155        try {
156          cleaner.cleanExpiredMobFiles(htd, hcd);
157        } catch (IOException e) {
158          LOG.error("Failed to clean the expired mob files table={} family={}",
159            htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
160        }
161      }
162    }
163    try {
164      // Now clean obsolete files for a table
165      LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
166      try (final Admin admin = master.getConnection().getAdmin()) {
167        MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
168          admin);
169      }
170      LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
171    } catch (IOException e) {
172      LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
173    }
174  }
175
176  @Override
177  public void onConfigurationChange(Configuration conf) {
178    int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
179      MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
180    if (newThreadCount < 1) {
181      return; // invalid value , skip the config change
182    }
183
184    if (newThreadCount != threadCount) {
185      resizeThreadPool(newThreadCount, newThreadCount);
186      threadCount = newThreadCount;
187    }
188  }
189
190  private void resizeThreadPool(int newCoreSize, int newMaxSize) {
191    int currentCoreSize = executor.getCorePoolSize();
192    if (newCoreSize > currentCoreSize) {
193      // Increasing the pool size: Set max first, then core
194      executor.setMaximumPoolSize(newMaxSize);
195      executor.setCorePoolSize(newCoreSize);
196    } else {
197      // Decreasing the pool size: Set core first, then max
198      executor.setCorePoolSize(newCoreSize);
199      executor.setMaximumPoolSize(newMaxSize);
200    }
201  }
202
203  @RestrictedApi(explanation = "Should only be called in tests", link = "",
204      allowedOnPath = ".*/src/test/.*")
205  public ThreadPoolExecutor getExecutor() {
206    return executor;
207  }
208}