1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.KeyValue.KVComparator;
31 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
32 import org.apache.hadoop.hbase.util.Bytes;
33
34
35
36
37 @InterfaceAudience.Private
38 public abstract class StripeMultiFileWriter implements Compactor.CellSink {
39 private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
40
41
42 protected WriterFactory writerFactory;
43 protected KVComparator comparator;
44
45 protected List<StoreFile.Writer> existingWriters;
46 protected List<byte[]> boundaries;
47
48 protected StoreScanner sourceScanner;
49
50
51 private boolean doWriteStripeMetadata = true;
52
53 public interface WriterFactory {
54 public StoreFile.Writer createWriter() throws IOException;
55 }
56
57
58
59
60
61
62
63 public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
64 throws IOException {
65 this.writerFactory = factory;
66 this.sourceScanner = sourceScanner;
67 this.comparator = comparator;
68 }
69
70 public void setNoStripeMetadata() {
71 this.doWriteStripeMetadata = false;
72 }
73
74 public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
75 assert this.existingWriters != null;
76 commitWritersInternal();
77 assert this.boundaries.size() == (this.existingWriters.size() + 1);
78 LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
79 + "riting out metadata for " + this.existingWriters.size() + " writers");
80 List<Path> paths = new ArrayList<Path>();
81 for (int i = 0; i < this.existingWriters.size(); ++i) {
82 StoreFile.Writer writer = this.existingWriters.get(i);
83 if (writer == null) continue;
84 if (doWriteStripeMetadata) {
85 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
86 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
87 }
88 writer.appendMetadata(maxSeqId, isMajor);
89 paths.add(writer.getPath());
90 writer.close();
91 }
92 this.existingWriters = null;
93 return paths;
94 }
95
96 public List<Path> abortWriters() {
97 assert this.existingWriters != null;
98 List<Path> paths = new ArrayList<Path>();
99 for (StoreFile.Writer writer : this.existingWriters) {
100 try {
101 paths.add(writer.getPath());
102 writer.close();
103 } catch (Exception ex) {
104 LOG.error("Failed to close the writer after an unfinished compaction.", ex);
105 }
106 }
107 this.existingWriters = null;
108 return paths;
109 }
110
111
112
113
114
115
116
117
118 protected void sanityCheckLeft(
119 byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
120 if (StripeStoreFileManager.OPEN_KEY != left &&
121 comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
122 String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
123 + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
124 LOG.error(error);
125 throw new IOException(error);
126 }
127 }
128
129
130
131
132
133
134
135
136 protected void sanityCheckRight(
137 byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
138 if (StripeStoreFileManager.OPEN_KEY != right &&
139 comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
140 String error = "The last row is higher or equal than the right boundary of ["
141 + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
142 LOG.error(error);
143 throw new IOException(error);
144 }
145 }
146
147
148
149
150
151 protected abstract void commitWritersInternal() throws IOException;
152
153
154
155
156
157
158 public static class BoundaryMultiWriter extends StripeMultiFileWriter {
159 private StoreFile.Writer currentWriter;
160 private byte[] currentWriterEndKey;
161
162 private Cell lastCell;
163 private long cellsInCurrentWriter = 0;
164 private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
165 private boolean hasAnyWriter = false;
166
167
168
169
170
171
172
173
174 public BoundaryMultiWriter(List<byte[]> targetBoundaries,
175 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
176 super();
177 this.boundaries = targetBoundaries;
178 this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
179
180
181 assert (majorRangeFrom == null) == (majorRangeTo == null);
182 if (majorRangeFrom != null) {
183 majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
184 : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
185 majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
186 : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
187 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
188 throw new IOException("Major range does not match writer boundaries: [" +
189 Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
190 + majorRangeFromIndex + " to " + majorRangeToIndex);
191 }
192 }
193 }
194
195 @Override
196 public void append(Cell cell) throws IOException {
197 if (currentWriter == null && existingWriters.isEmpty()) {
198
199 sanityCheckLeft(this.boundaries.get(0),
200 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
201 }
202 prepareWriterFor(cell);
203 currentWriter.append(cell);
204 lastCell = cell;
205 ++cellsInCurrentWriter;
206 }
207
208 private boolean isCellAfterCurrentWriter(Cell cell) {
209 return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
210 (comparator.compareRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
211 currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
212 }
213
214 @Override
215 protected void commitWritersInternal() throws IOException {
216 stopUsingCurrentWriter();
217 while (existingWriters.size() < boundaries.size() - 1) {
218 createEmptyWriter();
219 }
220 if (lastCell != null) {
221 sanityCheckRight(boundaries.get(boundaries.size() - 1),
222 lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
223 }
224 }
225
226 private void prepareWriterFor(Cell cell) throws IOException {
227 if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return;
228
229 stopUsingCurrentWriter();
230
231 while (isCellAfterCurrentWriter(cell)) {
232 checkCanCreateWriter();
233 createEmptyWriter();
234 }
235 checkCanCreateWriter();
236 hasAnyWriter = true;
237 currentWriter = writerFactory.createWriter();
238 existingWriters.add(currentWriter);
239 }
240
241
242
243
244
245
246
247
248
249
250
251 private void createEmptyWriter() throws IOException {
252 int index = existingWriters.size();
253 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
254
255 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
256 boolean needEmptyFile = isInMajorRange || isLastWriter;
257 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
258 hasAnyWriter |= needEmptyFile;
259 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
260 ? null : boundaries.get(existingWriters.size() + 1);
261 }
262
263 private void checkCanCreateWriter() throws IOException {
264 int maxWriterCount = boundaries.size() - 1;
265 assert existingWriters.size() <= maxWriterCount;
266 if (existingWriters.size() >= maxWriterCount) {
267 throw new IOException("Cannot create any more writers (created " + existingWriters.size()
268 + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
269 }
270 }
271
272 private void stopUsingCurrentWriter() {
273 if (currentWriter != null) {
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
276 + "] row; wrote out " + cellsInCurrentWriter + " kvs");
277 }
278 cellsInCurrentWriter = 0;
279 }
280 currentWriter = null;
281 currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
282 ? null : boundaries.get(existingWriters.size() + 1);
283 }
284 }
285
286
287
288
289
290
291
292 public static class SizeMultiWriter extends StripeMultiFileWriter {
293 private int targetCount;
294 private long targetCells;
295 private byte[] left;
296 private byte[] right;
297
298 private Cell lastCell;
299 private StoreFile.Writer currentWriter;
300 protected byte[] lastRowInCurrentWriter = null;
301 private long cellsInCurrentWriter = 0;
302 private long cellsSeen = 0;
303 private long cellsSeenInPrevious = 0;
304
305
306
307
308
309
310
311 public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
312 super();
313 this.targetCount = targetCount;
314 this.targetCells = targetKvs;
315 this.left = left;
316 this.right = right;
317 int preallocate = Math.min(this.targetCount, 64);
318 this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
319 this.boundaries = new ArrayList<byte[]>(preallocate + 1);
320 }
321
322 @Override
323 public void append(Cell cell) throws IOException {
324
325
326 boolean doCreateWriter = false;
327 if (currentWriter == null) {
328
329 sanityCheckLeft(left, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
330 doCreateWriter = true;
331 } else if (lastRowInCurrentWriter != null
332 && !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
333 lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
334 if (LOG.isDebugEnabled()) {
335 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
336 + "] row; wrote out " + cellsInCurrentWriter + " kvs");
337 }
338 lastRowInCurrentWriter = null;
339 cellsInCurrentWriter = 0;
340 cellsSeenInPrevious += cellsSeen;
341 doCreateWriter = true;
342 }
343 if (doCreateWriter) {
344 byte[] boundary = existingWriters.isEmpty() ? left : cell.getRow();
345 if (LOG.isDebugEnabled()) {
346 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
347 }
348 currentWriter = writerFactory.createWriter();
349 boundaries.add(boundary);
350 existingWriters.add(currentWriter);
351 }
352
353 currentWriter.append(cell);
354 lastCell = cell;
355 ++cellsInCurrentWriter;
356 cellsSeen = cellsInCurrentWriter;
357 if (this.sourceScanner != null) {
358 cellsSeen = Math.max(cellsSeen,
359 this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
360 }
361
362
363
364 if (lastRowInCurrentWriter == null
365 && existingWriters.size() < targetCount
366 && cellsSeen >= targetCells) {
367 lastRowInCurrentWriter = cell.getRow();
368 if (LOG.isDebugEnabled()) {
369 LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
370 lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
371 + cellsInCurrentWriter + " kvs");
372 }
373 }
374 }
375
376 @Override
377 protected void commitWritersInternal() throws IOException {
378 if (LOG.isDebugEnabled()) {
379 LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" +
380 ((this.sourceScanner == null) ? "" : ("; observed estimated "
381 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
382 }
383 if (lastCell != null) {
384 sanityCheckRight(
385 right, lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
386 }
387
388
389
390 if (existingWriters.isEmpty() && 1 == targetCount) {
391 if (LOG.isDebugEnabled()) {
392 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
393 }
394 boundaries.add(left);
395 existingWriters.add(writerFactory.createWriter());
396 }
397
398 this.boundaries.add(right);
399 }
400 }
401 }