001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellComparatorImpl;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.hbase.nio.SingleByteBuff;
034import org.apache.hadoop.hbase.util.ByteBufferUtils;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.WritableUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Store cells following every row's start offset, so we can binary search to a row's cells.
041 *
042 * Format:
043 * flat cells
044 * integer: number of rows
045 * integer: row0's offset
046 * integer: row1's offset
047 * ....
048 * integer: dataSize
049 *
050*/
051@InterfaceAudience.Private
052public class RowIndexCodecV1 extends AbstractDataBlockEncoder {
053
054  private static class RowIndexEncodingState extends EncodingState {
055    RowIndexEncoderV1 encoder = null;
056
057    @Override
058    public void beforeShipped() {
059      if (encoder != null) {
060        encoder.beforeShipped();
061      }
062    }
063  }
064
065  @Override
066  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
067      DataOutputStream out) throws IOException {
068    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
069      throw new IOException(this.getClass().getName() + " only accepts "
070          + HFileBlockDefaultEncodingContext.class.getName() + " as the "
071          + "encoding context.");
072    }
073
074    HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
075    encodingCtx.prepareEncoding(out);
076
077    RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
078    RowIndexEncodingState state = new RowIndexEncodingState();
079    state.encoder = encoder;
080    blkEncodingCtx.setEncodingState(state);
081  }
082
083  @Override
084  public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
085      DataOutputStream out) throws IOException {
086    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
087        .getEncodingState();
088    RowIndexEncoderV1 encoder = state.encoder;
089    return encoder.write(cell);
090  }
091
092  @Override
093  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx,
094      DataOutputStream out, byte[] uncompressedBytesWithHeader)
095      throws IOException {
096    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
097        .getEncodingState();
098    RowIndexEncoderV1 encoder = state.encoder;
099    encoder.flush();
100    postEncoding(encodingCtx);
101  }
102
103  @Override
104  public ByteBuffer decodeKeyValues(DataInputStream source,
105      HFileBlockDecodingContext decodingCtx) throws IOException {
106    ByteBuffer sourceAsBuffer = ByteBufferUtils
107        .drainInputStreamToBuffer(source);// waste
108    sourceAsBuffer.mark();
109    if (!decodingCtx.getHFileContext().isIncludesTags()) {
110      sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
111      int onDiskSize = sourceAsBuffer.getInt();
112      sourceAsBuffer.reset();
113      ByteBuffer dup = sourceAsBuffer.duplicate();
114      dup.position(sourceAsBuffer.position());
115      dup.limit(sourceAsBuffer.position() + onDiskSize);
116      return dup.slice();
117    } else {
118      RowIndexSeekerV1 seeker = new RowIndexSeekerV1(CellComparatorImpl.COMPARATOR,
119          decodingCtx);
120      seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
121      List<Cell> kvs = new ArrayList<>();
122      kvs.add(seeker.getCell());
123      while (seeker.next()) {
124        kvs.add(seeker.getCell());
125      }
126      boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc();
127      ByteArrayOutputStream baos = new ByteArrayOutputStream();
128      DataOutputStream out = new DataOutputStream(baos);
129      for (Cell cell : kvs) {
130        KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell);
131        out.write(currentCell.getBuffer(), currentCell.getOffset(),
132            currentCell.getLength());
133        if (includesMvcc) {
134          WritableUtils.writeVLong(out, cell.getSequenceId());
135        }
136      }
137      out.flush();
138      return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size());
139    }
140  }
141
142  @Override
143  public Cell getFirstKeyCellInBlock(ByteBuff block) {
144    block.mark();
145    int keyLength = block.getInt();
146    block.getInt();
147    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
148    block.reset();
149    return createFirstKeyCell(key, keyLength);
150  }
151
152  @Override
153  public EncodedSeeker createSeeker(CellComparator comparator,
154      HFileBlockDecodingContext decodingCtx) {
155    return new RowIndexSeekerV1(comparator, decodingCtx);
156  }
157
158}