1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.DataInput;
21 import java.io.DataInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.conf.Configured;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.io.serializer.Deserializer;
39 import org.apache.hadoop.io.serializer.Serialization;
40 import org.apache.hadoop.io.serializer.Serializer;
41
42 public class ResultSerialization extends Configured implements Serialization<Result> {
43 private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
44
45 public static final String IMPORT_FORMAT_VER = "hbase.import.version";
46
47 @Override
48 public boolean accept(Class<?> c) {
49 return Result.class.isAssignableFrom(c);
50 }
51
52 @Override
53 public Deserializer<Result> getDeserializer(Class<Result> c) {
54
55 Configuration conf = getConf();
56 if (conf != null) {
57 String inputVersion = conf.get(IMPORT_FORMAT_VER);
58 if (inputVersion != null && inputVersion.equals("0.94")) {
59 LOG.info("Load exported file using deserializer for HBase 0.94 format");
60 return new Result94Deserializer();
61 }
62 }
63
64 return new ResultDeserializer();
65 }
66
67 @Override
68 public Serializer<Result> getSerializer(Class<Result> c) {
69 return new ResultSerializer();
70 }
71
72
73
74
75 private static class Result94Deserializer implements Deserializer<Result> {
76 private DataInputStream in;
77
78 @Override
79 public void close() throws IOException {
80 in.close();
81 }
82
83 @Override
84 public Result deserialize(Result mutation) throws IOException {
85 int totalBuffer = in.readInt();
86 if (totalBuffer == 0) {
87 return Result.EMPTY_RESULT;
88 }
89 byte[] buf = new byte[totalBuffer];
90 readChunked(in, buf, 0, totalBuffer);
91 List<Cell> kvs = new ArrayList<Cell>();
92 int offset = 0;
93 while (offset < totalBuffer) {
94 int keyLength = Bytes.toInt(buf, offset);
95 offset += Bytes.SIZEOF_INT;
96 kvs.add(new KeyValue(buf, offset, keyLength));
97 offset += keyLength;
98 }
99 return Result.create(kvs);
100 }
101
102 @Override
103 public void open(InputStream in) throws IOException {
104 if (!(in instanceof DataInputStream)) {
105 throw new IOException("Wrong input stream instance passed in");
106 }
107 this.in = (DataInputStream) in;
108 }
109
110 private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
111 int maxRead = 8192;
112
113 for (; ofs < len; ofs += maxRead)
114 in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
115 }
116 }
117
118 private static class ResultDeserializer implements Deserializer<Result> {
119 private InputStream in;
120
121 @Override
122 public void close() throws IOException {
123 in.close();
124 }
125
126 @Override
127 public Result deserialize(Result mutation) throws IOException {
128 ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
129 ProtobufUtil.mergeDelimitedFrom(builder, in);
130 ClientProtos.Result proto = builder.build();
131 return ProtobufUtil.toResult(proto);
132 }
133
134 @Override
135 public void open(InputStream in) throws IOException {
136 this.in = in;
137 }
138 }
139
140 private static class ResultSerializer implements Serializer<Result> {
141 private OutputStream out;
142
143 @Override
144 public void close() throws IOException {
145 out.close();
146 }
147
148 @Override
149 public void open(OutputStream out) throws IOException {
150 this.out = out;
151 }
152
153 @Override
154 public void serialize(Result result) throws IOException {
155 ProtobufUtil.toResult(result).writeDelimitedTo(out);
156 }
157 }
158 }