View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValueUtil;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.io.compress.Compression;
38  import org.apache.hadoop.hbase.io.hfile.HFile;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.regionserver.HStore;
41  import org.apache.hadoop.hbase.regionserver.InternalScanner;
42  import org.apache.hadoop.hbase.regionserver.ScanType;
43  import org.apache.hadoop.hbase.regionserver.ScannerContext;
44  import org.apache.hadoop.hbase.regionserver.Store;
45  import org.apache.hadoop.hbase.regionserver.StoreFile;
46  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
47  import org.apache.hadoop.hbase.regionserver.StoreScanner;
48  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.Writables;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
53  
54  /**
55   * A compactor is a compaction algorithm associated a given policy. Base class also contains
56   * reusable parts for implementing compactors (what is common and what isn't is evolving).
57   */
58  @InterfaceAudience.Private
59  public abstract class Compactor {
60    private static final Log LOG = LogFactory.getLog(Compactor.class);
61    protected CompactionProgress progress;
62    protected Configuration conf;
63    protected Store store;
64  
65    protected int compactionKVMax;
66    protected Compression.Algorithm compactionCompression;
67    
68    /** specify how many days to keep MVCC values during major compaction **/ 
69    protected int keepSeqIdPeriod;
70  
71    //TODO: depending on Store is not good but, realistically, all compactors currently do.
72    Compactor(final Configuration conf, final Store store) {
73      this.conf = conf;
74      this.store = store;
75      this.compactionKVMax =
76        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
77      this.compactionCompression = (this.store.getFamily() == null) ?
78          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompressionType();
79      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
80        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
81    }
82  
83    public interface CellSink {
84      void append(Cell cell) throws IOException;
85    }
86  
87    public CompactionProgress getProgress() {
88      return this.progress;
89    }
90  
91    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
92    protected static class FileDetails {
93      /** Maximum key count after compaction (for blooms) */
94      public long maxKeyCount = 0;
95      /** Earliest put timestamp if major compaction */
96      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
97      /** Latest put timestamp */
98      public long latestPutTs = HConstants.LATEST_TIMESTAMP;
99      /** The last key in the files we're compacting. */
100     public long maxSeqId = 0;
101     /** Latest memstore read point found in any of the involved files */
102     public long maxMVCCReadpoint = 0;
103     /** Max tags length**/
104     public int maxTagsLength = 0;
105     /** Min SeqId to keep during a major compaction **/
106     public long minSeqIdToKeep = 0;
107   }
108 
109   /**
110    * Extracts some details about the files to compact that are commonly needed by compactors.
111    * @param filesToCompact Files.
112    * @param allFiles Whether all files are included for compaction
113    * @return The result.
114    */
115   protected FileDetails getFileDetails(
116       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
117     FileDetails fd = new FileDetails();
118     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
119       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
120 
121     for (StoreFile file : filesToCompact) {
122       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
123         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
124         // MVCC value to keep
125         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
126           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
127         }
128       }
129       long seqNum = file.getMaxSequenceId();
130       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
131       StoreFile.Reader r = file.getReader();
132       if (r == null) {
133         LOG.warn("Null reader for " + file.getPath());
134         continue;
135       }
136       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
137       // blooms can cause progress to be miscalculated or if the user switches bloom
138       // type (e.g. from ROW to ROWCOL)
139       long keyCount = r.getEntries();
140       fd.maxKeyCount += keyCount;
141       // calculate the latest MVCC readpoint in any of the involved store files
142       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
143       byte tmp[] = null;
144       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
145       // SeqId number.
146       if (r.isBulkLoaded()) {
147         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
148       }
149       else {
150         tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
151         if (tmp != null) {
152           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
153         }
154       }
155       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
156       if (tmp != null) {
157         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
158       }
159       // If required, calculate the earliest put timestamp of all involved storefiles.
160       // This is used to remove family delete marker during compaction.
161       long earliestPutTs = 0;
162       if (allFiles) {
163         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
164         if (tmp == null) {
165           // There's a file with no information, must be an old one
166           // assume we have very old puts
167           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
168         } else {
169           earliestPutTs = Bytes.toLong(tmp);
170           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
171         }
172       }
173       tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
174       TimeRangeTracker trt = new TimeRangeTracker();
175       if (tmp == null) {
176         fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
177       } else {
178         Writables.copyWritable(tmp, trt);
179         fd.latestPutTs = trt.getMaximumTimestamp();
180       }
181       if (LOG.isDebugEnabled()) {
182         LOG.debug("Compacting " + file +
183           ", keycount=" + keyCount +
184           ", bloomtype=" + r.getBloomFilterType().toString() +
185           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
186           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
187           ", seqNum=" + seqNum +
188           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
189       }
190     }
191     return fd;
192   }
193 
194   /**
195    * Creates file scanners for compaction.
196    * @param filesToCompact Files.
197    * @return Scanners.
198    */
199   protected List<StoreFileScanner> createFileScanners(
200       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
201     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
202       smallestReadPoint);
203   }
204 
205   protected long getSmallestReadPoint() {
206     return store.getSmallestReadPoint();
207   }
208 
209   /**
210    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
211    * @param request Compaction request.
212    * @param scanType Scan type.
213    * @param earliestPutTs Earliest put ts.
214    * @param scanners File scanners for compaction files.
215    * @return Scanner override by coprocessor; null if not overriding.
216    */
217   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
218       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
219     if (store.getCoprocessorHost() == null) return null;
220     return store.getCoprocessorHost()
221         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
222   }
223 
224   /**
225    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
226    * @param request Compaction request.
227    * @param scanType Scan type.
228    * @param scanner The default scanner created for compaction.
229    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
230    */
231    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
232       ScanType scanType, InternalScanner scanner) throws IOException {
233     if (store.getCoprocessorHost() == null) return scanner;
234     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
235   }
236 
237   /**
238    * Used to prevent compaction name conflict when multiple compactions running parallel on the
239    * same store.
240    */
241   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
242 
243   private String generateCompactionName() {
244     int counter;
245     for (;;) {
246       counter = NAME_COUNTER.get();
247       int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
248       if (NAME_COUNTER.compareAndSet(counter, next)) {
249         break;
250       }
251     }
252     return store.getRegionInfo().getRegionNameAsString() + "#"
253         + store.getFamily().getNameAsString() + "#" + counter;
254   }
255 
256   /**
257    * Performs the compaction.
258    * @param fd FileDetails of cell sink writer
259    * @param scanner Where to read from.
260    * @param writer Where to write to.
261    * @param smallestReadPoint Smallest read point.
262    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
263    * @param major Is a major compaction.
264    * @return Whether compaction ended; false if it was interrupted for some reason.
265    */
266   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
267       long smallestReadPoint, boolean cleanSeqId,
268       CompactionThroughputController throughputController, boolean major) throws IOException {
269     long bytesWritten = 0;
270     long bytesWrittenProgress = 0;
271     // Since scanner.next() can return 'false' but still be delivering data,
272     // we have to use a do/while loop.
273     List<Cell> cells = new ArrayList<Cell>();
274     long closeCheckInterval = HStore.getCloseCheckInterval();
275     long lastMillis = 0;
276     if (LOG.isDebugEnabled()) {
277       lastMillis = EnvironmentEdgeManager.currentTime();
278     }
279     String compactionName = generateCompactionName();
280     long now = 0;
281     boolean hasMore;
282     ScannerContext scannerContext =
283         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
284 
285     throughputController.start(compactionName);
286     try {
287       do {
288         hasMore = scanner.next(cells, scannerContext);
289         if (LOG.isDebugEnabled()) {
290           now = EnvironmentEdgeManager.currentTime();
291         }
292         // output to writer:
293         for (Cell c : cells) {
294           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
295             CellUtil.setSequenceId(c, 0);
296           }
297           writer.append(c);
298           int len = KeyValueUtil.length(c);
299           ++progress.currentCompactedKVs;
300           progress.totalCompactedSize += len;
301           if (LOG.isDebugEnabled()) {
302             bytesWrittenProgress += len;
303           }
304           throughputController.control(compactionName, len);
305           // check periodically to see if a system stop is requested
306           if (closeCheckInterval > 0) {
307             bytesWritten += len;
308             if (bytesWritten > closeCheckInterval) {
309               bytesWritten = 0;
310               if (!store.areWritesEnabled()) {
311                 progress.cancel();
312                 return false;
313               }
314             }
315           }
316         }
317         // Log the progress of long running compactions every minute if
318         // logging at DEBUG level
319         if (LOG.isDebugEnabled()) {
320           if ((now - lastMillis) >= 60 * 1000) {
321             LOG.debug("Compaction progress: "
322                 + compactionName
323                 + " "
324                 + progress
325                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
326                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
327                 + throughputController);
328             lastMillis = now;
329             bytesWrittenProgress = 0;
330           }
331         }
332         cells.clear();
333       } while (hasMore);
334     } catch (InterruptedException e) {
335       progress.cancel();
336       throw new InterruptedIOException("Interrupted while control throughput of compacting "
337           + compactionName);
338     } finally {
339       throughputController.finish(compactionName);
340     }
341     progress.complete();
342     return true;
343   }
344 
345   /**
346    * @param store store
347    * @param scanners Store file scanners.
348    * @param scanType Scan type.
349    * @param smallestReadPoint Smallest MVCC read point.
350    * @param earliestPutTs Earliest put across all files.
351    * @return A compaction scanner.
352    */
353   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
354       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
355     Scan scan = new Scan();
356     scan.setMaxVersions(store.getFamily().getMaxVersions());
357     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
358         scanType, smallestReadPoint, earliestPutTs);
359   }
360 
361   /**
362    * @param store The store.
363    * @param scanners Store file scanners.
364    * @param smallestReadPoint Smallest MVCC read point.
365    * @param earliestPutTs Earliest put across all files.
366    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
367    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
368    * @return A compaction scanner.
369    */
370   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
371      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
372      byte[] dropDeletesToRow) throws IOException {
373     Scan scan = new Scan();
374     scan.setMaxVersions(store.getFamily().getMaxVersions());
375     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
376         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
377   }
378 
379   /**
380    * Appends the metadata and closes the writer.
381    * @param writer The current store writer.
382    * @param fd The file details.
383    * @param isMajor Is a major compaction.
384    * @throws IOException
385    */
386   protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
387       boolean isMajor) throws IOException {
388     writer.appendMetadata(fd.maxSeqId, isMajor);
389     writer.close();
390   }
391 }