001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * http://www.apache.org/licenses/LICENSE-2.0
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.apache.hadoop.hbase.util.compaction;
018
019import java.io.IOException;
020import java.util.Arrays;
021import java.util.List;
022import java.util.Optional;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.Future;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.NotServingRegionException;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.CompactionState;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
047import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
058
059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
060public class MajorCompactor {
061
062  private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
063  private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
064
065  private final ClusterCompactionQueues clusterCompactionQueues;
066  private final long timestamp;
067  private final Set<String> storesToCompact;
068  private final ExecutorService executor;
069  private final long sleepForMs;
070  private final Connection connection;
071  private final TableName tableName;
072
073  public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
074      int concurrency, long timestamp, long sleepForMs) throws IOException {
075    this.connection = ConnectionFactory.createConnection(conf);
076    this.tableName = tableName;
077    this.timestamp = timestamp;
078    this.storesToCompact = storesToCompact;
079    this.executor = Executors.newFixedThreadPool(concurrency);
080    this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
081    this.sleepForMs = sleepForMs;
082  }
083
084  public void compactAllRegions() throws Exception {
085    List<Future<?>> futures = Lists.newArrayList();
086    while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
087      while (clusterCompactionQueues.atCapacity()) {
088        LOG.debug("Waiting for servers to complete Compactions");
089        Thread.sleep(sleepForMs);
090      }
091      Optional<ServerName> serverToProcess =
092          clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
093      if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
094        ServerName serverName = serverToProcess.get();
095        // check to see if the region has moved... if so we have to enqueue it again with
096        // the proper serverName
097        MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
098
099        ServerName currentServer = connection.getRegionLocator(tableName)
100            .getRegionLocation(request.getRegion().getStartKey()).getServerName();
101
102        if (!currentServer.equals(serverName)) {
103          // add it back to the queue with the correct server it should be picked up in the future.
104          LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
105              + serverName + " to: " + currentServer + " re-queuing request");
106          clusterCompactionQueues.addToCompactionQueue(currentServer, request);
107          clusterCompactionQueues.releaseCompaction(serverName);
108        } else {
109          LOG.info("Firing off compaction request for server: " + serverName + ", " + request
110              + " total queue size left: " + clusterCompactionQueues
111              .getCompactionRequestsLeftToFinish());
112          futures.add(executor.submit(new Compact(serverName, request)));
113        }
114      } else {
115        // haven't assigned anything so we sleep.
116        Thread.sleep(sleepForMs);
117      }
118    }
119    LOG.info("All compactions have completed");
120  }
121
122  private boolean futuresComplete(List<Future<?>> futures) {
123    futures.removeIf(Future::isDone);
124    return futures.isEmpty();
125  }
126
127  public void shutdown() throws Exception {
128    executor.shutdown();
129    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
130    if (!ERRORS.isEmpty()) {
131      StringBuilder builder =
132          new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size())
133              .append(" regions / stores that failed compacting\n")
134              .append("Failed compaction requests\n").append("--------------------------\n")
135              .append(Joiner.on("\n").join(ERRORS));
136      LOG.error(builder.toString());
137    }
138    if (connection != null) {
139      connection.close();
140    }
141    LOG.info("All regions major compacted successfully");
142  }
143
144  @VisibleForTesting void initializeWorkQueues() throws IOException {
145    if (storesToCompact.isEmpty()) {
146      connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
147          .forEach(a -> storesToCompact.add(Bytes.toString(a)));
148      LOG.info("No family specified, will execute for all families");
149    }
150    LOG.info(
151        "Initializing compaction queues for table:  " + tableName + " with cf: " + storesToCompact);
152    List<HRegionLocation> regionLocations =
153        connection.getRegionLocator(tableName).getAllRegionLocations();
154    for (HRegionLocation location : regionLocations) {
155      Optional<MajorCompactionRequest> request = MajorCompactionRequest
156          .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
157              timestamp);
158      request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
159          .addToCompactionQueue(location.getServerName(), majorCompactionRequest));
160    }
161  }
162
163  class Compact implements Runnable {
164
165    private final ServerName serverName;
166    private final MajorCompactionRequest request;
167
168    Compact(ServerName serverName, MajorCompactionRequest request) {
169      this.serverName = serverName;
170      this.request = request;
171    }
172
173    @Override public void run() {
174      try {
175        compactAndWait(request);
176      } catch (NotServingRegionException e) {
177        // this region has split or merged
178        LOG.warn("Region is invalid, requesting updated regions", e);
179        // lets updated the cluster compaction queues with these newly created regions.
180        addNewRegions();
181      } catch (Exception e) {
182        LOG.warn("Error compacting:", e);
183      } finally {
184        clusterCompactionQueues.releaseCompaction(serverName);
185      }
186    }
187
188    void compactAndWait(MajorCompactionRequest request) throws Exception {
189      Admin admin = connection.getAdmin();
190      try {
191        // only make the request if the region is not already major compacting
192        if (!isCompacting(request)) {
193          Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
194          if (!stores.isEmpty()) {
195            request.setStores(stores);
196            for (String store : request.getStores()) {
197              admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
198                  Bytes.toBytes(store));
199            }
200          }
201        }
202        while (isCompacting(request)) {
203          Thread.sleep(sleepForMs);
204          LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
205              .getEncodedName());
206        }
207      } finally {
208        // Make sure to wait for the CompactedFileDischarger chore to do its work
209        int waitForArchive = connection.getConfiguration()
210            .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
211        Thread.sleep(waitForArchive);
212        // check if compaction completed successfully, otherwise put that request back in the
213        // proper queue
214        Set<String> storesRequiringCompaction =
215            request.getStoresRequiringCompaction(storesToCompact);
216        if (!storesRequiringCompaction.isEmpty()) {
217          // this happens, when a region server is marked as dead, flushes a store file and
218          // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
219          // thus you have more store files on the filesystem than the regionserver knows about.
220          boolean regionHasNotMoved = connection.getRegionLocator(tableName)
221              .getRegionLocation(request.getRegion().getStartKey()).getServerName()
222              .equals(serverName);
223          if (regionHasNotMoved) {
224            LOG.error("Not all store files were compacted, this may be due to the regionserver not "
225                + "being aware of all store files.  Will not reattempt compacting, " + request);
226            ERRORS.add(request);
227          } else {
228            request.setStores(storesRequiringCompaction);
229            clusterCompactionQueues.addToCompactionQueue(serverName, request);
230            LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
231                + " region: " + request.getRegion().getEncodedName());
232          }
233        } else {
234          LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
235              + " -> cf(s): " + request.getStores());
236        }
237      }
238    }
239  }
240
241  private boolean isCompacting(MajorCompactionRequest request) throws Exception {
242    CompactionState compactionState = connection.getAdmin()
243        .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
244    return compactionState.equals(CompactionState.MAJOR) || compactionState
245        .equals(CompactionState.MAJOR_AND_MINOR);
246  }
247
248  private void addNewRegions() {
249    try {
250      List<HRegionLocation> locations =
251          connection.getRegionLocator(tableName).getAllRegionLocations();
252      for (HRegionLocation location : locations) {
253        if (location.getRegion().getRegionId() > timestamp) {
254          Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
255              .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
256                  timestamp);
257          compactionRequest.ifPresent(request -> clusterCompactionQueues
258              .addToCompactionQueue(location.getServerName(), request));
259        }
260      }
261    } catch (IOException e) {
262      throw new RuntimeException(e);
263    }
264  }
265
266  public static void main(String[] args) throws Exception {
267    Options options = new Options();
268    options.addOption(
269        Option.builder("table")
270            .required()
271            .desc("table name")
272            .hasArg()
273            .build()
274    );
275    options.addOption(
276        Option.builder("cf")
277            .optionalArg(true)
278            .desc("column families: comma separated eg: a,b,c")
279            .hasArg()
280            .build()
281    );
282    options.addOption(
283        Option.builder("servers")
284            .required()
285            .desc("Concurrent servers compacting")
286            .hasArg()
287            .build()
288    );
289    options.addOption(
290        Option.builder("minModTime").
291            desc("Compact if store files have modification time < minModTime")
292            .hasArg()
293            .build()
294    );
295    options.addOption(
296        Option.builder("zk")
297            .optionalArg(true)
298            .desc("zk quorum")
299            .hasArg()
300            .build()
301    );
302    options.addOption(
303        Option.builder("rootDir")
304            .optionalArg(true)
305            .desc("hbase.rootDir")
306            .hasArg()
307            .build()
308    );
309    options.addOption(
310        Option.builder("sleep")
311            .desc("Time to sleepForMs (ms) for checking compaction status per region and available "
312                + "work queues: default 30s")
313            .hasArg()
314            .build()
315    );
316    options.addOption(
317        Option.builder("retries")
318        .desc("Max # of retries for a compaction request," + " defaults to 3")
319            .hasArg()
320            .build()
321    );
322    options.addOption(
323        Option.builder("dryRun")
324            .desc("Dry run, will just output a list of regions that require compaction based on "
325            + "parameters passed")
326            .hasArg(false)
327            .build()
328    );
329
330    final CommandLineParser cmdLineParser = new DefaultParser();
331    CommandLine commandLine = null;
332    try {
333      commandLine = cmdLineParser.parse(options, args);
334    } catch (ParseException parseException) {
335      System.out.println(
336          "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
337              + parseException);
338      printUsage(options);
339      return;
340    }
341    if (commandLine == null) {
342      System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
343      printUsage(options);
344      return;
345    }
346    String tableName = commandLine.getOptionValue("table");
347    String cf = commandLine.getOptionValue("cf", null);
348    Set<String> families = Sets.newHashSet();
349    if (cf != null) {
350      Iterables.addAll(families, Splitter.on(",").split(cf));
351    }
352
353
354    Configuration configuration = HBaseConfiguration.create();
355    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
356    long minModTime = Long.parseLong(
357        commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
358    String quorum =
359        commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
360    String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
361    long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
362
363    configuration.set(HConstants.HBASE_DIR, rootDir);
364    configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
365
366    MajorCompactor compactor =
367        new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
368            minModTime, sleep);
369
370    compactor.initializeWorkQueues();
371    if (!commandLine.hasOption("dryRun")) {
372      compactor.compactAllRegions();
373    }
374    compactor.shutdown();
375  }
376
377  private static void printUsage(final Options options) {
378    String header = "\nUsage instructions\n\n";
379    String footer = "\n";
380    HelpFormatter formatter = new HelpFormatter();
381    formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
382  }
383
384}