001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
036import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
037import org.apache.hadoop.hbase.util.StealJobQueue;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043/**
044 * This Chore, every time it runs, will clear the HFiles in the hfile archive
045 * folder that are deletable for each HFile cleaner in the chain.
046 */
047@InterfaceAudience.Private
048public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
049  implements ConfigurationObserver {
050
051  public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
052
053  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
054    Path directory, DirScanPool pool) {
055    this(period, stopper, conf, fs, directory, pool, null);
056  }
057
058  // Configuration key for large/small throttle point
059  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
060      "hbase.regionserver.thread.hfilecleaner.throttle";
061  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
062
063  // Configuration key for large queue initial size
064  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
065      "hbase.regionserver.hfilecleaner.large.queue.size";
066  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
067
068  // Configuration key for small queue initial size
069  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
070      "hbase.regionserver.hfilecleaner.small.queue.size";
071  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
072
073  // Configuration key for large file delete thread number
074  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
075      "hbase.regionserver.hfilecleaner.large.thread.count";
076  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
077
078  // Configuration key for small file delete thread number
079  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
080      "hbase.regionserver.hfilecleaner.small.thread.count";
081  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
082
083  public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
084      "hbase.regionserver.hfilecleaner.thread.timeout.msec";
085  @VisibleForTesting
086  static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
087
088  public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
089      "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
090  @VisibleForTesting
091  static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
092
093  private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
094
095  StealJobQueue<HFileDeleteTask> largeFileQueue;
096  BlockingQueue<HFileDeleteTask> smallFileQueue;
097  private int throttlePoint;
098  private int largeQueueInitSize;
099  private int smallQueueInitSize;
100  private int largeFileDeleteThreadNumber;
101  private int smallFileDeleteThreadNumber;
102  private long cleanerThreadTimeoutMsec;
103  private long cleanerThreadCheckIntervalMsec;
104  private List<Thread> threads = new ArrayList<Thread>();
105  private volatile boolean running;
106
107  private AtomicLong deletedLargeFiles = new AtomicLong();
108  private AtomicLong deletedSmallFiles = new AtomicLong();
109
110  /**
111   * @param period the period of time to sleep between each run
112   * @param stopper the stopper
113   * @param conf configuration to use
114   * @param fs handle to the FS
115   * @param directory directory to be cleaned
116   * @param pool the thread pool used to scan directories
117   * @param params params could be used in subclass of BaseHFileCleanerDelegate
118   */
119  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
120    Path directory, DirScanPool pool, Map<String, Object> params) {
121    this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
122      params);
123
124  }
125
126  /**
127   * For creating customized HFileCleaner.
128   * @param name name of the chore being run
129   * @param period the period of time to sleep between each run
130   * @param stopper the stopper
131   * @param conf configuration to use
132   * @param fs handle to the FS
133   * @param directory directory to be cleaned
134   * @param confKey configuration key for the classes to instantiate
135   * @param pool the thread pool used to scan directories
136   * @param params params could be used in subclass of BaseHFileCleanerDelegate
137   */
138  public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
139    Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
140    super(name, period, stopper, conf, fs, directory, confKey, pool, params);
141    throttlePoint =
142      conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
143    largeQueueInitSize =
144      conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
145    smallQueueInitSize =
146      conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
147    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
148    smallFileQueue = largeFileQueue.getStealFromQueue();
149    largeFileDeleteThreadNumber =
150      conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
151    smallFileDeleteThreadNumber =
152      conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
153    cleanerThreadTimeoutMsec =
154      conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
155    cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
156      DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
157    startHFileDeleteThreads();
158  }
159
160  @Override
161  protected boolean validate(Path file) {
162    return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
163      StoreFileInfo.validateStoreFileName(file.getName()) ||
164      file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
165  }
166
167  /**
168   * Exposed for TESTING!
169   */
170  public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
171    return this.cleanersChain;
172  }
173
174  @Override
175  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
176    int deletedFiles = 0;
177    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
178    // construct delete tasks and add into relative queue
179    for (FileStatus file : filesToDelete) {
180      HFileDeleteTask task = deleteFile(file);
181      if (task != null) {
182        tasks.add(task);
183      }
184    }
185    // wait for each submitted task to finish
186    for (HFileDeleteTask task : tasks) {
187      if (task.getResult(cleanerThreadCheckIntervalMsec)) {
188        deletedFiles++;
189      }
190    }
191    return deletedFiles;
192  }
193
194  /**
195   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
196   * @param file the file to delete
197   * @return HFileDeleteTask to track progress
198   */
199  private HFileDeleteTask deleteFile(FileStatus file) {
200    HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
201    boolean enqueued = dispatch(task);
202    return enqueued ? task : null;
203  }
204
205  private boolean dispatch(HFileDeleteTask task) {
206    if (task.fileLength >= this.throttlePoint) {
207      if (!this.largeFileQueue.offer(task)) {
208        // should never arrive here as long as we use PriorityQueue
209        LOG.trace("Large file deletion queue is full");
210        return false;
211      }
212    } else {
213      if (!this.smallFileQueue.offer(task)) {
214        // should never arrive here as long as we use PriorityQueue
215        LOG.trace("Small file deletion queue is full");
216        return false;
217      }
218    }
219    return true;
220  }
221
222  @Override
223  public synchronized void cleanup() {
224    super.cleanup();
225    stopHFileDeleteThreads();
226  }
227
228  /**
229   * Start threads for hfile deletion
230   */
231  private void startHFileDeleteThreads() {
232    final String n = Thread.currentThread().getName();
233    running = true;
234    // start thread for large file deletion
235    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
236      Thread large = new Thread() {
237        @Override
238        public void run() {
239          consumerLoop(largeFileQueue);
240        }
241      };
242      large.setDaemon(true);
243      large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
244      large.start();
245      LOG.debug("Starting for large file={}", large);
246      threads.add(large);
247    }
248
249    // start thread for small file deletion
250    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
251      Thread small = new Thread() {
252        @Override
253        public void run() {
254          consumerLoop(smallFileQueue);
255        }
256      };
257      small.setDaemon(true);
258      small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
259      small.start();
260      LOG.debug("Starting for small files={}", small);
261      threads.add(small);
262    }
263  }
264
265  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
266    try {
267      while (running) {
268        HFileDeleteTask task = null;
269        try {
270          task = queue.take();
271        } catch (InterruptedException e) {
272          LOG.trace("Interrupted while trying to take a task from queue", e);
273          break;
274        }
275        if (task != null) {
276          LOG.trace("Removing {}", task.filePath);
277          boolean succeed;
278          try {
279            succeed = this.fs.delete(task.filePath, false);
280          } catch (IOException e) {
281            LOG.warn("Failed to delete {}", task.filePath, e);
282            succeed = false;
283          }
284          task.setResult(succeed);
285          if (succeed) {
286            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
287          }
288        }
289      }
290    } finally {
291      LOG.debug("Exit {}", Thread.currentThread());
292    }
293  }
294
295  // Currently only for testing purpose
296  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
297    if (isLargeFile) {
298      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
299        LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
300        deletedLargeFiles.set(0L);
301      }
302      deletedLargeFiles.incrementAndGet();
303    } else {
304      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
305        LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
306        deletedSmallFiles.set(0L);
307      }
308      if (fromLargeQueue) {
309        LOG.trace("Stolen a small file deletion task in large file thread");
310      }
311      deletedSmallFiles.incrementAndGet();
312    }
313  }
314
315  /**
316   * Stop threads for hfile deletion
317   */
318  private void stopHFileDeleteThreads() {
319    running = false;
320    LOG.debug("Stopping file delete threads");
321    for(Thread thread: threads){
322      thread.interrupt();
323    }
324  }
325
326  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
327
328    @Override
329    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
330      // larger file first so reverse compare
331      int cmp = Long.compare(o2.fileLength, o1.fileLength);
332      if (cmp != 0) {
333        return cmp;
334      }
335      // just use hashCode to generate a stable result.
336      return System.identityHashCode(o1) - System.identityHashCode(o2);
337    }
338  };
339
340  private static final class HFileDeleteTask {
341
342    boolean done = false;
343    boolean result;
344    final Path filePath;
345    final long fileLength;
346    final long timeoutMsec;
347
348    public HFileDeleteTask(FileStatus file, long timeoutMsec) {
349      this.filePath = file.getPath();
350      this.fileLength = file.getLen();
351      this.timeoutMsec = timeoutMsec;
352    }
353
354    public synchronized void setResult(boolean result) {
355      this.done = true;
356      this.result = result;
357      notify();
358    }
359
360    public synchronized boolean getResult(long waitIfNotFinished) {
361      long waitTimeMsec = 0;
362      try {
363        while (!done) {
364          long startTimeNanos = System.nanoTime();
365          wait(waitIfNotFinished);
366          waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
367              TimeUnit.NANOSECONDS);
368          if (done) {
369            return this.result;
370          }
371          if (waitTimeMsec > timeoutMsec) {
372            LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath
373                + ", exit...");
374            return false;
375          }
376        }
377      } catch (InterruptedException e) {
378        LOG.warn("Interrupted while waiting for result of deleting " + filePath
379            + ", will return false", e);
380        return false;
381      }
382      return this.result;
383    }
384  }
385
386  @VisibleForTesting
387  public List<Thread> getCleanerThreads() {
388    return threads;
389  }
390
391  @VisibleForTesting
392  public long getNumOfDeletedLargeFiles() {
393    return deletedLargeFiles.get();
394  }
395
396  @VisibleForTesting
397  public long getNumOfDeletedSmallFiles() {
398    return deletedSmallFiles.get();
399  }
400
401  @VisibleForTesting
402  public long getLargeQueueInitSize() {
403    return largeQueueInitSize;
404  }
405
406  @VisibleForTesting
407  public long getSmallQueueInitSize() {
408    return smallQueueInitSize;
409  }
410
411  @VisibleForTesting
412  public long getThrottlePoint() {
413    return throttlePoint;
414  }
415
416  @VisibleForTesting
417  long getCleanerThreadTimeoutMsec() {
418    return cleanerThreadTimeoutMsec;
419  }
420
421  @VisibleForTesting
422  long getCleanerThreadCheckIntervalMsec() {
423    return cleanerThreadCheckIntervalMsec;
424  }
425
426  @Override
427  public void onConfigurationChange(Configuration conf) {
428    if (!checkAndUpdateConfigurations(conf)) {
429      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
430      return;
431    }
432    stopHFileDeleteThreads();
433    // record the left over tasks
434    List<HFileDeleteTask> leftOverTasks =
435        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
436    leftOverTasks.addAll(largeFileQueue);
437    leftOverTasks.addAll(smallFileQueue);
438    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
439    smallFileQueue = largeFileQueue.getStealFromQueue();
440    threads.clear();
441    startHFileDeleteThreads();
442    // re-dispatch the left over tasks
443    for (HFileDeleteTask task : leftOverTasks) {
444      dispatch(task);
445    }
446  }
447
448  /**
449   * Check new configuration and update settings if value changed
450   * @param conf The new configuration
451   * @return true if any configuration for HFileCleaner changes, false if no change
452   */
453  private boolean checkAndUpdateConfigurations(Configuration conf) {
454    boolean updated = false;
455    int throttlePoint =
456        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
457    if (throttlePoint != this.throttlePoint) {
458      LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint);
459      this.throttlePoint = throttlePoint;
460      updated = true;
461    }
462    int largeQueueInitSize =
463        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
464    if (largeQueueInitSize != this.largeQueueInitSize) {
465      LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize,
466          largeQueueInitSize);
467      this.largeQueueInitSize = largeQueueInitSize;
468      updated = true;
469    }
470    int smallQueueInitSize =
471        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
472    if (smallQueueInitSize != this.smallQueueInitSize) {
473      LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize,
474          smallQueueInitSize);
475      this.smallQueueInitSize = smallQueueInitSize;
476      updated = true;
477    }
478    int largeFileDeleteThreadNumber =
479        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
480    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
481      LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}",
482          this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber);
483      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
484      updated = true;
485    }
486    int smallFileDeleteThreadNumber =
487        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
488    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
489      LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}",
490          this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber);
491      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
492      updated = true;
493    }
494    long cleanerThreadTimeoutMsec =
495        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
496    if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
497      this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
498      updated = true;
499    }
500    long cleanerThreadCheckIntervalMsec =
501        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
502            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
503    if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
504      this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
505      updated = true;
506    }
507    return updated;
508  }
509
510  @Override
511  public synchronized void cancel(boolean mayInterruptIfRunning) {
512    super.cancel(mayInterruptIfRunning);
513    for (Thread t: this.threads) {
514      t.interrupt();
515    }
516  }
517}