001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.apache.hadoop.hbase.util.compaction; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.CompactionState; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.util.Tool; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 054import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 055import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 056import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 059import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 060import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 062import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 063import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 064import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 067 068@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 069public class MajorCompactor extends Configured implements Tool { 070 071 private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); 072 protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet(); 073 074 protected ClusterCompactionQueues clusterCompactionQueues; 075 private long timestamp; 076 protected Set<String> storesToCompact; 077 protected ExecutorService executor; 078 protected long sleepForMs; 079 protected Connection connection; 080 protected TableName tableName; 081 private int numServers = -1; 082 private int numRegions = -1; 083 private boolean skipWait = false; 084 085 MajorCompactor() { 086 } 087 088 public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact, 089 int concurrency, long timestamp, long sleepForMs) throws IOException { 090 this.connection = ConnectionFactory.createConnection(conf); 091 this.tableName = tableName; 092 this.timestamp = timestamp; 093 this.storesToCompact = storesToCompact; 094 this.executor = Executors.newFixedThreadPool(concurrency); 095 this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); 096 this.sleepForMs = sleepForMs; 097 } 098 099 public void compactAllRegions() throws Exception { 100 List<Future<?>> futures = Lists.newArrayList(); 101 while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) { 102 while (clusterCompactionQueues.atCapacity()) { 103 LOG.debug("Waiting for servers to complete Compactions"); 104 Thread.sleep(sleepForMs); 105 } 106 Optional<ServerName> serverToProcess = 107 clusterCompactionQueues.getLargestQueueFromServersNotCompacting(); 108 if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) { 109 ServerName serverName = serverToProcess.get(); 110 // check to see if the region has moved... if so we have to enqueue it again with 111 // the proper serverName 112 MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName); 113 114 ServerName currentServer = connection.getRegionLocator(tableName) 115 .getRegionLocation(request.getRegion().getStartKey()).getServerName(); 116 117 if (!currentServer.equals(serverName)) { 118 // add it back to the queue with the correct server it should be picked up in the future. 119 LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " 120 + serverName + " to: " + currentServer + " re-queuing request"); 121 clusterCompactionQueues.addToCompactionQueue(currentServer, request); 122 clusterCompactionQueues.releaseCompaction(serverName); 123 } else { 124 LOG.info("Firing off compaction request for server: " + serverName + ", " + request 125 + " total queue size left: " + clusterCompactionQueues 126 .getCompactionRequestsLeftToFinish()); 127 futures.add(executor.submit(new Compact(serverName, request))); 128 } 129 } else { 130 // haven't assigned anything so we sleep. 131 Thread.sleep(sleepForMs); 132 } 133 } 134 LOG.info("All compactions have completed"); 135 } 136 137 private boolean futuresComplete(List<Future<?>> futures) { 138 futures.removeIf(Future::isDone); 139 return futures.isEmpty(); 140 } 141 142 public void shutdown() throws Exception { 143 executor.shutdown(); 144 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 145 if (!ERRORS.isEmpty()) { 146 StringBuilder builder = 147 new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size()) 148 .append(" regions / stores that failed compacting\n") 149 .append("Failed compaction requests\n").append("--------------------------\n") 150 .append(Joiner.on("\n").join(ERRORS)); 151 LOG.error(builder.toString()); 152 } 153 if (connection != null) { 154 connection.close(); 155 } 156 LOG.info("All regions major compacted successfully"); 157 } 158 159 @VisibleForTesting void initializeWorkQueues() throws IOException { 160 if (storesToCompact.isEmpty()) { 161 connection.getTable(tableName).getDescriptor().getColumnFamilyNames() 162 .forEach(a -> storesToCompact.add(Bytes.toString(a))); 163 LOG.info("No family specified, will execute for all families"); 164 } 165 LOG.info( 166 "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); 167 168 Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap(); 169 /* 170 * If numservers is specified, stop inspecting regions beyond the numservers, it will serve 171 * to throttle and won't end up scanning all the regions in the event there are not many 172 * regions to compact based on the criteria. 173 */ 174 for (ServerName sn : getServersToCompact(snRegionMap.keySet())) { 175 List<RegionInfo> regions = snRegionMap.get(sn); 176 LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size()); 177 178 /* 179 * If the tool is run periodically, then we could shuffle the regions and provide 180 * some random order to select regions. Helps if numregions is specified. 181 */ 182 Collections.shuffle(regions); 183 int regionsToCompact = numRegions; 184 for (RegionInfo hri : regions) { 185 if (numRegions > 0 && regionsToCompact <= 0) { 186 LOG.debug("Reached region limit for server: " + sn); 187 break; 188 } 189 190 Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri); 191 if (request.isPresent()) { 192 LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction"); 193 clusterCompactionQueues.addToCompactionQueue(sn, request.get()); 194 if (numRegions > 0) { 195 regionsToCompact--; 196 } 197 } 198 } 199 } 200 } 201 202 protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri) 203 throws IOException { 204 return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact, 205 timestamp); 206 } 207 208 private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) { 209 if(numServers < 0 || snSet.size() <= numServers) { 210 return snSet; 211 212 } else { 213 List<ServerName> snList = Lists.newArrayList(snSet); 214 Collections.shuffle(snList); 215 return snList.subList(0, numServers); 216 } 217 } 218 219 private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException { 220 Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap(); 221 List<HRegionLocation> regionLocations = 222 connection.getRegionLocator(tableName).getAllRegionLocations(); 223 for (HRegionLocation regionLocation : regionLocations) { 224 ServerName sn = regionLocation.getServerName(); 225 RegionInfo hri = regionLocation.getRegion(); 226 if (!snRegionMap.containsKey(sn)) { 227 snRegionMap.put(sn, Lists.newArrayList()); 228 } 229 snRegionMap.get(sn).add(hri); 230 } 231 return snRegionMap; 232 } 233 234 public void setNumServers(int numServers) { 235 this.numServers = numServers; 236 } 237 238 public void setNumRegions(int numRegions) { 239 this.numRegions = numRegions; 240 } 241 242 public void setSkipWait(boolean skipWait) { 243 this.skipWait = skipWait; 244 } 245 246 class Compact implements Runnable { 247 248 private final ServerName serverName; 249 private final MajorCompactionRequest request; 250 251 Compact(ServerName serverName, MajorCompactionRequest request) { 252 this.serverName = serverName; 253 this.request = request; 254 } 255 256 @Override public void run() { 257 try { 258 compactAndWait(request); 259 } catch (NotServingRegionException e) { 260 // this region has split or merged 261 LOG.warn("Region is invalid, requesting updated regions", e); 262 // lets updated the cluster compaction queues with these newly created regions. 263 addNewRegions(); 264 } catch (Exception e) { 265 LOG.warn("Error compacting:", e); 266 } finally { 267 clusterCompactionQueues.releaseCompaction(serverName); 268 } 269 } 270 271 void compactAndWait(MajorCompactionRequest request) throws Exception { 272 Admin admin = connection.getAdmin(); 273 try { 274 // only make the request if the region is not already major compacting 275 if (!isCompacting(request)) { 276 Set<String> stores = getStoresRequiringCompaction(request); 277 if (!stores.isEmpty()) { 278 request.setStores(stores); 279 for (String store : request.getStores()) { 280 compactRegionOnServer(request, admin, store); 281 } 282 } 283 } 284 285 /* 286 * In some scenarios like compacting TTLed regions, the compaction itself won't take time 287 * and hence we can skip the wait. An external tool will also be triggered frequently and 288 * the next run can identify region movements and compact them. 289 */ 290 if (!skipWait) { 291 while (isCompacting(request)) { 292 Thread.sleep(sleepForMs); 293 LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() 294 .getEncodedName()); 295 } 296 } 297 } finally { 298 if (!skipWait) { 299 // Make sure to wait for the CompactedFileDischarger chore to do its work 300 int waitForArchive = connection.getConfiguration() 301 .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 302 Thread.sleep(waitForArchive); 303 // check if compaction completed successfully, otherwise put that request back in the 304 // proper queue 305 Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request); 306 if (!storesRequiringCompaction.isEmpty()) { 307 // this happens, when a region server is marked as dead, flushes a store file and 308 // the new regionserver doesn't pick it up because its accounted for in the WAL replay, 309 // thus you have more store files on the filesystem than the regionserver knows about. 310 boolean regionHasNotMoved = connection.getRegionLocator(tableName) 311 .getRegionLocation(request.getRegion().getStartKey()).getServerName() 312 .equals(serverName); 313 if (regionHasNotMoved) { 314 LOG.error( 315 "Not all store files were compacted, this may be due to the regionserver not " 316 + "being aware of all store files. Will not reattempt compacting, " 317 + request); 318 ERRORS.add(request); 319 } else { 320 request.setStores(storesRequiringCompaction); 321 clusterCompactionQueues.addToCompactionQueue(serverName, request); 322 LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction 323 + " region: " + request.getRegion().getEncodedName()); 324 } 325 } else { 326 LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() 327 + " -> cf(s): " + request.getStores()); 328 } 329 } 330 } 331 } 332 333 private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store) 334 throws IOException { 335 admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), 336 Bytes.toBytes(store)); 337 } 338 } 339 340 private boolean isCompacting(MajorCompactionRequest request) throws Exception { 341 CompactionState compactionState = connection.getAdmin() 342 .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); 343 return compactionState.equals(CompactionState.MAJOR) || compactionState 344 .equals(CompactionState.MAJOR_AND_MINOR); 345 } 346 347 private void addNewRegions() { 348 try { 349 List<HRegionLocation> locations = 350 connection.getRegionLocator(tableName).getAllRegionLocations(); 351 for (HRegionLocation location : locations) { 352 if (location.getRegion().getRegionId() > timestamp) { 353 Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest 354 .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, 355 timestamp); 356 compactionRequest.ifPresent(request -> clusterCompactionQueues 357 .addToCompactionQueue(location.getServerName(), request)); 358 } 359 } 360 } catch (IOException e) { 361 throw new RuntimeException(e); 362 } 363 } 364 365 protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request) 366 throws IOException { 367 return request.getStoresRequiringCompaction(storesToCompact, timestamp); 368 } 369 370 protected Options getCommonOptions() { 371 Options options = new Options(); 372 373 options.addOption( 374 Option.builder("servers") 375 .required() 376 .desc("Concurrent servers compacting") 377 .hasArg() 378 .build() 379 ); 380 options.addOption( 381 Option.builder("minModTime"). 382 desc("Compact if store files have modification time < minModTime") 383 .hasArg() 384 .build() 385 ); 386 options.addOption( 387 Option.builder("zk") 388 .optionalArg(true) 389 .desc("zk quorum") 390 .hasArg() 391 .build() 392 ); 393 options.addOption( 394 Option.builder("rootDir") 395 .optionalArg(true) 396 .desc("hbase.rootDir") 397 .hasArg() 398 .build() 399 ); 400 options.addOption( 401 Option.builder("sleep") 402 .desc("Time to sleepForMs (ms) for checking compaction status per region and available " 403 + "work queues: default 30s") 404 .hasArg() 405 .build() 406 ); 407 options.addOption( 408 Option.builder("retries") 409 .desc("Max # of retries for a compaction request," + " defaults to 3") 410 .hasArg() 411 .build() 412 ); 413 options.addOption( 414 Option.builder("dryRun") 415 .desc("Dry run, will just output a list of regions that require compaction based on " 416 + "parameters passed") 417 .hasArg(false) 418 .build() 419 ); 420 421 options.addOption( 422 Option.builder("skipWait") 423 .desc("Skip waiting after triggering compaction.") 424 .hasArg(false) 425 .build() 426 ); 427 428 options.addOption( 429 Option.builder("numservers") 430 .optionalArg(true) 431 .desc("Number of servers to compact in this run, defaults to all") 432 .hasArg() 433 .build() 434 ); 435 436 options.addOption( 437 Option.builder("numregions") 438 .optionalArg(true) 439 .desc("Number of regions to compact per server, defaults to all") 440 .hasArg() 441 .build() 442 ); 443 return options; 444 } 445 446 @Override 447 public int run(String[] args) throws Exception { 448 Options options = getCommonOptions(); 449 options.addOption( 450 Option.builder("table") 451 .required() 452 .desc("table name") 453 .hasArg() 454 .build() 455 ); 456 options.addOption( 457 Option.builder("cf") 458 .optionalArg(true) 459 .desc("column families: comma separated eg: a,b,c") 460 .hasArg() 461 .build() 462 ); 463 464 final CommandLineParser cmdLineParser = new DefaultParser(); 465 CommandLine commandLine = null; 466 try { 467 commandLine = cmdLineParser.parse(options, args); 468 } catch (ParseException parseException) { 469 System.out.println( 470 "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " 471 + parseException); 472 printUsage(options); 473 return -1; 474 } 475 if (commandLine == null) { 476 System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); 477 printUsage(options); 478 return -1; 479 } 480 String tableName = commandLine.getOptionValue("table"); 481 String cf = commandLine.getOptionValue("cf", null); 482 Set<String> families = Sets.newHashSet(); 483 if (cf != null) { 484 Iterables.addAll(families, Splitter.on(",").split(cf)); 485 } 486 487 Configuration configuration = getConf(); 488 int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); 489 long minModTime = Long.parseLong( 490 commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis()))); 491 String quorum = 492 commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM)); 493 String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); 494 long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); 495 496 int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); 497 int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); 498 499 configuration.set(HConstants.HBASE_DIR, rootDir); 500 configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); 501 502 MajorCompactor compactor = 503 new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, 504 minModTime, sleep); 505 compactor.setNumServers(numServers); 506 compactor.setNumRegions(numRegions); 507 compactor.setSkipWait(commandLine.hasOption("skipWait")); 508 509 compactor.initializeWorkQueues(); 510 if (!commandLine.hasOption("dryRun")) { 511 compactor.compactAllRegions(); 512 } 513 compactor.shutdown(); 514 return ERRORS.size(); 515 } 516 517 protected static void printUsage(final Options options) { 518 String header = "\nUsage instructions\n\n"; 519 String footer = "\n"; 520 HelpFormatter formatter = new HelpFormatter(); 521 formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); 522 } 523 524 public static void main(String[] args) throws Exception { 525 ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args); 526 } 527}