001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.net.InetSocketAddress; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.CellScanner; 025import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 026import org.apache.hadoop.hbase.nio.ByteBuff; 027import org.apache.hadoop.hbase.nio.SingleByteBuff; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 032import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 035import org.apache.hbase.thirdparty.io.netty.channel.Channel; 036import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 039 040/** 041 * RpcConnection implementation for netty rpc server. 042 * @since 2.0.0 043 */ 044@InterfaceAudience.Private 045class NettyServerRpcConnection extends ServerRpcConnection { 046 047 final Channel channel; 048 049 NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { 050 super(rpcServer); 051 this.channel = channel; 052 // register close hook to release resources 053 channel.closeFuture().addListener(f -> { 054 disposeSasl(); 055 callCleanupIfNeeded(); 056 }); 057 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); 058 this.addr = inetSocketAddress.getAddress(); 059 if (addr == null) { 060 this.hostAddress = "*Unknown*"; 061 } else { 062 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); 063 } 064 this.remotePort = inetSocketAddress.getPort(); 065 } 066 067 void process(final ByteBuf buf) throws IOException, InterruptedException { 068 if (connectionHeaderRead) { 069 this.callCleanup = () -> ReferenceCountUtil.safeRelease(buf); 070 process(new SingleByteBuff(buf.nioBuffer())); 071 } else { 072 ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); 073 try { 074 buf.readBytes(connectionHeader); 075 } finally { 076 buf.release(); 077 } 078 process(connectionHeader); 079 } 080 } 081 082 void process(ByteBuffer buf) throws IOException, InterruptedException { 083 process(new SingleByteBuff(buf)); 084 } 085 086 void process(ByteBuff buf) throws IOException, InterruptedException { 087 try { 088 if (skipInitialSaslHandshake) { 089 skipInitialSaslHandshake = false; 090 callCleanupIfNeeded(); 091 return; 092 } 093 094 if (useSasl) { 095 saslReadAndProcess(buf); 096 } else { 097 processOneRpc(buf); 098 } 099 } catch (Exception e) { 100 callCleanupIfNeeded(); 101 throw e; 102 } finally { 103 this.callCleanup = null; 104 } 105 } 106 107 @Override 108 public synchronized void close() { 109 channel.close(); 110 } 111 112 @Override 113 public boolean isConnectionOpen() { 114 return channel.isOpen(); 115 } 116 117 @Override 118 public NettyServerCall createCall(int id, final BlockingService service, 119 final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, 120 long size, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 121 return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, 122 remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator, 123 this.rpcServer.cellBlockBuilder, reqCleanup); 124 } 125 126 @Override 127 protected void doRespond(RpcResponse resp) { 128 channel.writeAndFlush(resp); 129 } 130}