001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 036import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 037import org.apache.hadoop.hbase.util.StealJobQueue; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043/** 044 * This Chore, every time it runs, will clear the HFiles in the hfile archive 045 * folder that are deletable for each HFile cleaner in the chain. 046 */ 047@InterfaceAudience.Private 048public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> 049 implements ConfigurationObserver { 050 051 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; 052 053 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 054 Path directory, DirScanPool pool) { 055 this(period, stopper, conf, fs, directory, pool, null); 056 } 057 058 // Configuration key for large/small throttle point 059 public final static String HFILE_DELETE_THROTTLE_THRESHOLD = 060 "hbase.regionserver.thread.hfilecleaner.throttle"; 061 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M 062 063 // Configuration key for large queue initial size 064 public final static String LARGE_HFILE_QUEUE_INIT_SIZE = 065 "hbase.regionserver.hfilecleaner.large.queue.size"; 066 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; 067 068 // Configuration key for small queue initial size 069 public final static String SMALL_HFILE_QUEUE_INIT_SIZE = 070 "hbase.regionserver.hfilecleaner.small.queue.size"; 071 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; 072 073 // Configuration key for large file delete thread number 074 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = 075 "hbase.regionserver.hfilecleaner.large.thread.count"; 076 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; 077 078 // Configuration key for small file delete thread number 079 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = 080 "hbase.regionserver.hfilecleaner.small.thread.count"; 081 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; 082 083 public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = 084 "hbase.regionserver.hfilecleaner.thread.timeout.msec"; 085 @VisibleForTesting 086 static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; 087 088 public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 089 "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; 090 @VisibleForTesting 091 static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; 092 093 private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); 094 095 StealJobQueue<HFileDeleteTask> largeFileQueue; 096 BlockingQueue<HFileDeleteTask> smallFileQueue; 097 private int throttlePoint; 098 private int largeQueueInitSize; 099 private int smallQueueInitSize; 100 private int largeFileDeleteThreadNumber; 101 private int smallFileDeleteThreadNumber; 102 private long cleanerThreadTimeoutMsec; 103 private long cleanerThreadCheckIntervalMsec; 104 private List<Thread> threads = new ArrayList<Thread>(); 105 private volatile boolean running; 106 107 private AtomicLong deletedLargeFiles = new AtomicLong(); 108 private AtomicLong deletedSmallFiles = new AtomicLong(); 109 110 /** 111 * @param period the period of time to sleep between each run 112 * @param stopper the stopper 113 * @param conf configuration to use 114 * @param fs handle to the FS 115 * @param directory directory to be cleaned 116 * @param pool the thread pool used to scan directories 117 * @param params params could be used in subclass of BaseHFileCleanerDelegate 118 */ 119 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 120 Path directory, DirScanPool pool, Map<String, Object> params) { 121 this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, 122 params); 123 124 } 125 126 /** 127 * For creating customized HFileCleaner. 128 * @param name name of the chore being run 129 * @param period the period of time to sleep between each run 130 * @param stopper the stopper 131 * @param conf configuration to use 132 * @param fs handle to the FS 133 * @param directory directory to be cleaned 134 * @param confKey configuration key for the classes to instantiate 135 * @param pool the thread pool used to scan directories 136 * @param params params could be used in subclass of BaseHFileCleanerDelegate 137 */ 138 public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, 139 Path directory, String confKey, DirScanPool pool, Map<String, Object> params) { 140 super(name, period, stopper, conf, fs, directory, confKey, pool, params); 141 throttlePoint = 142 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 143 largeQueueInitSize = 144 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 145 smallQueueInitSize = 146 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 147 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 148 smallFileQueue = largeFileQueue.getStealFromQueue(); 149 largeFileDeleteThreadNumber = 150 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 151 smallFileDeleteThreadNumber = 152 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 153 cleanerThreadTimeoutMsec = 154 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 155 cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 156 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 157 startHFileDeleteThreads(); 158 } 159 160 @Override 161 protected boolean validate(Path file) { 162 return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) || 163 StoreFileInfo.validateStoreFileName(file.getName()) || 164 file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 165 } 166 167 /** 168 * Exposed for TESTING! 169 */ 170 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { 171 return this.cleanersChain; 172 } 173 174 @Override 175 public int deleteFiles(Iterable<FileStatus> filesToDelete) { 176 int deletedFiles = 0; 177 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); 178 // construct delete tasks and add into relative queue 179 for (FileStatus file : filesToDelete) { 180 HFileDeleteTask task = deleteFile(file); 181 if (task != null) { 182 tasks.add(task); 183 } 184 } 185 // wait for each submitted task to finish 186 for (HFileDeleteTask task : tasks) { 187 if (task.getResult(cleanerThreadCheckIntervalMsec)) { 188 deletedFiles++; 189 } 190 } 191 return deletedFiles; 192 } 193 194 /** 195 * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue 196 * @param file the file to delete 197 * @return HFileDeleteTask to track progress 198 */ 199 private HFileDeleteTask deleteFile(FileStatus file) { 200 HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); 201 boolean enqueued = dispatch(task); 202 return enqueued ? task : null; 203 } 204 205 private boolean dispatch(HFileDeleteTask task) { 206 if (task.fileLength >= this.throttlePoint) { 207 if (!this.largeFileQueue.offer(task)) { 208 // should never arrive here as long as we use PriorityQueue 209 LOG.trace("Large file deletion queue is full"); 210 return false; 211 } 212 } else { 213 if (!this.smallFileQueue.offer(task)) { 214 // should never arrive here as long as we use PriorityQueue 215 LOG.trace("Small file deletion queue is full"); 216 return false; 217 } 218 } 219 return true; 220 } 221 222 @Override 223 public synchronized void cleanup() { 224 super.cleanup(); 225 stopHFileDeleteThreads(); 226 } 227 228 /** 229 * Start threads for hfile deletion 230 */ 231 private void startHFileDeleteThreads() { 232 final String n = Thread.currentThread().getName(); 233 running = true; 234 // start thread for large file deletion 235 for (int i = 0; i < largeFileDeleteThreadNumber; i++) { 236 Thread large = new Thread() { 237 @Override 238 public void run() { 239 consumerLoop(largeFileQueue); 240 } 241 }; 242 large.setDaemon(true); 243 large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); 244 large.start(); 245 LOG.debug("Starting for large file={}", large); 246 threads.add(large); 247 } 248 249 // start thread for small file deletion 250 for (int i = 0; i < smallFileDeleteThreadNumber; i++) { 251 Thread small = new Thread() { 252 @Override 253 public void run() { 254 consumerLoop(smallFileQueue); 255 } 256 }; 257 small.setDaemon(true); 258 small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); 259 small.start(); 260 LOG.debug("Starting for small files={}", small); 261 threads.add(small); 262 } 263 } 264 265 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { 266 try { 267 while (running) { 268 HFileDeleteTask task = null; 269 try { 270 task = queue.take(); 271 } catch (InterruptedException e) { 272 LOG.trace("Interrupted while trying to take a task from queue", e); 273 break; 274 } 275 if (task != null) { 276 LOG.trace("Removing {}", task.filePath); 277 boolean succeed; 278 try { 279 succeed = this.fs.delete(task.filePath, false); 280 } catch (IOException e) { 281 LOG.warn("Failed to delete {}", task.filePath, e); 282 succeed = false; 283 } 284 task.setResult(succeed); 285 if (succeed) { 286 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); 287 } 288 } 289 } 290 } finally { 291 LOG.debug("Exit {}", Thread.currentThread()); 292 } 293 } 294 295 // Currently only for testing purpose 296 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { 297 if (isLargeFile) { 298 if (deletedLargeFiles.get() == Long.MAX_VALUE) { 299 LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); 300 deletedLargeFiles.set(0L); 301 } 302 deletedLargeFiles.incrementAndGet(); 303 } else { 304 if (deletedSmallFiles.get() == Long.MAX_VALUE) { 305 LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); 306 deletedSmallFiles.set(0L); 307 } 308 if (fromLargeQueue) { 309 LOG.trace("Stolen a small file deletion task in large file thread"); 310 } 311 deletedSmallFiles.incrementAndGet(); 312 } 313 } 314 315 /** 316 * Stop threads for hfile deletion 317 */ 318 private void stopHFileDeleteThreads() { 319 running = false; 320 LOG.debug("Stopping file delete threads"); 321 for(Thread thread: threads){ 322 thread.interrupt(); 323 } 324 } 325 326 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { 327 328 @Override 329 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { 330 // larger file first so reverse compare 331 int cmp = Long.compare(o2.fileLength, o1.fileLength); 332 if (cmp != 0) { 333 return cmp; 334 } 335 // just use hashCode to generate a stable result. 336 return System.identityHashCode(o1) - System.identityHashCode(o2); 337 } 338 }; 339 340 private static final class HFileDeleteTask { 341 342 boolean done = false; 343 boolean result; 344 final Path filePath; 345 final long fileLength; 346 final long timeoutMsec; 347 348 public HFileDeleteTask(FileStatus file, long timeoutMsec) { 349 this.filePath = file.getPath(); 350 this.fileLength = file.getLen(); 351 this.timeoutMsec = timeoutMsec; 352 } 353 354 public synchronized void setResult(boolean result) { 355 this.done = true; 356 this.result = result; 357 notify(); 358 } 359 360 public synchronized boolean getResult(long waitIfNotFinished) { 361 long waitTimeMsec = 0; 362 try { 363 while (!done) { 364 long startTimeNanos = System.nanoTime(); 365 wait(waitIfNotFinished); 366 waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 367 TimeUnit.NANOSECONDS); 368 if (done) { 369 return this.result; 370 } 371 if (waitTimeMsec > timeoutMsec) { 372 LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath 373 + ", exit..."); 374 return false; 375 } 376 } 377 } catch (InterruptedException e) { 378 LOG.warn("Interrupted while waiting for result of deleting " + filePath 379 + ", will return false", e); 380 return false; 381 } 382 return this.result; 383 } 384 } 385 386 @VisibleForTesting 387 public List<Thread> getCleanerThreads() { 388 return threads; 389 } 390 391 @VisibleForTesting 392 public long getNumOfDeletedLargeFiles() { 393 return deletedLargeFiles.get(); 394 } 395 396 @VisibleForTesting 397 public long getNumOfDeletedSmallFiles() { 398 return deletedSmallFiles.get(); 399 } 400 401 @VisibleForTesting 402 public long getLargeQueueInitSize() { 403 return largeQueueInitSize; 404 } 405 406 @VisibleForTesting 407 public long getSmallQueueInitSize() { 408 return smallQueueInitSize; 409 } 410 411 @VisibleForTesting 412 public long getThrottlePoint() { 413 return throttlePoint; 414 } 415 416 @VisibleForTesting 417 long getCleanerThreadTimeoutMsec() { 418 return cleanerThreadTimeoutMsec; 419 } 420 421 @VisibleForTesting 422 long getCleanerThreadCheckIntervalMsec() { 423 return cleanerThreadCheckIntervalMsec; 424 } 425 426 @Override 427 public void onConfigurationChange(Configuration conf) { 428 if (!checkAndUpdateConfigurations(conf)) { 429 LOG.debug("Update configuration triggered but nothing changed for this cleaner"); 430 return; 431 } 432 stopHFileDeleteThreads(); 433 // record the left over tasks 434 List<HFileDeleteTask> leftOverTasks = 435 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); 436 leftOverTasks.addAll(largeFileQueue); 437 leftOverTasks.addAll(smallFileQueue); 438 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 439 smallFileQueue = largeFileQueue.getStealFromQueue(); 440 threads.clear(); 441 startHFileDeleteThreads(); 442 // re-dispatch the left over tasks 443 for (HFileDeleteTask task : leftOverTasks) { 444 dispatch(task); 445 } 446 } 447 448 /** 449 * Check new configuration and update settings if value changed 450 * @param conf The new configuration 451 * @return true if any configuration for HFileCleaner changes, false if no change 452 */ 453 private boolean checkAndUpdateConfigurations(Configuration conf) { 454 boolean updated = false; 455 int throttlePoint = 456 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 457 if (throttlePoint != this.throttlePoint) { 458 LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint); 459 this.throttlePoint = throttlePoint; 460 updated = true; 461 } 462 int largeQueueInitSize = 463 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 464 if (largeQueueInitSize != this.largeQueueInitSize) { 465 LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize, 466 largeQueueInitSize); 467 this.largeQueueInitSize = largeQueueInitSize; 468 updated = true; 469 } 470 int smallQueueInitSize = 471 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 472 if (smallQueueInitSize != this.smallQueueInitSize) { 473 LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize, 474 smallQueueInitSize); 475 this.smallQueueInitSize = smallQueueInitSize; 476 updated = true; 477 } 478 int largeFileDeleteThreadNumber = 479 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 480 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { 481 LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}", 482 this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber); 483 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; 484 updated = true; 485 } 486 int smallFileDeleteThreadNumber = 487 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 488 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { 489 LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}", 490 this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber); 491 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; 492 updated = true; 493 } 494 long cleanerThreadTimeoutMsec = 495 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 496 if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { 497 this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; 498 updated = true; 499 } 500 long cleanerThreadCheckIntervalMsec = 501 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 502 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 503 if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { 504 this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; 505 updated = true; 506 } 507 return updated; 508 } 509 510 @Override 511 public synchronized void cancel(boolean mayInterruptIfRunning) { 512 super.cancel(mayInterruptIfRunning); 513 for (Thread t: this.threads) { 514 t.interrupt(); 515 } 516 } 517}