View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataInputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.conf.Configured;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.io.serializer.Deserializer;
41  import org.apache.hadoop.io.serializer.Serialization;
42  import org.apache.hadoop.io.serializer.Serializer;
43
44  @InterfaceAudience.Public
45  @InterfaceStability.Evolving
46  public class ResultSerialization extends Configured implements Serialization<Result> {
47    private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
48    // The following configuration property indicates import file format version.
49    public static final String IMPORT_FORMAT_VER = "hbase.import.version";
50
51    @Override
52    public boolean accept(Class<?> c) {
53      return Result.class.isAssignableFrom(c);
54    }
55
56    @Override
57    public Deserializer<Result> getDeserializer(Class<Result> c) {
58      // check input format version
59      Configuration conf = getConf();
60      if (conf != null) {
61        String inputVersion = conf.get(IMPORT_FORMAT_VER);
62        if (inputVersion != null && inputVersion.equals("0.94")) {
63          LOG.info("Load exported file using deserializer for HBase 0.94 format");
64          return new Result94Deserializer();
65        }
66      }
67
68      return new ResultDeserializer();
69    }
70
71    @Override
72    public Serializer<Result> getSerializer(Class<Result> c) {
73      return new ResultSerializer();
74    }
75
76    /**
77     * The following deserializer class is used to load exported file of 0.94
78     */
79    private static class Result94Deserializer implements Deserializer<Result> {
80      private DataInputStream in;
81
82      @Override
83      public void close() throws IOException {
84        in.close();
85      }
86
87      @Override
88      public Result deserialize(Result mutation) throws IOException {
89        int totalBuffer = in.readInt();
90        if (totalBuffer == 0) {
91          return Result.EMPTY_RESULT;
92        }
93        byte[] buf = new byte[totalBuffer];
94        readChunked(in, buf, 0, totalBuffer);
95        List<Cell> kvs = new ArrayList<Cell>();
96        int offset = 0;
97        while (offset < totalBuffer) {
98          int keyLength = Bytes.toInt(buf, offset);
99          offset += Bytes.SIZEOF_INT;
100         kvs.add(new KeyValue(buf, offset, keyLength));
101         offset += keyLength;
102       }
103       return Result.create(kvs);
104     }
105
106     @Override
107     public void open(InputStream in) throws IOException {
108       if (!(in instanceof DataInputStream)) {
109         throw new IOException("Wrong input stream instance passed in");
110       }
111       this.in = (DataInputStream) in;
112     }
113
114     private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
115       int maxRead = 8192;
116
117       for (; ofs < len; ofs += maxRead)
118         in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
119     }
120   }
121
122   private static class ResultDeserializer implements Deserializer<Result> {
123     private InputStream in;
124
125     @Override
126     public void close() throws IOException {
127       in.close();
128     }
129
130     @Override
131     public Result deserialize(Result mutation) throws IOException {
132       ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
133       return ProtobufUtil.toResult(proto);
134     }
135
136     @Override
137     public void open(InputStream in) throws IOException {
138       this.in = in;
139     }
140   }
141
142   private static class ResultSerializer implements Serializer<Result> {
143     private OutputStream out;
144
145     @Override
146     public void close() throws IOException {
147       out.close();
148     }
149
150     @Override
151     public void open(OutputStream out) throws IOException {
152       this.out = out;
153     }
154
155     @Override
156     public void serialize(Result result) throws IOException {
157       ProtobufUtil.toResult(result).writeDelimitedTo(out);
158     }
159   }
160 }