View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.conf.Configured;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.hfile.Compression;
36  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
37  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
38  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.util.StringUtils;
41  
42  /**
43   * Compact passed set of files.
44   * Create an instance and then call {@ink #compact(Store, Collection, boolean, long)}.
45   */
46  @InterfaceAudience.Private
47  class Compactor extends Configured {
48    private static final Log LOG = LogFactory.getLog(Compactor.class);
49    private CompactionProgress progress;
50  
51    Compactor(final Configuration c) {
52      super(c);
53    }
54  
55    /**
56     * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to the
57     * actual compaction method
58     * @param store store which should be compacted
59     * @param conf configuration to use when generating the compaction selection
60     * @param filesToCompact the files to compact. They are used a the compaction selection for the
61     *          generated {@link CompactionRequest}
62     * @param isMajor <tt>true</tt> to initiate a major compaction (prune all deletes, max versions,
63     *          etc)
64     * @param maxId maximum sequenceID == the last key of all files in the compaction
65     * @return product of the compaction or null if all cells expired or deleted and nothing made it
66     *         through the compaction.
67     * @throws IOException
68     */
69    public StoreFile.Writer compactForTesting(final Store store, Configuration conf,
70        final Collection<StoreFile> filesToCompact,
71        boolean isMajor, long maxId) throws IOException {
72      return compact(CompactionRequest.getRequestForTesting(store, conf, filesToCompact, isMajor),
73        maxId);
74    }
75  
76    /**
77     * Do a minor/major compaction on an explicit set of storefiles from a Store.
78     * @param request the requested compaction that contains all necessary information to complete the
79     *          compaction (i.e. the store, the files, etc.)
80     * @return Product of compaction or null if all cells expired or deleted and nothing made it
81     *         through the compaction.
82     * @throws IOException
83     */
84    StoreFile.Writer compact(CompactionRequest request, long maxId) throws IOException {
85      // Calculate maximum key count after compaction (for blooms)
86      // Also calculate earliest put timestamp if major compaction
87      int maxKeyCount = 0;
88      long earliestPutTs = HConstants.LATEST_TIMESTAMP;
89      long maxMVCCReadpoint = 0;
90  
91      // pull out the interesting things from the CR for ease later
92      final Store store = request.getStore();
93      final boolean majorCompaction = request.isMajor();
94      final List<StoreFile> filesToCompact = request.getFiles();
95  
96      for (StoreFile file : filesToCompact) {
97        StoreFile.Reader r = file.getReader();
98        if (r == null) {
99          LOG.warn("Null reader for " + file.getPath());
100         continue;
101       }
102       // NOTE: getFilterEntries could cause under-sized blooms if the user
103       //       switches bloom type (e.g. from ROW to ROWCOL)
104       long keyCount = (r.getBloomFilterType() == store.getFamily()
105           .getBloomFilterType()) ?
106           r.getFilterEntries() : r.getEntries();
107       maxKeyCount += keyCount;
108       // Calculate the maximum MVCC readpoint used in any of the involved files
109       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
110       byte[] tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
111       if (tmp != null) {
112         maxMVCCReadpoint = Math.max(maxMVCCReadpoint, Bytes.toLong(tmp));
113       }
114       // For major compactions calculate the earliest put timestamp
115       // of all involved storefiles. This is used to remove 
116       // family delete marker during the compaction.
117       if (majorCompaction) {
118         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
119         if (tmp == null) {
120           // There's a file with no information, must be an old one
121           // assume we have very old puts
122           earliestPutTs = HConstants.OLDEST_TIMESTAMP;
123         } else {
124           earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
125         }
126       }
127       if (LOG.isDebugEnabled()) {
128         LOG.debug("Compacting " + file +
129           ", keycount=" + keyCount +
130           ", bloomtype=" + r.getBloomFilterType().toString() +
131           ", size=" + StringUtils.humanReadableInt(r.length()) +
132           ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
133           (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
134       }
135     }
136 
137     // keep track of compaction progress
138     this.progress = new CompactionProgress(maxKeyCount);
139     // Get some configs
140     int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
141     Compression.Algorithm compression = store.getFamily().getCompression();
142     // Avoid overriding compression setting for major compactions if the user
143     // has not specified it separately
144     Compression.Algorithm compactionCompression =
145       (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
146       store.getFamily().getCompactionCompression(): compression;
147 
148     // For each file, obtain a scanner:
149     List<StoreFileScanner> scanners = StoreFileScanner
150       .getScannersForStoreFiles(filesToCompact, false, false, true);
151 
152     // Make the instantiation lazy in case compaction produces no product; i.e.
153     // where all source cells are expired or deleted.
154     StoreFile.Writer writer = null;
155     // Find the smallest read point across all the Scanners.
156     long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
157     MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
158     try {
159       InternalScanner scanner = null;
160       try {
161         if (store.getHRegion().getCoprocessorHost() != null) {
162           scanner = store.getHRegion()
163               .getCoprocessorHost()
164               .preCompactScannerOpen(store, scanners,
165                 majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs,
166                 request);
167         }
168         if (scanner == null) {
169           Scan scan = new Scan();
170           scan.setMaxVersions(store.getFamily().getMaxVersions());
171           /* Include deletes, unless we are doing a major compaction */
172           scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
173             majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
174             smallestReadPoint, earliestPutTs);
175         }
176         if (store.getHRegion().getCoprocessorHost() != null) {
177           InternalScanner cpScanner =
178             store.getHRegion().getCoprocessorHost().preCompact(store, scanner, request);
179           // NULL scanner returned from coprocessor hooks means skip normal processing
180           if (cpScanner == null) {
181             return null;
182           }
183           scanner = cpScanner;
184         }
185 
186         int bytesWritten = 0;
187         // since scanner.next() can return 'false' but still be delivering data,
188         // we have to use a do/while loop.
189         List<KeyValue> kvs = new ArrayList<KeyValue>();
190         // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
191         boolean hasMore;
192         do {
193           hasMore = scanner.next(kvs, compactionKVMax);
194           // Create the writer even if no kv(Empty store file is also ok),
195           // because we need record the max seq id for the store file, see
196           // HBASE-6059
197           if (writer == null) {
198             writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true,
199                 maxMVCCReadpoint >= smallestReadPoint);
200           }
201           if (writer != null) {
202             // output to writer:
203             for (KeyValue kv : kvs) {
204               if (kv.getMemstoreTS() <= smallestReadPoint) {
205                 kv.setMemstoreTS(0);
206               }
207               writer.append(kv);
208               // update progress per key
209               ++progress.currentCompactedKVs;
210 
211               // check periodically to see if a system stop is requested
212               if (Store.closeCheckInterval > 0) {
213                 bytesWritten += kv.getLength();
214                 if (bytesWritten > Store.closeCheckInterval) {
215                   bytesWritten = 0;
216                   isInterrupted(store, writer);
217                 }
218               }
219             }
220           }
221           kvs.clear();
222         } while (hasMore);
223       } finally {
224         if (scanner != null) {
225           scanner.close();
226         }
227       }
228     } finally {
229       if (writer != null) {
230         writer.appendMetadata(maxId, majorCompaction);
231         writer.close();
232       }
233     }
234     return writer;
235   }
236 
237   void isInterrupted(final Store store, final StoreFile.Writer writer)
238   throws IOException {
239     if (store.getHRegion().areWritesEnabled()) return;
240     // Else cleanup.
241     writer.close();
242     store.getFileSystem().delete(writer.getPath(), false);
243     throw new InterruptedIOException( "Aborting compaction of store " + store +
244       " in region " + store.getHRegion() + " because user requested stop.");
245   }
246 
247   CompactionProgress getProgress() {
248     return this.progress;
249   }
250 }