View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.io.compress.Compression;
32  import org.apache.hadoop.hbase.regionserver.InternalScanner;
33  import org.apache.hadoop.hbase.regionserver.ScanType;
34  import org.apache.hadoop.hbase.regionserver.Store;
35  import org.apache.hadoop.hbase.regionserver.StoreFile;
36  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37  import org.apache.hadoop.hbase.regionserver.StoreScanner;
38  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
39  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
40  import org.apache.hadoop.hbase.security.User;
41  import org.apache.hadoop.hbase.util.Bytes;
42  
43  /**
44   * This is the placeholder for stripe compactor. The implementation,
45   * as well as the proper javadoc, will be added in HBASE-7967.
46   */
47  @InterfaceAudience.Private
48  public class StripeCompactor extends Compactor {
49    private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
50    public StripeCompactor(Configuration conf, Store store) {
51      super(conf, store);
52    }
53  
54    public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
55      byte[] majorRangeFromRow, byte[] majorRangeToRow,
56      CompactionThroughputController throughputController) throws IOException {
57      return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
58        throughputController, null);
59    }
60  
61    public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
62        byte[] majorRangeFromRow, byte[] majorRangeToRow,
63        CompactionThroughputController throughputController, User user) throws IOException {
64      if (LOG.isDebugEnabled()) {
65        StringBuilder sb = new StringBuilder();
66        sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
67        for (byte[] tb : targetBoundaries) {
68          sb.append(" [").append(Bytes.toString(tb)).append("]");
69        }
70        LOG.debug(sb.toString());
71      }
72      StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
73          targetBoundaries, majorRangeFromRow, majorRangeToRow);
74      return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
75        throughputController, user);
76    }
77  
78    public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
79      byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
80      CompactionThroughputController throughputController) throws IOException {
81      return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
82        majorRangeToRow, throughputController, null);
83    }
84  
85    public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
86        byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
87        CompactionThroughputController throughputController, User user) throws IOException {
88      if (LOG.isDebugEnabled()) {
89        LOG.debug("Executing compaction with " + targetSize
90            + " target file size, no more than " + targetCount + " files, in ["
91            + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
92      }
93      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
94          targetCount, targetSize, left, right);
95      return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
96        throughputController, user);
97    }
98  
99    private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
100       byte[] majorRangeFromRow, byte[] majorRangeToRow,
101       CompactionThroughputController throughputController, User user) throws IOException {
102     final Collection<StoreFile> filesToCompact = request.getFiles();
103     final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
104     this.progress = new CompactionProgress(fd.maxKeyCount);
105 
106     long smallestReadPoint = getSmallestReadPoint();
107     List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
108         smallestReadPoint, store.throttleCompaction(request.getSize()));
109 
110     boolean finished = false;
111     InternalScanner scanner = null;
112     boolean cleanSeqId = false;
113     try {
114       // Get scanner to use.
115       ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
116       scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
117       if (scanner == null) {
118         scanner = (majorRangeFromRow == null)
119             ? createScanner(store, scanners,
120                 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
121             : createScanner(store, scanners,
122                 smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
123       }
124       scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
125       if (scanner == null) {
126         // NULL scanner returned from coprocessor hooks means skip normal processing.
127         return new ArrayList<Path>();
128       }
129 
130       // Create the writer factory for compactions.
131       if(fd.minSeqIdToKeep > 0) {
132         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
133         cleanSeqId = true;
134       }
135 
136       final boolean needMvcc = fd.maxMVCCReadpoint > 0;
137 
138       final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
139       StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
140         @Override
141         public Writer createWriter() throws IOException {
142           return store.createWriterInTmp(
143               fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
144               store.throttleCompaction(request.getSize()));
145         }
146       };
147 
148       // Prepare multi-writer, and perform the compaction using scanner and writer.
149       // It is ok here if storeScanner is null.
150       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
151       mw.init(storeScanner, factory, store.getComparator());
152       finished =
153           performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
154       if (!finished) {
155         throw new InterruptedIOException( "Aborting compaction of store " + store +
156             " in region " + store.getRegionInfo().getRegionNameAsString() +
157             " because it was interrupted.");
158       }
159     } finally {
160       if (scanner != null) {
161         try {
162           scanner.close();
163         } catch (Throwable t) {
164           // Don't fail the compaction if this fails.
165           LOG.error("Failed to close scanner after compaction.", t);
166         }
167       }
168       if (!finished) {
169         for (Path leftoverFile : mw.abortWriters()) {
170           try {
171             store.getFileSystem().delete(leftoverFile, false);
172           } catch (Exception ex) {
173             LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
174           }
175         }
176       }
177     }
178 
179     assert finished : "We should have exited the method on all error paths";
180     List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
181     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
182     return newFiles;
183   }
184 }