1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValueUtil;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.compress.Compression;
39 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
41 import org.apache.hadoop.hbase.regionserver.HStore;
42 import org.apache.hadoop.hbase.regionserver.InternalScanner;
43 import org.apache.hadoop.hbase.regionserver.ScanType;
44 import org.apache.hadoop.hbase.regionserver.ScannerContext;
45 import org.apache.hadoop.hbase.regionserver.Store;
46 import org.apache.hadoop.hbase.regionserver.StoreFile;
47 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
48 import org.apache.hadoop.hbase.regionserver.StoreScanner;
49 import org.apache.hadoop.hbase.security.User;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
53
54
55
56
57
58 @InterfaceAudience.Private
59 public abstract class Compactor {
60 private static final Log LOG = LogFactory.getLog(Compactor.class);
61 protected CompactionProgress progress;
62 protected Configuration conf;
63 protected Store store;
64
65 private int compactionKVMax;
66 protected Compression.Algorithm compactionCompression;
67
68
69 protected int keepSeqIdPeriod;
70
71
72 Compactor(final Configuration conf, final Store store) {
73 this.conf = conf;
74 this.store = store;
75 this.compactionKVMax =
76 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
77 this.compactionCompression = (this.store.getFamily() == null) ?
78 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
79 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
80 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
81 }
82
83 public interface CellSink {
84 void append(Cell cell) throws IOException;
85 }
86
87 public CompactionProgress getProgress() {
88 return this.progress;
89 }
90
91
92 protected static class FileDetails {
93
94 public long maxKeyCount = 0;
95
96 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
97
98 public long maxSeqId = 0;
99
100 public long maxMVCCReadpoint = 0;
101
102 public int maxTagsLength = 0;
103
104 public long minSeqIdToKeep = 0;
105 }
106
107
108
109
110
111
112
113 protected FileDetails getFileDetails(
114 Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
115 FileDetails fd = new FileDetails();
116 long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
117 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
118
119 for (StoreFile file : filesToCompact) {
120 if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
121
122
123 if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
124 fd.minSeqIdToKeep = file.getMaxMemstoreTS();
125 }
126 }
127 long seqNum = file.getMaxSequenceId();
128 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
129 StoreFile.Reader r = file.getReader();
130 if (r == null) {
131 LOG.warn("Null reader for " + file.getPath());
132 continue;
133 }
134
135
136
137 long keyCount = r.getEntries();
138 fd.maxKeyCount += keyCount;
139
140 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
141 byte tmp[] = null;
142
143
144 if (r.isBulkLoaded()) {
145 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
146 }
147 else {
148 tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
149 if (tmp != null) {
150 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
151 }
152 }
153 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
154 if (tmp != null) {
155 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
156 }
157
158
159 long earliestPutTs = 0;
160 if (allFiles) {
161 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
162 if (tmp == null) {
163
164
165 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
166 } else {
167 earliestPutTs = Bytes.toLong(tmp);
168 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
169 }
170 }
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("Compacting " + file +
173 ", keycount=" + keyCount +
174 ", bloomtype=" + r.getBloomFilterType().toString() +
175 ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
176 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
177 ", compression=" + compactionCompression +
178 ", seqNum=" + seqNum +
179 (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
180 }
181 }
182 return fd;
183 }
184
185
186
187
188
189
190 protected List<StoreFileScanner> createFileScanners(
191 final Collection<StoreFile> filesToCompact,
192 long smallestReadPoint,
193 boolean useDropBehind) throws IOException {
194 return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
195
196
197
198
199 smallestReadPoint);
200 }
201
202 protected long getSmallestReadPoint() {
203 return store.getSmallestReadPoint();
204 }
205
206
207
208
209
210
211
212
213
214 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
215 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
216 return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
217 }
218
219 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
220 final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
221 User user) throws IOException {
222 if (store.getCoprocessorHost() == null) return null;
223 if (user == null) {
224 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
225 earliestPutTs, request);
226 } else {
227 try {
228 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
229 @Override
230 public InternalScanner run() throws Exception {
231 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
232 scanType, earliestPutTs, request);
233 }
234 });
235 } catch (InterruptedException ie) {
236 InterruptedIOException iioe = new InterruptedIOException();
237 iioe.initCause(ie);
238 throw iioe;
239 }
240 }
241 }
242
243
244
245
246
247
248
249
250 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
251 final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
252 if (store.getCoprocessorHost() == null) return scanner;
253 if (user == null) {
254 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
255 } else {
256 try {
257 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
258 @Override
259 public InternalScanner run() throws Exception {
260 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
261 }
262 });
263 } catch (InterruptedException ie) {
264 InterruptedIOException iioe = new InterruptedIOException();
265 iioe.initCause(ie);
266 throw iioe;
267 }
268 }
269 }
270
271
272
273
274
275 private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
276
277 private String generateCompactionName() {
278 int counter;
279 for (;;) {
280 counter = NAME_COUNTER.get();
281 int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
282 if (NAME_COUNTER.compareAndSet(counter, next)) {
283 break;
284 }
285 }
286 return store.getRegionInfo().getRegionNameAsString() + "#"
287 + store.getFamily().getNameAsString() + "#" + counter;
288 }
289
290
291
292
293
294
295
296
297 protected boolean performCompaction(InternalScanner scanner, CellSink writer,
298 long smallestReadPoint, boolean cleanSeqId,
299 CompactionThroughputController throughputController) throws IOException {
300 long bytesWritten = 0;
301 long bytesWrittenProgress = 0;
302
303
304 List<Cell> cells = new ArrayList<Cell>();
305 long closeCheckInterval = HStore.getCloseCheckInterval();
306 long lastMillis = 0;
307 if (LOG.isDebugEnabled()) {
308 lastMillis = EnvironmentEdgeManager.currentTime();
309 }
310 String compactionName = generateCompactionName();
311 long now = 0;
312 boolean hasMore;
313 ScannerContext scannerContext =
314 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
315
316 throughputController.start(compactionName);
317 try {
318 do {
319 hasMore = scanner.next(cells, scannerContext);
320 if (LOG.isDebugEnabled()) {
321 now = EnvironmentEdgeManager.currentTime();
322 }
323
324 Cell lastCleanCell = null;
325 long lastCleanCellSeqId = 0;
326 for (Cell c : cells) {
327 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
328 lastCleanCell = c;
329 lastCleanCellSeqId = c.getSequenceId();
330 CellUtil.setSequenceId(c, 0);
331 } else {
332 lastCleanCell = null;
333 lastCleanCellSeqId = 0;
334 }
335 writer.append(c);
336 int len = KeyValueUtil.length(c);
337 ++progress.currentCompactedKVs;
338 progress.totalCompactedSize += len;
339 if (LOG.isDebugEnabled()) {
340 bytesWrittenProgress += len;
341 }
342 throughputController.control(compactionName, len);
343
344 if (closeCheckInterval > 0) {
345 bytesWritten += len;
346 if (bytesWritten > closeCheckInterval) {
347 bytesWritten = 0;
348 if (!store.areWritesEnabled()) {
349 progress.cancel();
350 return false;
351 }
352 }
353 }
354 }
355 if (lastCleanCell != null) {
356
357 CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
358 }
359
360
361 if (LOG.isDebugEnabled()) {
362 if ((now - lastMillis) >= 60 * 1000) {
363 LOG.debug("Compaction progress: "
364 + compactionName
365 + " "
366 + progress
367 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
368 / ((now - lastMillis) / 1000.0)) + ", throughputController is "
369 + throughputController);
370 lastMillis = now;
371 bytesWrittenProgress = 0;
372 }
373 }
374 cells.clear();
375 } while (hasMore);
376 } catch (InterruptedException e) {
377 progress.cancel();
378 throw new InterruptedIOException("Interrupted while control throughput of compacting "
379 + compactionName);
380 } finally {
381 throughputController.finish(compactionName);
382 }
383 progress.complete();
384 return true;
385 }
386
387
388
389
390
391
392
393
394
395 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
396 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
397 Scan scan = new Scan();
398 scan.setMaxVersions(store.getFamily().getMaxVersions());
399 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
400 scanType, smallestReadPoint, earliestPutTs);
401 }
402
403
404
405
406
407
408
409
410
411
412 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
413 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
414 byte[] dropDeletesToRow) throws IOException {
415 Scan scan = new Scan();
416 scan.setMaxVersions(store.getFamily().getMaxVersions());
417 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
418 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
419 }
420 }