001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.net.InetSocketAddress; 023import org.apache.hadoop.hbase.ExtendedCellScanner; 024import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 025import org.apache.hadoop.hbase.nio.ByteBuff; 026import org.apache.hadoop.hbase.nio.SingleByteBuff; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.NettyFutureUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 032import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 035import org.apache.hbase.thirdparty.io.netty.channel.Channel; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 038 039/** 040 * RpcConnection implementation for netty rpc server. 041 * @since 2.0.0 042 */ 043@InterfaceAudience.Private 044class NettyServerRpcConnection extends ServerRpcConnection { 045 046 final Channel channel; 047 048 NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { 049 super(rpcServer); 050 this.channel = channel; 051 rpcServer.allChannels.add(channel); 052 NettyRpcServer.LOG.trace("Connection {}; # active connections={}", channel.remoteAddress(), 053 rpcServer.allChannels.size() - 1); 054 // register close hook to release resources 055 NettyFutureUtils.addListener(channel.closeFuture(), f -> { 056 disposeSasl(); 057 callCleanupIfNeeded(); 058 NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}", channel.remoteAddress(), 059 rpcServer.allChannels.size() - 1); 060 rpcServer.allChannels.remove(channel); 061 }); 062 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); 063 this.addr = inetSocketAddress.getAddress(); 064 if (addr == null) { 065 this.hostAddress = "*Unknown*"; 066 } else { 067 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); 068 } 069 this.remotePort = inetSocketAddress.getPort(); 070 } 071 072 void setupHandler() { 073 channel.pipeline() 074 .addBefore(NettyRpcServerResponseEncoder.NAME, "frameDecoder", 075 new NettyRpcFrameDecoder(rpcServer.maxRequestSize, this)) 076 .addBefore(NettyRpcServerResponseEncoder.NAME, "decoder", 077 new NettyRpcServerRequestDecoder(rpcServer.metrics, this)); 078 } 079 080 void process(ByteBuf buf) throws IOException, InterruptedException { 081 if (skipInitialSaslHandshake) { 082 skipInitialSaslHandshake = false; 083 buf.release(); 084 return; 085 } 086 this.callCleanup = () -> buf.release(); 087 ByteBuff byteBuff = new SingleByteBuff(buf.nioBuffer()); 088 try { 089 processOneRpc(byteBuff); 090 } catch (Exception e) { 091 callCleanupIfNeeded(); 092 throw e; 093 } finally { 094 this.callCleanup = null; 095 } 096 } 097 098 @Override 099 public synchronized void close() { 100 channel.close(); 101 } 102 103 @Override 104 public boolean isConnectionOpen() { 105 return channel.isOpen(); 106 } 107 108 @Override 109 public NettyServerCall createCall(int id, final BlockingService service, 110 final MethodDescriptor md, RequestHeader header, Message param, ExtendedCellScanner cellScanner, 111 long size, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 112 return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, 113 remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator, 114 this.rpcServer.cellBlockBuilder, reqCleanup); 115 } 116 117 @Override 118 protected void doRespond(RpcResponse resp) { 119 NettyFutureUtils.safeWriteAndFlush(channel, resp); 120 } 121}