View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.hbase.client.Delete;
27  import org.apache.hadoop.hbase.client.Mutation;
28  import org.apache.hadoop.hbase.client.Put;
29  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
31  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
32  import org.apache.hadoop.io.serializer.Deserializer;
33  import org.apache.hadoop.io.serializer.Serialization;
34  import org.apache.hadoop.io.serializer.Serializer;
35
36  @InterfaceAudience.Public
37  @InterfaceStability.Evolving
38  public class MutationSerialization implements Serialization<Mutation> {
39    @Override
40    public boolean accept(Class<?> c) {
41      return Mutation.class.isAssignableFrom(c);
42    }
43
44    @Override
45    public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
46      return new MutationDeserializer();
47    }
48
49    @Override
50    public Serializer<Mutation> getSerializer(Class<Mutation> c) {
51      return new MutationSerializer();
52    }
53
54    private static class MutationDeserializer implements Deserializer<Mutation> {
55      private InputStream in;
56
57      @Override
58      public void close() throws IOException {
59        in.close();
60      }
61
62      @Override
63      public Mutation deserialize(Mutation mutation) throws IOException {
64        MutationProto proto = MutationProto.parseDelimitedFrom(in);
65        return ProtobufUtil.toMutation(proto);
66      }
67
68      @Override
69      public void open(InputStream in) throws IOException {
70        this.in = in;
71      }
72
73    }
74    private static class MutationSerializer implements Serializer<Mutation> {
75      private OutputStream out;
76
77      @Override
78      public void close() throws IOException {
79        out.close();
80      }
81
82      @Override
83      public void open(OutputStream out) throws IOException {
84        this.out = out;
85      }
86
87      @Override
88      public void serialize(Mutation mutation) throws IOException {
89        MutationType type;
90        if (mutation instanceof Put) {
91          type = MutationType.PUT;
92        } else if (mutation instanceof Delete) {
93          type = MutationType.DELETE;
94        } else {
95          throw new IllegalArgumentException("Only Put and Delete are supported");
96        }
97        ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
98      }
99    }
100 }