001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
036import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
037import org.apache.hadoop.hbase.util.StealJobQueue;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This Chore, every time it runs, will clear the HFiles in the hfile archive
044 * folder that are deletable for each HFile cleaner in the chain.
045 */
046@InterfaceAudience.Private
047public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
048  implements ConfigurationObserver {
049
050  public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
051
052  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
053    Path directory, DirScanPool pool) {
054    this(period, stopper, conf, fs, directory, pool, null);
055  }
056
057  // Configuration key for large/small throttle point
058  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
059      "hbase.regionserver.thread.hfilecleaner.throttle";
060  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
061
062  // Configuration key for large queue initial size
063  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
064      "hbase.regionserver.hfilecleaner.large.queue.size";
065  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
066
067  // Configuration key for small queue initial size
068  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
069      "hbase.regionserver.hfilecleaner.small.queue.size";
070  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
071
072  // Configuration key for large file delete thread number
073  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
074      "hbase.regionserver.hfilecleaner.large.thread.count";
075  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
076
077  // Configuration key for small file delete thread number
078  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
079      "hbase.regionserver.hfilecleaner.small.thread.count";
080  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
081
082  public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
083      "hbase.regionserver.hfilecleaner.thread.timeout.msec";
084  static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
085
086  public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
087      "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
088  static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
089
090  private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
091
092  StealJobQueue<HFileDeleteTask> largeFileQueue;
093  BlockingQueue<HFileDeleteTask> smallFileQueue;
094  private int throttlePoint;
095  private int largeQueueInitSize;
096  private int smallQueueInitSize;
097  private int largeFileDeleteThreadNumber;
098  private int smallFileDeleteThreadNumber;
099  private long cleanerThreadTimeoutMsec;
100  private long cleanerThreadCheckIntervalMsec;
101  private List<Thread> threads = new ArrayList<Thread>();
102  private volatile boolean running;
103
104  private AtomicLong deletedLargeFiles = new AtomicLong();
105  private AtomicLong deletedSmallFiles = new AtomicLong();
106
107  /**
108   * @param period the period of time to sleep between each run
109   * @param stopper the stopper
110   * @param conf configuration to use
111   * @param fs handle to the FS
112   * @param directory directory to be cleaned
113   * @param pool the thread pool used to scan directories
114   * @param params params could be used in subclass of BaseHFileCleanerDelegate
115   */
116  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
117    Path directory, DirScanPool pool, Map<String, Object> params) {
118    this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
119      params);
120
121  }
122
123  /**
124   * For creating customized HFileCleaner.
125   * @param name name of the chore being run
126   * @param period the period of time to sleep between each run
127   * @param stopper the stopper
128   * @param conf configuration to use
129   * @param fs handle to the FS
130   * @param directory directory to be cleaned
131   * @param confKey configuration key for the classes to instantiate
132   * @param pool the thread pool used to scan directories
133   * @param params params could be used in subclass of BaseHFileCleanerDelegate
134   */
135  public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
136    Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
137    super(name, period, stopper, conf, fs, directory, confKey, pool, params);
138    throttlePoint =
139      conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
140    largeQueueInitSize =
141      conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
142    smallQueueInitSize =
143      conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
144    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
145    smallFileQueue = largeFileQueue.getStealFromQueue();
146    largeFileDeleteThreadNumber =
147      conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
148    smallFileDeleteThreadNumber =
149      conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
150    cleanerThreadTimeoutMsec =
151      conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
152    cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
153      DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
154    startHFileDeleteThreads();
155  }
156
157  @Override
158  protected boolean validate(Path file) {
159    return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
160      StoreFileInfo.validateStoreFileName(file.getName()) ||
161      file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
162  }
163
164  /**
165   * Exposed for TESTING!
166   */
167  public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
168    return this.cleanersChain;
169  }
170
171  @Override
172  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
173    int deletedFiles = 0;
174    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
175    // construct delete tasks and add into relative queue
176    for (FileStatus file : filesToDelete) {
177      HFileDeleteTask task = deleteFile(file);
178      if (task != null) {
179        tasks.add(task);
180      }
181    }
182    // wait for each submitted task to finish
183    for (HFileDeleteTask task : tasks) {
184      if (task.getResult(cleanerThreadCheckIntervalMsec)) {
185        deletedFiles++;
186      }
187    }
188    return deletedFiles;
189  }
190
191  /**
192   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
193   * @param file the file to delete
194   * @return HFileDeleteTask to track progress
195   */
196  private HFileDeleteTask deleteFile(FileStatus file) {
197    HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
198    boolean enqueued = dispatch(task);
199    return enqueued ? task : null;
200  }
201
202  private boolean dispatch(HFileDeleteTask task) {
203    if (task.fileLength >= this.throttlePoint) {
204      if (!this.largeFileQueue.offer(task)) {
205        // should never arrive here as long as we use PriorityQueue
206        LOG.trace("Large file deletion queue is full");
207        return false;
208      }
209    } else {
210      if (!this.smallFileQueue.offer(task)) {
211        // should never arrive here as long as we use PriorityQueue
212        LOG.trace("Small file deletion queue is full");
213        return false;
214      }
215    }
216    return true;
217  }
218
219  @Override
220  public synchronized void cleanup() {
221    super.cleanup();
222    stopHFileDeleteThreads();
223  }
224
225  /**
226   * Start threads for hfile deletion
227   */
228  private void startHFileDeleteThreads() {
229    final String n = Thread.currentThread().getName();
230    running = true;
231    // start thread for large file deletion
232    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
233      Thread large = new Thread() {
234        @Override
235        public void run() {
236          consumerLoop(largeFileQueue);
237        }
238      };
239      large.setDaemon(true);
240      large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
241      large.start();
242      LOG.debug("Starting for large file={}", large);
243      threads.add(large);
244    }
245
246    // start thread for small file deletion
247    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
248      Thread small = new Thread() {
249        @Override
250        public void run() {
251          consumerLoop(smallFileQueue);
252        }
253      };
254      small.setDaemon(true);
255      small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
256      small.start();
257      LOG.debug("Starting for small files={}", small);
258      threads.add(small);
259    }
260  }
261
262  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
263    try {
264      while (running) {
265        HFileDeleteTask task = null;
266        try {
267          task = queue.take();
268        } catch (InterruptedException e) {
269          LOG.trace("Interrupted while trying to take a task from queue", e);
270          break;
271        }
272        if (task != null) {
273          LOG.trace("Removing {}", task.filePath);
274          boolean succeed;
275          try {
276            succeed = this.fs.delete(task.filePath, false);
277          } catch (IOException e) {
278            LOG.warn("Failed to delete {}", task.filePath, e);
279            succeed = false;
280          }
281          task.setResult(succeed);
282          if (succeed) {
283            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
284          }
285        }
286      }
287    } finally {
288      LOG.debug("Exit {}", Thread.currentThread());
289    }
290  }
291
292  // Currently only for testing purpose
293  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
294    if (isLargeFile) {
295      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
296        LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
297        deletedLargeFiles.set(0L);
298      }
299      deletedLargeFiles.incrementAndGet();
300    } else {
301      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
302        LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
303        deletedSmallFiles.set(0L);
304      }
305      if (fromLargeQueue) {
306        LOG.trace("Stolen a small file deletion task in large file thread");
307      }
308      deletedSmallFiles.incrementAndGet();
309    }
310  }
311
312  /**
313   * Stop threads for hfile deletion
314   */
315  private void stopHFileDeleteThreads() {
316    running = false;
317    LOG.debug("Stopping file delete threads");
318    for(Thread thread: threads){
319      thread.interrupt();
320    }
321  }
322
323  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
324
325    @Override
326    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
327      // larger file first so reverse compare
328      int cmp = Long.compare(o2.fileLength, o1.fileLength);
329      if (cmp != 0) {
330        return cmp;
331      }
332      // just use hashCode to generate a stable result.
333      return System.identityHashCode(o1) - System.identityHashCode(o2);
334    }
335  };
336
337  private static final class HFileDeleteTask {
338
339    boolean done = false;
340    boolean result;
341    final Path filePath;
342    final long fileLength;
343    final long timeoutMsec;
344
345    public HFileDeleteTask(FileStatus file, long timeoutMsec) {
346      this.filePath = file.getPath();
347      this.fileLength = file.getLen();
348      this.timeoutMsec = timeoutMsec;
349    }
350
351    public synchronized void setResult(boolean result) {
352      this.done = true;
353      this.result = result;
354      notify();
355    }
356
357    public synchronized boolean getResult(long waitIfNotFinished) {
358      long waitTimeMsec = 0;
359      try {
360        while (!done) {
361          long startTimeNanos = System.nanoTime();
362          wait(waitIfNotFinished);
363          waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
364              TimeUnit.NANOSECONDS);
365          if (done) {
366            return this.result;
367          }
368          if (waitTimeMsec > timeoutMsec) {
369            LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath
370                + ", exit...");
371            return false;
372          }
373        }
374      } catch (InterruptedException e) {
375        LOG.warn("Interrupted while waiting for result of deleting " + filePath
376            + ", will return false", e);
377        return false;
378      }
379      return this.result;
380    }
381  }
382
383  public List<Thread> getCleanerThreads() {
384    return threads;
385  }
386
387  public long getNumOfDeletedLargeFiles() {
388    return deletedLargeFiles.get();
389  }
390
391  public long getNumOfDeletedSmallFiles() {
392    return deletedSmallFiles.get();
393  }
394
395  public long getLargeQueueInitSize() {
396    return largeQueueInitSize;
397  }
398
399  public long getSmallQueueInitSize() {
400    return smallQueueInitSize;
401  }
402
403  public long getThrottlePoint() {
404    return throttlePoint;
405  }
406
407  long getCleanerThreadTimeoutMsec() {
408    return cleanerThreadTimeoutMsec;
409  }
410
411  long getCleanerThreadCheckIntervalMsec() {
412    return cleanerThreadCheckIntervalMsec;
413  }
414
415  @Override
416  public void onConfigurationChange(Configuration conf) {
417    if (!checkAndUpdateConfigurations(conf)) {
418      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
419      return;
420    }
421    stopHFileDeleteThreads();
422    // record the left over tasks
423    List<HFileDeleteTask> leftOverTasks =
424        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
425    leftOverTasks.addAll(largeFileQueue);
426    leftOverTasks.addAll(smallFileQueue);
427    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
428    smallFileQueue = largeFileQueue.getStealFromQueue();
429    threads.clear();
430    startHFileDeleteThreads();
431    // re-dispatch the left over tasks
432    for (HFileDeleteTask task : leftOverTasks) {
433      dispatch(task);
434    }
435  }
436
437  /**
438   * Check new configuration and update settings if value changed
439   * @param conf The new configuration
440   * @return true if any configuration for HFileCleaner changes, false if no change
441   */
442  private boolean checkAndUpdateConfigurations(Configuration conf) {
443    boolean updated = false;
444    int throttlePoint =
445        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
446    if (throttlePoint != this.throttlePoint) {
447      LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint);
448      this.throttlePoint = throttlePoint;
449      updated = true;
450    }
451    int largeQueueInitSize =
452        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
453    if (largeQueueInitSize != this.largeQueueInitSize) {
454      LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize,
455          largeQueueInitSize);
456      this.largeQueueInitSize = largeQueueInitSize;
457      updated = true;
458    }
459    int smallQueueInitSize =
460        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
461    if (smallQueueInitSize != this.smallQueueInitSize) {
462      LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize,
463          smallQueueInitSize);
464      this.smallQueueInitSize = smallQueueInitSize;
465      updated = true;
466    }
467    int largeFileDeleteThreadNumber =
468        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
469    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
470      LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}",
471          this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber);
472      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
473      updated = true;
474    }
475    int smallFileDeleteThreadNumber =
476        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
477    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
478      LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}",
479          this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber);
480      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
481      updated = true;
482    }
483    long cleanerThreadTimeoutMsec =
484        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
485    if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
486      this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
487      updated = true;
488    }
489    long cleanerThreadCheckIntervalMsec =
490        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
491            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
492    if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
493      this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
494      updated = true;
495    }
496    return updated;
497  }
498
499  @Override
500  public synchronized void cancel(boolean mayInterruptIfRunning) {
501    super.cancel(mayInterruptIfRunning);
502    for (Thread t: this.threads) {
503      t.interrupt();
504    }
505  }
506}