001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 036import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.StealJobQueue; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * This Chore, every time it runs, will clear the HFiles in the hfile archive folder that are 045 * deletable for each HFile cleaner in the chain. 046 */ 047@InterfaceAudience.Private 048public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> 049 implements ConfigurationObserver { 050 051 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; 052 053 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 054 Path directory, DirScanPool pool) { 055 this(period, stopper, conf, fs, directory, pool, null); 056 } 057 058 // Configuration key for large/small throttle point 059 public final static String HFILE_DELETE_THROTTLE_THRESHOLD = 060 "hbase.regionserver.thread.hfilecleaner.throttle"; 061 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M 062 063 // Configuration key for large queue initial size 064 public final static String LARGE_HFILE_QUEUE_INIT_SIZE = 065 "hbase.regionserver.hfilecleaner.large.queue.size"; 066 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; 067 068 // Configuration key for small queue initial size 069 public final static String SMALL_HFILE_QUEUE_INIT_SIZE = 070 "hbase.regionserver.hfilecleaner.small.queue.size"; 071 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; 072 073 // Configuration key for large file delete thread number 074 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = 075 "hbase.regionserver.hfilecleaner.large.thread.count"; 076 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; 077 078 // Configuration key for small file delete thread number 079 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = 080 "hbase.regionserver.hfilecleaner.small.thread.count"; 081 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; 082 083 public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = 084 "hbase.regionserver.hfilecleaner.thread.timeout.msec"; 085 static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; 086 087 public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 088 "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; 089 static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; 090 091 /** 092 * The custom paths for hfile cleaner, subdirectories of archive, e.g. 093 * data/default/testTable1,data/default/testTable2 094 */ 095 public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths"; 096 097 /** Configure hfile cleaner classes for the custom paths */ 098 public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS = 099 "hbase.master.hfilecleaner.custom.paths.plugins"; 100 public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size"; 101 102 private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); 103 104 StealJobQueue<HFileDeleteTask> largeFileQueue; 105 BlockingQueue<HFileDeleteTask> smallFileQueue; 106 private int throttlePoint; 107 private int largeQueueInitSize; 108 private int smallQueueInitSize; 109 private int largeFileDeleteThreadNumber; 110 private int smallFileDeleteThreadNumber; 111 private long cleanerThreadTimeoutMsec; 112 private long cleanerThreadCheckIntervalMsec; 113 private List<Thread> threads = new ArrayList<Thread>(); 114 private volatile boolean running; 115 116 private AtomicLong deletedLargeFiles = new AtomicLong(); 117 private AtomicLong deletedSmallFiles = new AtomicLong(); 118 119 /** 120 * @param period the period of time to sleep between each run 121 * @param stopper the stopper 122 * @param conf configuration to use 123 * @param fs handle to the FS 124 * @param directory directory to be cleaned 125 * @param pool the thread pool used to scan directories 126 * @param params params could be used in subclass of BaseHFileCleanerDelegate 127 */ 128 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 129 Path directory, DirScanPool pool, Map<String, Object> params) { 130 this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, 131 params, null); 132 } 133 134 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 135 Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) { 136 this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, 137 params, excludePaths); 138 } 139 140 /** 141 * For creating customized HFileCleaner. 142 * @param name name of the chore being run 143 * @param period the period of time to sleep between each run 144 * @param stopper the stopper 145 * @param conf configuration to use 146 * @param fs handle to the FS 147 * @param directory directory to be cleaned 148 * @param confKey configuration key for the classes to instantiate 149 * @param pool the thread pool used to scan directories 150 * @param params params could be used in subclass of BaseHFileCleanerDelegate 151 */ 152 public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, 153 Path directory, String confKey, DirScanPool pool, Map<String, Object> params, 154 List<Path> excludePaths) { 155 super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths); 156 throttlePoint = 157 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 158 largeQueueInitSize = 159 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 160 smallQueueInitSize = 161 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 162 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 163 smallFileQueue = largeFileQueue.getStealFromQueue(); 164 largeFileDeleteThreadNumber = 165 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 166 smallFileDeleteThreadNumber = 167 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 168 cleanerThreadTimeoutMsec = 169 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 170 cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 171 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 172 startHFileDeleteThreads(); 173 } 174 175 @Override 176 protected boolean validate(Path file) { 177 return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) 178 || StoreFileInfo.validateStoreFileName(file.getName()) 179 || file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 180 } 181 182 /** 183 * Exposed for TESTING! 184 */ 185 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { 186 return this.cleanersChain; 187 } 188 189 @Override 190 public int deleteFiles(Iterable<FileStatus> filesToDelete) { 191 int deletedFiles = 0; 192 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); 193 // construct delete tasks and add into relative queue 194 for (FileStatus file : filesToDelete) { 195 HFileDeleteTask task = deleteFile(file); 196 if (task != null) { 197 tasks.add(task); 198 } 199 } 200 // wait for each submitted task to finish 201 for (HFileDeleteTask task : tasks) { 202 if (task.getResult(cleanerThreadCheckIntervalMsec)) { 203 deletedFiles++; 204 } 205 } 206 return deletedFiles; 207 } 208 209 /** 210 * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue 211 * @param file the file to delete 212 * @return HFileDeleteTask to track progress 213 */ 214 private HFileDeleteTask deleteFile(FileStatus file) { 215 HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); 216 boolean enqueued = dispatch(task); 217 return enqueued ? task : null; 218 } 219 220 private boolean dispatch(HFileDeleteTask task) { 221 if (task.fileLength >= this.throttlePoint) { 222 if (!this.largeFileQueue.offer(task)) { 223 // should never arrive here as long as we use PriorityQueue 224 LOG.trace("Large file deletion queue is full"); 225 return false; 226 } 227 } else { 228 if (!this.smallFileQueue.offer(task)) { 229 // should never arrive here as long as we use PriorityQueue 230 LOG.trace("Small file deletion queue is full"); 231 return false; 232 } 233 } 234 return true; 235 } 236 237 @Override 238 public synchronized void cleanup() { 239 super.cleanup(); 240 stopHFileDeleteThreads(); 241 } 242 243 /** 244 * Start threads for hfile deletion 245 */ 246 private void startHFileDeleteThreads() { 247 final String n = Thread.currentThread().getName(); 248 running = true; 249 // start thread for large file deletion 250 for (int i = 0; i < largeFileDeleteThreadNumber; i++) { 251 Thread large = new Thread() { 252 @Override 253 public void run() { 254 consumerLoop(largeFileQueue); 255 } 256 }; 257 large.setDaemon(true); 258 large.setName(n + "-HFileCleaner.large." + i + "-" + EnvironmentEdgeManager.currentTime()); 259 large.start(); 260 LOG.debug("Starting for large file={}", large); 261 threads.add(large); 262 } 263 264 // start thread for small file deletion 265 for (int i = 0; i < smallFileDeleteThreadNumber; i++) { 266 Thread small = new Thread() { 267 @Override 268 public void run() { 269 consumerLoop(smallFileQueue); 270 } 271 }; 272 small.setDaemon(true); 273 small.setName(n + "-HFileCleaner.small." + i + "-" + EnvironmentEdgeManager.currentTime()); 274 small.start(); 275 LOG.debug("Starting for small files={}", small); 276 threads.add(small); 277 } 278 } 279 280 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { 281 try { 282 while (running) { 283 HFileDeleteTask task = null; 284 try { 285 task = queue.take(); 286 } catch (InterruptedException e) { 287 LOG.trace("Interrupted while trying to take a task from queue", e); 288 break; 289 } 290 if (task != null) { 291 LOG.trace("Removing {}", task.filePath); 292 boolean succeed; 293 try { 294 succeed = this.fs.delete(task.filePath, false); 295 } catch (IOException e) { 296 LOG.warn("Failed to delete {}", task.filePath, e); 297 succeed = false; 298 } 299 task.setResult(succeed); 300 if (succeed) { 301 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); 302 } 303 } 304 } 305 } finally { 306 LOG.debug("Exit {}", Thread.currentThread()); 307 } 308 } 309 310 // Currently only for testing purpose 311 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { 312 if (isLargeFile) { 313 if (deletedLargeFiles.get() == Long.MAX_VALUE) { 314 LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); 315 deletedLargeFiles.set(0L); 316 } 317 deletedLargeFiles.incrementAndGet(); 318 } else { 319 if (deletedSmallFiles.get() == Long.MAX_VALUE) { 320 LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); 321 deletedSmallFiles.set(0L); 322 } 323 if (fromLargeQueue) { 324 LOG.trace("Stolen a small file deletion task in large file thread"); 325 } 326 deletedSmallFiles.incrementAndGet(); 327 } 328 } 329 330 /** 331 * Stop threads for hfile deletion 332 */ 333 private void stopHFileDeleteThreads() { 334 running = false; 335 LOG.debug("Stopping file delete threads"); 336 for (Thread thread : threads) { 337 thread.interrupt(); 338 } 339 } 340 341 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { 342 343 @Override 344 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { 345 // larger file first so reverse compare 346 int cmp = Long.compare(o2.fileLength, o1.fileLength); 347 if (cmp != 0) { 348 return cmp; 349 } 350 // just use hashCode to generate a stable result. 351 return System.identityHashCode(o1) - System.identityHashCode(o2); 352 } 353 }; 354 355 private static final class HFileDeleteTask { 356 357 boolean done = false; 358 boolean result; 359 final Path filePath; 360 final long fileLength; 361 final long timeoutMsec; 362 363 public HFileDeleteTask(FileStatus file, long timeoutMsec) { 364 this.filePath = file.getPath(); 365 this.fileLength = file.getLen(); 366 this.timeoutMsec = timeoutMsec; 367 } 368 369 public synchronized void setResult(boolean result) { 370 this.done = true; 371 this.result = result; 372 notify(); 373 } 374 375 public synchronized boolean getResult(long waitIfNotFinished) { 376 long waitTimeMsec = 0; 377 try { 378 while (!done) { 379 long startTimeNanos = System.nanoTime(); 380 wait(waitIfNotFinished); 381 waitTimeMsec += 382 TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS); 383 if (done) { 384 return this.result; 385 } 386 if (waitTimeMsec > timeoutMsec) { 387 LOG.warn( 388 "Wait more than " + timeoutMsec + " ms for deleting " + this.filePath + ", exit..."); 389 return false; 390 } 391 } 392 } catch (InterruptedException e) { 393 LOG.warn( 394 "Interrupted while waiting for result of deleting " + filePath + ", will return false", 395 e); 396 return false; 397 } 398 return this.result; 399 } 400 } 401 402 public List<Thread> getCleanerThreads() { 403 return threads; 404 } 405 406 public long getNumOfDeletedLargeFiles() { 407 return deletedLargeFiles.get(); 408 } 409 410 public long getNumOfDeletedSmallFiles() { 411 return deletedSmallFiles.get(); 412 } 413 414 public long getLargeQueueInitSize() { 415 return largeQueueInitSize; 416 } 417 418 public long getSmallQueueInitSize() { 419 return smallQueueInitSize; 420 } 421 422 public long getThrottlePoint() { 423 return throttlePoint; 424 } 425 426 long getCleanerThreadTimeoutMsec() { 427 return cleanerThreadTimeoutMsec; 428 } 429 430 long getCleanerThreadCheckIntervalMsec() { 431 return cleanerThreadCheckIntervalMsec; 432 } 433 434 @Override 435 public void onConfigurationChange(Configuration conf) { 436 if (!checkAndUpdateConfigurations(conf)) { 437 LOG.debug("Update configuration triggered but nothing changed for this cleaner"); 438 return; 439 } 440 stopHFileDeleteThreads(); 441 // record the left over tasks 442 List<HFileDeleteTask> leftOverTasks = 443 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); 444 leftOverTasks.addAll(largeFileQueue); 445 leftOverTasks.addAll(smallFileQueue); 446 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 447 smallFileQueue = largeFileQueue.getStealFromQueue(); 448 threads.clear(); 449 startHFileDeleteThreads(); 450 // re-dispatch the left over tasks 451 for (HFileDeleteTask task : leftOverTasks) { 452 dispatch(task); 453 } 454 } 455 456 /** 457 * Check new configuration and update settings if value changed 458 * @param conf The new configuration 459 * @return true if any configuration for HFileCleaner changes, false if no change 460 */ 461 private boolean checkAndUpdateConfigurations(Configuration conf) { 462 boolean updated = false; 463 int throttlePoint = 464 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 465 if (throttlePoint != this.throttlePoint) { 466 LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint); 467 this.throttlePoint = throttlePoint; 468 updated = true; 469 } 470 int largeQueueInitSize = 471 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 472 if (largeQueueInitSize != this.largeQueueInitSize) { 473 LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize, 474 largeQueueInitSize); 475 this.largeQueueInitSize = largeQueueInitSize; 476 updated = true; 477 } 478 int smallQueueInitSize = 479 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 480 if (smallQueueInitSize != this.smallQueueInitSize) { 481 LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize, 482 smallQueueInitSize); 483 this.smallQueueInitSize = smallQueueInitSize; 484 updated = true; 485 } 486 int largeFileDeleteThreadNumber = 487 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 488 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { 489 LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}", 490 this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber); 491 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; 492 updated = true; 493 } 494 int smallFileDeleteThreadNumber = 495 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 496 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { 497 LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}", 498 this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber); 499 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; 500 updated = true; 501 } 502 long cleanerThreadTimeoutMsec = 503 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 504 if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { 505 this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; 506 updated = true; 507 } 508 long cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 509 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 510 if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { 511 this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; 512 updated = true; 513 } 514 return updated; 515 } 516 517 @Override 518 public synchronized void cancel(boolean mayInterruptIfRunning) { 519 super.cancel(mayInterruptIfRunning); 520 for (Thread t : this.threads) { 521 t.interrupt(); 522 } 523 } 524}