001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil; 022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 023import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder; 024import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.util.List; 032 033/** 034 * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder. 035 * The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf). 036 * 037 * Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and 038 * so we must use reflection to detect which one (relocated or not) to use. 039 * 040 * Do not use this to process HBase's shaded protobuf messages. This is meant to process the 041 * protobuf messages in HDFS for the asyncfs use case. 042 * */ 043@InterfaceAudience.Private 044public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> { 045 private static final Logger LOG = 046 LoggerFactory.getLogger(ProtobufDecoder.class); 047 048 private static Class<?> protobufMessageLiteClass = null; 049 private static Class<?> protobufMessageLiteBuilderClass = null; 050 051 private static final boolean HAS_PARSER; 052 053 private static Method getParserForTypeMethod; 054 private static Method newBuilderForTypeMethod; 055 056 private Method parseFromMethod; 057 private Method mergeFromMethod; 058 private Method buildMethod; 059 060 private Object parser; 061 private Object builder; 062 063 064 public ProtobufDecoder(Object prototype) { 065 try { 066 Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod( 067 "getDefaultInstanceForType"); 068 Object prototype1 = getDefaultInstanceForTypeMethod 069 .invoke(ObjectUtil.checkNotNull(prototype, "prototype")); 070 071 // parser = prototype.getParserForType() 072 parser = getParserForTypeMethod.invoke(prototype1); 073 parseFromMethod = parser.getClass().getMethod( 074 "parseFrom", byte[].class, int.class, int.class); 075 076 // builder = prototype.newBuilderForType(); 077 builder = newBuilderForTypeMethod.invoke(prototype1); 078 mergeFromMethod = builder.getClass().getMethod( 079 "mergeFrom", byte[].class, int.class, int.class); 080 081 // All protobuf message builders inherits from MessageLite.Builder 082 buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build"); 083 084 } catch (IllegalAccessException | NoSuchMethodException e) { 085 throw new RuntimeException(e); 086 } catch (InvocationTargetException e) { 087 throw new RuntimeException(e.getTargetException()); 088 } 089 } 090 091 protected void decode( 092 ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { 093 int length = msg.readableBytes(); 094 byte[] array; 095 int offset; 096 if (msg.hasArray()) { 097 array = msg.array(); 098 offset = msg.arrayOffset() + msg.readerIndex(); 099 } else { 100 array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false); 101 offset = 0; 102 } 103 104 Object addObj; 105 if (HAS_PARSER) { 106 // addObj = parser.parseFrom(array, offset, length); 107 addObj = parseFromMethod.invoke(parser, array, offset, length); 108 } else { 109 // addObj = builder.mergeFrom(array, offset, length).build(); 110 Object builderObj = mergeFromMethod.invoke(builder, array, offset, length); 111 addObj = buildMethod.invoke(builderObj); 112 } 113 out.add(addObj); 114 } 115 116 static { 117 boolean hasParser = false; 118 119 // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf 120 protobufMessageLiteClass = com.google.protobuf.MessageLite.class; 121 protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class; 122 123 try { 124 protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite"); 125 protobufMessageLiteBuilderClass = Class.forName( 126 "org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder"); 127 LOG.debug("Hadoop 3.3 and above shades protobuf."); 128 } catch (ClassNotFoundException e) { 129 LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e); 130 } 131 132 try { 133 getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType"); 134 newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType"); 135 } catch (NoSuchMethodException e) { 136 // If the method is not found, we are in trouble. Abort. 137 throw new RuntimeException(e); 138 } 139 140 try { 141 protobufMessageLiteClass.getDeclaredMethod("getParserForType"); 142 hasParser = true; 143 } catch (Throwable var2) { 144 } 145 146 HAS_PARSER = hasParser; 147 } 148}