001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.util.List;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
028import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
031import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
032
033/**
034 * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder. The Netty's ProtobufDecode
035 * supports unshaded protobuf messages (com.google.protobuf). Hadoop 3.3.0 and above relocates
036 * protobuf classes to a shaded jar (hadoop-thirdparty), and so we must use reflection to detect
037 * which one (relocated or not) to use. Do not use this to process HBase's shaded protobuf messages.
038 * This is meant to process the protobuf messages in HDFS for the asyncfs use case.
039 */
040@InterfaceAudience.Private
041public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
042  private static final Logger LOG = LoggerFactory.getLogger(ProtobufDecoder.class);
043
044  private static Class<?> protobufMessageLiteClass = null;
045  private static Class<?> protobufMessageLiteBuilderClass = null;
046
047  private static final boolean HAS_PARSER;
048
049  private static Method getParserForTypeMethod;
050  private static Method newBuilderForTypeMethod;
051
052  private Method parseFromMethod;
053  private Method mergeFromMethod;
054  private Method buildMethod;
055
056  private Object parser;
057  private Object builder;
058
059  public ProtobufDecoder(Object prototype) {
060    try {
061      Method getDefaultInstanceForTypeMethod =
062        protobufMessageLiteClass.getMethod("getDefaultInstanceForType");
063      Object prototype1 =
064        getDefaultInstanceForTypeMethod.invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
065
066      // parser = prototype.getParserForType()
067      parser = getParserForTypeMethod.invoke(prototype1);
068      parseFromMethod =
069        parser.getClass().getMethod("parseFrom", byte[].class, int.class, int.class);
070
071      // builder = prototype.newBuilderForType();
072      builder = newBuilderForTypeMethod.invoke(prototype1);
073      mergeFromMethod =
074        builder.getClass().getMethod("mergeFrom", byte[].class, int.class, int.class);
075
076      // All protobuf message builders inherits from MessageLite.Builder
077      buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
078
079    } catch (IllegalAccessException | NoSuchMethodException e) {
080      throw new RuntimeException(e);
081    } catch (InvocationTargetException e) {
082      throw new RuntimeException(e.getTargetException());
083    }
084  }
085
086  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
087    int length = msg.readableBytes();
088    byte[] array;
089    int offset;
090    if (msg.hasArray()) {
091      array = msg.array();
092      offset = msg.arrayOffset() + msg.readerIndex();
093    } else {
094      array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
095      offset = 0;
096    }
097
098    Object addObj;
099    if (HAS_PARSER) {
100      // addObj = parser.parseFrom(array, offset, length);
101      addObj = parseFromMethod.invoke(parser, array, offset, length);
102    } else {
103      // addObj = builder.mergeFrom(array, offset, length).build();
104      Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
105      addObj = buildMethod.invoke(builderObj);
106    }
107    out.add(addObj);
108  }
109
110  static {
111    boolean hasParser = false;
112
113    // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
114    protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
115    protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
116
117    try {
118      protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
119      protobufMessageLiteBuilderClass =
120        Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite$Builder");
121      LOG.debug("Hadoop 3.3 and above shades protobuf.");
122    } catch (ClassNotFoundException e) {
123      LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
124    }
125
126    try {
127      getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
128      newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
129      // TODO: If this is false then the class will fail to load? Can refactor it out?
130      hasParser = true;
131    } catch (NoSuchMethodException e) {
132      // If the method is not found, we are in trouble. Abort.
133      throw new RuntimeException(e);
134    }
135
136    HAS_PARSER = hasParser;
137  }
138}