View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  
24  import org.apache.hadoop.hbase.client.Delete;
25  import org.apache.hadoop.hbase.client.Mutation;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
28  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
29  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
30  import org.apache.hadoop.io.serializer.Deserializer;
31  import org.apache.hadoop.io.serializer.Serialization;
32  import org.apache.hadoop.io.serializer.Serializer;
33  
34  public class MutationSerialization implements Serialization<Mutation> {
35    @Override
36    public boolean accept(Class<?> c) {
37      return Mutation.class.isAssignableFrom(c);
38    }
39  
40    @Override
41    public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
42      return new MutationDeserializer();
43    }
44  
45    @Override
46    public Serializer<Mutation> getSerializer(Class<Mutation> c) {
47      return new MutationSerializer();
48    }
49  
50    private static class MutationDeserializer implements Deserializer<Mutation> {
51      private InputStream in;
52  
53      @Override
54      public void close() throws IOException {
55        in.close();
56      }
57  
58      @Override
59      public Mutation deserialize(Mutation mutation) throws IOException {
60        MutationProto proto = MutationProto.parseDelimitedFrom(in);
61        return ProtobufUtil.toMutation(proto);
62      }
63  
64      @Override
65      public void open(InputStream in) throws IOException {
66        this.in = in;
67      }
68      
69    }
70    private static class MutationSerializer implements Serializer<Mutation> {
71      private OutputStream out;
72  
73      @Override
74      public void close() throws IOException {
75        out.close();
76      }
77  
78      @Override
79      public void open(OutputStream out) throws IOException {
80        this.out = out;
81      }
82  
83      @Override
84      public void serialize(Mutation mutation) throws IOException {
85        MutationType type;
86        if (mutation instanceof Put) {
87          type = MutationType.PUT;
88        } else if (mutation instanceof Delete) {
89          type = MutationType.DELETE;
90        } else {
91          throw new IllegalArgumentException("Only Put and Delete are supported");
92        }
93        ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
94      }
95    }
96  }