001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.apache.hadoop.hbase.util.compaction; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.List; 022import java.util.Optional; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.CompactionState; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 047import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 051import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 052import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; 054import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 055import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 058 059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 060public class MajorCompactor { 061 062 private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); 063 private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet(); 064 065 private final ClusterCompactionQueues clusterCompactionQueues; 066 private final long timestamp; 067 private final Set<String> storesToCompact; 068 private final ExecutorService executor; 069 private final long sleepForMs; 070 private final Connection connection; 071 private final TableName tableName; 072 073 public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact, 074 int concurrency, long timestamp, long sleepForMs) throws IOException { 075 this.connection = ConnectionFactory.createConnection(conf); 076 this.tableName = tableName; 077 this.timestamp = timestamp; 078 this.storesToCompact = storesToCompact; 079 this.executor = Executors.newFixedThreadPool(concurrency); 080 this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); 081 this.sleepForMs = sleepForMs; 082 } 083 084 public void compactAllRegions() throws Exception { 085 List<Future<?>> futures = Lists.newArrayList(); 086 while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) { 087 while (clusterCompactionQueues.atCapacity()) { 088 LOG.debug("Waiting for servers to complete Compactions"); 089 Thread.sleep(sleepForMs); 090 } 091 Optional<ServerName> serverToProcess = 092 clusterCompactionQueues.getLargestQueueFromServersNotCompacting(); 093 if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) { 094 ServerName serverName = serverToProcess.get(); 095 // check to see if the region has moved... if so we have to enqueue it again with 096 // the proper serverName 097 MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName); 098 099 ServerName currentServer = connection.getRegionLocator(tableName) 100 .getRegionLocation(request.getRegion().getStartKey()).getServerName(); 101 102 if (!currentServer.equals(serverName)) { 103 // add it back to the queue with the correct server it should be picked up in the future. 104 LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " 105 + serverName + " to: " + currentServer + " re-queuing request"); 106 clusterCompactionQueues.addToCompactionQueue(currentServer, request); 107 clusterCompactionQueues.releaseCompaction(serverName); 108 } else { 109 LOG.info("Firing off compaction request for server: " + serverName + ", " + request 110 + " total queue size left: " + clusterCompactionQueues 111 .getCompactionRequestsLeftToFinish()); 112 futures.add(executor.submit(new Compact(serverName, request))); 113 } 114 } else { 115 // haven't assigned anything so we sleep. 116 Thread.sleep(sleepForMs); 117 } 118 } 119 LOG.info("All compactions have completed"); 120 } 121 122 private boolean futuresComplete(List<Future<?>> futures) { 123 futures.removeIf(Future::isDone); 124 return futures.isEmpty(); 125 } 126 127 public void shutdown() throws Exception { 128 executor.shutdown(); 129 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 130 if (!ERRORS.isEmpty()) { 131 StringBuilder builder = 132 new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size()) 133 .append(" regions / stores that failed compacting\n") 134 .append("Failed compaction requests\n").append("--------------------------\n") 135 .append(Joiner.on("\n").join(ERRORS)); 136 LOG.error(builder.toString()); 137 } 138 if (connection != null) { 139 connection.close(); 140 } 141 LOG.info("All regions major compacted successfully"); 142 } 143 144 @VisibleForTesting void initializeWorkQueues() throws IOException { 145 if (storesToCompact.isEmpty()) { 146 connection.getTable(tableName).getDescriptor().getColumnFamilyNames() 147 .forEach(a -> storesToCompact.add(Bytes.toString(a))); 148 LOG.info("No family specified, will execute for all families"); 149 } 150 LOG.info( 151 "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); 152 List<HRegionLocation> regionLocations = 153 connection.getRegionLocator(tableName).getAllRegionLocations(); 154 for (HRegionLocation location : regionLocations) { 155 Optional<MajorCompactionRequest> request = MajorCompactionRequest 156 .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, 157 timestamp); 158 request.ifPresent(majorCompactionRequest -> clusterCompactionQueues 159 .addToCompactionQueue(location.getServerName(), majorCompactionRequest)); 160 } 161 } 162 163 class Compact implements Runnable { 164 165 private final ServerName serverName; 166 private final MajorCompactionRequest request; 167 168 Compact(ServerName serverName, MajorCompactionRequest request) { 169 this.serverName = serverName; 170 this.request = request; 171 } 172 173 @Override public void run() { 174 try { 175 compactAndWait(request); 176 } catch (NotServingRegionException e) { 177 // this region has split or merged 178 LOG.warn("Region is invalid, requesting updated regions", e); 179 // lets updated the cluster compaction queues with these newly created regions. 180 addNewRegions(); 181 } catch (Exception e) { 182 LOG.warn("Error compacting:", e); 183 } finally { 184 clusterCompactionQueues.releaseCompaction(serverName); 185 } 186 } 187 188 void compactAndWait(MajorCompactionRequest request) throws Exception { 189 Admin admin = connection.getAdmin(); 190 try { 191 // only make the request if the region is not already major compacting 192 if (!isCompacting(request)) { 193 Set<String> stores = request.getStoresRequiringCompaction(storesToCompact); 194 if (!stores.isEmpty()) { 195 request.setStores(stores); 196 for (String store : request.getStores()) { 197 admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), 198 Bytes.toBytes(store)); 199 } 200 } 201 } 202 while (isCompacting(request)) { 203 Thread.sleep(sleepForMs); 204 LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() 205 .getEncodedName()); 206 } 207 } finally { 208 // Make sure to wait for the CompactedFileDischarger chore to do its work 209 int waitForArchive = connection.getConfiguration() 210 .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 211 Thread.sleep(waitForArchive); 212 // check if compaction completed successfully, otherwise put that request back in the 213 // proper queue 214 Set<String> storesRequiringCompaction = 215 request.getStoresRequiringCompaction(storesToCompact); 216 if (!storesRequiringCompaction.isEmpty()) { 217 // this happens, when a region server is marked as dead, flushes a store file and 218 // the new regionserver doesn't pick it up because its accounted for in the WAL replay, 219 // thus you have more store files on the filesystem than the regionserver knows about. 220 boolean regionHasNotMoved = connection.getRegionLocator(tableName) 221 .getRegionLocation(request.getRegion().getStartKey()).getServerName() 222 .equals(serverName); 223 if (regionHasNotMoved) { 224 LOG.error("Not all store files were compacted, this may be due to the regionserver not " 225 + "being aware of all store files. Will not reattempt compacting, " + request); 226 ERRORS.add(request); 227 } else { 228 request.setStores(storesRequiringCompaction); 229 clusterCompactionQueues.addToCompactionQueue(serverName, request); 230 LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction 231 + " region: " + request.getRegion().getEncodedName()); 232 } 233 } else { 234 LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() 235 + " -> cf(s): " + request.getStores()); 236 } 237 } 238 } 239 } 240 241 private boolean isCompacting(MajorCompactionRequest request) throws Exception { 242 CompactionState compactionState = connection.getAdmin() 243 .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); 244 return compactionState.equals(CompactionState.MAJOR) || compactionState 245 .equals(CompactionState.MAJOR_AND_MINOR); 246 } 247 248 private void addNewRegions() { 249 try { 250 List<HRegionLocation> locations = 251 connection.getRegionLocator(tableName).getAllRegionLocations(); 252 for (HRegionLocation location : locations) { 253 if (location.getRegion().getRegionId() > timestamp) { 254 Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest 255 .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, 256 timestamp); 257 compactionRequest.ifPresent(request -> clusterCompactionQueues 258 .addToCompactionQueue(location.getServerName(), request)); 259 } 260 } 261 } catch (IOException e) { 262 throw new RuntimeException(e); 263 } 264 } 265 266 public static void main(String[] args) throws Exception { 267 Options options = new Options(); 268 options.addOption( 269 Option.builder("table") 270 .required() 271 .desc("table name") 272 .hasArg() 273 .build() 274 ); 275 options.addOption( 276 Option.builder("cf") 277 .optionalArg(true) 278 .desc("column families: comma separated eg: a,b,c") 279 .hasArg() 280 .build() 281 ); 282 options.addOption( 283 Option.builder("servers") 284 .required() 285 .desc("Concurrent servers compacting") 286 .hasArg() 287 .build() 288 ); 289 options.addOption( 290 Option.builder("minModTime"). 291 desc("Compact if store files have modification time < minModTime") 292 .hasArg() 293 .build() 294 ); 295 options.addOption( 296 Option.builder("zk") 297 .optionalArg(true) 298 .desc("zk quorum") 299 .hasArg() 300 .build() 301 ); 302 options.addOption( 303 Option.builder("rootDir") 304 .optionalArg(true) 305 .desc("hbase.rootDir") 306 .hasArg() 307 .build() 308 ); 309 options.addOption( 310 Option.builder("sleep") 311 .desc("Time to sleepForMs (ms) for checking compaction status per region and available " 312 + "work queues: default 30s") 313 .hasArg() 314 .build() 315 ); 316 options.addOption( 317 Option.builder("retries") 318 .desc("Max # of retries for a compaction request," + " defaults to 3") 319 .hasArg() 320 .build() 321 ); 322 options.addOption( 323 Option.builder("dryRun") 324 .desc("Dry run, will just output a list of regions that require compaction based on " 325 + "parameters passed") 326 .hasArg(false) 327 .build() 328 ); 329 330 final CommandLineParser cmdLineParser = new DefaultParser(); 331 CommandLine commandLine = null; 332 try { 333 commandLine = cmdLineParser.parse(options, args); 334 } catch (ParseException parseException) { 335 System.out.println( 336 "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " 337 + parseException); 338 printUsage(options); 339 return; 340 } 341 if (commandLine == null) { 342 System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); 343 printUsage(options); 344 return; 345 } 346 String tableName = commandLine.getOptionValue("table"); 347 String cf = commandLine.getOptionValue("cf", null); 348 Set<String> families = Sets.newHashSet(); 349 if (cf != null) { 350 Iterables.addAll(families, Splitter.on(",").split(cf)); 351 } 352 353 354 Configuration configuration = HBaseConfiguration.create(); 355 int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); 356 long minModTime = Long.parseLong( 357 commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis()))); 358 String quorum = 359 commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM)); 360 String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); 361 long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); 362 363 configuration.set(HConstants.HBASE_DIR, rootDir); 364 configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); 365 366 MajorCompactor compactor = 367 new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, 368 minModTime, sleep); 369 370 compactor.initializeWorkQueues(); 371 if (!commandLine.hasOption("dryRun")) { 372 compactor.compactAllRegions(); 373 } 374 compactor.shutdown(); 375 } 376 377 private static void printUsage(final Options options) { 378 String header = "\nUsage instructions\n\n"; 379 String footer = "\n"; 380 HelpFormatter formatter = new HelpFormatter(); 381 formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); 382 } 383 384}