View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.compress.Compression;
37  import org.apache.hadoop.hbase.io.hfile.HFile;
38  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
39  import org.apache.hadoop.hbase.regionserver.HStore;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  import org.apache.hadoop.hbase.regionserver.ScanType;
42  import org.apache.hadoop.hbase.regionserver.ScannerContext;
43  import org.apache.hadoop.hbase.regionserver.Store;
44  import org.apache.hadoop.hbase.regionserver.StoreFile;
45  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
46  import org.apache.hadoop.hbase.regionserver.StoreScanner;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
50  
51  /**
52   * A compactor is a compaction algorithm associated a given policy. Base class also contains
53   * reusable parts for implementing compactors (what is common and what isn't is evolving).
54   */
55  @InterfaceAudience.Private
56  public abstract class Compactor {
57    private static final Log LOG = LogFactory.getLog(Compactor.class);
58    protected CompactionProgress progress;
59    protected Configuration conf;
60    protected Store store;
61  
62    private int compactionKVMax;
63    protected Compression.Algorithm compactionCompression;
64    
65    /** specify how many days to keep MVCC values during major compaction **/ 
66    protected int keepSeqIdPeriod;
67  
68    //TODO: depending on Store is not good but, realistically, all compactors currently do.
69    Compactor(final Configuration conf, final Store store) {
70      this.conf = conf;
71      this.store = store;
72      this.compactionKVMax =
73        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
74      this.compactionCompression = (this.store.getFamily() == null) ?
75          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
76      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
77        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
78    }
79  
80    public interface CellSink {
81      void append(Cell cell) throws IOException;
82    }
83  
84    public CompactionProgress getProgress() {
85      return this.progress;
86    }
87  
88    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
89    protected static class FileDetails {
90      /** Maximum key count after compaction (for blooms) */
91      public long maxKeyCount = 0;
92      /** Earliest put timestamp if major compaction */
93      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
94      /** The last key in the files we're compacting. */
95      public long maxSeqId = 0;
96      /** Latest memstore read point found in any of the involved files */
97      public long maxMVCCReadpoint = 0;
98      /** Max tags length**/
99      public int maxTagsLength = 0;
100     /** Min SeqId to keep during a major compaction **/
101     public long minSeqIdToKeep = 0;
102   }
103 
104   /**
105    * Extracts some details about the files to compact that are commonly needed by compactors.
106    * @param filesToCompact Files.
107    * @param allFiles Whether all files are included for compaction
108    * @return The result.
109    */
110   protected FileDetails getFileDetails(
111       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
112     FileDetails fd = new FileDetails();
113     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
114       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
115 
116     for (StoreFile file : filesToCompact) {
117       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
118         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
119         // MVCC value to keep
120         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
121           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
122         }
123       }
124       long seqNum = file.getMaxSequenceId();
125       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
126       StoreFile.Reader r = file.getReader();
127       if (r == null) {
128         LOG.warn("Null reader for " + file.getPath());
129         continue;
130       }
131       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
132       // blooms can cause progress to be miscalculated or if the user switches bloom
133       // type (e.g. from ROW to ROWCOL)
134       long keyCount = r.getEntries();
135       fd.maxKeyCount += keyCount;
136       // calculate the latest MVCC readpoint in any of the involved store files
137       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
138       byte tmp[] = null;
139       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
140       // SeqId number.
141       if (r.isBulkLoaded()) {
142         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
143       }
144       else {
145         tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
146         if (tmp != null) {
147           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
148         }
149       }
150       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
151       if (tmp != null) {
152         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
153       }
154       // If required, calculate the earliest put timestamp of all involved storefiles.
155       // This is used to remove family delete marker during compaction.
156       long earliestPutTs = 0;
157       if (allFiles) {
158         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
159         if (tmp == null) {
160           // There's a file with no information, must be an old one
161           // assume we have very old puts
162           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
163         } else {
164           earliestPutTs = Bytes.toLong(tmp);
165           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
166         }
167       }
168       if (LOG.isDebugEnabled()) {
169         LOG.debug("Compacting " + file +
170           ", keycount=" + keyCount +
171           ", bloomtype=" + r.getBloomFilterType().toString() +
172           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
173           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
174           ", seqNum=" + seqNum +
175           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
176       }
177     }
178     return fd;
179   }
180 
181   /**
182    * Creates file scanners for compaction.
183    * @param filesToCompact Files.
184    * @return Scanners.
185    */
186   protected List<StoreFileScanner> createFileScanners(
187       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
188     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
189       smallestReadPoint);
190   }
191 
192   protected long getSmallestReadPoint() {
193     return store.getSmallestReadPoint();
194   }
195 
196   /**
197    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
198    * @param request Compaction request.
199    * @param scanType Scan type.
200    * @param earliestPutTs Earliest put ts.
201    * @param scanners File scanners for compaction files.
202    * @return Scanner override by coprocessor; null if not overriding.
203    */
204   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
205       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
206     if (store.getCoprocessorHost() == null) return null;
207     return store.getCoprocessorHost()
208         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
209   }
210 
211   /**
212    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
213    * @param request Compaction request.
214    * @param scanType Scan type.
215    * @param scanner The default scanner created for compaction.
216    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
217    */
218    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
219       ScanType scanType, InternalScanner scanner) throws IOException {
220     if (store.getCoprocessorHost() == null) return scanner;
221     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
222   }
223 
224   /**
225    * Performs the compaction.
226    * @param scanner Where to read from.
227    * @param writer Where to write to.
228    * @param smallestReadPoint Smallest read point.
229    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
230    * @return Whether compaction ended; false if it was interrupted for some reason.
231    */
232   protected boolean performCompaction(InternalScanner scanner, CellSink writer,
233       long smallestReadPoint, boolean cleanSeqId,
234       CompactionThroughputController throughputController) throws IOException {
235     long bytesWritten = 0;
236     long bytesWrittenProgress = 0;
237     // Since scanner.next() can return 'false' but still be delivering data,
238     // we have to use a do/while loop.
239     List<Cell> cells = new ArrayList<Cell>();
240     long closeCheckInterval = HStore.getCloseCheckInterval();
241     long lastMillis = 0;
242     if (LOG.isDebugEnabled()) {
243       lastMillis = EnvironmentEdgeManager.currentTime();
244     }
245     String compactionName =
246         store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
247     long now = 0;
248     boolean hasMore;
249     ScannerContext scannerContext =
250         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
251 
252     throughputController.start(compactionName);
253     try {
254       do {
255         hasMore = scanner.next(cells, scannerContext);
256         if (LOG.isDebugEnabled()) {
257           now = EnvironmentEdgeManager.currentTime();
258         }
259         // output to writer:
260         for (Cell c : cells) {
261           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
262             CellUtil.setSequenceId(c, 0);
263           }
264           writer.append(c);
265           int len = KeyValueUtil.length(c);
266           ++progress.currentCompactedKVs;
267           progress.totalCompactedSize += len;
268           if (LOG.isDebugEnabled()) {
269             bytesWrittenProgress += len;
270           }
271           throughputController.control(compactionName, len);
272           // check periodically to see if a system stop is requested
273           if (closeCheckInterval > 0) {
274             bytesWritten += len;
275             if (bytesWritten > closeCheckInterval) {
276               bytesWritten = 0;
277               if (!store.areWritesEnabled()) {
278                 progress.cancel();
279                 return false;
280               }
281             }
282           }
283         }
284         // Log the progress of long running compactions every minute if
285         // logging at DEBUG level
286         if (LOG.isDebugEnabled()) {
287           if ((now - lastMillis) >= 60 * 1000) {
288             LOG.debug("Compaction progress: "
289                 + compactionName
290                 + " "
291                 + progress
292                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
293                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
294                 + throughputController);
295             lastMillis = now;
296             bytesWrittenProgress = 0;
297           }
298         }
299         cells.clear();
300       } while (hasMore);
301     } catch (InterruptedException e) {
302       progress.cancel();
303       throw new InterruptedIOException("Interrupted while control throughput of compacting "
304           + compactionName);
305     } finally {
306       throughputController.finish(compactionName);
307     }
308     progress.complete();
309     return true;
310   }
311 
312   /**
313    * @param store store
314    * @param scanners Store file scanners.
315    * @param scanType Scan type.
316    * @param smallestReadPoint Smallest MVCC read point.
317    * @param earliestPutTs Earliest put across all files.
318    * @return A compaction scanner.
319    */
320   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
321       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
322     Scan scan = new Scan();
323     scan.setMaxVersions(store.getFamily().getMaxVersions());
324     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
325         scanType, smallestReadPoint, earliestPutTs);
326   }
327 
328   /**
329    * @param store The store.
330    * @param scanners Store file scanners.
331    * @param smallestReadPoint Smallest MVCC read point.
332    * @param earliestPutTs Earliest put across all files.
333    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
334    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
335    * @return A compaction scanner.
336    */
337   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
338      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
339      byte[] dropDeletesToRow) throws IOException {
340     Scan scan = new Scan();
341     scan.setMaxVersions(store.getFamily().getMaxVersions());
342     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
343         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
344   }
345 }