View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.compress.Compression;
36  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
37  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
38  import org.apache.hadoop.hbase.regionserver.HStore;
39  import org.apache.hadoop.hbase.regionserver.InternalScanner;
40  import org.apache.hadoop.hbase.regionserver.ScanType;
41  import org.apache.hadoop.hbase.regionserver.Store;
42  import org.apache.hadoop.hbase.regionserver.StoreFile;
43  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
44  import org.apache.hadoop.hbase.regionserver.StoreScanner;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.util.StringUtils;
48  
49  /**
50   * A compactor is a compaction algorithm associated a given policy. Base class also contains
51   * reusable parts for implementing compactors (what is common and what isn't is evolving).
52   */
53  @InterfaceAudience.Private
54  public abstract class Compactor {
55    private static final Log LOG = LogFactory.getLog(Compactor.class);
56    protected CompactionProgress progress;
57    protected Configuration conf;
58    protected Store store;
59  
60    private int compactionKVMax;
61    protected Compression.Algorithm compactionCompression;
62    
63    /** specify how many days to keep MVCC values during major compaction **/ 
64    protected int keepSeqIdPeriod;
65  
66    //TODO: depending on Store is not good but, realistically, all compactors currently do.
67    Compactor(final Configuration conf, final Store store) {
68      this.conf = conf;
69      this.store = store;
70      this.compactionKVMax =
71        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
72      this.compactionCompression = (this.store.getFamily() == null) ?
73          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
74      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
75        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
76    }
77  
78    public interface CellSink {
79      void append(Cell cell) throws IOException;
80    }
81  
82    public CompactionProgress getProgress() {
83      return this.progress;
84    }
85  
86    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
87    protected static class FileDetails {
88      /** Maximum key count after compaction (for blooms) */
89      public long maxKeyCount = 0;
90      /** Earliest put timestamp if major compaction */
91      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
92      /** The last key in the files we're compacting. */
93      public long maxSeqId = 0;
94      /** Latest memstore read point found in any of the involved files */
95      public long maxMVCCReadpoint = 0;
96      /** Max tags length**/
97      public int maxTagsLength = 0;
98      /** Min SeqId to keep during a major compaction **/
99      public long minSeqIdToKeep = 0;
100   }
101 
102   /**
103    * Extracts some details about the files to compact that are commonly needed by compactors.
104    * @param filesToCompact Files.
105    * @param allFiles Whether all files are included for compaction
106    * @return The result.
107    */
108   protected FileDetails getFileDetails(
109       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
110     FileDetails fd = new FileDetails();
111     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
112       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
113 
114     for (StoreFile file : filesToCompact) {
115       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
116         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
117         // MVCC value to keep
118         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
119           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
120         }
121       }
122       long seqNum = file.getMaxSequenceId();
123       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
124       StoreFile.Reader r = file.getReader();
125       if (r == null) {
126         LOG.warn("Null reader for " + file.getPath());
127         continue;
128       }
129       // NOTE: getFilterEntries could cause under-sized blooms if the user
130       // switches bloom type (e.g. from ROW to ROWCOL)
131       long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
132           ? r.getFilterEntries() : r.getEntries();
133       fd.maxKeyCount += keyCount;
134       // calculate the latest MVCC readpoint in any of the involved store files
135       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
136       byte tmp[] = null;
137       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
138       // SeqId number.
139       if (r.isBulkLoaded()) {
140         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
141       }
142       else {
143         tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
144         if (tmp != null) {
145           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
146         }
147       }
148       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
149       if (tmp != null) {
150         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
151       }
152       // If required, calculate the earliest put timestamp of all involved storefiles.
153       // This is used to remove family delete marker during compaction.
154       long earliestPutTs = 0;
155       if (allFiles) {
156         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
157         if (tmp == null) {
158           // There's a file with no information, must be an old one
159           // assume we have very old puts
160           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
161         } else {
162           earliestPutTs = Bytes.toLong(tmp);
163           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
164         }
165       }
166       if (LOG.isDebugEnabled()) {
167         LOG.debug("Compacting " + file +
168           ", keycount=" + keyCount +
169           ", bloomtype=" + r.getBloomFilterType().toString() +
170           ", size=" + StringUtils.humanReadableInt(r.length()) +
171           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
172           ", seqNum=" + seqNum +
173           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
174       }
175     }
176     return fd;
177   }
178 
179   /**
180    * Creates file scanners for compaction.
181    * @param filesToCompact Files.
182    * @return Scanners.
183    */
184   protected List<StoreFileScanner> createFileScanners(
185       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
186     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
187       smallestReadPoint);
188   }
189 
190   protected long getSmallestReadPoint() {
191     return store.getSmallestReadPoint();
192   }
193 
194   /**
195    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
196    * @param request Compaction request.
197    * @param scanType Scan type.
198    * @param earliestPutTs Earliest put ts.
199    * @param scanners File scanners for compaction files.
200    * @return Scanner override by coprocessor; null if not overriding.
201    */
202   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
203       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
204     if (store.getCoprocessorHost() == null) return null;
205     return store.getCoprocessorHost()
206         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
207   }
208 
209   /**
210    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
211    * @param request Compaction request.
212    * @param scanType Scan type.
213    * @param scanner The default scanner created for compaction.
214    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
215    */
216    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
217       ScanType scanType, InternalScanner scanner) throws IOException {
218     if (store.getCoprocessorHost() == null) return scanner;
219     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
220   }
221 
222   /**
223    * Performs the compaction.
224    * @param scanner Where to read from.
225    * @param writer Where to write to.
226    * @param smallestReadPoint Smallest read point.
227    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
228    * @return Whether compaction ended; false if it was interrupted for some reason.
229    */
230   protected boolean performCompaction(InternalScanner scanner,
231       CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
232     long bytesWritten = 0;
233     long bytesWrittenProgress = 0;
234     // Since scanner.next() can return 'false' but still be delivering data,
235     // we have to use a do/while loop.
236     List<Cell> cells = new ArrayList<Cell>();
237     long closeCheckInterval = HStore.getCloseCheckInterval();
238     long lastMillis = 0;
239     if (LOG.isDebugEnabled()) {
240       lastMillis = EnvironmentEdgeManager.currentTime();
241     }
242     long now = 0;
243     boolean hasMore;
244     do {
245       hasMore = scanner.next(cells, compactionKVMax);
246       if (LOG.isDebugEnabled()) {
247         now = EnvironmentEdgeManager.currentTime();
248       }
249       // output to writer:
250       for (Cell c : cells) {
251         if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
252           CellUtil.setSequenceId(c, 0);
253         }
254         writer.append(c);
255         int len = KeyValueUtil.length(c);
256         ++progress.currentCompactedKVs;
257         progress.totalCompactedSize += len;
258         if (LOG.isDebugEnabled()) {
259           bytesWrittenProgress += len;
260         }
261         // check periodically to see if a system stop is requested
262         if (closeCheckInterval > 0) {
263           bytesWritten += len;
264           if (bytesWritten > closeCheckInterval) {
265             bytesWritten = 0;
266             if (!store.areWritesEnabled()) {
267               progress.cancel();
268               return false;
269             }
270           }
271         }
272       }
273       // Log the progress of long running compactions every minute if
274       // logging at DEBUG level
275       if (LOG.isDebugEnabled()) {
276         if ((now - lastMillis) >= 60 * 1000) {
277           LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
278             (bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0)));
279           lastMillis = now;
280           bytesWrittenProgress = 0;
281         }
282       }
283       cells.clear();
284     } while (hasMore);
285     progress.complete();
286     return true;
287   }
288 
289   /**
290    * @param store store
291    * @param scanners Store file scanners.
292    * @param scanType Scan type.
293    * @param smallestReadPoint Smallest MVCC read point.
294    * @param earliestPutTs Earliest put across all files.
295    * @return A compaction scanner.
296    */
297   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
298       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
299     Scan scan = new Scan();
300     scan.setMaxVersions(store.getFamily().getMaxVersions());
301     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
302         scanType, smallestReadPoint, earliestPutTs);
303   }
304 
305   /**
306    * @param store The store.
307    * @param scanners Store file scanners.
308    * @param smallestReadPoint Smallest MVCC read point.
309    * @param earliestPutTs Earliest put across all files.
310    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
311    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
312    * @return A compaction scanner.
313    */
314   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
315      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
316      byte[] dropDeletesToRow) throws IOException {
317     Scan scan = new Scan();
318     scan.setMaxVersions(store.getFamily().getMaxVersions());
319     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
320         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
321   }
322 }