1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.DataInputStream;
21 import java.io.DataOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.io.serializer.Deserializer;
28 import org.apache.hadoop.io.serializer.Serialization;
29 import org.apache.hadoop.io.serializer.Serializer;
30
31 public class KeyValueSerialization implements Serialization<KeyValue> {
32 @Override
33 public boolean accept(Class<?> c) {
34 return KeyValue.class.isAssignableFrom(c);
35 }
36
37 @Override
38 public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
39 return new KeyValueDeserializer();
40 }
41
42 @Override
43 public KeyValueSerializer getSerializer(Class<KeyValue> c) {
44 return new KeyValueSerializer();
45 }
46
47 public static class KeyValueDeserializer implements Deserializer<KeyValue> {
48 private DataInputStream dis;
49
50 @Override
51 public void close() throws IOException {
52 this.dis.close();
53 }
54
55 @Override
56 public KeyValue deserialize(KeyValue ignore) throws IOException {
57
58 return KeyValue.create(this.dis);
59 }
60
61 @Override
62 public void open(InputStream is) throws IOException {
63 this.dis = new DataInputStream(is);
64 }
65 }
66
67 public static class KeyValueSerializer implements Serializer<KeyValue> {
68 private DataOutputStream dos;
69
70 @Override
71 public void close() throws IOException {
72 this.dos.close();
73 }
74
75 @Override
76 public void open(OutputStream os) throws IOException {
77 this.dos = new DataOutputStream(os);
78 }
79
80 @Override
81 public void serialize(KeyValue kv) throws IOException {
82 KeyValue.write(kv, this.dos);
83 }
84 }
85 }