001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.KeyValueUtil;
028import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.nio.SingleByteBuff;
031import org.apache.hadoop.hbase.util.ByteBufferUtils;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.io.WritableUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Store cells following every row's start offset, so we can binary search to a row's cells.
038 *
039 * Format:
040 * flat cells
041 * integer: number of rows
042 * integer: row0's offset
043 * integer: row1's offset
044 * ....
045 * integer: dataSize
046 *
047*/
048@InterfaceAudience.Private
049public class RowIndexCodecV1 extends AbstractDataBlockEncoder {
050
051  private static class RowIndexEncodingState extends EncodingState {
052    RowIndexEncoderV1 encoder = null;
053
054    @Override
055    public void beforeShipped() {
056      if (encoder != null) {
057        encoder.beforeShipped();
058      }
059    }
060  }
061
062  @Override
063  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
064      DataOutputStream out) throws IOException {
065    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
066      throw new IOException(this.getClass().getName() + " only accepts "
067          + HFileBlockDefaultEncodingContext.class.getName() + " as the "
068          + "encoding context.");
069    }
070
071    HFileBlockDefaultEncodingContext encodingCtx =
072      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
073    encodingCtx.prepareEncoding(out);
074
075    RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
076    RowIndexEncodingState state = new RowIndexEncodingState();
077    state.encoder = encoder;
078    blkEncodingCtx.setEncodingState(state);
079  }
080
081  @Override
082  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx,
083      DataOutputStream out) throws IOException {
084    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
085        .getEncodingState();
086    RowIndexEncoderV1 encoder = state.encoder;
087    encoder.write(cell);
088  }
089
090  @Override
091  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx,
092      DataOutputStream out, byte[] uncompressedBytesWithHeader)
093      throws IOException {
094    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
095        .getEncodingState();
096    RowIndexEncoderV1 encoder = state.encoder;
097    encoder.flush();
098    postEncoding(encodingCtx);
099  }
100
101  @Override
102  public ByteBuffer decodeKeyValues(DataInputStream source,
103      HFileBlockDecodingContext decodingCtx) throws IOException {
104    ByteBuffer sourceAsBuffer = ByteBufferUtils
105        .drainInputStreamToBuffer(source);// waste
106    sourceAsBuffer.mark();
107    if (!decodingCtx.getHFileContext().isIncludesTags()) {
108      sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
109      int onDiskSize = sourceAsBuffer.getInt();
110      sourceAsBuffer.reset();
111      ByteBuffer dup = sourceAsBuffer.duplicate();
112      dup.position(sourceAsBuffer.position());
113      dup.limit(sourceAsBuffer.position() + onDiskSize);
114      return dup.slice();
115    } else {
116      RowIndexSeekerV1 seeker = new RowIndexSeekerV1(decodingCtx);
117      seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
118      List<Cell> kvs = new ArrayList<>();
119      kvs.add(seeker.getCell());
120      while (seeker.next()) {
121        kvs.add(seeker.getCell());
122      }
123      boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc();
124      ByteArrayOutputStream baos = new ByteArrayOutputStream();
125      DataOutputStream out = new DataOutputStream(baos);
126      for (Cell cell : kvs) {
127        KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell);
128        out.write(currentCell.getBuffer(), currentCell.getOffset(),
129            currentCell.getLength());
130        if (includesMvcc) {
131          WritableUtils.writeVLong(out, cell.getSequenceId());
132        }
133      }
134      out.flush();
135      return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size());
136    }
137  }
138
139  @Override
140  public Cell getFirstKeyCellInBlock(ByteBuff block) {
141    block.mark();
142    int keyLength = block.getInt();
143    block.getInt();
144    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
145    block.reset();
146    return createFirstKeyCell(key, keyLength);
147  }
148
149  @Override
150  public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
151    return new RowIndexSeekerV1(decodingCtx);
152  }
153}