1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellComparator;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41 import org.apache.hadoop.hbase.util.BloomFilterWriter;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.Writable;
44
45
46
47
48 @InterfaceAudience.Private
49 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
50 justification="Understood but doing it anyway; HBASE-14730")
51 public class HFileWriterV2 extends AbstractHFileWriter {
52 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
53
54
55 public static final byte [] MAX_MEMSTORE_TS_KEY =
56 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
57
58
59 public static final byte [] KEY_VALUE_VERSION =
60 Bytes.toBytes("KEY_VALUE_VERSION");
61
62
63 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
64
65
66 private List<InlineBlockWriter> inlineBlockWriters =
67 new ArrayList<InlineBlockWriter>();
68
69
70 protected HFileBlock.Writer fsBlockWriter;
71
72 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
73 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
74
75
76 private long firstDataBlockOffset = -1;
77
78
79 protected long lastDataBlockOffset;
80
81
82
83
84
85 private Cell lastCellOfPreviousBlock = null;
86
87
88 private List<BlockWritable> additionalLoadOnOpenData =
89 new ArrayList<BlockWritable>();
90
91 protected long maxMemstoreTS = 0;
92
93
94 private static boolean WARN_CELL_WITH_TAGS = true;
95
96 static class WriterFactoryV2 extends HFile.WriterFactory {
97 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
98 super(conf, cacheConf);
99 }
100
101 @Override
102 public Writer createWriter(FileSystem fs, Path path,
103 FSDataOutputStream ostream,
104 KVComparator comparator, HFileContext context) throws IOException {
105 context.setIncludesTags(false);
106 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
107 comparator, context);
108 }
109 }
110
111
112 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
113 FileSystem fs, Path path, FSDataOutputStream ostream,
114 final KVComparator comparator, final HFileContext context) throws IOException {
115 super(cacheConf,
116 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
117 path, comparator, context);
118 finishInit(conf);
119 }
120
121
122 protected void finishInit(final Configuration conf) {
123 if (fsBlockWriter != null)
124 throw new IllegalStateException("finishInit called twice");
125
126 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
127
128
129 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
130 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
131 cacheIndexesOnWrite ? cacheConf : null,
132 cacheIndexesOnWrite ? name : null);
133 dataBlockIndexWriter.setMaxChunkSize(
134 HFileBlockIndex.getMaxChunkSize(conf));
135 dataBlockIndexWriter.setMinIndexNumEntries(
136 HFileBlockIndex.getMinIndexNumEntries(conf));
137 inlineBlockWriters.add(dataBlockIndexWriter);
138
139
140 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
141 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
142 }
143
144
145
146
147
148
149 protected void checkBlockBoundary() throws IOException {
150 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
151 return;
152
153 finishBlock();
154 writeInlineBlocks(false);
155 newBlock();
156 }
157
158
159 private void finishBlock() throws IOException {
160 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
161 return;
162
163
164 if (firstDataBlockOffset == -1) {
165 firstDataBlockOffset = outputStream.getPos();
166 }
167
168 lastDataBlockOffset = outputStream.getPos();
169 fsBlockWriter.writeHeaderAndData(outputStream);
170 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
171
172 Cell indexEntry =
173 CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
174 dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
175 lastDataBlockOffset, onDiskSize);
176 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
177 if (cacheConf.shouldCacheDataOnWrite()) {
178 doCacheOnWrite(lastDataBlockOffset);
179 }
180 }
181
182
183 private void writeInlineBlocks(boolean closing) throws IOException {
184 for (InlineBlockWriter ibw : inlineBlockWriters) {
185 while (ibw.shouldWriteBlock(closing)) {
186 long offset = outputStream.getPos();
187 boolean cacheThisBlock = ibw.getCacheOnWrite();
188 ibw.writeInlineBlock(fsBlockWriter.startWriting(
189 ibw.getInlineBlockType()));
190 fsBlockWriter.writeHeaderAndData(outputStream);
191 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
192 fsBlockWriter.getUncompressedSizeWithoutHeader());
193 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
194
195 if (cacheThisBlock) {
196 doCacheOnWrite(offset);
197 }
198 }
199 }
200 }
201
202
203
204
205
206
207 private void doCacheOnWrite(long offset) {
208 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
209 cacheConf.getBlockCache().cacheBlock(
210 new BlockCacheKey(name, offset), cacheFormatBlock);
211 }
212
213
214
215
216
217
218 protected void newBlock() throws IOException {
219
220 fsBlockWriter.startWriting(BlockType.DATA);
221 firstCellInBlock = null;
222 if (lastCell != null) {
223 lastCellOfPreviousBlock = lastCell;
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238 @Override
239 public void appendMetaBlock(String metaBlockName, Writable content) {
240 byte[] key = Bytes.toBytes(metaBlockName);
241 int i;
242 for (i = 0; i < metaNames.size(); ++i) {
243
244 byte[] cur = metaNames.get(i);
245 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
246 key.length) > 0) {
247 break;
248 }
249 }
250 metaNames.add(i, key);
251 metaData.add(i, content);
252 }
253
254
255
256
257
258
259
260
261 @Override
262 public void append(final Cell cell) throws IOException {
263 byte[] value = cell.getValueArray();
264 int voffset = cell.getValueOffset();
265 int vlength = cell.getValueLength();
266
267 boolean dupKey = checkKey(cell);
268 checkValue(value, voffset, vlength);
269 if (!dupKey) {
270 checkBlockBoundary();
271 }
272
273 if (!fsBlockWriter.isWriting()) {
274 newBlock();
275 }
276
277
278
279
280 if (WARN_CELL_WITH_TAGS && getFileContext().isIncludesTags()) {
281 LOG.warn("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
282 + " is required to support cell attributes/tags. Consider setting "
283 + HFile.FORMAT_VERSION_KEY + " accordingly.");
284 WARN_CELL_WITH_TAGS = false;
285 }
286
287 fsBlockWriter.write(cell);
288
289 totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
290 totalValueLength += vlength;
291
292
293 if (firstCellInBlock == null) {
294
295
296 firstCellInBlock = cell;
297 }
298
299
300
301 lastCell = cell;
302 entryCount++;
303 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
304 }
305
306 @Override
307 public void close() throws IOException {
308 if (outputStream == null) {
309 return;
310 }
311
312 blockEncoder.saveMetadata(this);
313
314
315
316 finishBlock();
317 writeInlineBlocks(true);
318
319 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
320
321
322 if (!metaNames.isEmpty()) {
323 for (int i = 0; i < metaNames.size(); ++i) {
324
325 long offset = outputStream.getPos();
326
327 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
328 metaData.get(i).write(dos);
329
330 fsBlockWriter.writeHeaderAndData(outputStream);
331 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
332
333
334 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
335 fsBlockWriter.getOnDiskSizeWithHeader());
336 }
337 }
338
339
340
341
342
343
344
345
346
347
348 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
349 trailer.setLoadOnOpenOffset(rootIndexOffset);
350
351
352 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
353 BlockType.ROOT_INDEX), "meta");
354 fsBlockWriter.writeHeaderAndData(outputStream);
355 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
356
357 if (this.hFileContext.isIncludesMvcc()) {
358 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
359 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
360 }
361
362
363 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
364 fsBlockWriter.writeHeaderAndData(outputStream);
365 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
366
367
368 for (BlockWritable w : additionalLoadOnOpenData){
369 fsBlockWriter.writeBlock(w, outputStream);
370 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
371 }
372
373
374 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
375 trailer.setUncompressedDataIndexSize(
376 dataBlockIndexWriter.getTotalUncompressedSize());
377 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
378 trailer.setLastDataBlockOffset(lastDataBlockOffset);
379 trailer.setComparatorClass(comparator.getClass());
380 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
381
382
383 finishClose(trailer);
384
385 fsBlockWriter.release();
386 }
387
388 @Override
389 public void addInlineBlockWriter(InlineBlockWriter ibw) {
390 inlineBlockWriters.add(ibw);
391 }
392
393 @Override
394 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
395 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
396 }
397
398 @Override
399 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
400 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
401 }
402
403 private void addBloomFilter(final BloomFilterWriter bfw,
404 final BlockType blockType) {
405 if (bfw.getKeyCount() <= 0)
406 return;
407
408 if (blockType != BlockType.GENERAL_BLOOM_META &&
409 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
410 throw new RuntimeException("Block Type: " + blockType.toString() +
411 "is not supported");
412 }
413 additionalLoadOnOpenData.add(new BlockWritable() {
414 @Override
415 public BlockType getBlockType() {
416 return blockType;
417 }
418
419 @Override
420 public void writeToBlock(DataOutput out) throws IOException {
421 bfw.getMetaWriter().write(out);
422 Writable dataWriter = bfw.getDataWriter();
423 if (dataWriter != null)
424 dataWriter.write(out);
425 }
426 });
427 }
428
429 protected int getMajorVersion() {
430 return 2;
431 }
432
433 protected int getMinorVersion() {
434 return HFileReaderV2.MAX_MINOR_VERSION;
435 }
436
437 @Override
438 public HFileContext getFileContext() {
439 return hFileContext;
440 }
441 }