1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.regionserver.InternalScanner;
33 import org.apache.hadoop.hbase.regionserver.ScanType;
34 import org.apache.hadoop.hbase.regionserver.Store;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37 import org.apache.hadoop.hbase.security.User;
38
39
40
41
42
43 @InterfaceAudience.Private
44 public class DefaultCompactor extends Compactor {
45 private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
46
47 public DefaultCompactor(final Configuration conf, final Store store) {
48 super(conf, store);
49 }
50
51
52
53
54 public List<Path> compact(final CompactionRequest request,
55 CompactionThroughputController throughputController, User user) throws IOException {
56 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
57 this.progress = new CompactionProgress(fd.maxKeyCount);
58
59
60 long smallestReadPoint = getSmallestReadPoint();
61
62 List<StoreFileScanner> scanners;
63 Collection<StoreFile> readersToClose;
64 if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
65
66
67 readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
68 for (StoreFile f : request.getFiles()) {
69 readersToClose.add(new StoreFile(f));
70 }
71 scanners = createFileScanners(readersToClose, smallestReadPoint,
72 store.throttleCompaction(request.getSize()));
73 } else {
74 readersToClose = Collections.emptyList();
75 scanners = createFileScanners(request.getFiles(), smallestReadPoint,
76 store.throttleCompaction(request.getSize()));
77 }
78
79 StoreFile.Writer writer = null;
80 List<Path> newFiles = new ArrayList<Path>();
81 boolean cleanSeqId = false;
82 IOException e = null;
83 try {
84 InternalScanner scanner = null;
85 try {
86
87
88 ScanType scanType =
89 request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
90 scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
91 if (scanner == null) {
92 scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
93 }
94 scanner = postCreateCoprocScanner(request, scanType, scanner, user);
95 if (scanner == null) {
96
97 return newFiles;
98 }
99
100
101 if(fd.minSeqIdToKeep > 0) {
102 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
103 cleanSeqId = true;
104 }
105
106
107
108 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
109 fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
110
111 boolean finished =
112 performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
113
114
115 if (!finished) {
116 writer.close();
117 store.getFileSystem().delete(writer.getPath(), false);
118 writer = null;
119 throw new InterruptedIOException("Aborting compaction of store " + store +
120 " in region " + store.getRegionInfo().getRegionNameAsString() +
121 " because it was interrupted.");
122 }
123 } finally {
124 if (scanner != null) {
125 scanner.close();
126 }
127 }
128 } catch (IOException ioe) {
129 e = ioe;
130
131 throw ioe;
132 }
133 finally {
134 try {
135 if (writer != null) {
136 if (e != null) {
137 writer.close();
138 } else {
139 writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
140 writer.close();
141 newFiles.add(writer.getPath());
142 }
143 }
144 } finally {
145 for (StoreFile f : readersToClose) {
146 try {
147 f.closeReader(true);
148 } catch (IOException ioe) {
149 LOG.warn("Exception closing " + f, ioe);
150 }
151 }
152 }
153 }
154 return newFiles;
155 }
156
157
158
159
160
161
162
163
164
165
166
167 public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
168 throws IOException {
169 CompactionRequest cr = new CompactionRequest(filesToCompact);
170 cr.setIsMajor(isMajor, isMajor);
171 return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
172 }
173 }