001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.codec;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import org.apache.commons.io.IOUtils;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellBuilderType;
026import org.apache.hadoop.hbase.ExtendedCellBuilder;
027import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.io.ByteBuffInputStream;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Basic Cell codec that just writes out all the individual elements of a Cell. Uses ints delimiting
036 * all lengths. Profligate. Needs tune up. Note: This will not write tags of a Cell.
037 */
038@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
039public class CellCodec implements Codec {
040  static class CellEncoder extends BaseEncoder {
041    CellEncoder(final OutputStream out) {
042      super(out);
043    }
044
045    @Override
046    public void write(Cell cell) throws IOException {
047      checkFlushed();
048      // Row
049      write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
050      // Column family
051      write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
052      // Qualifier
053      write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
054      // Version
055      this.out.write(Bytes.toBytes(cell.getTimestamp()));
056      // Type
057      this.out.write(cell.getTypeByte());
058      // Value
059      write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
060      // MvccVersion
061      this.out.write(Bytes.toBytes(cell.getSequenceId()));
062    }
063
064    /**
065     * Write int length followed by array bytes. nnnn
066     */
067    private void write(final byte[] bytes, final int offset, final int length) throws IOException {
068      // TODO add BB backed os check and do for write. Pass Cell
069      this.out.write(Bytes.toBytes(length));
070      this.out.write(bytes, offset, length);
071    }
072  }
073
074  static class CellDecoder extends BaseDecoder {
075    private final ExtendedCellBuilder cellBuilder =
076      ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
077
078    public CellDecoder(final InputStream in) {
079      super(in);
080    }
081
082    @Override
083    protected Cell parseCell() throws IOException {
084      byte[] row = readByteArray(this.in);
085      byte[] family = readByteArray(in);
086      byte[] qualifier = readByteArray(in);
087      byte[] longArray = new byte[Bytes.SIZEOF_LONG];
088      IOUtils.readFully(this.in, longArray);
089      long timestamp = Bytes.toLong(longArray);
090      byte type = (byte) this.in.read();
091      byte[] value = readByteArray(in);
092      // Read memstore version
093      byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
094      IOUtils.readFully(this.in, memstoreTSArray);
095      long memstoreTS = Bytes.toLong(memstoreTSArray);
096      return cellBuilder.clear().setRow(row).setFamily(family).setQualifier(qualifier)
097        .setTimestamp(timestamp).setType(type).setValue(value).setSequenceId(memstoreTS).build();
098    }
099
100    /** Returns Byte array read from the stream. n */
101    private byte[] readByteArray(final InputStream in) throws IOException {
102      byte[] intArray = new byte[Bytes.SIZEOF_INT];
103      IOUtils.readFully(in, intArray);
104      int length = Bytes.toInt(intArray);
105      byte[] bytes = new byte[length];
106      IOUtils.readFully(in, bytes);
107      return bytes;
108    }
109  }
110
111  @Override
112  public Decoder getDecoder(InputStream is) {
113    return new CellDecoder(is);
114  }
115
116  @Override
117  public Decoder getDecoder(ByteBuff buf) {
118    return getDecoder(new ByteBuffInputStream(buf));
119  }
120
121  @Override
122  public Encoder getEncoder(OutputStream os) {
123    return new CellEncoder(os);
124  }
125}