001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
036import org.apache.hadoop.hbase.util.StealJobQueue;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042/**
043 * This Chore, every time it runs, will clear the HFiles in the hfile archive
044 * folder that are deletable for each HFile cleaner in the chain.
045 */
046@InterfaceAudience.Private
047public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
048  implements ConfigurationObserver {
049
050  public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
051
052  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
053    Path directory, DirScanPool pool) {
054    this(period, stopper, conf, fs, directory, pool, null);
055  }
056
057  // Configuration key for large/small throttle point
058  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
059      "hbase.regionserver.thread.hfilecleaner.throttle";
060  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
061
062  // Configuration key for large queue initial size
063  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
064      "hbase.regionserver.hfilecleaner.large.queue.size";
065  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
066
067  // Configuration key for small queue initial size
068  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
069      "hbase.regionserver.hfilecleaner.small.queue.size";
070  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
071
072  // Configuration key for large file delete thread number
073  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
074      "hbase.regionserver.hfilecleaner.large.thread.count";
075  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
076
077  // Configuration key for small file delete thread number
078  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
079      "hbase.regionserver.hfilecleaner.small.thread.count";
080  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
081
082  public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
083      "hbase.regionserver.hfilecleaner.thread.timeout.msec";
084  @VisibleForTesting
085  static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
086
087  public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
088      "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
089  @VisibleForTesting
090  static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
091
092  private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
093
094  StealJobQueue<HFileDeleteTask> largeFileQueue;
095  BlockingQueue<HFileDeleteTask> smallFileQueue;
096  private int throttlePoint;
097  private int largeQueueInitSize;
098  private int smallQueueInitSize;
099  private int largeFileDeleteThreadNumber;
100  private int smallFileDeleteThreadNumber;
101  private long cleanerThreadTimeoutMsec;
102  private long cleanerThreadCheckIntervalMsec;
103  private List<Thread> threads = new ArrayList<Thread>();
104  private boolean running;
105
106  private AtomicLong deletedLargeFiles = new AtomicLong();
107  private AtomicLong deletedSmallFiles = new AtomicLong();
108
109  /**
110   * @param period the period of time to sleep between each run
111   * @param stopper the stopper
112   * @param conf configuration to use
113   * @param fs handle to the FS
114   * @param directory directory to be cleaned
115   * @param pool the thread pool used to scan directories
116   * @param params params could be used in subclass of BaseHFileCleanerDelegate
117   */
118  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
119    Path directory, DirScanPool pool, Map<String, Object> params) {
120    super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
121      params);
122    throttlePoint =
123        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
124    largeQueueInitSize =
125        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
126    smallQueueInitSize =
127        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
128    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
129    smallFileQueue = largeFileQueue.getStealFromQueue();
130    largeFileDeleteThreadNumber =
131        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
132    smallFileDeleteThreadNumber =
133        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
134    cleanerThreadTimeoutMsec =
135        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
136    cleanerThreadCheckIntervalMsec =
137        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
138            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
139    startHFileDeleteThreads();
140  }
141
142  @Override
143  protected boolean validate(Path file) {
144    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
145      return true;
146    }
147    return StoreFileInfo.validateStoreFileName(file.getName());
148  }
149
150  /**
151   * Exposed for TESTING!
152   */
153  public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
154    return this.cleanersChain;
155  }
156
157  @Override
158  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
159    int deletedFiles = 0;
160    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
161    // construct delete tasks and add into relative queue
162    for (FileStatus file : filesToDelete) {
163      HFileDeleteTask task = deleteFile(file);
164      if (task != null) {
165        tasks.add(task);
166      }
167    }
168    // wait for each submitted task to finish
169    for (HFileDeleteTask task : tasks) {
170      if (task.getResult(cleanerThreadCheckIntervalMsec)) {
171        deletedFiles++;
172      }
173    }
174    return deletedFiles;
175  }
176
177  /**
178   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
179   * @param file the file to delete
180   * @return HFileDeleteTask to track progress
181   */
182  private HFileDeleteTask deleteFile(FileStatus file) {
183    HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
184    boolean enqueued = dispatch(task);
185    return enqueued ? task : null;
186  }
187
188  private boolean dispatch(HFileDeleteTask task) {
189    if (task.fileLength >= this.throttlePoint) {
190      if (!this.largeFileQueue.offer(task)) {
191        // should never arrive here as long as we use PriorityQueue
192        LOG.trace("Large file deletion queue is full");
193        return false;
194      }
195    } else {
196      if (!this.smallFileQueue.offer(task)) {
197        // should never arrive here as long as we use PriorityQueue
198        LOG.trace("Small file deletion queue is full");
199        return false;
200      }
201    }
202    return true;
203  }
204
205  @Override
206  public synchronized void cleanup() {
207    super.cleanup();
208    stopHFileDeleteThreads();
209  }
210
211  /**
212   * Start threads for hfile deletion
213   */
214  private void startHFileDeleteThreads() {
215    final String n = Thread.currentThread().getName();
216    running = true;
217    // start thread for large file deletion
218    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
219      Thread large = new Thread() {
220        @Override
221        public void run() {
222          consumerLoop(largeFileQueue);
223        }
224      };
225      large.setDaemon(true);
226      large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
227      large.start();
228      LOG.debug("Starting for large file={}", large);
229      threads.add(large);
230    }
231
232    // start thread for small file deletion
233    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
234      Thread small = new Thread() {
235        @Override
236        public void run() {
237          consumerLoop(smallFileQueue);
238        }
239      };
240      small.setDaemon(true);
241      small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
242      small.start();
243      LOG.debug("Starting for small files={}", small);
244      threads.add(small);
245    }
246  }
247
248  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
249    try {
250      while (running) {
251        HFileDeleteTask task = null;
252        try {
253          task = queue.take();
254        } catch (InterruptedException e) {
255          LOG.trace("Interrupted while trying to take a task from queue", e);
256          break;
257        }
258        if (task != null) {
259          LOG.trace("Removing {}", task.filePath);
260          boolean succeed;
261          try {
262            succeed = this.fs.delete(task.filePath, false);
263          } catch (IOException e) {
264            LOG.warn("Failed to delete {}", task.filePath, e);
265            succeed = false;
266          }
267          task.setResult(succeed);
268          if (succeed) {
269            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
270          }
271        }
272      }
273    } finally {
274      LOG.debug("Exit {}", Thread.currentThread());
275    }
276  }
277
278  // Currently only for testing purpose
279  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
280    if (isLargeFile) {
281      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
282        LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
283        deletedLargeFiles.set(0L);
284      }
285      deletedLargeFiles.incrementAndGet();
286    } else {
287      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
288        LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
289        deletedSmallFiles.set(0L);
290      }
291      if (fromLargeQueue) {
292        LOG.trace("Stolen a small file deletion task in large file thread");
293      }
294      deletedSmallFiles.incrementAndGet();
295    }
296  }
297
298  /**
299   * Stop threads for hfile deletion
300   */
301  private void stopHFileDeleteThreads() {
302    running = false;
303    LOG.debug("Stopping file delete threads");
304    for(Thread thread: threads){
305      thread.interrupt();
306    }
307  }
308
309  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
310
311    @Override
312    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
313      // larger file first so reverse compare
314      int cmp = Long.compare(o2.fileLength, o1.fileLength);
315      if (cmp != 0) {
316        return cmp;
317      }
318      // just use hashCode to generate a stable result.
319      return System.identityHashCode(o1) - System.identityHashCode(o2);
320    }
321  };
322
323  private static final class HFileDeleteTask {
324
325    boolean done = false;
326    boolean result;
327    final Path filePath;
328    final long fileLength;
329    final long timeoutMsec;
330
331    public HFileDeleteTask(FileStatus file, long timeoutMsec) {
332      this.filePath = file.getPath();
333      this.fileLength = file.getLen();
334      this.timeoutMsec = timeoutMsec;
335    }
336
337    public synchronized void setResult(boolean result) {
338      this.done = true;
339      this.result = result;
340      notify();
341    }
342
343    public synchronized boolean getResult(long waitIfNotFinished) {
344      long waitTimeMsec = 0;
345      try {
346        while (!done) {
347          long startTimeNanos = System.nanoTime();
348          wait(waitIfNotFinished);
349          waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
350              TimeUnit.NANOSECONDS);
351          if (done) {
352            return this.result;
353          }
354          if (waitTimeMsec > timeoutMsec) {
355            LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath
356                + ", exit...");
357            return false;
358          }
359        }
360      } catch (InterruptedException e) {
361        LOG.warn("Interrupted while waiting for result of deleting " + filePath
362            + ", will return false", e);
363        return false;
364      }
365      return this.result;
366    }
367  }
368
369  @VisibleForTesting
370  public List<Thread> getCleanerThreads() {
371    return threads;
372  }
373
374  @VisibleForTesting
375  public long getNumOfDeletedLargeFiles() {
376    return deletedLargeFiles.get();
377  }
378
379  @VisibleForTesting
380  public long getNumOfDeletedSmallFiles() {
381    return deletedSmallFiles.get();
382  }
383
384  @VisibleForTesting
385  public long getLargeQueueInitSize() {
386    return largeQueueInitSize;
387  }
388
389  @VisibleForTesting
390  public long getSmallQueueInitSize() {
391    return smallQueueInitSize;
392  }
393
394  @VisibleForTesting
395  public long getThrottlePoint() {
396    return throttlePoint;
397  }
398
399  @VisibleForTesting
400  long getCleanerThreadTimeoutMsec() {
401    return cleanerThreadTimeoutMsec;
402  }
403
404  @VisibleForTesting
405  long getCleanerThreadCheckIntervalMsec() {
406    return cleanerThreadCheckIntervalMsec;
407  }
408
409  @Override
410  public void onConfigurationChange(Configuration conf) {
411    if (!checkAndUpdateConfigurations(conf)) {
412      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
413      return;
414    }
415    stopHFileDeleteThreads();
416    // record the left over tasks
417    List<HFileDeleteTask> leftOverTasks =
418        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
419    leftOverTasks.addAll(largeFileQueue);
420    leftOverTasks.addAll(smallFileQueue);
421    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
422    smallFileQueue = largeFileQueue.getStealFromQueue();
423    threads.clear();
424    startHFileDeleteThreads();
425    // re-dispatch the left over tasks
426    for (HFileDeleteTask task : leftOverTasks) {
427      dispatch(task);
428    }
429  }
430
431  /**
432   * Check new configuration and update settings if value changed
433   * @param conf The new configuration
434   * @return true if any configuration for HFileCleaner changes, false if no change
435   */
436  private boolean checkAndUpdateConfigurations(Configuration conf) {
437    boolean updated = false;
438    int throttlePoint =
439        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
440    if (throttlePoint != this.throttlePoint) {
441      LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint);
442      this.throttlePoint = throttlePoint;
443      updated = true;
444    }
445    int largeQueueInitSize =
446        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
447    if (largeQueueInitSize != this.largeQueueInitSize) {
448      LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize,
449          largeQueueInitSize);
450      this.largeQueueInitSize = largeQueueInitSize;
451      updated = true;
452    }
453    int smallQueueInitSize =
454        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
455    if (smallQueueInitSize != this.smallQueueInitSize) {
456      LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize,
457          smallQueueInitSize);
458      this.smallQueueInitSize = smallQueueInitSize;
459      updated = true;
460    }
461    int largeFileDeleteThreadNumber =
462        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
463    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
464      LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}",
465          this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber);
466      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
467      updated = true;
468    }
469    int smallFileDeleteThreadNumber =
470        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
471    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
472      LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}",
473          this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber);
474      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
475      updated = true;
476    }
477    long cleanerThreadTimeoutMsec =
478        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
479    if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
480      this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
481      updated = true;
482    }
483    long cleanerThreadCheckIntervalMsec =
484        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
485            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
486    if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
487      this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
488      updated = true;
489    }
490    return updated;
491  }
492
493  @Override
494  public synchronized void cancel(boolean mayInterruptIfRunning) {
495    super.cancel(mayInterruptIfRunning);
496    for (Thread t: this.threads) {
497      t.interrupt();
498    }
499  }
500}