View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.regionserver.InternalScanner;
33  import org.apache.hadoop.hbase.regionserver.ScanType;
34  import org.apache.hadoop.hbase.regionserver.Store;
35  import org.apache.hadoop.hbase.regionserver.StoreFile;
36  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37  import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
38  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
39  import org.apache.hadoop.hbase.security.User;
40  
41  /**
42   * Compact passed set of files. Create an instance and then call
43   * {@link #compact(CompactionRequest, ThroughputController, User)}
44   */
45  @InterfaceAudience.Private
46  public class DefaultCompactor extends Compactor {
47    private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
48  
49    public DefaultCompactor(final Configuration conf, final Store store) {
50      super(conf, store);
51    }
52  
53    /**
54     * Do a minor/major compaction on an explicit set of storefiles from a Store.
55     */
56    public List<Path> compact(final CompactionRequest request,
57        ThroughputController throughputController, User user) throws IOException {
58      FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
59      this.progress = new CompactionProgress(fd.maxKeyCount);
60  
61      // Find the smallest read point across all the Scanners.
62      long smallestReadPoint = getSmallestReadPoint();
63  
64      List<StoreFileScanner> scanners;
65      Collection<StoreFile> readersToClose;
66      if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
67        // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
68        // HFileFiles, and their readers
69        readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
70        for (StoreFile f : request.getFiles()) {
71          readersToClose.add(new StoreFile(f));
72        }
73        scanners = createFileScanners(readersToClose, smallestReadPoint,
74            store.throttleCompaction(request.getSize()));
75      } else {
76        readersToClose = Collections.emptyList();
77        scanners = createFileScanners(request.getFiles(), smallestReadPoint,
78            store.throttleCompaction(request.getSize()));
79      }
80  
81      StoreFile.Writer writer = null;
82      List<Path> newFiles = new ArrayList<Path>();
83      boolean cleanSeqId = false;
84      IOException e = null;
85      try {
86        InternalScanner scanner = null;
87        try {
88          /* Include deletes, unless we are doing a compaction of all files */
89            ScanType scanType =
90                    request.isRetainDeleteMarkers() ?
91                            ScanType.COMPACT_RETAIN_DELETES :
92                            ScanType.COMPACT_DROP_DELETES;
93          scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
94          if (scanner == null) {
95            scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
96          }
97          scanner = postCreateCoprocScanner(request, scanType, scanner, user);
98          if (scanner == null) {
99            // NULL scanner returned from coprocessor hooks means skip normal processing.
100           return newFiles;
101         }
102         // Create the writer even if no kv(Empty store file is also ok),
103         // because we need record the max seq id for the store file, see HBASE-6059
104         if(fd.minSeqIdToKeep > 0) {
105           smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
106           cleanSeqId = true;
107         }
108 
109 
110         writer = createTmpWriter(fd, store.throttleCompaction(request.getSize()));
111         boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
112           throughputController, request.isAllFiles());
113 
114         if (!finished) {
115           writer.close();
116           store.getFileSystem().delete(writer.getPath(), false);
117           writer = null;
118           throw new InterruptedIOException("Aborting compaction of store " + store +
119               " in region " + store.getRegionInfo().getRegionNameAsString() +
120               " because it was interrupted.");
121          }
122        } finally {
123          if (scanner != null) {
124            scanner.close();
125          }
126       }
127     } catch (IOException ioe) {
128       e = ioe;
129       // Throw the exception
130       throw ioe;
131     }
132     finally {
133       try {
134         if (writer != null) {
135           if (e != null) {
136             writer.close();
137           } else {
138             writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
139             writer.close();
140             newFiles.add(writer.getPath());
141           }
142         }
143       } finally {
144         for (StoreFile f : readersToClose) {
145           try {
146             f.closeReader(true);
147           } catch (IOException ioe) {
148             LOG.warn("Exception closing " + f, ioe);
149           }
150         }
151       }
152     }
153     return newFiles;
154   }
155 
156   /**
157    * Creates a writer for a new file in a temporary directory.
158    * @param fd The file details.
159    * @return Writer for a new StoreFile in the tmp dir.
160    * @throws IOException
161    */
162   protected StoreFile.Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind)
163     throws IOException {
164 
165       // When all MVCC readpoints are 0, don't write them.
166       // See HBASE-8166, HBASE-12600, and HBASE-13389.
167 
168       return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
169             /* isCompaction = */ true,
170             /* includeMVCCReadpoint = */ fd.maxMVCCReadpoint > 0,
171             /* includesTags = */ fd.maxTagsLength > 0,
172             /* shouldDropBehind = */ shouldDropBehind);
173   }
174 
175 
176   /**
177    * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
178    * {@link #compact(CompactionRequest, ThroughputController, User)};
179    * @param filesToCompact the files to compact. These are used as the compactionSelection for
180    *          the generated {@link CompactionRequest}.
181    * @param isMajor true to major compact (prune all deletes, max versions, etc)
182    * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
183    *         made it through the compaction.
184    * @throws IOException
185    */
186   public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
187       throws IOException {
188     CompactionRequest cr = new CompactionRequest(filesToCompact);
189     cr.setIsMajor(isMajor, isMajor);
190     return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
191   }
192 }