View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  
24  import org.apache.hadoop.hbase.client.Delete;
25  import org.apache.hadoop.hbase.client.Mutation;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
28  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
29  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
30  import org.apache.hadoop.io.serializer.Deserializer;
31  import org.apache.hadoop.io.serializer.Serialization;
32  import org.apache.hadoop.io.serializer.Serializer;
33  
34  public class MutationSerialization implements Serialization<Mutation> {
35    @Override
36    public boolean accept(Class<?> c) {
37      return Mutation.class.isAssignableFrom(c);
38    }
39  
40    @Override
41    public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
42      return new MutationDeserializer();
43    }
44  
45    @Override
46    public Serializer<Mutation> getSerializer(Class<Mutation> c) {
47      return new MutationSerializer();
48    }
49  
50    private static class MutationDeserializer implements Deserializer<Mutation> {
51      private InputStream in;
52  
53      @Override
54      public void close() throws IOException {
55        in.close();
56      }
57  
58      @Override
59      public Mutation deserialize(Mutation mutation) throws IOException {
60        ClientProtos.MutationProto.Builder builder = ClientProtos.MutationProto.newBuilder();
61        ProtobufUtil.mergeDelimitedFrom(builder, in);
62        ClientProtos.MutationProto proto = builder.build();
63        return ProtobufUtil.toMutation(proto);
64      }
65  
66      @Override
67      public void open(InputStream in) throws IOException {
68        this.in = in;
69      }
70      
71    }
72    private static class MutationSerializer implements Serializer<Mutation> {
73      private OutputStream out;
74  
75      @Override
76      public void close() throws IOException {
77        out.close();
78      }
79  
80      @Override
81      public void open(OutputStream out) throws IOException {
82        this.out = out;
83      }
84  
85      @Override
86      public void serialize(Mutation mutation) throws IOException {
87        MutationType type;
88        if (mutation instanceof Put) {
89          type = MutationType.PUT;
90        } else if (mutation instanceof Delete) {
91          type = MutationType.DELETE;
92        } else {
93          throw new IllegalArgumentException("Only Put and Delete are supported");
94        }
95        ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
96      }
97    }
98  }