001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
021import org.apache.hbase.thirdparty.io.netty.channel.Channel;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027
028import org.apache.hadoop.hbase.CellScanner;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.hbase.nio.SingleByteBuff;
033import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
034import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
035import org.apache.hbase.thirdparty.com.google.protobuf.Message;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
037
038/**
039 * RpcConnection implementation for netty rpc server.
040 * @since 2.0.0
041 */
042@InterfaceAudience.Private
043class NettyServerRpcConnection extends ServerRpcConnection {
044
045  final Channel channel;
046
047  NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
048    super(rpcServer);
049    this.channel = channel;
050    InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
051    this.addr = inetSocketAddress.getAddress();
052    if (addr == null) {
053      this.hostAddress = "*Unknown*";
054    } else {
055      this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
056    }
057    this.remotePort = inetSocketAddress.getPort();
058  }
059
060  void process(final ByteBuf buf) throws IOException, InterruptedException {
061    if (connectionHeaderRead) {
062      this.callCleanup = new RpcServer.CallCleanup() {
063        @Override
064        public void run() {
065          buf.release();
066        }
067      };
068      process(new SingleByteBuff(buf.nioBuffer()));
069    } else {
070      ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
071      buf.readBytes(connectionHeader);
072      buf.release();
073      process(connectionHeader);
074    }
075  }
076
077  void process(ByteBuffer buf) throws IOException, InterruptedException {
078    process(new SingleByteBuff(buf));
079  }
080
081  void process(ByteBuff buf) throws IOException, InterruptedException {
082    try {
083      if (skipInitialSaslHandshake) {
084        skipInitialSaslHandshake = false;
085        if (callCleanup != null) {
086          callCleanup.run();
087        }
088        return;
089      }
090
091      if (useSasl) {
092        saslReadAndProcess(buf);
093      } else {
094        processOneRpc(buf);
095      }
096    } catch (Exception e) {
097      if (callCleanup != null) {
098        callCleanup.run();
099      }
100      throw e;
101    } finally {
102      this.callCleanup = null;
103    }
104  }
105
106  @Override
107  public synchronized void close() {
108    disposeSasl();
109    channel.close();
110    callCleanup = null;
111  }
112
113  @Override
114  public boolean isConnectionOpen() {
115    return channel.isOpen();
116  }
117
118  @Override
119  public NettyServerCall createCall(int id, final BlockingService service,
120      final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
121      long size, final InetAddress remoteAddress, int timeout,
122      CallCleanup reqCleanup) {
123    return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
124        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
125        this.rpcServer.cellBlockBuilder, reqCleanup);
126  }
127
128  @Override
129  protected void doRespond(RpcResponse resp) {
130    channel.writeAndFlush(resp);
131  }
132}