001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.codec;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellBuilderType;
027import org.apache.hadoop.hbase.ExtendedCellBuilder;
028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.io.ByteBuffInputStream;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
037 * Uses ints delimiting all lengths. Profligate. Needs tune up.
038 * <b>Use this Codec only at server side.</b>
039 */
040@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
041public class CellCodecWithTags implements Codec {
042  static class CellEncoder extends BaseEncoder {
043    CellEncoder(final OutputStream out) {
044      super(out);
045    }
046
047    @Override
048    public void write(Cell cell) throws IOException {
049      checkFlushed();
050      // Row
051      write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
052      // Column family
053      write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
054      // Qualifier
055      write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
056      // Version
057      this.out.write(Bytes.toBytes(cell.getTimestamp()));
058      // Type
059      this.out.write(cell.getTypeByte());
060      // Value
061      write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
062      // Tags
063      write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
064      // MvccVersion
065      this.out.write(Bytes.toBytes(cell.getSequenceId()));
066    }
067
068    /**
069     * Write int length followed by array bytes.
070     *
071     * @param bytes
072     * @param offset
073     * @param length
074     * @throws IOException
075     */
076    private void write(final byte[] bytes, final int offset, final int length) throws IOException {
077      this.out.write(Bytes.toBytes(length));
078      this.out.write(bytes, offset, length);
079    }
080  }
081
082  static class CellDecoder extends BaseDecoder {
083    private final ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
084    public CellDecoder(final InputStream in) {
085      super(in);
086    }
087
088    @Override
089    protected Cell parseCell() throws IOException {
090      byte[] row = readByteArray(this.in);
091      byte[] family = readByteArray(in);
092      byte[] qualifier = readByteArray(in);
093      byte[] longArray = new byte[Bytes.SIZEOF_LONG];
094      IOUtils.readFully(this.in, longArray);
095      long timestamp = Bytes.toLong(longArray);
096      byte type = (byte) this.in.read();
097      byte[] value = readByteArray(in);
098      byte[] tags = readByteArray(in);
099      // Read memstore version
100      byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
101      IOUtils.readFully(this.in, memstoreTSArray);
102      long memstoreTS = Bytes.toLong(memstoreTSArray);
103      return cellBuilder.clear()
104              .setRow(row)
105              .setFamily(family)
106              .setQualifier(qualifier)
107              .setTimestamp(timestamp)
108              .setType(type)
109              .setValue(value)
110              .setSequenceId(memstoreTS)
111              .setTags(tags)
112              .build();
113    }
114
115    /**
116     * @return Byte array read from the stream.
117     * @throws IOException
118     */
119    private byte[] readByteArray(final InputStream in) throws IOException {
120      byte[] intArray = new byte[Bytes.SIZEOF_INT];
121      IOUtils.readFully(in, intArray);
122      int length = Bytes.toInt(intArray);
123      byte[] bytes = new byte[length];
124      IOUtils.readFully(in, bytes);
125      return bytes;
126    }
127  }
128
129  @Override
130  public Decoder getDecoder(InputStream is) {
131    return new CellDecoder(is);
132  }
133
134  @Override
135  public Decoder getDecoder(ByteBuff buf) {
136    return getDecoder(new ByteBuffInputStream(buf));
137  }
138
139  @Override
140  public Encoder getEncoder(OutputStream os) {
141    return new CellEncoder(os);
142  }
143}