001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayList;
026import java.util.List;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.conf.Configured;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.serializer.Deserializer;
040import org.apache.hadoop.io.serializer.Serialization;
041import org.apache.hadoop.io.serializer.Serializer;
042
043@InterfaceAudience.Public
044public class ResultSerialization extends Configured implements Serialization<Result> {
045  private static final Logger LOG = LoggerFactory.getLogger(ResultSerialization.class);
046  // The following configuration property indicates import file format version.
047  public static final String IMPORT_FORMAT_VER = "hbase.import.version";
048
049  @Override
050  public boolean accept(Class<?> c) {
051    return Result.class.isAssignableFrom(c);
052  }
053
054  @Override
055  public Deserializer<Result> getDeserializer(Class<Result> c) {
056    // check input format version
057    Configuration conf = getConf();
058    if (conf != null) {
059      String inputVersion = conf.get(IMPORT_FORMAT_VER);
060      if (inputVersion != null && inputVersion.equals("0.94")) {
061        LOG.info("Load exported file using deserializer for HBase 0.94 format");
062        return new Result94Deserializer();
063      }
064    }
065
066    return new ResultDeserializer();
067  }
068
069  @Override
070  public Serializer<Result> getSerializer(Class<Result> c) {
071    return new ResultSerializer();
072  }
073
074  /**
075   * The following deserializer class is used to load exported file of 0.94
076   */
077  private static class Result94Deserializer implements Deserializer<Result> {
078    private DataInputStream in;
079
080    @Override
081    public void close() throws IOException {
082      in.close();
083    }
084
085    @Override
086    public Result deserialize(Result mutation) throws IOException {
087      int totalBuffer = in.readInt();
088      if (totalBuffer == 0) {
089        return Result.EMPTY_RESULT;
090      }
091      byte[] buf = new byte[totalBuffer];
092      readChunked(in, buf, 0, totalBuffer);
093      List<Cell> kvs = new ArrayList<>();
094      int offset = 0;
095      while (offset < totalBuffer) {
096        int keyLength = Bytes.toInt(buf, offset);
097        offset += Bytes.SIZEOF_INT;
098        kvs.add(new KeyValue(buf, offset, keyLength));
099        offset += keyLength;
100      }
101      return Result.create(kvs);
102    }
103
104    @Override
105    public void open(InputStream in) throws IOException {
106      if (!(in instanceof DataInputStream)) {
107        throw new IOException("Wrong input stream instance passed in");
108      }
109      this.in = (DataInputStream) in;
110    }
111
112    private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
113      int maxRead = 8192;
114
115      for (; ofs < len; ofs += maxRead)
116        in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
117    }
118  }
119
120  private static class ResultDeserializer implements Deserializer<Result> {
121    private InputStream in;
122
123    @Override
124    public void close() throws IOException {
125      in.close();
126    }
127
128    @Override
129    public Result deserialize(Result mutation) throws IOException {
130      ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
131      return ProtobufUtil.toResult(proto);
132    }
133
134    @Override
135    public void open(InputStream in) throws IOException {
136      this.in = in;
137    }
138  }
139
140  private static class ResultSerializer implements Serializer<Result> {
141    private OutputStream out;
142
143    @Override
144    public void close() throws IOException {
145      out.close();
146    }
147
148    @Override
149    public void open(OutputStream out) throws IOException {
150      this.out = out;
151    }
152
153    @Override
154    public void serialize(Result result) throws IOException {
155      ProtobufUtil.toResult(result).writeDelimitedTo(out);
156    }
157  }
158}