View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataInputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.conf.Configured;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.io.serializer.Deserializer;
39  import org.apache.hadoop.io.serializer.Serialization;
40  import org.apache.hadoop.io.serializer.Serializer;
41  
42  public class ResultSerialization extends Configured implements Serialization<Result> {
43    private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
44    // The following configuration property indicates import file format version.
45    public static final String IMPORT_FORMAT_VER = "hbase.import.version";
46  
47    @Override
48    public boolean accept(Class<?> c) {
49      return Result.class.isAssignableFrom(c);
50    }
51  
52    @Override
53    public Deserializer<Result> getDeserializer(Class<Result> c) {
54      // check input format version
55      Configuration conf = getConf();
56      if (conf != null) {
57        String inputVersion = conf.get(IMPORT_FORMAT_VER);
58        if (inputVersion != null && inputVersion.equals("0.94")) {
59          LOG.info("Load exported file using deserializer for HBase 0.94 format");
60          return new Result94Deserializer();
61        }
62      }
63  
64      return new ResultDeserializer();
65    }
66  
67    @Override
68    public Serializer<Result> getSerializer(Class<Result> c) {
69      return new ResultSerializer();
70    }
71  
72    /**
73     * The following deserializer class is used to load exported file of 0.94
74     */
75    private static class Result94Deserializer implements Deserializer<Result> {
76      private DataInputStream in;
77  
78      @Override
79      public void close() throws IOException {
80        in.close();
81      }
82  
83      @Override
84      public Result deserialize(Result mutation) throws IOException {
85        int totalBuffer = in.readInt();
86        if (totalBuffer == 0) {
87          return Result.EMPTY_RESULT;
88        }
89        byte[] buf = new byte[totalBuffer];
90        readChunked(in, buf, 0, totalBuffer);
91        List<Cell> kvs = new ArrayList<Cell>();
92        int offset = 0;
93        while (offset < totalBuffer) {
94          int keyLength = Bytes.toInt(buf, offset);
95          offset += Bytes.SIZEOF_INT;
96          kvs.add(new KeyValue(buf, offset, keyLength));
97          offset += keyLength;
98        }
99        return Result.create(kvs);
100     }
101 
102     @Override
103     public void open(InputStream in) throws IOException {
104       if (!(in instanceof DataInputStream)) {
105         throw new IOException("Wrong input stream instance passed in");
106       }
107       this.in = (DataInputStream) in;
108     }
109 
110     private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
111       int maxRead = 8192;
112 
113       for (; ofs < len; ofs += maxRead)
114         in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
115     }
116   }
117 
118   private static class ResultDeserializer implements Deserializer<Result> {
119     private InputStream in;
120 
121     @Override
122     public void close() throws IOException {
123       in.close();
124     }
125 
126     @Override
127     public Result deserialize(Result mutation) throws IOException {
128       ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
129       ProtobufUtil.mergeDelimitedFrom(builder, in);
130       ClientProtos.Result proto = builder.build();
131       return ProtobufUtil.toResult(proto);
132     }
133 
134     @Override
135     public void open(InputStream in) throws IOException {
136       this.in = in;
137     }
138   }
139 
140   private static class ResultSerializer implements Serializer<Result> {
141     private OutputStream out;
142 
143     @Override
144     public void close() throws IOException {
145       out.close();
146     }
147 
148     @Override
149     public void open(OutputStream out) throws IOException {
150       this.out = out;
151     }
152 
153     @Override
154     public void serialize(Result result) throws IOException {
155       ProtobufUtil.toResult(result).writeDelimitedTo(out);
156     }
157   }
158 }