001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 021import org.apache.hbase.thirdparty.io.netty.channel.Channel; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027 028import org.apache.hadoop.hbase.CellScanner; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.hbase.nio.SingleByteBuff; 033import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 034import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 035import org.apache.hbase.thirdparty.com.google.protobuf.Message; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 037 038/** 039 * RpcConnection implementation for netty rpc server. 040 * @since 2.0.0 041 */ 042@InterfaceAudience.Private 043class NettyServerRpcConnection extends ServerRpcConnection { 044 045 final Channel channel; 046 047 NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { 048 super(rpcServer); 049 this.channel = channel; 050 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); 051 this.addr = inetSocketAddress.getAddress(); 052 if (addr == null) { 053 this.hostAddress = "*Unknown*"; 054 } else { 055 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); 056 } 057 this.remotePort = inetSocketAddress.getPort(); 058 } 059 060 void process(final ByteBuf buf) throws IOException, InterruptedException { 061 if (connectionHeaderRead) { 062 this.callCleanup = buf::release; 063 process(new SingleByteBuff(buf.nioBuffer())); 064 } else { 065 ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); 066 buf.readBytes(connectionHeader); 067 buf.release(); 068 process(connectionHeader); 069 } 070 } 071 072 void process(ByteBuffer buf) throws IOException, InterruptedException { 073 process(new SingleByteBuff(buf)); 074 } 075 076 void process(ByteBuff buf) throws IOException, InterruptedException { 077 try { 078 if (skipInitialSaslHandshake) { 079 skipInitialSaslHandshake = false; 080 if (callCleanup != null) { 081 callCleanup.run(); 082 } 083 return; 084 } 085 086 if (useSasl) { 087 saslReadAndProcess(buf); 088 } else { 089 processOneRpc(buf); 090 } 091 } catch (Exception e) { 092 if (callCleanup != null) { 093 callCleanup.run(); 094 } 095 throw e; 096 } finally { 097 this.callCleanup = null; 098 } 099 } 100 101 @Override 102 public synchronized void close() { 103 disposeSasl(); 104 channel.close(); 105 callCleanup = null; 106 } 107 108 @Override 109 public boolean isConnectionOpen() { 110 return channel.isOpen(); 111 } 112 113 @Override 114 public NettyServerCall createCall(int id, final BlockingService service, 115 final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, 116 long size, final InetAddress remoteAddress, int timeout, 117 CallCleanup reqCleanup) { 118 return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, 119 remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator, 120 this.rpcServer.cellBlockBuilder, reqCleanup); 121 } 122 123 @Override 124 protected void doRespond(RpcResponse resp) { 125 channel.writeAndFlush(resp); 126 } 127}