001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * http://www.apache.org/licenses/LICENSE-2.0
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.apache.hadoop.hbase.util.compaction;
018
019import java.io.IOException;
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.conf.Configured;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.CompactionState;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.util.Tool;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
055import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
056import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
059import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
060import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
062import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
067
068@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
069public class MajorCompactor extends Configured implements Tool {
070
071  private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
072  protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
073
074  protected ClusterCompactionQueues clusterCompactionQueues;
075  private long timestamp;
076  protected Set<String> storesToCompact;
077  protected ExecutorService executor;
078  protected long sleepForMs;
079  protected Connection connection;
080  protected TableName tableName;
081  private int numServers = -1;
082  private int numRegions = -1;
083  private boolean skipWait = false;
084
085  MajorCompactor() {
086  }
087
088  public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
089      int concurrency, long timestamp, long sleepForMs) throws IOException {
090    this.connection = ConnectionFactory.createConnection(conf);
091    this.tableName = tableName;
092    this.timestamp = timestamp;
093    this.storesToCompact = storesToCompact;
094    this.executor = Executors.newFixedThreadPool(concurrency);
095    this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
096    this.sleepForMs = sleepForMs;
097  }
098
099  public void compactAllRegions() throws Exception {
100    List<Future<?>> futures = Lists.newArrayList();
101    while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
102      while (clusterCompactionQueues.atCapacity()) {
103        LOG.debug("Waiting for servers to complete Compactions");
104        Thread.sleep(sleepForMs);
105      }
106      Optional<ServerName> serverToProcess =
107          clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
108      if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
109        ServerName serverName = serverToProcess.get();
110        // check to see if the region has moved... if so we have to enqueue it again with
111        // the proper serverName
112        MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
113
114        ServerName currentServer = connection.getRegionLocator(tableName)
115            .getRegionLocation(request.getRegion().getStartKey()).getServerName();
116
117        if (!currentServer.equals(serverName)) {
118          // add it back to the queue with the correct server it should be picked up in the future.
119          LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
120              + serverName + " to: " + currentServer + " re-queuing request");
121          clusterCompactionQueues.addToCompactionQueue(currentServer, request);
122          clusterCompactionQueues.releaseCompaction(serverName);
123        } else {
124          LOG.info("Firing off compaction request for server: " + serverName + ", " + request
125              + " total queue size left: " + clusterCompactionQueues
126              .getCompactionRequestsLeftToFinish());
127          futures.add(executor.submit(new Compact(serverName, request)));
128        }
129      } else {
130        // haven't assigned anything so we sleep.
131        Thread.sleep(sleepForMs);
132      }
133    }
134    LOG.info("All compactions have completed");
135  }
136
137  private boolean futuresComplete(List<Future<?>> futures) {
138    futures.removeIf(Future::isDone);
139    return futures.isEmpty();
140  }
141
142  public void shutdown() throws Exception {
143    executor.shutdown();
144    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
145    if (!ERRORS.isEmpty()) {
146      StringBuilder builder =
147          new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size())
148              .append(" regions / stores that failed compacting\n")
149              .append("Failed compaction requests\n").append("--------------------------\n")
150              .append(Joiner.on("\n").join(ERRORS));
151      LOG.error(builder.toString());
152    }
153    if (connection != null) {
154      connection.close();
155    }
156    LOG.info("All regions major compacted successfully");
157  }
158
159  @VisibleForTesting void initializeWorkQueues() throws IOException {
160    if (storesToCompact.isEmpty()) {
161      connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
162          .forEach(a -> storesToCompact.add(Bytes.toString(a)));
163      LOG.info("No family specified, will execute for all families");
164    }
165    LOG.info(
166        "Initializing compaction queues for table:  " + tableName + " with cf: " + storesToCompact);
167
168    Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap();
169    /*
170     * If numservers is specified, stop inspecting regions beyond the numservers, it will serve
171     * to throttle and won't end up scanning all the regions in the event there are not many
172     * regions to compact based on the criteria.
173     */
174    for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
175      List<RegionInfo> regions = snRegionMap.get(sn);
176      LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
177
178      /*
179       * If the tool is run periodically, then we could shuffle the regions and provide
180       * some random order to select regions. Helps if numregions is specified.
181       */
182      Collections.shuffle(regions);
183      int regionsToCompact = numRegions;
184      for (RegionInfo hri : regions) {
185        if (numRegions > 0 && regionsToCompact <= 0) {
186          LOG.debug("Reached region limit for server: " + sn);
187          break;
188        }
189
190        Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
191        if (request.isPresent()) {
192          LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
193          clusterCompactionQueues.addToCompactionQueue(sn, request.get());
194          if (numRegions > 0) {
195            regionsToCompact--;
196          }
197        }
198      }
199    }
200  }
201
202  protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
203      throws IOException {
204    return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
205            timestamp);
206  }
207
208  private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
209    if(numServers < 0 || snSet.size() <= numServers) {
210      return snSet;
211
212    } else {
213      List<ServerName> snList = Lists.newArrayList(snSet);
214      Collections.shuffle(snList);
215      return snList.subList(0, numServers);
216    }
217  }
218
219  private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
220    Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
221    List<HRegionLocation> regionLocations =
222        connection.getRegionLocator(tableName).getAllRegionLocations();
223    for (HRegionLocation regionLocation : regionLocations) {
224      ServerName sn = regionLocation.getServerName();
225      RegionInfo hri = regionLocation.getRegion();
226      if (!snRegionMap.containsKey(sn)) {
227        snRegionMap.put(sn, Lists.newArrayList());
228      }
229      snRegionMap.get(sn).add(hri);
230    }
231    return snRegionMap;
232  }
233
234  public void setNumServers(int numServers) {
235    this.numServers = numServers;
236  }
237
238  public void setNumRegions(int numRegions) {
239    this.numRegions = numRegions;
240  }
241
242  public void setSkipWait(boolean skipWait) {
243    this.skipWait = skipWait;
244  }
245
246  class Compact implements Runnable {
247
248    private final ServerName serverName;
249    private final MajorCompactionRequest request;
250
251    Compact(ServerName serverName, MajorCompactionRequest request) {
252      this.serverName = serverName;
253      this.request = request;
254    }
255
256    @Override public void run() {
257      try {
258        compactAndWait(request);
259      } catch (NotServingRegionException e) {
260        // this region has split or merged
261        LOG.warn("Region is invalid, requesting updated regions", e);
262        // lets updated the cluster compaction queues with these newly created regions.
263        addNewRegions();
264      } catch (Exception e) {
265        LOG.warn("Error compacting:", e);
266      } finally {
267        clusterCompactionQueues.releaseCompaction(serverName);
268      }
269    }
270
271    void compactAndWait(MajorCompactionRequest request) throws Exception {
272      Admin admin = connection.getAdmin();
273      try {
274        // only make the request if the region is not already major compacting
275        if (!isCompacting(request)) {
276          Set<String> stores = getStoresRequiringCompaction(request);
277          if (!stores.isEmpty()) {
278            request.setStores(stores);
279            for (String store : request.getStores()) {
280              compactRegionOnServer(request, admin, store);
281            }
282          }
283        }
284
285        /*
286         * In some scenarios like compacting TTLed regions, the compaction itself won't take time
287         * and hence we can skip the wait. An external tool will also be triggered frequently and
288         * the next run can identify region movements and compact them.
289         */
290        if (!skipWait) {
291          while (isCompacting(request)) {
292            Thread.sleep(sleepForMs);
293            LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
294                .getEncodedName());
295          }
296        }
297      } finally {
298        if (!skipWait) {
299          // Make sure to wait for the CompactedFileDischarger chore to do its work
300          int waitForArchive = connection.getConfiguration()
301              .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
302          Thread.sleep(waitForArchive);
303          // check if compaction completed successfully, otherwise put that request back in the
304          // proper queue
305          Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
306          if (!storesRequiringCompaction.isEmpty()) {
307            // this happens, when a region server is marked as dead, flushes a store file and
308            // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
309            // thus you have more store files on the filesystem than the regionserver knows about.
310            boolean regionHasNotMoved = connection.getRegionLocator(tableName)
311                .getRegionLocation(request.getRegion().getStartKey()).getServerName()
312                .equals(serverName);
313            if (regionHasNotMoved) {
314              LOG.error(
315                  "Not all store files were compacted, this may be due to the regionserver not "
316                      + "being aware of all store files.  Will not reattempt compacting, "
317                      + request);
318              ERRORS.add(request);
319            } else {
320              request.setStores(storesRequiringCompaction);
321              clusterCompactionQueues.addToCompactionQueue(serverName, request);
322              LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
323                  + " region: " + request.getRegion().getEncodedName());
324            }
325          } else {
326            LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
327                + " -> cf(s): " + request.getStores());
328          }
329        }
330      }
331    }
332
333    private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
334        throws IOException {
335      admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
336          Bytes.toBytes(store));
337    }
338  }
339
340  private boolean isCompacting(MajorCompactionRequest request) throws Exception {
341    CompactionState compactionState = connection.getAdmin()
342        .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
343    return compactionState.equals(CompactionState.MAJOR) || compactionState
344        .equals(CompactionState.MAJOR_AND_MINOR);
345  }
346
347  private void addNewRegions() {
348    try {
349      List<HRegionLocation> locations =
350          connection.getRegionLocator(tableName).getAllRegionLocations();
351      for (HRegionLocation location : locations) {
352        if (location.getRegion().getRegionId() > timestamp) {
353          Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
354              .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
355                  timestamp);
356          compactionRequest.ifPresent(request -> clusterCompactionQueues
357              .addToCompactionQueue(location.getServerName(), request));
358        }
359      }
360    } catch (IOException e) {
361      throw new RuntimeException(e);
362    }
363  }
364
365  protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
366      throws IOException {
367    return request.getStoresRequiringCompaction(storesToCompact, timestamp);
368  }
369
370  protected Options getCommonOptions() {
371    Options options = new Options();
372
373    options.addOption(
374        Option.builder("servers")
375            .required()
376            .desc("Concurrent servers compacting")
377            .hasArg()
378            .build()
379    );
380    options.addOption(
381        Option.builder("minModTime").
382            desc("Compact if store files have modification time < minModTime")
383            .hasArg()
384            .build()
385    );
386    options.addOption(
387        Option.builder("zk")
388            .optionalArg(true)
389            .desc("zk quorum")
390            .hasArg()
391            .build()
392    );
393    options.addOption(
394        Option.builder("rootDir")
395            .optionalArg(true)
396            .desc("hbase.rootDir")
397            .hasArg()
398            .build()
399    );
400    options.addOption(
401        Option.builder("sleep")
402            .desc("Time to sleepForMs (ms) for checking compaction status per region and available "
403                + "work queues: default 30s")
404            .hasArg()
405            .build()
406    );
407    options.addOption(
408        Option.builder("retries")
409        .desc("Max # of retries for a compaction request," + " defaults to 3")
410            .hasArg()
411            .build()
412    );
413    options.addOption(
414        Option.builder("dryRun")
415            .desc("Dry run, will just output a list of regions that require compaction based on "
416            + "parameters passed")
417            .hasArg(false)
418            .build()
419    );
420
421    options.addOption(
422        Option.builder("skipWait")
423            .desc("Skip waiting after triggering compaction.")
424            .hasArg(false)
425            .build()
426    );
427
428    options.addOption(
429        Option.builder("numservers")
430            .optionalArg(true)
431            .desc("Number of servers to compact in this run, defaults to all")
432            .hasArg()
433            .build()
434    );
435
436    options.addOption(
437        Option.builder("numregions")
438            .optionalArg(true)
439            .desc("Number of regions to compact per server, defaults to all")
440            .hasArg()
441            .build()
442    );
443    return options;
444  }
445
446  @Override
447  public int run(String[] args) throws Exception {
448    Options options = getCommonOptions();
449    options.addOption(
450        Option.builder("table")
451            .required()
452            .desc("table name")
453            .hasArg()
454            .build()
455    );
456    options.addOption(
457        Option.builder("cf")
458            .optionalArg(true)
459            .desc("column families: comma separated eg: a,b,c")
460            .hasArg()
461            .build()
462    );
463
464    final CommandLineParser cmdLineParser = new DefaultParser();
465    CommandLine commandLine = null;
466    try {
467      commandLine = cmdLineParser.parse(options, args);
468    } catch (ParseException parseException) {
469      System.out.println(
470          "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
471              + parseException);
472      printUsage(options);
473      return -1;
474    }
475    if (commandLine == null) {
476      System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
477      printUsage(options);
478      return -1;
479    }
480    String tableName = commandLine.getOptionValue("table");
481    String cf = commandLine.getOptionValue("cf", null);
482    Set<String> families = Sets.newHashSet();
483    if (cf != null) {
484      Iterables.addAll(families, Splitter.on(",").split(cf));
485    }
486
487    Configuration configuration = getConf();
488    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
489    long minModTime = Long.parseLong(
490        commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
491    String quorum =
492        commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
493    String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
494    long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
495
496    int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
497    int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
498
499    configuration.set(HConstants.HBASE_DIR, rootDir);
500    configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
501
502    MajorCompactor compactor =
503        new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
504            minModTime, sleep);
505    compactor.setNumServers(numServers);
506    compactor.setNumRegions(numRegions);
507    compactor.setSkipWait(commandLine.hasOption("skipWait"));
508
509    compactor.initializeWorkQueues();
510    if (!commandLine.hasOption("dryRun")) {
511      compactor.compactAllRegions();
512    }
513    compactor.shutdown();
514    return ERRORS.size();
515  }
516
517  protected static void printUsage(final Options options) {
518    String header = "\nUsage instructions\n\n";
519    String footer = "\n";
520    HelpFormatter formatter = new HelpFormatter();
521    formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
522  }
523
524  public static void main(String[] args) throws Exception {
525    ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
526  }
527}