1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23
24 import org.apache.hadoop.hbase.client.Delete;
25 import org.apache.hadoop.hbase.client.Mutation;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
28 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
29 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
30 import org.apache.hadoop.io.serializer.Deserializer;
31 import org.apache.hadoop.io.serializer.Serialization;
32 import org.apache.hadoop.io.serializer.Serializer;
33
34 public class MutationSerialization implements Serialization<Mutation> {
35 @Override
36 public boolean accept(Class<?> c) {
37 return Mutation.class.isAssignableFrom(c);
38 }
39
40 @Override
41 public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
42 return new MutationDeserializer();
43 }
44
45 @Override
46 public Serializer<Mutation> getSerializer(Class<Mutation> c) {
47 return new MutationSerializer();
48 }
49
50 private static class MutationDeserializer implements Deserializer<Mutation> {
51 private InputStream in;
52
53 @Override
54 public void close() throws IOException {
55 in.close();
56 }
57
58 @Override
59 public Mutation deserialize(Mutation mutation) throws IOException {
60 ClientProtos.MutationProto.Builder builder = ClientProtos.MutationProto.newBuilder();
61 ProtobufUtil.mergeDelimitedFrom(builder, in);
62 ClientProtos.MutationProto proto = builder.build();
63 return ProtobufUtil.toMutation(proto);
64 }
65
66 @Override
67 public void open(InputStream in) throws IOException {
68 this.in = in;
69 }
70
71 }
72 private static class MutationSerializer implements Serializer<Mutation> {
73 private OutputStream out;
74
75 @Override
76 public void close() throws IOException {
77 out.close();
78 }
79
80 @Override
81 public void open(OutputStream out) throws IOException {
82 this.out = out;
83 }
84
85 @Override
86 public void serialize(Mutation mutation) throws IOException {
87 MutationType type;
88 if (mutation instanceof Put) {
89 type = MutationType.PUT;
90 } else if (mutation instanceof Delete) {
91 type = MutationType.DELETE;
92 } else {
93 throw new IllegalArgumentException("Only Put and Delete are supported");
94 }
95 ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
96 }
97 }
98 }