001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.net.InetSocketAddress; 023import org.apache.hadoop.hbase.CellScanner; 024import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 025import org.apache.hadoop.hbase.nio.ByteBuff; 026import org.apache.hadoop.hbase.nio.SingleByteBuff; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.NettyFutureUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 032import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 035import org.apache.hbase.thirdparty.io.netty.channel.Channel; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 038 039/** 040 * RpcConnection implementation for netty rpc server. 041 * @since 2.0.0 042 */ 043@InterfaceAudience.Private 044class NettyServerRpcConnection extends ServerRpcConnection { 045 046 final Channel channel; 047 048 NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { 049 super(rpcServer); 050 this.channel = channel; 051 rpcServer.allChannels.add(channel); 052 NettyRpcServer.LOG.trace("Connection {}; # active connections={}", channel.remoteAddress(), 053 rpcServer.allChannels.size() - 1); 054 // register close hook to release resources 055 NettyFutureUtils.addListener(channel.closeFuture(), f -> { 056 disposeSasl(); 057 callCleanupIfNeeded(); 058 NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}", channel.remoteAddress(), 059 rpcServer.allChannels.size() - 1); 060 rpcServer.allChannels.remove(channel); 061 }); 062 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); 063 this.addr = inetSocketAddress.getAddress(); 064 if (addr == null) { 065 this.hostAddress = "*Unknown*"; 066 } else { 067 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); 068 } 069 this.remotePort = inetSocketAddress.getPort(); 070 } 071 072 void setupHandler() { 073 channel.pipeline() 074 .addLast("frameDecoder", new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this)) 075 .addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, this)) 076 .addLast("encoder", new NettyRpcServerResponseEncoder(rpcServer.metrics)); 077 } 078 079 void process(ByteBuf buf) throws IOException, InterruptedException { 080 if (skipInitialSaslHandshake) { 081 skipInitialSaslHandshake = false; 082 buf.release(); 083 return; 084 } 085 this.callCleanup = () -> buf.release(); 086 ByteBuff byteBuff = new SingleByteBuff(buf.nioBuffer()); 087 try { 088 processOneRpc(byteBuff); 089 } catch (Exception e) { 090 callCleanupIfNeeded(); 091 throw e; 092 } finally { 093 this.callCleanup = null; 094 } 095 } 096 097 @Override 098 public synchronized void close() { 099 channel.close(); 100 } 101 102 @Override 103 public boolean isConnectionOpen() { 104 return channel.isOpen(); 105 } 106 107 @Override 108 public NettyServerCall createCall(int id, final BlockingService service, 109 final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, 110 long size, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 111 return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, 112 remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator, 113 this.rpcServer.cellBlockBuilder, reqCleanup); 114 } 115 116 @Override 117 protected void doRespond(RpcResponse resp) { 118 channel.writeAndFlush(resp); 119 } 120}