View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.compress.Compression;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
41  import org.apache.hadoop.hbase.regionserver.HStore;
42  import org.apache.hadoop.hbase.regionserver.InternalScanner;
43  import org.apache.hadoop.hbase.regionserver.ScanType;
44  import org.apache.hadoop.hbase.regionserver.ScannerContext;
45  import org.apache.hadoop.hbase.regionserver.Store;
46  import org.apache.hadoop.hbase.regionserver.StoreFile;
47  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
48  import org.apache.hadoop.hbase.regionserver.StoreScanner;
49  import org.apache.hadoop.hbase.security.User;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
53  
54  /**
55   * A compactor is a compaction algorithm associated a given policy. Base class also contains
56   * reusable parts for implementing compactors (what is common and what isn't is evolving).
57   */
58  @InterfaceAudience.Private
59  public abstract class Compactor {
60    private static final Log LOG = LogFactory.getLog(Compactor.class);
61    protected CompactionProgress progress;
62    protected Configuration conf;
63    protected Store store;
64  
65    private int compactionKVMax;
66    protected Compression.Algorithm compactionCompression;
67    
68    /** specify how many days to keep MVCC values during major compaction **/ 
69    protected int keepSeqIdPeriod;
70  
71    //TODO: depending on Store is not good but, realistically, all compactors currently do.
72    Compactor(final Configuration conf, final Store store) {
73      this.conf = conf;
74      this.store = store;
75      this.compactionKVMax =
76        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
77      this.compactionCompression = (this.store.getFamily() == null) ?
78          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
79      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
80        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
81    }
82  
83    public interface CellSink {
84      void append(Cell cell) throws IOException;
85    }
86  
87    public CompactionProgress getProgress() {
88      return this.progress;
89    }
90  
91    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
92    protected static class FileDetails {
93      /** Maximum key count after compaction (for blooms) */
94      public long maxKeyCount = 0;
95      /** Earliest put timestamp if major compaction */
96      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
97      /** The last key in the files we're compacting. */
98      public long maxSeqId = 0;
99      /** Latest memstore read point found in any of the involved files */
100     public long maxMVCCReadpoint = 0;
101     /** Max tags length**/
102     public int maxTagsLength = 0;
103     /** Min SeqId to keep during a major compaction **/
104     public long minSeqIdToKeep = 0;
105   }
106 
107   /**
108    * Extracts some details about the files to compact that are commonly needed by compactors.
109    * @param filesToCompact Files.
110    * @param allFiles Whether all files are included for compaction
111    * @return The result.
112    */
113   protected FileDetails getFileDetails(
114       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
115     FileDetails fd = new FileDetails();
116     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
117       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
118 
119     for (StoreFile file : filesToCompact) {
120       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
121         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
122         // MVCC value to keep
123         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
124           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
125         }
126       }
127       long seqNum = file.getMaxSequenceId();
128       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
129       StoreFile.Reader r = file.getReader();
130       if (r == null) {
131         LOG.warn("Null reader for " + file.getPath());
132         continue;
133       }
134       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
135       // blooms can cause progress to be miscalculated or if the user switches bloom
136       // type (e.g. from ROW to ROWCOL)
137       long keyCount = r.getEntries();
138       fd.maxKeyCount += keyCount;
139       // calculate the latest MVCC readpoint in any of the involved store files
140       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
141       byte tmp[] = null;
142       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
143       // SeqId number.
144       if (r.isBulkLoaded()) {
145         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
146       }
147       else {
148         tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
149         if (tmp != null) {
150           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
151         }
152       }
153       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
154       if (tmp != null) {
155         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
156       }
157       // If required, calculate the earliest put timestamp of all involved storefiles.
158       // This is used to remove family delete marker during compaction.
159       long earliestPutTs = 0;
160       if (allFiles) {
161         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
162         if (tmp == null) {
163           // There's a file with no information, must be an old one
164           // assume we have very old puts
165           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
166         } else {
167           earliestPutTs = Bytes.toLong(tmp);
168           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
169         }
170       }
171       if (LOG.isDebugEnabled()) {
172         LOG.debug("Compacting " + file +
173           ", keycount=" + keyCount +
174           ", bloomtype=" + r.getBloomFilterType().toString() +
175           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
176           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
177           ", compression=" + compactionCompression +
178           ", seqNum=" + seqNum +
179           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
180       }
181     }
182     return fd;
183   }
184 
185   /**
186    * Creates file scanners for compaction.
187    * @param filesToCompact Files.
188    * @return Scanners.
189    */
190   protected List<StoreFileScanner> createFileScanners(
191       final Collection<StoreFile> filesToCompact,
192       long smallestReadPoint,
193       boolean useDropBehind) throws IOException {
194     return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
195         /* cache blocks = */ false,
196         /* use pread = */ false,
197         /* is compaction */ true,
198         /* use Drop Behind */ useDropBehind,
199       smallestReadPoint);
200   }
201 
202   protected long getSmallestReadPoint() {
203     return store.getSmallestReadPoint();
204   }
205 
206   /**
207    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
208    * @param request Compaction request.
209    * @param scanType Scan type.
210    * @param earliestPutTs Earliest put ts.
211    * @param scanners File scanners for compaction files.
212    * @return Scanner override by coprocessor; null if not overriding.
213    */
214   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
215       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
216     return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
217   }
218 
219   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
220       final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
221       User user) throws IOException {
222     if (store.getCoprocessorHost() == null) return null;
223     if (user == null) {
224       return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
225         earliestPutTs, request);
226     } else {
227       try {
228         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
229           @Override
230           public InternalScanner run() throws Exception {
231             return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
232               scanType, earliestPutTs, request);
233           }
234         });
235       } catch (InterruptedException ie) {
236         InterruptedIOException iioe = new InterruptedIOException();
237         iioe.initCause(ie);
238         throw iioe;
239       }
240     }
241   }
242 
243   /**
244    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
245    * @param request Compaction request.
246    * @param scanType Scan type.
247    * @param scanner The default scanner created for compaction.
248    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
249    */
250    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
251       final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
252      if (store.getCoprocessorHost() == null) return scanner;
253      if (user == null) {
254        return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
255      } else {
256        try {
257          return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
258            @Override
259            public InternalScanner run() throws Exception {
260              return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
261            }
262          });
263        } catch (InterruptedException ie) {
264          InterruptedIOException iioe = new InterruptedIOException();
265          iioe.initCause(ie);
266          throw iioe;
267        }
268      }
269   }
270 
271   /**
272    * Used to prevent compaction name conflict when multiple compactions running parallel on the
273    * same store.
274    */
275   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
276 
277   private String generateCompactionName() {
278     int counter;
279     for (;;) {
280       counter = NAME_COUNTER.get();
281       int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
282       if (NAME_COUNTER.compareAndSet(counter, next)) {
283         break;
284       }
285     }
286     return store.getRegionInfo().getRegionNameAsString() + "#"
287         + store.getFamily().getNameAsString() + "#" + counter;
288   }
289   /**
290    * Performs the compaction.
291    * @param scanner Where to read from.
292    * @param writer Where to write to.
293    * @param smallestReadPoint Smallest read point.
294    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= smallestReadPoint
295    * @return Whether compaction ended; false if it was interrupted for some reason.
296    */
297   protected boolean performCompaction(InternalScanner scanner, CellSink writer,
298       long smallestReadPoint, boolean cleanSeqId,
299       CompactionThroughputController throughputController) throws IOException {
300     long bytesWritten = 0;
301     long bytesWrittenProgress = 0;
302     // Since scanner.next() can return 'false' but still be delivering data,
303     // we have to use a do/while loop.
304     List<Cell> cells = new ArrayList<Cell>();
305     long closeCheckInterval = HStore.getCloseCheckInterval();
306     long lastMillis = 0;
307     if (LOG.isDebugEnabled()) {
308       lastMillis = EnvironmentEdgeManager.currentTime();
309     }
310     String compactionName = generateCompactionName();
311     long now = 0;
312     boolean hasMore;
313     ScannerContext scannerContext =
314         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
315 
316     throughputController.start(compactionName);
317     try {
318       do {
319         hasMore = scanner.next(cells, scannerContext);
320         if (LOG.isDebugEnabled()) {
321           now = EnvironmentEdgeManager.currentTime();
322         }
323         // output to writer:
324         Cell lastCleanCell = null;
325         long lastCleanCellSeqId = 0;
326         for (Cell c : cells) {
327           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
328             lastCleanCell = c;
329             lastCleanCellSeqId = c.getSequenceId();
330             CellUtil.setSequenceId(c, 0);
331           } else {
332             lastCleanCell = null;
333             lastCleanCellSeqId = 0;
334           }
335           writer.append(c);
336           int len = KeyValueUtil.length(c);
337           ++progress.currentCompactedKVs;
338           progress.totalCompactedSize += len;
339           if (LOG.isDebugEnabled()) {
340             bytesWrittenProgress += len;
341           }
342           throughputController.control(compactionName, len);
343           // check periodically to see if a system stop is requested
344           if (closeCheckInterval > 0) {
345             bytesWritten += len;
346             if (bytesWritten > closeCheckInterval) {
347               bytesWritten = 0;
348               if (!store.areWritesEnabled()) {
349                 progress.cancel();
350                 return false;
351               }
352             }
353           }
354         }
355         if (lastCleanCell != null) {
356           // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
357           CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
358         }
359         // Log the progress of long running compactions every minute if
360         // logging at DEBUG level
361         if (LOG.isDebugEnabled()) {
362           if ((now - lastMillis) >= 60 * 1000) {
363             LOG.debug("Compaction progress: "
364                 + compactionName
365                 + " "
366                 + progress
367                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
368                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
369                 + throughputController);
370             lastMillis = now;
371             bytesWrittenProgress = 0;
372           }
373         }
374         cells.clear();
375       } while (hasMore);
376     } catch (InterruptedException e) {
377       progress.cancel();
378       throw new InterruptedIOException("Interrupted while control throughput of compacting "
379           + compactionName);
380     } finally {
381       throughputController.finish(compactionName);
382     }
383     progress.complete();
384     return true;
385   }
386 
387   /**
388    * @param store store
389    * @param scanners Store file scanners.
390    * @param scanType Scan type.
391    * @param smallestReadPoint Smallest MVCC read point.
392    * @param earliestPutTs Earliest put across all files.
393    * @return A compaction scanner.
394    */
395   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
396       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
397     Scan scan = new Scan();
398     scan.setMaxVersions(store.getFamily().getMaxVersions());
399     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
400         scanType, smallestReadPoint, earliestPutTs);
401   }
402 
403   /**
404    * @param store The store.
405    * @param scanners Store file scanners.
406    * @param smallestReadPoint Smallest MVCC read point.
407    * @param earliestPutTs Earliest put across all files.
408    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
409    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
410    * @return A compaction scanner.
411    */
412   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
413      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
414      byte[] dropDeletesToRow) throws IOException {
415     Scan scan = new Scan();
416     scan.setMaxVersions(store.getFamily().getMaxVersions());
417     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
418         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
419   }
420 }