View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.compress.Compression;
37  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
38  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
39  import org.apache.hadoop.hbase.regionserver.HStore;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  import org.apache.hadoop.hbase.regionserver.ScanType;
42  import org.apache.hadoop.hbase.regionserver.Store;
43  import org.apache.hadoop.hbase.regionserver.StoreFile;
44  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
45  import org.apache.hadoop.hbase.regionserver.StoreScanner;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
49  
50  /**
51   * A compactor is a compaction algorithm associated a given policy. Base class also contains
52   * reusable parts for implementing compactors (what is common and what isn't is evolving).
53   */
54  @InterfaceAudience.Private
55  public abstract class Compactor {
56    private static final Log LOG = LogFactory.getLog(Compactor.class);
57    protected CompactionProgress progress;
58    protected Configuration conf;
59    protected Store store;
60  
61    private int compactionKVMax;
62    protected Compression.Algorithm compactionCompression;
63    
64    /** specify how many days to keep MVCC values during major compaction **/ 
65    protected int keepSeqIdPeriod;
66  
67    //TODO: depending on Store is not good but, realistically, all compactors currently do.
68    Compactor(final Configuration conf, final Store store) {
69      this.conf = conf;
70      this.store = store;
71      this.compactionKVMax =
72        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
73      this.compactionCompression = (this.store.getFamily() == null) ?
74          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
75      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
76        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
77    }
78  
79    public interface CellSink {
80      void append(Cell cell) throws IOException;
81    }
82  
83    public CompactionProgress getProgress() {
84      return this.progress;
85    }
86  
87    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
88    protected static class FileDetails {
89      /** Maximum key count after compaction (for blooms) */
90      public long maxKeyCount = 0;
91      /** Earliest put timestamp if major compaction */
92      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
93      /** The last key in the files we're compacting. */
94      public long maxSeqId = 0;
95      /** Latest memstore read point found in any of the involved files */
96      public long maxMVCCReadpoint = 0;
97      /** Max tags length**/
98      public int maxTagsLength = 0;
99      /** Min SeqId to keep during a major compaction **/
100     public long minSeqIdToKeep = 0;
101   }
102 
103   /**
104    * Extracts some details about the files to compact that are commonly needed by compactors.
105    * @param filesToCompact Files.
106    * @param allFiles Whether all files are included for compaction
107    * @return The result.
108    */
109   protected FileDetails getFileDetails(
110       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
111     FileDetails fd = new FileDetails();
112     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
113       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
114 
115     for (StoreFile file : filesToCompact) {
116       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
117         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
118         // MVCC value to keep
119         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
120           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
121         }
122       }
123       long seqNum = file.getMaxSequenceId();
124       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
125       StoreFile.Reader r = file.getReader();
126       if (r == null) {
127         LOG.warn("Null reader for " + file.getPath());
128         continue;
129       }
130       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
131       // blooms can cause progress to be miscalculated or if the user switches bloom
132       // type (e.g. from ROW to ROWCOL)
133       long keyCount = r.getEntries();
134       fd.maxKeyCount += keyCount;
135       // calculate the latest MVCC readpoint in any of the involved store files
136       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
137       byte tmp[] = null;
138       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
139       // SeqId number.
140       if (r.isBulkLoaded()) {
141         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
142       }
143       else {
144         tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
145         if (tmp != null) {
146           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
147         }
148       }
149       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
150       if (tmp != null) {
151         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
152       }
153       // If required, calculate the earliest put timestamp of all involved storefiles.
154       // This is used to remove family delete marker during compaction.
155       long earliestPutTs = 0;
156       if (allFiles) {
157         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
158         if (tmp == null) {
159           // There's a file with no information, must be an old one
160           // assume we have very old puts
161           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
162         } else {
163           earliestPutTs = Bytes.toLong(tmp);
164           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
165         }
166       }
167       if (LOG.isDebugEnabled()) {
168         LOG.debug("Compacting " + file +
169           ", keycount=" + keyCount +
170           ", bloomtype=" + r.getBloomFilterType().toString() +
171           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
172           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
173           ", seqNum=" + seqNum +
174           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
175       }
176     }
177     return fd;
178   }
179 
180   /**
181    * Creates file scanners for compaction.
182    * @param filesToCompact Files.
183    * @return Scanners.
184    */
185   protected List<StoreFileScanner> createFileScanners(
186       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
187     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
188       smallestReadPoint);
189   }
190 
191   protected long getSmallestReadPoint() {
192     return store.getSmallestReadPoint();
193   }
194 
195   /**
196    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
197    * @param request Compaction request.
198    * @param scanType Scan type.
199    * @param earliestPutTs Earliest put ts.
200    * @param scanners File scanners for compaction files.
201    * @return Scanner override by coprocessor; null if not overriding.
202    */
203   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
204       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
205     if (store.getCoprocessorHost() == null) return null;
206     return store.getCoprocessorHost()
207         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
208   }
209 
210   /**
211    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
212    * @param request Compaction request.
213    * @param scanType Scan type.
214    * @param scanner The default scanner created for compaction.
215    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
216    */
217    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
218       ScanType scanType, InternalScanner scanner) throws IOException {
219     if (store.getCoprocessorHost() == null) return scanner;
220     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
221   }
222 
223   /**
224    * Performs the compaction.
225    * @param scanner Where to read from.
226    * @param writer Where to write to.
227    * @param smallestReadPoint Smallest read point.
228    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
229    * @return Whether compaction ended; false if it was interrupted for some reason.
230    */
231   protected boolean performCompaction(InternalScanner scanner, CellSink writer,
232       long smallestReadPoint, boolean cleanSeqId,
233       CompactionThroughputController throughputController) throws IOException {
234     long bytesWritten = 0;
235     long bytesWrittenProgress = 0;
236     // Since scanner.next() can return 'false' but still be delivering data,
237     // we have to use a do/while loop.
238     List<Cell> cells = new ArrayList<Cell>();
239     long closeCheckInterval = HStore.getCloseCheckInterval();
240     long lastMillis = 0;
241     if (LOG.isDebugEnabled()) {
242       lastMillis = EnvironmentEdgeManager.currentTime();
243     }
244     String compactionName =
245         store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
246     long now = 0;
247     boolean hasMore;
248     throughputController.start(compactionName);
249     try {
250       do {
251         hasMore = scanner.next(cells, compactionKVMax);
252         if (LOG.isDebugEnabled()) {
253           now = EnvironmentEdgeManager.currentTime();
254         }
255         // output to writer:
256         for (Cell c : cells) {
257           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
258             CellUtil.setSequenceId(c, 0);
259           }
260           writer.append(c);
261           int len = KeyValueUtil.length(c);
262           ++progress.currentCompactedKVs;
263           progress.totalCompactedSize += len;
264           if (LOG.isDebugEnabled()) {
265             bytesWrittenProgress += len;
266           }
267           throughputController.control(compactionName, len);
268           // check periodically to see if a system stop is requested
269           if (closeCheckInterval > 0) {
270             bytesWritten += len;
271             if (bytesWritten > closeCheckInterval) {
272               bytesWritten = 0;
273               if (!store.areWritesEnabled()) {
274                 progress.cancel();
275                 return false;
276               }
277             }
278           }
279         }
280         // Log the progress of long running compactions every minute if
281         // logging at DEBUG level
282         if (LOG.isDebugEnabled()) {
283           if ((now - lastMillis) >= 60 * 1000) {
284             LOG.debug("Compaction progress: "
285                 + compactionName
286                 + " "
287                 + progress
288                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
289                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
290                 + throughputController);
291             lastMillis = now;
292             bytesWrittenProgress = 0;
293           }
294         }
295         cells.clear();
296       } while (hasMore);
297     } catch (InterruptedException e) {
298       progress.cancel();
299       throw new InterruptedIOException("Interrupted while control throughput of compacting "
300           + compactionName);
301     } finally {
302       throughputController.finish(compactionName);
303     }
304     progress.complete();
305     return true;
306   }
307 
308   /**
309    * @param store store
310    * @param scanners Store file scanners.
311    * @param scanType Scan type.
312    * @param smallestReadPoint Smallest MVCC read point.
313    * @param earliestPutTs Earliest put across all files.
314    * @return A compaction scanner.
315    */
316   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
317       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
318     Scan scan = new Scan();
319     scan.setMaxVersions(store.getFamily().getMaxVersions());
320     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
321         scanType, smallestReadPoint, earliestPutTs);
322   }
323 
324   /**
325    * @param store The store.
326    * @param scanners Store file scanners.
327    * @param smallestReadPoint Smallest MVCC read point.
328    * @param earliestPutTs Earliest put across all files.
329    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
330    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
331    * @return A compaction scanner.
332    */
333   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
334      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
335      byte[] dropDeletesToRow) throws IOException {
336     Scan scan = new Scan();
337     scan.setMaxVersions(store.getFamily().getMaxVersions());
338     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
339         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
340   }
341 }