001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.net.InetSocketAddress;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.CellScanner;
025import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
026import org.apache.hadoop.hbase.nio.ByteBuff;
027import org.apache.hadoop.hbase.nio.SingleByteBuff;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
032import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
033import org.apache.hbase.thirdparty.com.google.protobuf.Message;
034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
035import org.apache.hbase.thirdparty.io.netty.channel.Channel;
036import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
039
040/**
041 * RpcConnection implementation for netty rpc server.
042 * @since 2.0.0
043 */
044@InterfaceAudience.Private
045class NettyServerRpcConnection extends ServerRpcConnection {
046
047  final Channel channel;
048
049  NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
050    super(rpcServer);
051    this.channel = channel;
052    // register close hook to release resources
053    channel.closeFuture().addListener(f -> {
054      disposeSasl();
055      callCleanupIfNeeded();
056    });
057    InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
058    this.addr = inetSocketAddress.getAddress();
059    if (addr == null) {
060      this.hostAddress = "*Unknown*";
061    } else {
062      this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
063    }
064    this.remotePort = inetSocketAddress.getPort();
065  }
066
067  void process(final ByteBuf buf) throws IOException, InterruptedException {
068    if (connectionHeaderRead) {
069      this.callCleanup = () -> ReferenceCountUtil.safeRelease(buf);
070      process(new SingleByteBuff(buf.nioBuffer()));
071    } else {
072      ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
073      try {
074        buf.readBytes(connectionHeader);
075      } finally {
076        buf.release();
077      }
078      process(connectionHeader);
079    }
080  }
081
082  void process(ByteBuffer buf) throws IOException, InterruptedException {
083    process(new SingleByteBuff(buf));
084  }
085
086  void process(ByteBuff buf) throws IOException, InterruptedException {
087    try {
088      if (skipInitialSaslHandshake) {
089        skipInitialSaslHandshake = false;
090        callCleanupIfNeeded();
091        return;
092      }
093
094      if (useSasl) {
095        saslReadAndProcess(buf);
096      } else {
097        processOneRpc(buf);
098      }
099    } catch (Exception e) {
100      callCleanupIfNeeded();
101      throw e;
102    } finally {
103      this.callCleanup = null;
104    }
105  }
106
107  @Override
108  public synchronized void close() {
109    channel.close();
110  }
111
112  @Override
113  public boolean isConnectionOpen() {
114    return channel.isOpen();
115  }
116
117  @Override
118  public NettyServerCall createCall(int id, final BlockingService service,
119    final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
120    long size, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
121    return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
122      remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator,
123      this.rpcServer.cellBlockBuilder, reqCleanup);
124  }
125
126  @Override
127  protected void doRespond(RpcResponse resp) {
128    channel.writeAndFlush(resp);
129  }
130}