001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
021import org.apache.hbase.thirdparty.io.netty.channel.Channel;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027
028import org.apache.hadoop.hbase.CellScanner;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.hbase.nio.SingleByteBuff;
033import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
034import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
035import org.apache.hbase.thirdparty.com.google.protobuf.Message;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
037
038/**
039 * RpcConnection implementation for netty rpc server.
040 * @since 2.0.0
041 */
042@InterfaceAudience.Private
043class NettyServerRpcConnection extends ServerRpcConnection {
044
045  final Channel channel;
046
047  NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
048    super(rpcServer);
049    this.channel = channel;
050    InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
051    this.addr = inetSocketAddress.getAddress();
052    if (addr == null) {
053      this.hostAddress = "*Unknown*";
054    } else {
055      this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
056    }
057    this.remotePort = inetSocketAddress.getPort();
058  }
059
060  void process(final ByteBuf buf) throws IOException, InterruptedException {
061    if (connectionHeaderRead) {
062      this.callCleanup = buf::release;
063      process(new SingleByteBuff(buf.nioBuffer()));
064    } else {
065      ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
066      buf.readBytes(connectionHeader);
067      buf.release();
068      process(connectionHeader);
069    }
070  }
071
072  void process(ByteBuffer buf) throws IOException, InterruptedException {
073    process(new SingleByteBuff(buf));
074  }
075
076  void process(ByteBuff buf) throws IOException, InterruptedException {
077    try {
078      if (skipInitialSaslHandshake) {
079        skipInitialSaslHandshake = false;
080        if (callCleanup != null) {
081          callCleanup.run();
082        }
083        return;
084      }
085
086      if (useSasl) {
087        saslReadAndProcess(buf);
088      } else {
089        processOneRpc(buf);
090      }
091    } catch (Exception e) {
092      if (callCleanup != null) {
093        callCleanup.run();
094      }
095      throw e;
096    } finally {
097      this.callCleanup = null;
098    }
099  }
100
101  @Override
102  public synchronized void close() {
103    disposeSasl();
104    channel.close();
105    callCleanup = null;
106  }
107
108  @Override
109  public boolean isConnectionOpen() {
110    return channel.isOpen();
111  }
112
113  @Override
114  public NettyServerCall createCall(int id, final BlockingService service,
115      final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
116      long size, final InetAddress remoteAddress, int timeout,
117      CallCleanup reqCleanup) {
118    return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
119        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
120        this.rpcServer.cellBlockBuilder, reqCleanup);
121  }
122
123  @Override
124  protected void doRespond(RpcResponse resp) {
125    channel.writeAndFlush(resp);
126  }
127}