1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
34 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
35 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
36
37 import com.google.common.annotations.VisibleForTesting;
38
39
40
41
42
43 @InterfaceAudience.Private
44 public class StripeStoreFlusher extends StoreFlusher {
45 private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
46 private final Object flushLock = new Object();
47 private final StripeCompactionPolicy policy;
48 private final StripeCompactionPolicy.StripeInformationProvider stripes;
49
50 public StripeStoreFlusher(Configuration conf, Store store,
51 StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
52 super(conf, store);
53 this.policy = policy;
54 this.stripes = stripes;
55 }
56
57 @Override
58 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
59 MonitoredTask status) throws IOException {
60 List<Path> result = new ArrayList<Path>();
61 int cellsCount = snapshot.getCellsCount();
62 if (cellsCount == 0) return result;
63
64 long smallestReadPoint = store.getSmallestReadPoint();
65 InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
66 if (scanner == null) {
67 return result;
68 }
69
70
71 StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
72
73 boolean success = false;
74 StripeMultiFileWriter mw = null;
75 try {
76 mw = req.createWriter();
77 StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
78 snapshot.getTimeRangeTracker(), cellsCount);
79 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
80 mw.init(storeScanner, factory, store.getComparator());
81
82 synchronized (flushLock) {
83 performFlush(scanner, mw, smallestReadPoint);
84 result = mw.commitWriters(cacheFlushSeqNum, false);
85 success = true;
86 }
87 } finally {
88 if (!success && (mw != null)) {
89 for (Path leftoverFile : mw.abortWriters()) {
90 try {
91 store.getFileSystem().delete(leftoverFile, false);
92 } catch (Exception e) {
93 LOG.error("Failed to delete a file after failed flush: " + e);
94 }
95 }
96 }
97 try {
98 scanner.close();
99 } catch (IOException ex) {
100 LOG.warn("Failed to close flush scanner, ignoring", ex);
101 }
102 }
103 return result;
104 }
105
106 private StripeMultiFileWriter.WriterFactory createWriterFactory(
107 final TimeRangeTracker tracker, final long kvCount) {
108 return new StripeMultiFileWriter.WriterFactory() {
109 @Override
110 public Writer createWriter() throws IOException {
111 StoreFile.Writer writer = store.createWriterInTmp(
112 kvCount, store.getFamily().getCompression(),
113
114
115
116
117 writer.setTimeRangeTracker(tracker);
118 return writer;
119 }
120 };
121 }
122
123
124 public static class StripeFlushRequest {
125 @VisibleForTesting
126 public StripeMultiFileWriter createWriter() throws IOException {
127 StripeMultiFileWriter writer =
128 new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
129 writer.setNoStripeMetadata();
130 return writer;
131 }
132 }
133
134
135 public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
136 private final List<byte[]> targetBoundaries;
137
138
139 public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
140 this.targetBoundaries = targetBoundaries;
141 }
142
143 @Override
144 public StripeMultiFileWriter createWriter() throws IOException {
145 return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
146 }
147 }
148
149
150 public static class SizeStripeFlushRequest extends StripeFlushRequest {
151 private final int targetCount;
152 private final long targetKvs;
153
154
155
156
157
158
159 public SizeStripeFlushRequest(int targetCount, long targetKvs) {
160 this.targetCount = targetCount;
161 this.targetKvs = targetKvs;
162 }
163
164 @Override
165 public StripeMultiFileWriter createWriter() throws IOException {
166 return new StripeMultiFileWriter.SizeMultiWriter(
167 this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
168 }
169 }
170 }