001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Stoppable;
034import org.apache.hadoop.hbase.io.HFileLink;
035import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
036import org.apache.hadoop.hbase.util.StealJobQueue;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
041/**
042 * This Chore, every time it runs, will clear the HFiles in the hfile archive
043 * folder that are deletable for each HFile cleaner in the chain.
044 */
045@InterfaceAudience.Private
046public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
047
048  public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
049
050  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
051      Path directory) {
052    this(period, stopper, conf, fs, directory, null);
053  }
054
055  // Configuration key for large/small throttle point
056  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
057      "hbase.regionserver.thread.hfilecleaner.throttle";
058  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
059
060  // Configuration key for large queue initial size
061  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
062      "hbase.regionserver.hfilecleaner.large.queue.size";
063  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
064
065  // Configuration key for small queue initial size
066  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
067      "hbase.regionserver.hfilecleaner.small.queue.size";
068  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
069
070  // Configuration key for large file delete thread number
071  public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
072      "hbase.regionserver.hfilecleaner.large.thread.count";
073  public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
074
075  // Configuration key for small file delete thread number
076  public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
077      "hbase.regionserver.hfilecleaner.small.thread.count";
078  public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
079
080  public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
081      "hbase.regionserver.hfilecleaner.thread.timeout.msec";
082  @VisibleForTesting
083  static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
084
085  public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
086      "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
087  @VisibleForTesting
088  static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
089
090  private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
091
092  StealJobQueue<HFileDeleteTask> largeFileQueue;
093  BlockingQueue<HFileDeleteTask> smallFileQueue;
094  private int throttlePoint;
095  private int largeQueueInitSize;
096  private int smallQueueInitSize;
097  private int largeFileDeleteThreadNumber;
098  private int smallFileDeleteThreadNumber;
099  private long cleanerThreadTimeoutMsec;
100  private long cleanerThreadCheckIntervalMsec;
101  private List<Thread> threads = new ArrayList<Thread>();
102  private boolean running;
103
104  private AtomicLong deletedLargeFiles = new AtomicLong();
105  private AtomicLong deletedSmallFiles = new AtomicLong();
106
107  /**
108   * @param period the period of time to sleep between each run
109   * @param stopper the stopper
110   * @param conf configuration to use
111   * @param fs handle to the FS
112   * @param directory directory to be cleaned
113   * @param params params could be used in subclass of BaseHFileCleanerDelegate
114   */
115  public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
116                      Path directory, Map<String, Object> params) {
117    super("HFileCleaner", period, stopper, conf, fs,
118      directory, MASTER_HFILE_CLEANER_PLUGINS, params);
119    throttlePoint =
120        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
121    largeQueueInitSize =
122        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
123    smallQueueInitSize =
124        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
125    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
126    smallFileQueue = largeFileQueue.getStealFromQueue();
127    largeFileDeleteThreadNumber =
128        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
129    smallFileDeleteThreadNumber =
130        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
131    cleanerThreadTimeoutMsec =
132        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
133    cleanerThreadCheckIntervalMsec =
134        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
135            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
136    startHFileDeleteThreads();
137  }
138
139  @Override
140  protected boolean validate(Path file) {
141    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
142      return true;
143    }
144    return StoreFileInfo.validateStoreFileName(file.getName());
145  }
146
147  /**
148   * Exposed for TESTING!
149   */
150  public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
151    return this.cleanersChain;
152  }
153
154  @Override
155  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
156    int deletedFiles = 0;
157    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
158    // construct delete tasks and add into relative queue
159    for (FileStatus file : filesToDelete) {
160      HFileDeleteTask task = deleteFile(file);
161      if (task != null) {
162        tasks.add(task);
163      }
164    }
165    // wait for each submitted task to finish
166    for (HFileDeleteTask task : tasks) {
167      if (task.getResult(cleanerThreadCheckIntervalMsec)) {
168        deletedFiles++;
169      }
170    }
171    return deletedFiles;
172  }
173
174  /**
175   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
176   * @param file the file to delete
177   * @return HFileDeleteTask to track progress
178   */
179  private HFileDeleteTask deleteFile(FileStatus file) {
180    HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
181    boolean enqueued = dispatch(task);
182    return enqueued ? task : null;
183  }
184
185  private boolean dispatch(HFileDeleteTask task) {
186    if (task.fileLength >= this.throttlePoint) {
187      if (!this.largeFileQueue.offer(task)) {
188        // should never arrive here as long as we use PriorityQueue
189        LOG.trace("Large file deletion queue is full");
190        return false;
191      }
192    } else {
193      if (!this.smallFileQueue.offer(task)) {
194        // should never arrive here as long as we use PriorityQueue
195        LOG.trace("Small file deletion queue is full");
196        return false;
197      }
198    }
199    return true;
200  }
201
202  @Override
203  public synchronized void cleanup() {
204    super.cleanup();
205    stopHFileDeleteThreads();
206  }
207
208  /**
209   * Start threads for hfile deletion
210   */
211  private void startHFileDeleteThreads() {
212    final String n = Thread.currentThread().getName();
213    running = true;
214    // start thread for large file deletion
215    for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
216      Thread large = new Thread() {
217        @Override
218        public void run() {
219          consumerLoop(largeFileQueue);
220        }
221      };
222      large.setDaemon(true);
223      large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
224      large.start();
225      LOG.debug("Starting for large file={}", large);
226      threads.add(large);
227    }
228
229    // start thread for small file deletion
230    for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
231      Thread small = new Thread() {
232        @Override
233        public void run() {
234          consumerLoop(smallFileQueue);
235        }
236      };
237      small.setDaemon(true);
238      small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
239      small.start();
240      LOG.debug("Starting for small files={}", small);
241      threads.add(small);
242    }
243  }
244
245  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
246    try {
247      while (running) {
248        HFileDeleteTask task = null;
249        try {
250          task = queue.take();
251        } catch (InterruptedException e) {
252          LOG.trace("Interrupted while trying to take a task from queue", e);
253          break;
254        }
255        if (task != null) {
256          LOG.trace("Removing {}", task.filePath);
257          boolean succeed;
258          try {
259            succeed = this.fs.delete(task.filePath, false);
260          } catch (IOException e) {
261            LOG.warn("Failed to delete {}", task.filePath, e);
262            succeed = false;
263          }
264          task.setResult(succeed);
265          if (succeed) {
266            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
267          }
268        }
269      }
270    } finally {
271      LOG.debug("Exit {}", Thread.currentThread());
272    }
273  }
274
275  // Currently only for testing purpose
276  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
277    if (isLargeFile) {
278      if (deletedLargeFiles.get() == Long.MAX_VALUE) {
279        LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
280        deletedLargeFiles.set(0L);
281      }
282      deletedLargeFiles.incrementAndGet();
283    } else {
284      if (deletedSmallFiles.get() == Long.MAX_VALUE) {
285        LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
286        deletedSmallFiles.set(0L);
287      }
288      if (fromLargeQueue) {
289        LOG.trace("Stolen a small file deletion task in large file thread");
290      }
291      deletedSmallFiles.incrementAndGet();
292    }
293  }
294
295  /**
296   * Stop threads for hfile deletion
297   */
298  private void stopHFileDeleteThreads() {
299    running = false;
300    LOG.debug("Stopping file delete threads");
301    for(Thread thread: threads){
302      thread.interrupt();
303    }
304  }
305
306  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
307
308    @Override
309    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
310      // larger file first so reverse compare
311      int cmp = Long.compare(o2.fileLength, o1.fileLength);
312      if (cmp != 0) {
313        return cmp;
314      }
315      // just use hashCode to generate a stable result.
316      return System.identityHashCode(o1) - System.identityHashCode(o2);
317    }
318  };
319
320  private static final class HFileDeleteTask {
321
322    boolean done = false;
323    boolean result;
324    final Path filePath;
325    final long fileLength;
326    final long timeoutMsec;
327
328    public HFileDeleteTask(FileStatus file, long timeoutMsec) {
329      this.filePath = file.getPath();
330      this.fileLength = file.getLen();
331      this.timeoutMsec = timeoutMsec;
332    }
333
334    public synchronized void setResult(boolean result) {
335      this.done = true;
336      this.result = result;
337      notify();
338    }
339
340    public synchronized boolean getResult(long waitIfNotFinished) {
341      long waitTimeMsec = 0;
342      try {
343        while (!done) {
344          long startTimeNanos = System.nanoTime();
345          wait(waitIfNotFinished);
346          waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
347              TimeUnit.NANOSECONDS);
348          if (done) {
349            return this.result;
350          }
351          if (waitTimeMsec > timeoutMsec) {
352            LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath
353                + ", exit...");
354            return false;
355          }
356        }
357      } catch (InterruptedException e) {
358        LOG.warn("Interrupted while waiting for result of deleting " + filePath
359            + ", will return false", e);
360        return false;
361      }
362      return this.result;
363    }
364  }
365
366  @VisibleForTesting
367  public List<Thread> getCleanerThreads() {
368    return threads;
369  }
370
371  @VisibleForTesting
372  public long getNumOfDeletedLargeFiles() {
373    return deletedLargeFiles.get();
374  }
375
376  @VisibleForTesting
377  public long getNumOfDeletedSmallFiles() {
378    return deletedSmallFiles.get();
379  }
380
381  @VisibleForTesting
382  public long getLargeQueueInitSize() {
383    return largeQueueInitSize;
384  }
385
386  @VisibleForTesting
387  public long getSmallQueueInitSize() {
388    return smallQueueInitSize;
389  }
390
391  @VisibleForTesting
392  public long getThrottlePoint() {
393    return throttlePoint;
394  }
395
396  @VisibleForTesting
397  long getCleanerThreadTimeoutMsec() {
398    return cleanerThreadTimeoutMsec;
399  }
400
401  @VisibleForTesting
402  long getCleanerThreadCheckIntervalMsec() {
403    return cleanerThreadCheckIntervalMsec;
404  }
405
406  @Override
407  public void onConfigurationChange(Configuration conf) {
408    super.onConfigurationChange(conf);
409
410    if (!checkAndUpdateConfigurations(conf)) {
411      LOG.debug("Update configuration triggered but nothing changed for this cleaner");
412      return;
413    }
414    stopHFileDeleteThreads();
415    // record the left over tasks
416    List<HFileDeleteTask> leftOverTasks =
417        new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
418    leftOverTasks.addAll(largeFileQueue);
419    leftOverTasks.addAll(smallFileQueue);
420    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
421    smallFileQueue = largeFileQueue.getStealFromQueue();
422    threads.clear();
423    startHFileDeleteThreads();
424    // re-dispatch the left over tasks
425    for (HFileDeleteTask task : leftOverTasks) {
426      dispatch(task);
427    }
428  }
429
430  /**
431   * Check new configuration and update settings if value changed
432   * @param conf The new configuration
433   * @return true if any configuration for HFileCleaner changes, false if no change
434   */
435  private boolean checkAndUpdateConfigurations(Configuration conf) {
436    boolean updated = false;
437    int throttlePoint =
438        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
439    if (throttlePoint != this.throttlePoint) {
440      LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint);
441      this.throttlePoint = throttlePoint;
442      updated = true;
443    }
444    int largeQueueInitSize =
445        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
446    if (largeQueueInitSize != this.largeQueueInitSize) {
447      LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize,
448          largeQueueInitSize);
449      this.largeQueueInitSize = largeQueueInitSize;
450      updated = true;
451    }
452    int smallQueueInitSize =
453        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
454    if (smallQueueInitSize != this.smallQueueInitSize) {
455      LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize,
456          smallQueueInitSize);
457      this.smallQueueInitSize = smallQueueInitSize;
458      updated = true;
459    }
460    int largeFileDeleteThreadNumber =
461        conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
462    if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
463      LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}",
464          this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber);
465      this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
466      updated = true;
467    }
468    int smallFileDeleteThreadNumber =
469        conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
470    if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
471      LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}",
472          this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber);
473      this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
474      updated = true;
475    }
476    long cleanerThreadTimeoutMsec =
477        conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
478    if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
479      this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
480      updated = true;
481    }
482    long cleanerThreadCheckIntervalMsec =
483        conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
484            DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
485    if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
486      this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
487      updated = true;
488    }
489    return updated;
490  }
491
492  @Override
493  public synchronized void cancel(boolean mayInterruptIfRunning) {
494    super.cancel(mayInterruptIfRunning);
495    for (Thread t: this.threads) {
496      t.interrupt();
497    }
498  }
499}