1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
33 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
34 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
35 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
36
37 import com.google.common.annotations.VisibleForTesting;
38
39
40
41
42
43 @InterfaceAudience.Private
44 public class StripeStoreFlusher extends StoreFlusher {
45 private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
46 private final Object flushLock = new Object();
47 private final StripeCompactionPolicy policy;
48 private final StripeCompactionPolicy.StripeInformationProvider stripes;
49
50 public StripeStoreFlusher(Configuration conf, Store store,
51 StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
52 super(conf, store);
53 this.policy = policy;
54 this.stripes = stripes;
55 }
56
57 @Override
58 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
59 MonitoredTask status) throws IOException {
60 List<Path> result = new ArrayList<Path>();
61 int cellsCount = snapshot.getCellsCount();
62 if (cellsCount == 0) return result;
63
64 long smallestReadPoint = store.getSmallestReadPoint();
65 InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
66 if (scanner == null) {
67 return result;
68 }
69
70
71 StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
72
73 boolean success = false;
74 StripeMultiFileWriter mw = null;
75 try {
76 mw = req.createWriter();
77 StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
78 snapshot.getTimeRangeTracker(), cellsCount);
79 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
80 mw.init(storeScanner, factory, store.getComparator());
81
82 synchronized (flushLock) {
83 performFlush(scanner, mw, smallestReadPoint);
84 result = mw.commitWriters(cacheFlushSeqNum, false);
85 success = true;
86 }
87 } finally {
88 if (!success && (mw != null)) {
89 for (Path leftoverFile : mw.abortWriters()) {
90 try {
91 store.getFileSystem().delete(leftoverFile, false);
92 } catch (Exception e) {
93 LOG.error("Failed to delete a file after failed flush: " + e);
94 }
95 }
96 }
97 try {
98 scanner.close();
99 } catch (IOException ex) {
100 LOG.warn("Failed to close flush scanner, ignoring", ex);
101 }
102 }
103 return result;
104 }
105
106 private StripeMultiFileWriter.WriterFactory createWriterFactory(
107 final TimeRangeTracker tracker, final long kvCount) {
108 return new StripeMultiFileWriter.WriterFactory() {
109 @Override
110 public Writer createWriter() throws IOException {
111 StoreFile.Writer writer = store.createWriterInTmp(
112 kvCount, store.getFamily().getCompression(), false, true, true);
113 writer.setTimeRangeTracker(tracker);
114 return writer;
115 }
116 };
117 }
118
119
120 public static class StripeFlushRequest {
121 @VisibleForTesting
122 public StripeMultiFileWriter createWriter() throws IOException {
123 StripeMultiFileWriter writer =
124 new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
125 writer.setNoStripeMetadata();
126 return writer;
127 }
128 }
129
130
131 public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
132 private final List<byte[]> targetBoundaries;
133
134
135 public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
136 this.targetBoundaries = targetBoundaries;
137 }
138
139 @Override
140 public StripeMultiFileWriter createWriter() throws IOException {
141 return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
142 }
143 }
144
145
146 public static class SizeStripeFlushRequest extends StripeFlushRequest {
147 private final int targetCount;
148 private final long targetKvs;
149
150
151
152
153
154
155 public SizeStripeFlushRequest(int targetCount, long targetKvs) {
156 this.targetCount = targetCount;
157 this.targetKvs = targetKvs;
158 }
159
160 @Override
161 public StripeMultiFileWriter createWriter() throws IOException {
162 return new StripeMultiFileWriter.SizeMultiWriter(
163 this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
164 }
165 }
166 }