1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValueUtil;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.compress.Compression;
39 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
41 import org.apache.hadoop.hbase.regionserver.HStore;
42 import org.apache.hadoop.hbase.regionserver.InternalScanner;
43 import org.apache.hadoop.hbase.regionserver.ScanType;
44 import org.apache.hadoop.hbase.regionserver.ScannerContext;
45 import org.apache.hadoop.hbase.regionserver.Store;
46 import org.apache.hadoop.hbase.regionserver.StoreFile;
47 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
48 import org.apache.hadoop.hbase.regionserver.StoreScanner;
49 import org.apache.hadoop.hbase.security.User;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
53
54
55
56
57
58 @InterfaceAudience.Private
59 public abstract class Compactor {
60 private static final Log LOG = LogFactory.getLog(Compactor.class);
61 protected CompactionProgress progress;
62 protected Configuration conf;
63 protected Store store;
64
65 private int compactionKVMax;
66 protected Compression.Algorithm compactionCompression;
67
68
69 protected int keepSeqIdPeriod;
70
71
72 Compactor(final Configuration conf, final Store store) {
73 this.conf = conf;
74 this.store = store;
75 this.compactionKVMax =
76 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
77 this.compactionCompression = (this.store.getFamily() == null) ?
78 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
79 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
80 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
81 }
82
83 public interface CellSink {
84 void append(Cell cell) throws IOException;
85 }
86
87 public CompactionProgress getProgress() {
88 return this.progress;
89 }
90
91
92 protected static class FileDetails {
93
94 public long maxKeyCount = 0;
95
96 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
97
98 public long maxSeqId = 0;
99
100 public long maxMVCCReadpoint = 0;
101
102 public int maxTagsLength = 0;
103
104 public long minSeqIdToKeep = 0;
105 }
106
107
108
109
110
111
112
113 protected FileDetails getFileDetails(
114 Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
115 FileDetails fd = new FileDetails();
116 long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
117 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
118
119 for (StoreFile file : filesToCompact) {
120 if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
121
122
123 if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
124 fd.minSeqIdToKeep = file.getMaxMemstoreTS();
125 }
126 }
127 long seqNum = file.getMaxSequenceId();
128 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
129 StoreFile.Reader r = file.getReader();
130 if (r == null) {
131 LOG.warn("Null reader for " + file.getPath());
132 continue;
133 }
134
135
136
137 long keyCount = r.getEntries();
138 fd.maxKeyCount += keyCount;
139
140 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
141 byte tmp[] = null;
142
143
144 if (r.isBulkLoaded()) {
145 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
146 }
147 else {
148 tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
149 if (tmp != null) {
150 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
151 }
152 }
153 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
154 if (tmp != null) {
155 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
156 }
157
158
159 long earliestPutTs = 0;
160 if (allFiles) {
161 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
162 if (tmp == null) {
163
164
165 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
166 } else {
167 earliestPutTs = Bytes.toLong(tmp);
168 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
169 }
170 }
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("Compacting " + file +
173 ", keycount=" + keyCount +
174 ", bloomtype=" + r.getBloomFilterType().toString() +
175 ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
176 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
177 ", seqNum=" + seqNum +
178 (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
179 }
180 }
181 return fd;
182 }
183
184
185
186
187
188
189 protected List<StoreFileScanner> createFileScanners(
190 final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
191 return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
192 smallestReadPoint);
193 }
194
195 protected long getSmallestReadPoint() {
196 return store.getSmallestReadPoint();
197 }
198
199
200
201
202
203
204
205
206
207 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
208 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
209 return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
210 }
211
212 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
213 final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
214 User user) throws IOException {
215 if (store.getCoprocessorHost() == null) return null;
216 if (user == null) {
217 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
218 earliestPutTs, request);
219 } else {
220 try {
221 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
222 @Override
223 public InternalScanner run() throws Exception {
224 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
225 scanType, earliestPutTs, request);
226 }
227 });
228 } catch (InterruptedException ie) {
229 InterruptedIOException iioe = new InterruptedIOException();
230 iioe.initCause(ie);
231 throw iioe;
232 }
233 }
234 }
235
236
237
238
239
240
241
242
243 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
244 final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
245 if (store.getCoprocessorHost() == null) return scanner;
246 if (user == null) {
247 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
248 } else {
249 try {
250 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
251 @Override
252 public InternalScanner run() throws Exception {
253 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
254 }
255 });
256 } catch (InterruptedException ie) {
257 InterruptedIOException iioe = new InterruptedIOException();
258 iioe.initCause(ie);
259 throw iioe;
260 }
261 }
262 }
263
264
265
266
267
268 private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
269
270 private String generateCompactionName() {
271 int counter;
272 for (;;) {
273 counter = NAME_COUNTER.get();
274 int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
275 if (NAME_COUNTER.compareAndSet(counter, next)) {
276 break;
277 }
278 }
279 return store.getRegionInfo().getRegionNameAsString() + "#"
280 + store.getFamily().getNameAsString() + "#" + counter;
281 }
282
283
284
285
286
287
288
289
290 protected boolean performCompaction(InternalScanner scanner, CellSink writer,
291 long smallestReadPoint, boolean cleanSeqId,
292 CompactionThroughputController throughputController) throws IOException {
293 long bytesWritten = 0;
294 long bytesWrittenProgress = 0;
295
296
297 List<Cell> cells = new ArrayList<Cell>();
298 long closeCheckInterval = HStore.getCloseCheckInterval();
299 long lastMillis = 0;
300 if (LOG.isDebugEnabled()) {
301 lastMillis = EnvironmentEdgeManager.currentTime();
302 }
303 String compactionName = generateCompactionName();
304 long now = 0;
305 boolean hasMore;
306 ScannerContext scannerContext =
307 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
308
309 throughputController.start(compactionName);
310 try {
311 do {
312 hasMore = scanner.next(cells, scannerContext);
313 if (LOG.isDebugEnabled()) {
314 now = EnvironmentEdgeManager.currentTime();
315 }
316
317 for (Cell c : cells) {
318 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
319 CellUtil.setSequenceId(c, 0);
320 }
321 writer.append(c);
322 int len = KeyValueUtil.length(c);
323 ++progress.currentCompactedKVs;
324 progress.totalCompactedSize += len;
325 if (LOG.isDebugEnabled()) {
326 bytesWrittenProgress += len;
327 }
328 throughputController.control(compactionName, len);
329
330 if (closeCheckInterval > 0) {
331 bytesWritten += len;
332 if (bytesWritten > closeCheckInterval) {
333 bytesWritten = 0;
334 if (!store.areWritesEnabled()) {
335 progress.cancel();
336 return false;
337 }
338 }
339 }
340 }
341
342
343 if (LOG.isDebugEnabled()) {
344 if ((now - lastMillis) >= 60 * 1000) {
345 LOG.debug("Compaction progress: "
346 + compactionName
347 + " "
348 + progress
349 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
350 / ((now - lastMillis) / 1000.0)) + ", throughputController is "
351 + throughputController);
352 lastMillis = now;
353 bytesWrittenProgress = 0;
354 }
355 }
356 cells.clear();
357 } while (hasMore);
358 } catch (InterruptedException e) {
359 progress.cancel();
360 throw new InterruptedIOException("Interrupted while control throughput of compacting "
361 + compactionName);
362 } finally {
363 throughputController.finish(compactionName);
364 }
365 progress.complete();
366 return true;
367 }
368
369
370
371
372
373
374
375
376
377 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
378 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
379 Scan scan = new Scan();
380 scan.setMaxVersions(store.getFamily().getMaxVersions());
381 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
382 scanType, smallestReadPoint, earliestPutTs);
383 }
384
385
386
387
388
389
390
391
392
393
394 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
395 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
396 byte[] dropDeletesToRow) throws IOException {
397 Scan scan = new Scan();
398 scan.setMaxVersions(store.getFamily().getMaxVersions());
399 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
400 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
401 }
402 }