1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.io.compress.Compression;
32 import org.apache.hadoop.hbase.regionserver.InternalScanner;
33 import org.apache.hadoop.hbase.regionserver.ScanType;
34 import org.apache.hadoop.hbase.regionserver.Store;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37 import org.apache.hadoop.hbase.regionserver.StoreScanner;
38 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
39 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.util.Bytes;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class StripeCompactor extends Compactor {
49 private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
50 public StripeCompactor(Configuration conf, Store store) {
51 super(conf, store);
52 }
53
54 public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
55 byte[] majorRangeFromRow, byte[] majorRangeToRow,
56 CompactionThroughputController throughputController) throws IOException {
57 return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
58 throughputController, null);
59 }
60
61 public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
62 byte[] majorRangeFromRow, byte[] majorRangeToRow,
63 CompactionThroughputController throughputController, User user) throws IOException {
64 if (LOG.isDebugEnabled()) {
65 StringBuilder sb = new StringBuilder();
66 sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
67 for (byte[] tb : targetBoundaries) {
68 sb.append(" [").append(Bytes.toString(tb)).append("]");
69 }
70 LOG.debug(sb.toString());
71 }
72 StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
73 targetBoundaries, majorRangeFromRow, majorRangeToRow);
74 return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
75 throughputController, user);
76 }
77
78 public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
79 byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
80 CompactionThroughputController throughputController) throws IOException {
81 return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
82 majorRangeToRow, throughputController, null);
83 }
84
85 public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
86 byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
87 CompactionThroughputController throughputController, User user) throws IOException {
88 if (LOG.isDebugEnabled()) {
89 LOG.debug("Executing compaction with " + targetSize
90 + " target file size, no more than " + targetCount + " files, in ["
91 + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
92 }
93 StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
94 targetCount, targetSize, left, right);
95 return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
96 throughputController, user);
97 }
98
99 private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
100 byte[] majorRangeFromRow, byte[] majorRangeToRow,
101 CompactionThroughputController throughputController, User user) throws IOException {
102 final Collection<StoreFile> filesToCompact = request.getFiles();
103 final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
104 this.progress = new CompactionProgress(fd.maxKeyCount);
105
106 long smallestReadPoint = getSmallestReadPoint();
107 List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
108 smallestReadPoint, store.throttleCompaction(request.getSize()));
109
110 boolean finished = false;
111 InternalScanner scanner = null;
112 boolean cleanSeqId = false;
113 try {
114
115 ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
116 scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
117 if (scanner == null) {
118 scanner = (majorRangeFromRow == null)
119 ? createScanner(store, scanners,
120 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
121 : createScanner(store, scanners,
122 smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
123 }
124 scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
125 if (scanner == null) {
126
127 return new ArrayList<Path>();
128 }
129
130
131 if(fd.minSeqIdToKeep > 0) {
132 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
133 cleanSeqId = true;
134 }
135
136 final boolean needMvcc = fd.maxMVCCReadpoint > 0;
137
138 final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
139 StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
140 @Override
141 public Writer createWriter() throws IOException {
142 return store.createWriterInTmp(
143 fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
144 store.throttleCompaction(request.getSize()));
145 }
146 };
147
148
149
150 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
151 mw.init(storeScanner, factory, store.getComparator());
152 finished =
153 performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
154 if (!finished) {
155 throw new InterruptedIOException( "Aborting compaction of store " + store +
156 " in region " + store.getRegionInfo().getRegionNameAsString() +
157 " because it was interrupted.");
158 }
159 } finally {
160 if (scanner != null) {
161 try {
162 scanner.close();
163 } catch (Throwable t) {
164
165 LOG.error("Failed to close scanner after compaction.", t);
166 }
167 }
168 if (!finished) {
169 for (Path leftoverFile : mw.abortWriters()) {
170 try {
171 store.getFileSystem().delete(leftoverFile, false);
172 } catch (Exception ex) {
173 LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
174 }
175 }
176 }
177 }
178
179 assert finished : "We should have exited the method on all error paths";
180 List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
181 assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
182 return newFiles;
183 }
184 }