001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
022import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
023import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
024import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.util.List;
032
033/**
034 * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder.
035 * The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf).
036 *
037 * Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and
038 * so we must use reflection to detect which one (relocated or not) to use.
039 *
040 * Do not use this to process HBase's shaded protobuf messages. This is meant to process the
041 * protobuf messages in HDFS for the asyncfs use case.
042 * */
043@InterfaceAudience.Private
044public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
045  private static final Logger LOG =
046    LoggerFactory.getLogger(ProtobufDecoder.class);
047
048  private static Class<?> protobufMessageLiteClass = null;
049  private static Class<?> protobufMessageLiteBuilderClass = null;
050
051  private static final boolean HAS_PARSER;
052
053  private static Method getParserForTypeMethod;
054  private static Method newBuilderForTypeMethod;
055
056  private Method parseFromMethod;
057  private Method mergeFromMethod;
058  private Method buildMethod;
059
060  private Object parser;
061  private Object builder;
062
063
064  public ProtobufDecoder(Object prototype) {
065    try {
066      Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod(
067        "getDefaultInstanceForType");
068      Object prototype1 = getDefaultInstanceForTypeMethod
069        .invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
070
071      // parser = prototype.getParserForType()
072      parser = getParserForTypeMethod.invoke(prototype1);
073      parseFromMethod = parser.getClass().getMethod(
074        "parseFrom", byte[].class, int.class, int.class);
075
076      // builder = prototype.newBuilderForType();
077      builder = newBuilderForTypeMethod.invoke(prototype1);
078      mergeFromMethod = builder.getClass().getMethod(
079        "mergeFrom", byte[].class, int.class, int.class);
080
081      // All protobuf message builders inherits from MessageLite.Builder
082      buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
083
084    } catch (IllegalAccessException | NoSuchMethodException e) {
085      throw new RuntimeException(e);
086    } catch (InvocationTargetException e) {
087      throw new RuntimeException(e.getTargetException());
088    }
089  }
090
091  protected void decode(
092    ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
093    int length = msg.readableBytes();
094    byte[] array;
095    int offset;
096    if (msg.hasArray()) {
097      array = msg.array();
098      offset = msg.arrayOffset() + msg.readerIndex();
099    } else {
100      array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
101      offset = 0;
102    }
103
104    Object addObj;
105    if (HAS_PARSER) {
106      // addObj = parser.parseFrom(array, offset, length);
107      addObj = parseFromMethod.invoke(parser, array, offset, length);
108    } else {
109      // addObj = builder.mergeFrom(array, offset, length).build();
110      Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
111      addObj = buildMethod.invoke(builderObj);
112    }
113    out.add(addObj);
114  }
115
116  static {
117    boolean hasParser = false;
118
119    // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
120    protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
121    protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
122
123    try {
124      protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
125      protobufMessageLiteBuilderClass = Class.forName(
126        "org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder");
127      LOG.debug("Hadoop 3.3 and above shades protobuf.");
128    } catch (ClassNotFoundException e) {
129      LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
130    }
131
132    try {
133      getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
134      newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
135    } catch (NoSuchMethodException e) {
136      // If the method is not found, we are in trouble. Abort.
137      throw new RuntimeException(e);
138    }
139
140    try {
141      protobufMessageLiteClass.getDeclaredMethod("getParserForType");
142      hasParser = true;
143    } catch (Throwable var2) {
144    }
145
146    HAS_PARSER = hasParser;
147  }
148}