001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.codec;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import org.apache.commons.io.IOUtils;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellBuilderType;
026import org.apache.hadoop.hbase.ExtendedCellBuilder;
027import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.io.ByteBuffInputStream;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
036 * Uses ints delimiting all lengths. Profligate. Needs tune up. <b>Use this Codec only at server
037 * side.</b>
038 */
039@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
040public class CellCodecWithTags implements Codec {
041  static class CellEncoder extends BaseEncoder {
042    CellEncoder(final OutputStream out) {
043      super(out);
044    }
045
046    @Override
047    public void write(Cell cell) throws IOException {
048      checkFlushed();
049      // Row
050      write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
051      // Column family
052      write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
053      // Qualifier
054      write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
055      // Version
056      this.out.write(Bytes.toBytes(cell.getTimestamp()));
057      // Type
058      this.out.write(cell.getTypeByte());
059      // Value
060      write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
061      // Tags
062      write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
063      // MvccVersion
064      this.out.write(Bytes.toBytes(cell.getSequenceId()));
065    }
066
067    /**
068     * Write int length followed by array bytes.
069     */
070    private void write(final byte[] bytes, final int offset, final int length) throws IOException {
071      this.out.write(Bytes.toBytes(length));
072      this.out.write(bytes, offset, length);
073    }
074  }
075
076  static class CellDecoder extends BaseDecoder {
077    private final ExtendedCellBuilder cellBuilder =
078      ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
079
080    public CellDecoder(final InputStream in) {
081      super(in);
082    }
083
084    @Override
085    protected Cell parseCell() throws IOException {
086      byte[] row = readByteArray(this.in);
087      byte[] family = readByteArray(in);
088      byte[] qualifier = readByteArray(in);
089      byte[] longArray = new byte[Bytes.SIZEOF_LONG];
090      IOUtils.readFully(this.in, longArray);
091      long timestamp = Bytes.toLong(longArray);
092      byte type = (byte) this.in.read();
093      byte[] value = readByteArray(in);
094      byte[] tags = readByteArray(in);
095      // Read memstore version
096      byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
097      IOUtils.readFully(this.in, memstoreTSArray);
098      long memstoreTS = Bytes.toLong(memstoreTSArray);
099      return cellBuilder.clear().setRow(row).setFamily(family).setQualifier(qualifier)
100        .setTimestamp(timestamp).setType(type).setValue(value).setSequenceId(memstoreTS)
101        .setTags(tags).build();
102    }
103
104    /** Returns Byte array read from the stream. n */
105    private byte[] readByteArray(final InputStream in) throws IOException {
106      byte[] intArray = new byte[Bytes.SIZEOF_INT];
107      IOUtils.readFully(in, intArray);
108      int length = Bytes.toInt(intArray);
109      byte[] bytes = new byte[length];
110      IOUtils.readFully(in, bytes);
111      return bytes;
112    }
113  }
114
115  @Override
116  public Decoder getDecoder(InputStream is) {
117    return new CellDecoder(is);
118  }
119
120  @Override
121  public Decoder getDecoder(ByteBuff buf) {
122    return getDecoder(new ByteBuffInputStream(buf));
123  }
124
125  @Override
126  public Encoder getEncoder(OutputStream os) {
127    return new CellEncoder(os);
128  }
129}