View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValueUtil;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.io.compress.Compression;
38  import org.apache.hadoop.hbase.io.hfile.HFile;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.regionserver.HStore;
41  import org.apache.hadoop.hbase.regionserver.InternalScanner;
42  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
43  import org.apache.hadoop.hbase.regionserver.ScanType;
44  import org.apache.hadoop.hbase.regionserver.ScannerContext;
45  import org.apache.hadoop.hbase.regionserver.Store;
46  import org.apache.hadoop.hbase.regionserver.StoreFile;
47  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
48  import org.apache.hadoop.hbase.regionserver.StoreScanner;
49  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
50  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
51  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
52  import org.apache.hadoop.hbase.security.User;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.Writables;
56  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
57  
58  /**
59   * A compactor is a compaction algorithm associated a given policy. Base class also contains
60   * reusable parts for implementing compactors (what is common and what isn't is evolving).
61   */
62  @InterfaceAudience.Private
63  public abstract class Compactor {
64    private static final Log LOG = LogFactory.getLog(Compactor.class);
65    private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
66    protected CompactionProgress progress;
67    protected Configuration conf;
68    protected Store store;
69  
70    protected int compactionKVMax;
71    protected Compression.Algorithm compactionCompression;
72  
73    /** specify how many days to keep MVCC values during major compaction **/ 
74    protected int keepSeqIdPeriod;
75  
76    //TODO: depending on Store is not good but, realistically, all compactors currently do.
77    Compactor(final Configuration conf, final Store store) {
78      this.conf = conf;
79      this.store = store;
80      this.compactionKVMax =
81        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
82      this.compactionCompression = (this.store.getFamily() == null) ?
83          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompressionType();
84      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
85        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
86    }
87  
88    public interface CellSink {
89      void append(Cell cell) throws IOException;
90    }
91  
92    public CompactionProgress getProgress() {
93      return this.progress;
94    }
95  
96    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
97    protected static class FileDetails {
98      /** Maximum key count after compaction (for blooms) */
99      public long maxKeyCount = 0;
100     /** Earliest put timestamp if major compaction */
101     public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
102     /** Latest put timestamp */
103     public long latestPutTs = HConstants.LATEST_TIMESTAMP;
104     /** The last key in the files we're compacting. */
105     public long maxSeqId = 0;
106     /** Latest memstore read point found in any of the involved files */
107     public long maxMVCCReadpoint = 0;
108     /** Max tags length**/
109     public int maxTagsLength = 0;
110     /** Min SeqId to keep during a major compaction **/
111     public long minSeqIdToKeep = 0;
112   }
113 
114   /**
115    * Extracts some details about the files to compact that are commonly needed by compactors.
116    * @param filesToCompact Files.
117    * @param allFiles Whether all files are included for compaction
118    * @return The result.
119    */
120   protected FileDetails getFileDetails(
121       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
122     FileDetails fd = new FileDetails();
123     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
124       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
125 
126     for (StoreFile file : filesToCompact) {
127       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
128         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
129         // MVCC value to keep
130         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
131           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
132         }
133       }
134       long seqNum = file.getMaxSequenceId();
135       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
136       StoreFile.Reader r = file.getReader();
137       if (r == null) {
138         LOG.warn("Null reader for " + file.getPath());
139         continue;
140       }
141       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
142       // blooms can cause progress to be miscalculated or if the user switches bloom
143       // type (e.g. from ROW to ROWCOL)
144       long keyCount = r.getEntries();
145       fd.maxKeyCount += keyCount;
146       // calculate the latest MVCC readpoint in any of the involved store files
147       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
148       byte tmp[] = null;
149       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
150       // SeqId number.
151       if (r.isBulkLoaded()) {
152         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
153       }
154       else {
155         tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
156         if (tmp != null) {
157           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
158         }
159       }
160       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
161       if (tmp != null) {
162         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
163       }
164       // If required, calculate the earliest put timestamp of all involved storefiles.
165       // This is used to remove family delete marker during compaction.
166       long earliestPutTs = 0;
167       if (allFiles) {
168         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
169         if (tmp == null) {
170           // There's a file with no information, must be an old one
171           // assume we have very old puts
172           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
173         } else {
174           earliestPutTs = Bytes.toLong(tmp);
175           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
176         }
177       }
178       tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
179       TimeRangeTracker trt = new TimeRangeTracker();
180       if (tmp == null) {
181         fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
182       } else {
183         Writables.copyWritable(tmp, trt);
184         fd.latestPutTs = trt.getMaximumTimestamp();
185       }
186       if (LOG.isDebugEnabled()) {
187         LOG.debug("Compacting " + file +
188           ", keycount=" + keyCount +
189           ", bloomtype=" + r.getBloomFilterType().toString() +
190           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
191           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
192           ", seqNum=" + seqNum +
193           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
194       }
195     }
196     return fd;
197   }
198 
199   /**
200    * Creates file scanners for compaction.
201    * @param filesToCompact Files.
202    * @return Scanners.
203    */
204   protected List<StoreFileScanner> createFileScanners(
205       final Collection<StoreFile> filesToCompact,
206       long smallestReadPoint,
207       boolean useDropBehind) throws IOException {
208     return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
209         /* cache blocks = */ false,
210         /* use pread = */ false,
211         /* is compaction */ true,
212         /* use Drop Behind */ useDropBehind,
213       smallestReadPoint);
214   }
215 
216   protected long getSmallestReadPoint() {
217     return store.getSmallestReadPoint();
218   }
219 
220   /**
221    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
222    * @param request Compaction request.
223    * @param scanType Scan type.
224    * @param earliestPutTs Earliest put ts.
225    * @param scanners File scanners for compaction files.
226    * @return Scanner override by coprocessor; null if not overriding.
227    */
228   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
229       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
230     return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
231   }
232 
233   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
234       final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
235       User user) throws IOException {
236     if (store.getCoprocessorHost() == null) return null;
237     if (user == null) {
238       return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
239         earliestPutTs, request);
240     } else {
241       try {
242         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
243           @Override
244           public InternalScanner run() throws Exception {
245             return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
246               scanType, earliestPutTs, request);
247           }
248         });
249       } catch (InterruptedException ie) {
250         InterruptedIOException iioe = new InterruptedIOException();
251         iioe.initCause(ie);
252         throw iioe;
253       }
254     }
255   }
256 
257   /**
258    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
259    * @param request Compaction request.
260    * @param scanType Scan type.
261    * @param scanner The default scanner created for compaction.
262    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
263    */
264    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
265       final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
266      if (store.getCoprocessorHost() == null) return scanner;
267      if (user == null) {
268        return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
269      } else {
270        try {
271          return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
272            @Override
273            public InternalScanner run() throws Exception {
274              return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
275            }
276          });
277        } catch (InterruptedException ie) {
278          InterruptedIOException iioe = new InterruptedIOException();
279          iioe.initCause(ie);
280          throw iioe;
281        }
282      }
283   }
284 
285   /**
286    * Performs the compaction.
287    * @param fd FileDetails of cell sink writer
288    * @param scanner Where to read from.
289    * @param writer Where to write to.
290    * @param smallestReadPoint Smallest read point.
291    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
292    * @param major Is a major compaction.
293    * @return Whether compaction ended; false if it was interrupted for some reason.
294    */
295   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
296       long smallestReadPoint, boolean cleanSeqId,
297       ThroughputController throughputController, boolean major) throws IOException {
298     long bytesWrittenProgressForCloseCheck = 0;
299     long bytesWrittenProgressForLog = 0;
300     long bytesWrittenProgressForShippedCall = 0;
301     // Since scanner.next() can return 'false' but still be delivering data,
302     // we have to use a do/while loop.
303     List<Cell> cells = new ArrayList<Cell>();
304     long closeCheckSizeLimit = HStore.getCloseCheckInterval();
305     long lastMillis = 0;
306     if (LOG.isDebugEnabled()) {
307       lastMillis = EnvironmentEdgeManager.currentTime();
308     }
309     String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
310     long now = 0;
311     boolean hasMore;
312     ScannerContext scannerContext =
313         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
314 
315     throughputController.start(compactionName);
316     KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
317     long minFilesToCompact = Math.max(2L,
318         conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
319             /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
320     long shippedCallSizeLimit = (long) minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
321     try {
322       do {
323         hasMore = scanner.next(cells, scannerContext);
324         if (LOG.isDebugEnabled()) {
325           now = EnvironmentEdgeManager.currentTime();
326         }
327         // output to writer:
328         for (Cell c : cells) {
329           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
330             CellUtil.setSequenceId(c, 0);
331           }
332           writer.append(c);
333           int len = KeyValueUtil.length(c);
334           ++progress.currentCompactedKVs;
335           progress.totalCompactedSize += len;
336           bytesWrittenProgressForShippedCall += len;
337           if (LOG.isDebugEnabled()) {
338             bytesWrittenProgressForLog += len;
339           }
340           throughputController.control(compactionName, len);
341           // check periodically to see if a system stop is requested
342           if (closeCheckSizeLimit > 0) {
343             bytesWrittenProgressForCloseCheck += len;
344             if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
345               bytesWrittenProgressForCloseCheck = 0;
346               if (!store.areWritesEnabled()) {
347                 progress.cancel();
348                 return false;
349               }
350             }
351           }
352           if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
353             // The SHARED block references, being read for compaction, will be kept in prevBlocks
354             // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
355             // being returned to client, we will call shipped() which can clear this list. Here by
356             // we are doing the similar thing. In between the compaction (after every N cells
357             // written with collective size of 'shippedCallSizeLimit') we will call shipped which
358             // may clear prevBlocks list.
359             kvs.shipped();
360             bytesWrittenProgressForShippedCall = 0;
361           }
362         }
363         // Log the progress of long running compactions every minute if
364         // logging at DEBUG level
365         if (LOG.isDebugEnabled()) {
366           if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
367             LOG.debug("Compaction progress: "
368                 + compactionName
369                 + " "
370                 + progress
371                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
372                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
373                 + throughputController);
374             lastMillis = now;
375             bytesWrittenProgressForLog = 0;
376           }
377         }
378         cells.clear();
379       } while (hasMore);
380     } catch (InterruptedException e) {
381       progress.cancel();
382       throw new InterruptedIOException("Interrupted while control throughput of compacting "
383           + compactionName);
384     } finally {
385       throughputController.finish(compactionName);
386     }
387     progress.complete();
388     return true;
389   }
390 
391   /**
392    * @param store store
393    * @param scanners Store file scanners.
394    * @param scanType Scan type.
395    * @param smallestReadPoint Smallest MVCC read point.
396    * @param earliestPutTs Earliest put across all files.
397    * @return A compaction scanner.
398    */
399   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
400       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
401     Scan scan = new Scan();
402     scan.setMaxVersions(store.getFamily().getMaxVersions());
403     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
404         scanType, smallestReadPoint, earliestPutTs);
405   }
406 
407   /**
408    * @param store The store.
409    * @param scanners Store file scanners.
410    * @param smallestReadPoint Smallest MVCC read point.
411    * @param earliestPutTs Earliest put across all files.
412    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
413    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
414    * @return A compaction scanner.
415    */
416   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
417      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
418      byte[] dropDeletesToRow) throws IOException {
419     Scan scan = new Scan();
420     scan.setMaxVersions(store.getFamily().getMaxVersions());
421     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
422         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
423   }
424 
425   /**
426    * Appends the metadata and closes the writer.
427    * @param writer The current store writer.
428    * @param fd The file details.
429    * @param isMajor Is a major compaction.
430    * @throws IOException
431    */
432   protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
433       boolean isMajor) throws IOException {
434     writer.appendMetadata(fd.maxSeqId, isMajor);
435     writer.close();
436   }
437 }