001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
036import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.StealJobQueue;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * This Chore, every time it runs, will clear the HFiles in the hfile archive folder that are
045 * deletable for each HFile cleaner in the chain.
046 */
047@InterfaceAudience.Private
048public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
049  implements ConfigurationObserver {
050
051  public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
052
053  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
054    Path directory, DirScanPool pool) {
055    this(period, stopper, conf, fs, directory, pool, null);
056  }
057
058  // Configuration key for large/small throttle point
059  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
060    "hbase.regionserver.thread.hfilecleaner.throttle";
061  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
062
063  // Configuration key for large queue initial size
064  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
065    "hbase.regionserver.hfilecleaner.large.queue.size";
066  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
067
068  // Configuration key for small queue initial size
069  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
070    "hbase.regionserver.hfilecleaner.small.queue.size";
071  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
072
073  // Configuration key for large file delete thread number
074  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
075    "hbase.regionserver.hfilecleaner.large.thread.count";
076  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
077
078  // Configuration key for small file delete thread number
079  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
080    "hbase.regionserver.hfilecleaner.small.thread.count";
081  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
082
083  public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
084    "hbase.regionserver.hfilecleaner.thread.timeout.msec";
085  static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
086
087  public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
088    "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
089  static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
090
091  /**
092   * The custom paths for hfile cleaner, subdirectories of archive, e.g.
093   * data/default/testTable1,data/default/testTable2
094   */
095  public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";
096
097  /** Configure hfile cleaner classes for the custom paths */
098  public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
099    "hbase.master.hfilecleaner.custom.paths.plugins";
100  public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";
101
102  private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
103
104  StealJobQueue<HFileDeleteTask> largeFileQueue;
105  BlockingQueue<HFileDeleteTask> smallFileQueue;
106  private int throttlePoint;
107  private int largeQueueInitSize;
108  private int smallQueueInitSize;
109  private int largeFileDeleteThreadNumber;
110  private int smallFileDeleteThreadNumber;
111  private long cleanerThreadTimeoutMsec;
112  private long cleanerThreadCheckIntervalMsec;
113  private List<Thread> threads = new ArrayList<Thread>();
114  private volatile boolean running;
115
116  private AtomicLong deletedLargeFiles = new AtomicLong();
117  private AtomicLong deletedSmallFiles = new AtomicLong();
118
119  /**
120   * @param period    the period of time to sleep between each run
121   * @param stopper   the stopper
122   * @param conf      configuration to use
123   * @param fs        handle to the FS
124   * @param directory directory to be cleaned
125   * @param pool      the thread pool used to scan directories
126   * @param params    params could be used in subclass of BaseHFileCleanerDelegate
127   */
128  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
129    Path directory, DirScanPool pool, Map<String, Object> params) {
130    this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
131      params, null);
132  }
133
134  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
135    Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) {
136    this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
137      params, excludePaths);
138  }
139
140  /**
141   * For creating customized HFileCleaner.
142   * @param name      name of the chore being run
143   * @param period    the period of time to sleep between each run
144   * @param stopper   the stopper
145   * @param conf      configuration to use
146   * @param fs        handle to the FS
147   * @param directory directory to be cleaned
148   * @param confKey   configuration key for the classes to instantiate
149   * @param pool      the thread pool used to scan directories
150   * @param params    params could be used in subclass of BaseHFileCleanerDelegate
151   */
152  public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
153    Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
154    List<Path> excludePaths) {
155    super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
156    throttlePoint =
157      conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
158    largeQueueInitSize =
159      conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
160    smallQueueInitSize =
161      conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
162    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
163    smallFileQueue = largeFileQueue.getStealFromQueue();
164    largeFileDeleteThreadNumber =
165      conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
166    smallFileDeleteThreadNumber =
167      conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
168    cleanerThreadTimeoutMsec =
169      conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
170    cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
171      DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
172    startHFileDeleteThreads();
173  }
174
175  @Override
176  protected boolean validate(Path file) {
177    return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())
178      || StoreFileInfo.validateStoreFileName(file.getName())
179      || file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
180  }
181
182  /**
183   * Exposed for TESTING!
184   */
185  public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
186    return this.cleanersChain;
187  }
188
189  @Override
190  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
191    int deletedFiles = 0;
192    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
193    // construct delete tasks and add into relative queue
194    for (FileStatus file : filesToDelete) {
195      HFileDeleteTask task = deleteFile(file);
196      if (task != null) {
197        tasks.add(task);
198      }
199    }
200    // wait for each submitted task to finish
201    for (HFileDeleteTask task : tasks) {
202      if (task.getResult(cleanerThreadCheckIntervalMsec)) {
203        deletedFiles++;
204      }
205    }
206    return deletedFiles;
207  }
208
209  /**
210   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
211   * @param file the file to delete
212   * @return HFileDeleteTask to track progress
213   */
214  private HFileDeleteTask deleteFile(FileStatus file) {
215    HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
216    boolean enqueued = dispatch(task);
217    return enqueued ? task : null;
218  }
219
220  private boolean dispatch(HFileDeleteTask task) {
221    if (task.fileLength >= this.throttlePoint) {
222      if (!this.largeFileQueue.offer(task)) {
223        // should never arrive here as long as we use PriorityQueue
224        LOG.trace("Large file deletion queue is full");
225        return false;
226      }
227    } else {
228      if (!this.smallFileQueue.offer(task)) {
229        // should never arrive here as long as we use PriorityQueue
230        LOG.trace("Small file deletion queue is full");
231        return false;
232      }
233    }
234    return true;
235  }
236
237  @Override
238  public synchronized void cleanup() {
239    super.cleanup();
240    stopHFileDeleteThreads();
241  }
242
243  /**
244   * Start threads for hfile deletion
245   */
246  private void startHFileDeleteThreads() {
247    final String n = Thread.currentThread().getName();
248    running = true;
249    // start thread for large file deletion
250    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
251      Thread large = new Thread() {
252        @Override
253        public void run() {
254          consumerLoop(largeFileQueue);
255        }
256      };
257      large.setDaemon(true);
258      large.setName(n + "-HFileCleaner.large." + i + "-" + EnvironmentEdgeManager.currentTime());
259      large.start();
260      LOG.debug("Starting for large file={}", large);
261      threads.add(large);
262    }
263
264    // start thread for small file deletion
265    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
266      Thread small = new Thread() {
267        @Override
268        public void run() {
269          consumerLoop(smallFileQueue);
270        }
271      };
272      small.setDaemon(true);
273      small.setName(n + "-HFileCleaner.small." + i + "-" + EnvironmentEdgeManager.currentTime());
274      small.start();
275      LOG.debug("Starting for small files={}", small);
276      threads.add(small);
277    }
278  }
279
280  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
281    try {
282      while (running) {
283        HFileDeleteTask task = null;
284        try {
285          task = queue.take();
286        } catch (InterruptedException e) {
287          LOG.trace("Interrupted while trying to take a task from queue", e);
288          break;
289        }
290        if (task != null) {
291          LOG.trace("Removing {}", task.filePath);
292          boolean succeed;
293          try {
294            succeed = this.fs.delete(task.filePath, false);
295          } catch (IOException e) {
296            LOG.warn("Failed to delete {}", task.filePath, e);
297            succeed = false;
298          }
299          task.setResult(succeed);
300          if (succeed) {
301            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
302          }
303        }
304      }
305    } finally {
306      LOG.debug("Exit {}", Thread.currentThread());
307    }
308  }
309
310  // Currently only for testing purpose
311  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
312    if (isLargeFile) {
313      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
314        LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
315        deletedLargeFiles.set(0L);
316      }
317      deletedLargeFiles.incrementAndGet();
318    } else {
319      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
320        LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
321        deletedSmallFiles.set(0L);
322      }
323      if (fromLargeQueue) {
324        LOG.trace("Stolen a small file deletion task in large file thread");
325      }
326      deletedSmallFiles.incrementAndGet();
327    }
328  }
329
330  /**
331   * Stop threads for hfile deletion
332   */
333  private void stopHFileDeleteThreads() {
334    running = false;
335    LOG.debug("Stopping file delete threads");
336    for (Thread thread : threads) {
337      thread.interrupt();
338    }
339  }
340
341  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
342
343    @Override
344    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
345      // larger file first so reverse compare
346      int cmp = Long.compare(o2.fileLength, o1.fileLength);
347      if (cmp != 0) {
348        return cmp;
349      }
350      // just use hashCode to generate a stable result.
351      return System.identityHashCode(o1) - System.identityHashCode(o2);
352    }
353  };
354
355  private static final class HFileDeleteTask {
356
357    boolean done = false;
358    boolean result;
359    final Path filePath;
360    final long fileLength;
361    final long timeoutMsec;
362
363    public HFileDeleteTask(FileStatus file, long timeoutMsec) {
364      this.filePath = file.getPath();
365      this.fileLength = file.getLen();
366      this.timeoutMsec = timeoutMsec;
367    }
368
369    public synchronized void setResult(boolean result) {
370      this.done = true;
371      this.result = result;
372      notify();
373    }
374
375    public synchronized boolean getResult(long waitIfNotFinished) {
376      long waitTimeMsec = 0;
377      try {
378        while (!done) {
379          long startTimeNanos = System.nanoTime();
380          wait(waitIfNotFinished);
381          waitTimeMsec +=
382            TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
383          if (done) {
384            return this.result;
385          }
386          if (waitTimeMsec > timeoutMsec) {
387            LOG.warn(
388              "Wait more than " + timeoutMsec + " ms for deleting " + this.filePath + ", exit...");
389            return false;
390          }
391        }
392      } catch (InterruptedException e) {
393        LOG.warn(
394          "Interrupted while waiting for result of deleting " + filePath + ", will return false",
395          e);
396        return false;
397      }
398      return this.result;
399    }
400  }
401
402  public List<Thread> getCleanerThreads() {
403    return threads;
404  }
405
406  public long getNumOfDeletedLargeFiles() {
407    return deletedLargeFiles.get();
408  }
409
410  public long getNumOfDeletedSmallFiles() {
411    return deletedSmallFiles.get();
412  }
413
414  public long getLargeQueueInitSize() {
415    return largeQueueInitSize;
416  }
417
418  public long getSmallQueueInitSize() {
419    return smallQueueInitSize;
420  }
421
422  public long getThrottlePoint() {
423    return throttlePoint;
424  }
425
426  long getCleanerThreadTimeoutMsec() {
427    return cleanerThreadTimeoutMsec;
428  }
429
430  long getCleanerThreadCheckIntervalMsec() {
431    return cleanerThreadCheckIntervalMsec;
432  }
433
434  @Override
435  public void onConfigurationChange(Configuration conf) {
436    if (!checkAndUpdateConfigurations(conf)) {
437      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
438      return;
439    }
440    stopHFileDeleteThreads();
441    // record the left over tasks
442    List<HFileDeleteTask> leftOverTasks =
443      new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
444    leftOverTasks.addAll(largeFileQueue);
445    leftOverTasks.addAll(smallFileQueue);
446    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
447    smallFileQueue = largeFileQueue.getStealFromQueue();
448    threads.clear();
449    startHFileDeleteThreads();
450    // re-dispatch the left over tasks
451    for (HFileDeleteTask task : leftOverTasks) {
452      dispatch(task);
453    }
454  }
455
456  /**
457   * Check new configuration and update settings if value changed
458   * @param conf The new configuration
459   * @return true if any configuration for HFileCleaner changes, false if no change
460   */
461  private boolean checkAndUpdateConfigurations(Configuration conf) {
462    boolean updated = false;
463    int throttlePoint =
464      conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
465    if (throttlePoint != this.throttlePoint) {
466      LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint);
467      this.throttlePoint = throttlePoint;
468      updated = true;
469    }
470    int largeQueueInitSize =
471      conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
472    if (largeQueueInitSize != this.largeQueueInitSize) {
473      LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize,
474        largeQueueInitSize);
475      this.largeQueueInitSize = largeQueueInitSize;
476      updated = true;
477    }
478    int smallQueueInitSize =
479      conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
480    if (smallQueueInitSize != this.smallQueueInitSize) {
481      LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize,
482        smallQueueInitSize);
483      this.smallQueueInitSize = smallQueueInitSize;
484      updated = true;
485    }
486    int largeFileDeleteThreadNumber =
487      conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
488    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
489      LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}",
490        this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber);
491      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
492      updated = true;
493    }
494    int smallFileDeleteThreadNumber =
495      conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
496    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
497      LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}",
498        this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber);
499      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
500      updated = true;
501    }
502    long cleanerThreadTimeoutMsec =
503      conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
504    if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
505      this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
506      updated = true;
507    }
508    long cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
509      DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
510    if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
511      this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
512      updated = true;
513    }
514    return updated;
515  }
516
517  @Override
518  public synchronized void cancel(boolean mayInterruptIfRunning) {
519    super.cancel(mayInterruptIfRunning);
520    for (Thread t : this.threads) {
521      t.interrupt();
522    }
523  }
524}