001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.regionserver.HStore;
027import org.apache.hadoop.hbase.regionserver.HStoreFile;
028import org.apache.hadoop.hbase.regionserver.InternalScanner;
029import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
030import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037
038/**
039 * Compact passed set of files. Create an instance and then call
040 * {@link #compact(CompactionRequestImpl, ThroughputController, User)}
041 */
042@InterfaceAudience.Private
043public class DefaultCompactor extends Compactor<StoreFileWriter> {
044  private static final Logger LOG = LoggerFactory.getLogger(DefaultCompactor.class);
045
046  public DefaultCompactor(Configuration conf, HStore store) {
047    super(conf, store);
048  }
049
050  private final CellSinkFactory<StoreFileWriter> writerFactory =
051      new CellSinkFactory<StoreFileWriter>() {
052        @Override
053        public StoreFileWriter createWriter(InternalScanner scanner,
054            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
055            boolean shouldDropBehind) throws IOException {
056          return createTmpWriter(fd, shouldDropBehind);
057        }
058      };
059
060  /**
061   * Do a minor/major compaction on an explicit set of storefiles from a Store.
062   */
063  public List<Path> compact(final CompactionRequestImpl request,
064      ThroughputController throughputController, User user) throws IOException {
065    return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
066  }
067
068  /**
069   * Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
070   * {@link #compact(CompactionRequestImpl, ThroughputController, User)};
071   * @param filesToCompact the files to compact. These are used as the compactionSelection for the
072   *          generated {@link CompactionRequestImpl}.
073   * @param isMajor true to major compact (prune all deletes, max versions, etc)
074   * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
075   *         made it through the compaction.
076   * @throws IOException
077   */
078  public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
079      throws IOException {
080    CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
081    cr.setIsMajor(isMajor, isMajor);
082    return compact(cr, NoLimitThroughputController.INSTANCE, null);
083  }
084
085  @Override
086  protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
087      CompactionRequestImpl request) throws IOException {
088    List<Path> newFiles = Lists.newArrayList(writer.getPath());
089    writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
090    writer.close();
091    return newFiles;
092  }
093
094  @Override
095  protected void abortWriter(StoreFileWriter writer) throws IOException {
096    Path leftoverFile = writer.getPath();
097    try {
098      writer.close();
099    } catch (IOException e) {
100      LOG.warn("Failed to close the writer after an unfinished compaction.", e);
101    }
102    try {
103      store.getFileSystem().delete(leftoverFile, false);
104    } catch (IOException e) {
105      LOG.warn(
106        "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
107        e);
108    }
109  }
110}