001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.KeyValue; 027import org.apache.hadoop.hbase.KeyValueUtil; 028import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.nio.SingleByteBuff; 031import org.apache.hadoop.hbase.util.ByteBufferUtils; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.io.WritableUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Store cells following every row's start offset, so we can binary search to a row's cells. 038 * 039 * Format: 040 * flat cells 041 * integer: number of rows 042 * integer: row0's offset 043 * integer: row1's offset 044 * .... 045 * integer: dataSize 046 * 047*/ 048@InterfaceAudience.Private 049public class RowIndexCodecV1 extends AbstractDataBlockEncoder { 050 051 private static class RowIndexEncodingState extends EncodingState { 052 RowIndexEncoderV1 encoder = null; 053 054 @Override 055 public void beforeShipped() { 056 if (encoder != null) { 057 encoder.beforeShipped(); 058 } 059 } 060 } 061 062 @Override 063 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, 064 DataOutputStream out) throws IOException { 065 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 066 throw new IOException(this.getClass().getName() + " only accepts " 067 + HFileBlockDefaultEncodingContext.class.getName() + " as the " 068 + "encoding context."); 069 } 070 071 HFileBlockDefaultEncodingContext encodingCtx = 072 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 073 encodingCtx.prepareEncoding(out); 074 075 RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx); 076 RowIndexEncodingState state = new RowIndexEncodingState(); 077 state.encoder = encoder; 078 blkEncodingCtx.setEncodingState(state); 079 } 080 081 @Override 082 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, 083 DataOutputStream out) throws IOException { 084 RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx 085 .getEncodingState(); 086 RowIndexEncoderV1 encoder = state.encoder; 087 encoder.write(cell); 088 } 089 090 @Override 091 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, 092 DataOutputStream out, byte[] uncompressedBytesWithHeader) 093 throws IOException { 094 RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx 095 .getEncodingState(); 096 RowIndexEncoderV1 encoder = state.encoder; 097 encoder.flush(); 098 postEncoding(encodingCtx); 099 } 100 101 @Override 102 public ByteBuffer decodeKeyValues(DataInputStream source, 103 HFileBlockDecodingContext decodingCtx) throws IOException { 104 ByteBuffer sourceAsBuffer = ByteBufferUtils 105 .drainInputStreamToBuffer(source);// waste 106 sourceAsBuffer.mark(); 107 if (!decodingCtx.getHFileContext().isIncludesTags()) { 108 sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT); 109 int onDiskSize = sourceAsBuffer.getInt(); 110 sourceAsBuffer.reset(); 111 ByteBuffer dup = sourceAsBuffer.duplicate(); 112 dup.position(sourceAsBuffer.position()); 113 dup.limit(sourceAsBuffer.position() + onDiskSize); 114 return dup.slice(); 115 } else { 116 RowIndexSeekerV1 seeker = new RowIndexSeekerV1(decodingCtx); 117 seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer)); 118 List<Cell> kvs = new ArrayList<>(); 119 kvs.add(seeker.getCell()); 120 while (seeker.next()) { 121 kvs.add(seeker.getCell()); 122 } 123 boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc(); 124 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 125 DataOutputStream out = new DataOutputStream(baos); 126 for (Cell cell : kvs) { 127 KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell); 128 out.write(currentCell.getBuffer(), currentCell.getOffset(), 129 currentCell.getLength()); 130 if (includesMvcc) { 131 WritableUtils.writeVLong(out, cell.getSequenceId()); 132 } 133 } 134 out.flush(); 135 return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size()); 136 } 137 } 138 139 @Override 140 public Cell getFirstKeyCellInBlock(ByteBuff block) { 141 block.mark(); 142 int keyLength = block.getInt(); 143 block.getInt(); 144 ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); 145 block.reset(); 146 return createFirstKeyCell(key, keyLength); 147 } 148 149 @Override 150 public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) { 151 return new RowIndexSeekerV1(decodingCtx); 152 } 153}