001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.conf.Configured; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.io.serializer.Deserializer; 034import org.apache.hadoop.io.serializer.Serialization; 035import org.apache.hadoop.io.serializer.Serializer; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 042 043@InterfaceAudience.Public 044public class ResultSerialization extends Configured implements Serialization<Result> { 045 private static final Logger LOG = LoggerFactory.getLogger(ResultSerialization.class); 046 // The following configuration property indicates import file format version. 047 public static final String IMPORT_FORMAT_VER = "hbase.import.version"; 048 049 @Override 050 public boolean accept(Class<?> c) { 051 return Result.class.isAssignableFrom(c); 052 } 053 054 @Override 055 public Deserializer<Result> getDeserializer(Class<Result> c) { 056 // check input format version 057 Configuration conf = getConf(); 058 if (conf != null) { 059 String inputVersion = conf.get(IMPORT_FORMAT_VER); 060 if (inputVersion != null && inputVersion.equals("0.94")) { 061 LOG.info("Load exported file using deserializer for HBase 0.94 format"); 062 return new Result94Deserializer(); 063 } 064 } 065 066 return new ResultDeserializer(); 067 } 068 069 @Override 070 public Serializer<Result> getSerializer(Class<Result> c) { 071 return new ResultSerializer(); 072 } 073 074 /** 075 * The following deserializer class is used to load exported file of 0.94 076 */ 077 private static class Result94Deserializer implements Deserializer<Result> { 078 private DataInputStream in; 079 080 @Override 081 public void close() throws IOException { 082 in.close(); 083 } 084 085 @Override 086 public Result deserialize(Result mutation) throws IOException { 087 int totalBuffer = in.readInt(); 088 if (totalBuffer == 0) { 089 return Result.EMPTY_RESULT; 090 } 091 byte[] buf = new byte[totalBuffer]; 092 readChunked(in, buf, 0, totalBuffer); 093 List<Cell> kvs = new ArrayList<>(); 094 int offset = 0; 095 while (offset < totalBuffer) { 096 int keyLength = Bytes.toInt(buf, offset); 097 offset += Bytes.SIZEOF_INT; 098 kvs.add(new KeyValue(buf, offset, keyLength)); 099 offset += keyLength; 100 } 101 return Result.create(kvs); 102 } 103 104 @Override 105 public void open(InputStream in) throws IOException { 106 if (!(in instanceof DataInputStream)) { 107 throw new IOException("Wrong input stream instance passed in"); 108 } 109 this.in = (DataInputStream) in; 110 } 111 112 private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException { 113 int maxRead = 8192; 114 115 for (; ofs < len; ofs += maxRead) 116 in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); 117 } 118 } 119 120 private static class ResultDeserializer implements Deserializer<Result> { 121 private InputStream in; 122 123 @Override 124 public void close() throws IOException { 125 in.close(); 126 } 127 128 @Override 129 public Result deserialize(Result mutation) throws IOException { 130 ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); 131 return ProtobufUtil.toResult(proto, true); 132 } 133 134 @Override 135 public void open(InputStream in) throws IOException { 136 this.in = in; 137 } 138 } 139 140 private static class ResultSerializer implements Serializer<Result> { 141 private OutputStream out; 142 143 @Override 144 public void close() throws IOException { 145 out.close(); 146 } 147 148 @Override 149 public void open(OutputStream out) throws IOException { 150 this.out = out; 151 } 152 153 @Override 154 public void serialize(Result result) throws IOException { 155 ProtobufUtil.toResult(result, true).writeDelimitedTo(out); 156 } 157 } 158}