001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.compaction; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.CompactionState; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.util.Tool; 050import org.apache.hadoop.util.ToolRunner; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 056import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 060import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 064import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 067import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 068 069@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 070public class MajorCompactor extends Configured implements Tool { 071 072 private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); 073 protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet(); 074 075 protected ClusterCompactionQueues clusterCompactionQueues; 076 private long timestamp; 077 protected Set<String> storesToCompact; 078 protected ExecutorService executor; 079 protected long sleepForMs; 080 protected Connection connection; 081 protected TableName tableName; 082 private int numServers = -1; 083 private int numRegions = -1; 084 private boolean skipWait = false; 085 086 MajorCompactor() { 087 } 088 089 public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact, 090 int concurrency, long timestamp, long sleepForMs) throws IOException { 091 this.connection = ConnectionFactory.createConnection(conf); 092 this.tableName = tableName; 093 this.timestamp = timestamp; 094 this.storesToCompact = storesToCompact; 095 this.executor = Executors.newFixedThreadPool(concurrency); 096 this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); 097 this.sleepForMs = sleepForMs; 098 } 099 100 public void compactAllRegions() throws Exception { 101 List<Future<?>> futures = Lists.newArrayList(); 102 while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) { 103 while (clusterCompactionQueues.atCapacity()) { 104 LOG.debug("Waiting for servers to complete Compactions"); 105 Thread.sleep(sleepForMs); 106 } 107 Optional<ServerName> serverToProcess = 108 clusterCompactionQueues.getLargestQueueFromServersNotCompacting(); 109 if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) { 110 ServerName serverName = serverToProcess.get(); 111 // check to see if the region has moved... if so we have to enqueue it again with 112 // the proper serverName 113 MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName); 114 115 ServerName currentServer = connection.getRegionLocator(tableName) 116 .getRegionLocation(request.getRegion().getStartKey()).getServerName(); 117 118 if (!currentServer.equals(serverName)) { 119 // add it back to the queue with the correct server it should be picked up in the future. 120 LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " 121 + serverName + " to: " + currentServer + " re-queuing request"); 122 clusterCompactionQueues.addToCompactionQueue(currentServer, request); 123 clusterCompactionQueues.releaseCompaction(serverName); 124 } else { 125 LOG.info("Firing off compaction request for server: " + serverName + ", " + request 126 + " total queue size left: " 127 + clusterCompactionQueues.getCompactionRequestsLeftToFinish()); 128 futures.add(executor.submit(new Compact(serverName, request))); 129 } 130 } else { 131 // haven't assigned anything so we sleep. 132 Thread.sleep(sleepForMs); 133 } 134 } 135 LOG.info("All compactions have completed"); 136 } 137 138 private boolean futuresComplete(List<Future<?>> futures) { 139 futures.removeIf(Future::isDone); 140 return futures.isEmpty(); 141 } 142 143 public void shutdown() throws Exception { 144 executor.shutdown(); 145 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 146 if (!ERRORS.isEmpty()) { 147 StringBuilder builder = new StringBuilder().append("Major compaction failed, there were: ") 148 .append(ERRORS.size()).append(" regions / stores that failed compacting\n") 149 .append("Failed compaction requests\n").append("--------------------------\n") 150 .append(Joiner.on("\n").join(ERRORS)); 151 LOG.error(builder.toString()); 152 } 153 if (connection != null) { 154 connection.close(); 155 } 156 LOG.info("All regions major compacted successfully"); 157 } 158 159 @InterfaceAudience.Private 160 void initializeWorkQueues() throws IOException { 161 if (storesToCompact.isEmpty()) { 162 connection.getTable(tableName).getDescriptor().getColumnFamilyNames() 163 .forEach(a -> storesToCompact.add(Bytes.toString(a))); 164 LOG.info("No family specified, will execute for all families"); 165 } 166 LOG.info( 167 "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); 168 169 Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap(); 170 /* 171 * If numservers is specified, stop inspecting regions beyond the numservers, it will serve to 172 * throttle and won't end up scanning all the regions in the event there are not many regions to 173 * compact based on the criteria. 174 */ 175 for (ServerName sn : getServersToCompact(snRegionMap.keySet())) { 176 List<RegionInfo> regions = snRegionMap.get(sn); 177 LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size()); 178 179 /* 180 * If the tool is run periodically, then we could shuffle the regions and provide some random 181 * order to select regions. Helps if numregions is specified. 182 */ 183 Collections.shuffle(regions); 184 int regionsToCompact = numRegions; 185 for (RegionInfo hri : regions) { 186 if (numRegions > 0 && regionsToCompact <= 0) { 187 LOG.debug("Reached region limit for server: " + sn); 188 break; 189 } 190 191 Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri); 192 if (request.isPresent()) { 193 LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction"); 194 clusterCompactionQueues.addToCompactionQueue(sn, request.get()); 195 if (numRegions > 0) { 196 regionsToCompact--; 197 } 198 } 199 } 200 } 201 } 202 203 protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri) 204 throws IOException { 205 return MajorCompactionRequest.newRequest(connection, hri, storesToCompact, timestamp); 206 } 207 208 private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) { 209 if (numServers < 0 || snSet.size() <= numServers) { 210 return snSet; 211 212 } else { 213 List<ServerName> snList = Lists.newArrayList(snSet); 214 Collections.shuffle(snList); 215 return snList.subList(0, numServers); 216 } 217 } 218 219 private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException { 220 Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap(); 221 List<HRegionLocation> regionLocations = 222 connection.getRegionLocator(tableName).getAllRegionLocations(); 223 for (HRegionLocation regionLocation : regionLocations) { 224 ServerName sn = regionLocation.getServerName(); 225 RegionInfo hri = regionLocation.getRegion(); 226 if (!snRegionMap.containsKey(sn)) { 227 snRegionMap.put(sn, Lists.newArrayList()); 228 } 229 snRegionMap.get(sn).add(hri); 230 } 231 return snRegionMap; 232 } 233 234 public void setNumServers(int numServers) { 235 this.numServers = numServers; 236 } 237 238 public void setNumRegions(int numRegions) { 239 this.numRegions = numRegions; 240 } 241 242 public void setSkipWait(boolean skipWait) { 243 this.skipWait = skipWait; 244 } 245 246 class Compact implements Runnable { 247 248 private final ServerName serverName; 249 private final MajorCompactionRequest request; 250 251 Compact(ServerName serverName, MajorCompactionRequest request) { 252 this.serverName = serverName; 253 this.request = request; 254 } 255 256 @Override 257 public void run() { 258 try { 259 compactAndWait(request); 260 } catch (NotServingRegionException e) { 261 // this region has split or merged 262 LOG.warn("Region is invalid, requesting updated regions", e); 263 // lets updated the cluster compaction queues with these newly created regions. 264 addNewRegions(); 265 } catch (Exception e) { 266 LOG.warn("Error compacting:", e); 267 } finally { 268 clusterCompactionQueues.releaseCompaction(serverName); 269 } 270 } 271 272 void compactAndWait(MajorCompactionRequest request) throws Exception { 273 Admin admin = connection.getAdmin(); 274 try { 275 // only make the request if the region is not already major compacting 276 if (!isCompacting(request)) { 277 Set<String> stores = getStoresRequiringCompaction(request); 278 if (!stores.isEmpty()) { 279 request.setStores(stores); 280 for (String store : request.getStores()) { 281 compactRegionOnServer(request, admin, store); 282 } 283 } 284 } 285 286 /* 287 * In some scenarios like compacting TTLed regions, the compaction itself won't take time 288 * and hence we can skip the wait. An external tool will also be triggered frequently and 289 * the next run can identify region movements and compact them. 290 */ 291 if (!skipWait) { 292 while (isCompacting(request)) { 293 Thread.sleep(sleepForMs); 294 LOG.debug("Waiting for compaction to complete for region: " 295 + request.getRegion().getEncodedName()); 296 } 297 } 298 } finally { 299 if (!skipWait) { 300 // Make sure to wait for the CompactedFileDischarger chore to do its work 301 int waitForArchive = connection.getConfiguration() 302 .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 303 Thread.sleep(waitForArchive); 304 // check if compaction completed successfully, otherwise put that request back in the 305 // proper queue 306 Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request); 307 if (!storesRequiringCompaction.isEmpty()) { 308 // this happens, when a region server is marked as dead, flushes a store file and 309 // the new regionserver doesn't pick it up because its accounted for in the WAL replay, 310 // thus you have more store files on the filesystem than the regionserver knows about. 311 boolean regionHasNotMoved = connection.getRegionLocator(tableName) 312 .getRegionLocation(request.getRegion().getStartKey()).getServerName() 313 .equals(serverName); 314 if (regionHasNotMoved) { 315 LOG.error( 316 "Not all store files were compacted, this may be due to the regionserver not " 317 + "being aware of all store files. Will not reattempt compacting, " + request); 318 ERRORS.add(request); 319 } else { 320 request.setStores(storesRequiringCompaction); 321 clusterCompactionQueues.addToCompactionQueue(serverName, request); 322 LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction 323 + " region: " + request.getRegion().getEncodedName()); 324 } 325 } else { 326 LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() 327 + " -> cf(s): " + request.getStores()); 328 } 329 } 330 } 331 } 332 333 private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store) 334 throws IOException { 335 admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(store)); 336 } 337 } 338 339 private boolean isCompacting(MajorCompactionRequest request) throws Exception { 340 CompactionState compactionState = connection.getAdmin() 341 .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); 342 return compactionState.equals(CompactionState.MAJOR) 343 || compactionState.equals(CompactionState.MAJOR_AND_MINOR); 344 } 345 346 private void addNewRegions() { 347 try { 348 List<HRegionLocation> locations = 349 connection.getRegionLocator(tableName).getAllRegionLocations(); 350 for (HRegionLocation location : locations) { 351 if (location.getRegion().getRegionId() > timestamp) { 352 Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest 353 .newRequest(connection, location.getRegion(), storesToCompact, timestamp); 354 compactionRequest.ifPresent(request -> clusterCompactionQueues 355 .addToCompactionQueue(location.getServerName(), request)); 356 } 357 } 358 } catch (IOException e) { 359 throw new RuntimeException(e); 360 } 361 } 362 363 protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request) 364 throws IOException { 365 return request.getStoresRequiringCompaction(storesToCompact, timestamp); 366 } 367 368 protected Options getCommonOptions() { 369 Options options = new Options(); 370 371 options.addOption( 372 Option.builder("servers").required().desc("Concurrent servers compacting").hasArg().build()); 373 options.addOption(Option.builder("minModTime") 374 .desc("Compact if store files have modification time < minModTime").hasArg().build()); 375 options.addOption(Option.builder("zk").optionalArg(true).desc("zk quorum").hasArg().build()); 376 options.addOption( 377 Option.builder("rootDir").optionalArg(true).desc("hbase.rootDir").hasArg().build()); 378 options.addOption(Option.builder("sleep") 379 .desc("Time to sleepForMs (ms) for checking compaction status per region and available " 380 + "work queues: default 30s") 381 .hasArg().build()); 382 options.addOption(Option.builder("retries") 383 .desc("Max # of retries for a compaction request," + " defaults to 3").hasArg().build()); 384 options.addOption(Option.builder("dryRun") 385 .desc("Dry run, will just output a list of regions that require compaction based on " 386 + "parameters passed") 387 .hasArg(false).build()); 388 389 options.addOption(Option.builder("skipWait").desc("Skip waiting after triggering compaction.") 390 .hasArg(false).build()); 391 392 options.addOption(Option.builder("numservers").optionalArg(true) 393 .desc("Number of servers to compact in this run, defaults to all").hasArg().build()); 394 395 options.addOption(Option.builder("numregions").optionalArg(true) 396 .desc("Number of regions to compact per server, defaults to all").hasArg().build()); 397 return options; 398 } 399 400 @Override 401 public int run(String[] args) throws Exception { 402 Options options = getCommonOptions(); 403 options.addOption(Option.builder("table").required().desc("table name").hasArg().build()); 404 options.addOption(Option.builder("cf").optionalArg(true) 405 .desc("column families: comma separated eg: a,b,c").hasArg().build()); 406 407 final CommandLineParser cmdLineParser = new DefaultParser(); 408 CommandLine commandLine = null; 409 try { 410 commandLine = cmdLineParser.parse(options, args); 411 } catch (ParseException parseException) { 412 System.out.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args) 413 + " due to: " + parseException); 414 printUsage(options); 415 return -1; 416 } 417 if (commandLine == null) { 418 System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); 419 printUsage(options); 420 return -1; 421 } 422 String tableName = commandLine.getOptionValue("table"); 423 String cf = commandLine.getOptionValue("cf"); 424 Set<String> families = Sets.newHashSet(); 425 if (cf != null) { 426 Iterables.addAll(families, Splitter.on(",").split(cf)); 427 } 428 429 Configuration configuration = getConf(); 430 int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); 431 long minModTime = Long.parseLong(commandLine.getOptionValue("minModTime", 432 String.valueOf(EnvironmentEdgeManager.currentTime()))); 433 String quorum = 434 commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM)); 435 String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); 436 long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); 437 438 int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); 439 int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); 440 441 configuration.set(HConstants.HBASE_DIR, rootDir); 442 configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); 443 444 MajorCompactor compactor = new MajorCompactor(configuration, TableName.valueOf(tableName), 445 families, concurrency, minModTime, sleep); 446 compactor.setNumServers(numServers); 447 compactor.setNumRegions(numRegions); 448 compactor.setSkipWait(commandLine.hasOption("skipWait")); 449 450 compactor.initializeWorkQueues(); 451 if (!commandLine.hasOption("dryRun")) { 452 compactor.compactAllRegions(); 453 } 454 compactor.shutdown(); 455 return ERRORS.size(); 456 } 457 458 protected static void printUsage(final Options options) { 459 String header = "\nUsage instructions\n\n"; 460 String footer = "\n"; 461 HelpFormatter formatter = new HelpFormatter(); 462 formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); 463 } 464 465 public static void main(String[] args) throws Exception { 466 ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args); 467 } 468}