1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellComparator;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41 import org.apache.hadoop.hbase.util.BloomFilterWriter;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.Writable;
44
45
46
47
48 @InterfaceAudience.Private
49 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
50 justification="Understood but doing it anyway; HBASE-14730")
51 public class HFileWriterV2 extends AbstractHFileWriter {
52 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
53
54
55 public static final byte [] MAX_MEMSTORE_TS_KEY =
56 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
57
58
59 public static final byte [] KEY_VALUE_VERSION =
60 Bytes.toBytes("KEY_VALUE_VERSION");
61
62
63 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
64
65
66 private List<InlineBlockWriter> inlineBlockWriters =
67 new ArrayList<InlineBlockWriter>();
68
69
70 protected HFileBlock.Writer fsBlockWriter;
71
72 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
73 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
74
75
76 private long firstDataBlockOffset = -1;
77
78
79 protected long lastDataBlockOffset;
80
81
82
83
84
85 private Cell lastCellOfPreviousBlock = null;
86
87
88 private List<BlockWritable> additionalLoadOnOpenData =
89 new ArrayList<BlockWritable>();
90
91 protected long maxMemstoreTS = 0;
92
93
94 private static boolean warnCellWithTags = true;
95
96 static class WriterFactoryV2 extends HFile.WriterFactory {
97 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
98 super(conf, cacheConf);
99 }
100
101 @Override
102 public Writer createWriter(FileSystem fs, Path path,
103 FSDataOutputStream ostream,
104 KVComparator comparator, HFileContext context) throws IOException {
105 context.setIncludesTags(false);
106 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
107 comparator, context);
108 }
109 }
110
111
112 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
113 FileSystem fs, Path path, FSDataOutputStream ostream,
114 final KVComparator comparator, final HFileContext context) throws IOException {
115 super(cacheConf,
116 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
117 path, comparator, context);
118 finishInit(conf);
119 }
120
121
122 protected void finishInit(final Configuration conf) {
123 if (fsBlockWriter != null)
124 throw new IllegalStateException("finishInit called twice");
125
126 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
127
128
129 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
130 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
131 cacheIndexesOnWrite ? cacheConf : null,
132 cacheIndexesOnWrite ? name : null);
133 dataBlockIndexWriter.setMaxChunkSize(
134 HFileBlockIndex.getMaxChunkSize(conf));
135 dataBlockIndexWriter.setMinIndexNumEntries(
136 HFileBlockIndex.getMinIndexNumEntries(conf));
137 inlineBlockWriters.add(dataBlockIndexWriter);
138
139
140 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
141 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
142 }
143
144
145
146
147
148
149 protected void checkBlockBoundary() throws IOException {
150 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
151 return;
152
153 finishBlock();
154 writeInlineBlocks(false);
155 newBlock();
156 }
157
158
159 private void finishBlock() throws IOException {
160 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
161 return;
162
163
164 if (firstDataBlockOffset == -1) {
165 firstDataBlockOffset = outputStream.getPos();
166 }
167
168 lastDataBlockOffset = outputStream.getPos();
169 fsBlockWriter.writeHeaderAndData(outputStream);
170 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
171
172 Cell indexEntry =
173 CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
174 dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
175 lastDataBlockOffset, onDiskSize);
176 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
177 if (cacheConf.shouldCacheDataOnWrite()) {
178 doCacheOnWrite(lastDataBlockOffset);
179 }
180 }
181
182
183 private void writeInlineBlocks(boolean closing) throws IOException {
184 for (InlineBlockWriter ibw : inlineBlockWriters) {
185 while (ibw.shouldWriteBlock(closing)) {
186 long offset = outputStream.getPos();
187 boolean cacheThisBlock = ibw.getCacheOnWrite();
188 ibw.writeInlineBlock(fsBlockWriter.startWriting(
189 ibw.getInlineBlockType()));
190 fsBlockWriter.writeHeaderAndData(outputStream);
191 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
192 fsBlockWriter.getUncompressedSizeWithoutHeader());
193 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
194
195 if (cacheThisBlock) {
196 doCacheOnWrite(offset);
197 }
198 }
199 }
200 }
201
202
203
204
205
206
207 private void doCacheOnWrite(long offset) {
208 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
209 cacheConf.getBlockCache().cacheBlock(
210 new BlockCacheKey(name, offset), cacheFormatBlock);
211 }
212
213
214
215
216
217
218 protected void newBlock() throws IOException {
219
220 fsBlockWriter.startWriting(BlockType.DATA);
221 firstCellInBlock = null;
222 if (lastCell != null) {
223 lastCellOfPreviousBlock = lastCell;
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238 @Override
239 public void appendMetaBlock(String metaBlockName, Writable content) {
240 byte[] key = Bytes.toBytes(metaBlockName);
241 int i;
242 for (i = 0; i < metaNames.size(); ++i) {
243
244 byte[] cur = metaNames.get(i);
245 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
246 key.length) > 0) {
247 break;
248 }
249 }
250 metaNames.add(i, key);
251 metaData.add(i, content);
252 }
253
254
255
256
257
258
259
260
261 @Override
262 public void append(final Cell cell) throws IOException {
263 byte[] value = cell.getValueArray();
264 int voffset = cell.getValueOffset();
265 int vlength = cell.getValueLength();
266
267 boolean dupKey = checkKey(cell);
268 checkValue(value, voffset, vlength);
269 if (!dupKey) {
270 checkBlockBoundary();
271 }
272
273 if (!fsBlockWriter.isWriting()) {
274 newBlock();
275 }
276
277 if (warnCellWithTags && getFileContext().isIncludesTags()) {
278 LOG.warn("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
279 + " is required to support cell attributes/tags. Consider setting "
280 + HFile.FORMAT_VERSION_KEY + " accordingly.");
281 warnCellWithTags = false;
282 }
283
284 fsBlockWriter.write(cell);
285
286 totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
287 totalValueLength += vlength;
288
289
290 if (firstCellInBlock == null) {
291
292
293 firstCellInBlock = cell;
294 }
295
296
297
298 lastCell = cell;
299 entryCount++;
300 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
301 }
302
303 @Override
304 public void close() throws IOException {
305 if (outputStream == null) {
306 return;
307 }
308
309 blockEncoder.saveMetadata(this);
310
311
312
313 finishBlock();
314 writeInlineBlocks(true);
315
316 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
317
318
319 if (!metaNames.isEmpty()) {
320 for (int i = 0; i < metaNames.size(); ++i) {
321
322 long offset = outputStream.getPos();
323
324 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
325 metaData.get(i).write(dos);
326
327 fsBlockWriter.writeHeaderAndData(outputStream);
328 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
329
330
331 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
332 fsBlockWriter.getOnDiskSizeWithHeader());
333 }
334 }
335
336
337
338
339
340
341
342
343
344
345 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
346 trailer.setLoadOnOpenOffset(rootIndexOffset);
347
348
349 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
350 BlockType.ROOT_INDEX), "meta");
351 fsBlockWriter.writeHeaderAndData(outputStream);
352 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
353
354 if (this.hFileContext.isIncludesMvcc()) {
355 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
356 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
357 }
358
359
360 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
361 fsBlockWriter.writeHeaderAndData(outputStream);
362 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
363
364
365 for (BlockWritable w : additionalLoadOnOpenData){
366 fsBlockWriter.writeBlock(w, outputStream);
367 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
368 }
369
370
371 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
372 trailer.setUncompressedDataIndexSize(
373 dataBlockIndexWriter.getTotalUncompressedSize());
374 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
375 trailer.setLastDataBlockOffset(lastDataBlockOffset);
376 trailer.setComparatorClass(comparator.getClass());
377 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
378
379
380 finishClose(trailer);
381
382 fsBlockWriter.release();
383 }
384
385 @Override
386 public void addInlineBlockWriter(InlineBlockWriter ibw) {
387 inlineBlockWriters.add(ibw);
388 }
389
390 @Override
391 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
392 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
393 }
394
395 @Override
396 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
397 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
398 }
399
400 private void addBloomFilter(final BloomFilterWriter bfw,
401 final BlockType blockType) {
402 if (bfw.getKeyCount() <= 0)
403 return;
404
405 if (blockType != BlockType.GENERAL_BLOOM_META &&
406 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
407 throw new RuntimeException("Block Type: " + blockType.toString() +
408 "is not supported");
409 }
410 additionalLoadOnOpenData.add(new BlockWritable() {
411 @Override
412 public BlockType getBlockType() {
413 return blockType;
414 }
415
416 @Override
417 public void writeToBlock(DataOutput out) throws IOException {
418 bfw.getMetaWriter().write(out);
419 Writable dataWriter = bfw.getDataWriter();
420 if (dataWriter != null)
421 dataWriter.write(out);
422 }
423 });
424 }
425
426 protected int getMajorVersion() {
427 return 2;
428 }
429
430 protected int getMinorVersion() {
431 return HFileReaderV2.MAX_MINOR_VERSION;
432 }
433
434 @Override
435 public HFileContext getFileContext() {
436 return hFileContext;
437 }
438 }