001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.hbase.ExtendedCell;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.nio.SingleByteBuff;
032import org.apache.hadoop.hbase.util.ByteBufferUtils;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.io.WritableUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * Store cells following every row's start offset, so we can binary search to a row's cells. Format:
039 * flat cells integer: number of rows integer: row0's offset integer: row1's offset .... integer:
040 * dataSize
041 */
042@InterfaceAudience.Private
043public class RowIndexCodecV1 extends AbstractDataBlockEncoder {
044
045  private static class RowIndexEncodingState extends EncodingState {
046    RowIndexEncoderV1 encoder = null;
047
048    @Override
049    public void beforeShipped() {
050      if (encoder != null) {
051        encoder.beforeShipped();
052      }
053    }
054  }
055
056  @Override
057  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
058    throws IOException {
059    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
060      throw new IOException(this.getClass().getName() + " only accepts "
061        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
062    }
063
064    HFileBlockDefaultEncodingContext encodingCtx =
065      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
066    encodingCtx.prepareEncoding(out);
067
068    RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
069    RowIndexEncodingState state = new RowIndexEncodingState();
070    state.encoder = encoder;
071    blkEncodingCtx.setEncodingState(state);
072  }
073
074  @Override
075  public void encode(ExtendedCell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
076    throws IOException {
077    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState();
078    RowIndexEncoderV1 encoder = state.encoder;
079    encoder.write(cell);
080  }
081
082  @Override
083  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
084    byte[] uncompressedBytesWithHeader) throws IOException {
085    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx.getEncodingState();
086    RowIndexEncoderV1 encoder = state.encoder;
087    encoder.flush();
088    postEncoding(encodingCtx);
089  }
090
091  @Override
092  public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx)
093    throws IOException {
094    ByteBuffer sourceAsBuffer = ByteBufferUtils.drainInputStreamToBuffer(source);// waste
095    sourceAsBuffer.mark();
096    if (!decodingCtx.getHFileContext().isIncludesTags()) {
097      sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
098      int onDiskSize = sourceAsBuffer.getInt();
099      sourceAsBuffer.reset();
100      ByteBuffer dup = sourceAsBuffer.duplicate();
101      dup.position(sourceAsBuffer.position());
102      dup.limit(sourceAsBuffer.position() + onDiskSize);
103      return dup.slice();
104    } else {
105      RowIndexSeekerV1 seeker = new RowIndexSeekerV1(decodingCtx);
106      seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
107      List<ExtendedCell> kvs = new ArrayList<>();
108      kvs.add(seeker.getCell());
109      while (seeker.next()) {
110        kvs.add(seeker.getCell());
111      }
112      boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc();
113      ByteArrayOutputStream baos = new ByteArrayOutputStream();
114      try (DataOutputStream out = new DataOutputStream(baos)) {
115        for (ExtendedCell cell : kvs) {
116          KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell);
117          out.write(currentCell.getBuffer(), currentCell.getOffset(), currentCell.getLength());
118          if (includesMvcc) {
119            WritableUtils.writeVLong(out, cell.getSequenceId());
120          }
121        }
122        out.flush();
123      }
124      return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size());
125    }
126  }
127
128  @Override
129  public ExtendedCell getFirstKeyCellInBlock(ByteBuff block) {
130    block.mark();
131    int keyLength = block.getInt();
132    block.getInt();
133    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
134    block.reset();
135    return createFirstKeyCell(key, keyLength);
136  }
137
138  @Override
139  public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
140    return new RowIndexSeekerV1(decodingCtx);
141  }
142}