001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.compaction;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.conf.Configured;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.CompactionState;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.util.Tool;
050import org.apache.hadoop.util.ToolRunner;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
056import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
060import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
062import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
068
069@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
070public class MajorCompactor extends Configured implements Tool {
071
072  private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
073  protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
074
075  protected ClusterCompactionQueues clusterCompactionQueues;
076  private long timestamp;
077  protected Set<String> storesToCompact;
078  protected ExecutorService executor;
079  protected long sleepForMs;
080  protected Connection connection;
081  protected TableName tableName;
082  private int numServers = -1;
083  private int numRegions = -1;
084  private boolean skipWait = false;
085
086  MajorCompactor() {
087  }
088
089  public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
090    int concurrency, long timestamp, long sleepForMs) throws IOException {
091    this.connection = ConnectionFactory.createConnection(conf);
092    this.tableName = tableName;
093    this.timestamp = timestamp;
094    this.storesToCompact = storesToCompact;
095    this.executor = Executors.newFixedThreadPool(concurrency);
096    this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
097    this.sleepForMs = sleepForMs;
098  }
099
100  public void compactAllRegions() throws Exception {
101    List<Future<?>> futures = Lists.newArrayList();
102    while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
103      while (clusterCompactionQueues.atCapacity()) {
104        LOG.debug("Waiting for servers to complete Compactions");
105        Thread.sleep(sleepForMs);
106      }
107      Optional<ServerName> serverToProcess =
108        clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
109      if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
110        ServerName serverName = serverToProcess.get();
111        // check to see if the region has moved... if so we have to enqueue it again with
112        // the proper serverName
113        MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
114
115        ServerName currentServer = connection.getRegionLocator(tableName)
116          .getRegionLocation(request.getRegion().getStartKey()).getServerName();
117
118        if (!currentServer.equals(serverName)) {
119          // add it back to the queue with the correct server it should be picked up in the future.
120          LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
121            + serverName + " to: " + currentServer + " re-queuing request");
122          clusterCompactionQueues.addToCompactionQueue(currentServer, request);
123          clusterCompactionQueues.releaseCompaction(serverName);
124        } else {
125          LOG.info("Firing off compaction request for server: " + serverName + ", " + request
126            + " total queue size left: "
127            + clusterCompactionQueues.getCompactionRequestsLeftToFinish());
128          futures.add(executor.submit(new Compact(serverName, request)));
129        }
130      } else {
131        // haven't assigned anything so we sleep.
132        Thread.sleep(sleepForMs);
133      }
134    }
135    LOG.info("All compactions have completed");
136  }
137
138  private boolean futuresComplete(List<Future<?>> futures) {
139    futures.removeIf(Future::isDone);
140    return futures.isEmpty();
141  }
142
143  public void shutdown() throws Exception {
144    executor.shutdown();
145    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
146    if (!ERRORS.isEmpty()) {
147      StringBuilder builder = new StringBuilder().append("Major compaction failed, there were: ")
148        .append(ERRORS.size()).append(" regions / stores that failed compacting\n")
149        .append("Failed compaction requests\n").append("--------------------------\n")
150        .append(Joiner.on("\n").join(ERRORS));
151      LOG.error(builder.toString());
152    }
153    if (connection != null) {
154      connection.close();
155    }
156    LOG.info("All regions major compacted successfully");
157  }
158
159  @InterfaceAudience.Private
160  void initializeWorkQueues() throws IOException {
161    if (storesToCompact.isEmpty()) {
162      connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
163        .forEach(a -> storesToCompact.add(Bytes.toString(a)));
164      LOG.info("No family specified, will execute for all families");
165    }
166    LOG.info(
167      "Initializing compaction queues for table:  " + tableName + " with cf: " + storesToCompact);
168
169    Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap();
170    /*
171     * If numservers is specified, stop inspecting regions beyond the numservers, it will serve to
172     * throttle and won't end up scanning all the regions in the event there are not many regions to
173     * compact based on the criteria.
174     */
175    for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
176      List<RegionInfo> regions = snRegionMap.get(sn);
177      LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
178
179      /*
180       * If the tool is run periodically, then we could shuffle the regions and provide some random
181       * order to select regions. Helps if numregions is specified.
182       */
183      Collections.shuffle(regions);
184      int regionsToCompact = numRegions;
185      for (RegionInfo hri : regions) {
186        if (numRegions > 0 && regionsToCompact <= 0) {
187          LOG.debug("Reached region limit for server: " + sn);
188          break;
189        }
190
191        Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
192        if (request.isPresent()) {
193          LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
194          clusterCompactionQueues.addToCompactionQueue(sn, request.get());
195          if (numRegions > 0) {
196            regionsToCompact--;
197          }
198        }
199      }
200    }
201  }
202
203  protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
204    throws IOException {
205    return MajorCompactionRequest.newRequest(connection, hri, storesToCompact, timestamp);
206  }
207
208  private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
209    if (numServers < 0 || snSet.size() <= numServers) {
210      return snSet;
211
212    } else {
213      List<ServerName> snList = Lists.newArrayList(snSet);
214      Collections.shuffle(snList);
215      return snList.subList(0, numServers);
216    }
217  }
218
219  private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
220    Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
221    List<HRegionLocation> regionLocations =
222      connection.getRegionLocator(tableName).getAllRegionLocations();
223    for (HRegionLocation regionLocation : regionLocations) {
224      ServerName sn = regionLocation.getServerName();
225      RegionInfo hri = regionLocation.getRegion();
226      if (!snRegionMap.containsKey(sn)) {
227        snRegionMap.put(sn, Lists.newArrayList());
228      }
229      snRegionMap.get(sn).add(hri);
230    }
231    return snRegionMap;
232  }
233
234  public void setNumServers(int numServers) {
235    this.numServers = numServers;
236  }
237
238  public void setNumRegions(int numRegions) {
239    this.numRegions = numRegions;
240  }
241
242  public void setSkipWait(boolean skipWait) {
243    this.skipWait = skipWait;
244  }
245
246  class Compact implements Runnable {
247
248    private final ServerName serverName;
249    private final MajorCompactionRequest request;
250
251    Compact(ServerName serverName, MajorCompactionRequest request) {
252      this.serverName = serverName;
253      this.request = request;
254    }
255
256    @Override
257    public void run() {
258      try {
259        compactAndWait(request);
260      } catch (NotServingRegionException e) {
261        // this region has split or merged
262        LOG.warn("Region is invalid, requesting updated regions", e);
263        // lets updated the cluster compaction queues with these newly created regions.
264        addNewRegions();
265      } catch (Exception e) {
266        LOG.warn("Error compacting:", e);
267      } finally {
268        clusterCompactionQueues.releaseCompaction(serverName);
269      }
270    }
271
272    void compactAndWait(MajorCompactionRequest request) throws Exception {
273      Admin admin = connection.getAdmin();
274      try {
275        // only make the request if the region is not already major compacting
276        if (!isCompacting(request)) {
277          Set<String> stores = getStoresRequiringCompaction(request);
278          if (!stores.isEmpty()) {
279            request.setStores(stores);
280            for (String store : request.getStores()) {
281              compactRegionOnServer(request, admin, store);
282            }
283          }
284        }
285
286        /*
287         * In some scenarios like compacting TTLed regions, the compaction itself won't take time
288         * and hence we can skip the wait. An external tool will also be triggered frequently and
289         * the next run can identify region movements and compact them.
290         */
291        if (!skipWait) {
292          while (isCompacting(request)) {
293            Thread.sleep(sleepForMs);
294            LOG.debug("Waiting for compaction to complete for region: "
295              + request.getRegion().getEncodedName());
296          }
297        }
298      } finally {
299        if (!skipWait) {
300          // Make sure to wait for the CompactedFileDischarger chore to do its work
301          int waitForArchive = connection.getConfiguration()
302            .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
303          Thread.sleep(waitForArchive);
304          // check if compaction completed successfully, otherwise put that request back in the
305          // proper queue
306          Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
307          if (!storesRequiringCompaction.isEmpty()) {
308            // this happens, when a region server is marked as dead, flushes a store file and
309            // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
310            // thus you have more store files on the filesystem than the regionserver knows about.
311            boolean regionHasNotMoved = connection.getRegionLocator(tableName)
312              .getRegionLocation(request.getRegion().getStartKey()).getServerName()
313              .equals(serverName);
314            if (regionHasNotMoved) {
315              LOG.error(
316                "Not all store files were compacted, this may be due to the regionserver not "
317                  + "being aware of all store files.  Will not reattempt compacting, " + request);
318              ERRORS.add(request);
319            } else {
320              request.setStores(storesRequiringCompaction);
321              clusterCompactionQueues.addToCompactionQueue(serverName, request);
322              LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
323                + " region: " + request.getRegion().getEncodedName());
324            }
325          } else {
326            LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
327              + " -> cf(s): " + request.getStores());
328          }
329        }
330      }
331    }
332
333    private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
334      throws IOException {
335      admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(store));
336    }
337  }
338
339  private boolean isCompacting(MajorCompactionRequest request) throws Exception {
340    CompactionState compactionState = connection.getAdmin()
341      .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
342    return compactionState.equals(CompactionState.MAJOR)
343      || compactionState.equals(CompactionState.MAJOR_AND_MINOR);
344  }
345
346  private void addNewRegions() {
347    try {
348      List<HRegionLocation> locations =
349        connection.getRegionLocator(tableName).getAllRegionLocations();
350      for (HRegionLocation location : locations) {
351        if (location.getRegion().getRegionId() > timestamp) {
352          Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
353            .newRequest(connection, location.getRegion(), storesToCompact, timestamp);
354          compactionRequest.ifPresent(request -> clusterCompactionQueues
355            .addToCompactionQueue(location.getServerName(), request));
356        }
357      }
358    } catch (IOException e) {
359      throw new RuntimeException(e);
360    }
361  }
362
363  protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
364    throws IOException {
365    return request.getStoresRequiringCompaction(storesToCompact, timestamp);
366  }
367
368  protected Options getCommonOptions() {
369    Options options = new Options();
370
371    options.addOption(
372      Option.builder("servers").required().desc("Concurrent servers compacting").hasArg().build());
373    options.addOption(Option.builder("minModTime")
374      .desc("Compact if store files have modification time < minModTime").hasArg().build());
375    options.addOption(Option.builder("zk").optionalArg(true).desc("zk quorum").hasArg().build());
376    options.addOption(
377      Option.builder("rootDir").optionalArg(true).desc("hbase.rootDir").hasArg().build());
378    options.addOption(Option.builder("sleep")
379      .desc("Time to sleepForMs (ms) for checking compaction status per region and available "
380        + "work queues: default 30s")
381      .hasArg().build());
382    options.addOption(Option.builder("retries")
383      .desc("Max # of retries for a compaction request," + " defaults to 3").hasArg().build());
384    options.addOption(Option.builder("dryRun")
385      .desc("Dry run, will just output a list of regions that require compaction based on "
386        + "parameters passed")
387      .hasArg(false).build());
388
389    options.addOption(Option.builder("skipWait").desc("Skip waiting after triggering compaction.")
390      .hasArg(false).build());
391
392    options.addOption(Option.builder("numservers").optionalArg(true)
393      .desc("Number of servers to compact in this run, defaults to all").hasArg().build());
394
395    options.addOption(Option.builder("numregions").optionalArg(true)
396      .desc("Number of regions to compact per server, defaults to all").hasArg().build());
397    return options;
398  }
399
400  @Override
401  public int run(String[] args) throws Exception {
402    Options options = getCommonOptions();
403    options.addOption(Option.builder("table").required().desc("table name").hasArg().build());
404    options.addOption(Option.builder("cf").optionalArg(true)
405      .desc("column families: comma separated eg: a,b,c").hasArg().build());
406
407    final CommandLineParser cmdLineParser = new DefaultParser();
408    CommandLine commandLine = null;
409    try {
410      commandLine = cmdLineParser.parse(options, args);
411    } catch (ParseException parseException) {
412      System.out.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args)
413        + " due to: " + parseException);
414      printUsage(options);
415      return -1;
416    }
417    if (commandLine == null) {
418      System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
419      printUsage(options);
420      return -1;
421    }
422    String tableName = commandLine.getOptionValue("table");
423    String cf = commandLine.getOptionValue("cf");
424    Set<String> families = Sets.newHashSet();
425    if (cf != null) {
426      Iterables.addAll(families, Splitter.on(",").split(cf));
427    }
428
429    Configuration configuration = getConf();
430    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
431    long minModTime = Long.parseLong(commandLine.getOptionValue("minModTime",
432      String.valueOf(EnvironmentEdgeManager.currentTime())));
433    String quorum =
434      commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
435    String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
436    long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
437
438    int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
439    int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
440
441    configuration.set(HConstants.HBASE_DIR, rootDir);
442    configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
443
444    MajorCompactor compactor = new MajorCompactor(configuration, TableName.valueOf(tableName),
445      families, concurrency, minModTime, sleep);
446    compactor.setNumServers(numServers);
447    compactor.setNumRegions(numRegions);
448    compactor.setSkipWait(commandLine.hasOption("skipWait"));
449
450    compactor.initializeWorkQueues();
451    if (!commandLine.hasOption("dryRun")) {
452      compactor.compactAllRegions();
453    }
454    compactor.shutdown();
455    return ERRORS.size();
456  }
457
458  protected static void printUsage(final Options options) {
459    String header = "\nUsage instructions\n\n";
460    String footer = "\n";
461    HelpFormatter formatter = new HelpFormatter();
462    formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
463  }
464
465  public static void main(String[] args) throws Exception {
466    ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
467  }
468}