001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.util.List; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 028import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 030import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder; 031import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 032 033/** 034 * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder. The Netty's ProtobufDecode 035 * supports unshaded protobuf messages (com.google.protobuf). Hadoop 3.3.0 and above relocates 036 * protobuf classes to a shaded jar (hadoop-thirdparty), and so we must use reflection to detect 037 * which one (relocated or not) to use. Do not use this to process HBase's shaded protobuf messages. 038 * This is meant to process the protobuf messages in HDFS for the asyncfs use case. 039 */ 040@InterfaceAudience.Private 041public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> { 042 private static final Logger LOG = LoggerFactory.getLogger(ProtobufDecoder.class); 043 044 private static Class<?> protobufMessageLiteClass = null; 045 private static Class<?> protobufMessageLiteBuilderClass = null; 046 047 private static final boolean HAS_PARSER; 048 049 private static Method getParserForTypeMethod; 050 private static Method newBuilderForTypeMethod; 051 052 private Method parseFromMethod; 053 private Method mergeFromMethod; 054 private Method buildMethod; 055 056 private Object parser; 057 private Object builder; 058 059 public ProtobufDecoder(Object prototype) { 060 try { 061 Method getDefaultInstanceForTypeMethod = 062 protobufMessageLiteClass.getMethod("getDefaultInstanceForType"); 063 Object prototype1 = 064 getDefaultInstanceForTypeMethod.invoke(ObjectUtil.checkNotNull(prototype, "prototype")); 065 066 // parser = prototype.getParserForType() 067 parser = getParserForTypeMethod.invoke(prototype1); 068 parseFromMethod = 069 parser.getClass().getMethod("parseFrom", byte[].class, int.class, int.class); 070 071 // builder = prototype.newBuilderForType(); 072 builder = newBuilderForTypeMethod.invoke(prototype1); 073 mergeFromMethod = 074 builder.getClass().getMethod("mergeFrom", byte[].class, int.class, int.class); 075 076 // All protobuf message builders inherits from MessageLite.Builder 077 buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build"); 078 079 } catch (IllegalAccessException | NoSuchMethodException e) { 080 throw new RuntimeException(e); 081 } catch (InvocationTargetException e) { 082 throw new RuntimeException(e.getTargetException()); 083 } 084 } 085 086 protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { 087 int length = msg.readableBytes(); 088 byte[] array; 089 int offset; 090 if (msg.hasArray()) { 091 array = msg.array(); 092 offset = msg.arrayOffset() + msg.readerIndex(); 093 } else { 094 array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false); 095 offset = 0; 096 } 097 098 Object addObj; 099 if (HAS_PARSER) { 100 // addObj = parser.parseFrom(array, offset, length); 101 addObj = parseFromMethod.invoke(parser, array, offset, length); 102 } else { 103 // addObj = builder.mergeFrom(array, offset, length).build(); 104 Object builderObj = mergeFromMethod.invoke(builder, array, offset, length); 105 addObj = buildMethod.invoke(builderObj); 106 } 107 out.add(addObj); 108 } 109 110 static { 111 boolean hasParser = false; 112 // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf 113 try { 114 protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite"); 115 protobufMessageLiteBuilderClass = 116 Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite$Builder"); 117 LOG.debug("Hadoop 3.3 and above shades protobuf."); 118 } catch (ClassNotFoundException e) { 119 LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e); 120 try { 121 protobufMessageLiteClass = Class.forName("com.google.protobuf.MessageLite"); 122 protobufMessageLiteBuilderClass = Class.forName("com.google.protobuf.MessageLite$Builder"); 123 } catch (ClassNotFoundException ex) { 124 throw new RuntimeException("can not initialize protobuf related classes for hadoop", ex); 125 } 126 } 127 128 try { 129 getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType"); 130 newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType"); 131 // TODO: If this is false then the class will fail to load? Can refactor it out? 132 hasParser = true; 133 } catch (NoSuchMethodException e) { 134 // If the method is not found, we are in trouble. Abort. 135 throw new RuntimeException("can not initialize protobuf related classes for hadoop", e); 136 } 137 138 HAS_PARSER = hasParser; 139 } 140}