001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 021import org.apache.hbase.thirdparty.io.netty.channel.Channel; 022 023import java.io.IOException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027 028import org.apache.hadoop.hbase.CellScanner; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.hbase.nio.SingleByteBuff; 033import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 034import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 035import org.apache.hbase.thirdparty.com.google.protobuf.Message; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 037 038/** 039 * RpcConnection implementation for netty rpc server. 040 * @since 2.0.0 041 */ 042@InterfaceAudience.Private 043class NettyServerRpcConnection extends ServerRpcConnection { 044 045 final Channel channel; 046 047 NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { 048 super(rpcServer); 049 this.channel = channel; 050 InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); 051 this.addr = inetSocketAddress.getAddress(); 052 if (addr == null) { 053 this.hostAddress = "*Unknown*"; 054 } else { 055 this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); 056 } 057 this.remotePort = inetSocketAddress.getPort(); 058 } 059 060 void process(final ByteBuf buf) throws IOException, InterruptedException { 061 if (connectionHeaderRead) { 062 this.callCleanup = new RpcServer.CallCleanup() { 063 @Override 064 public void run() { 065 buf.release(); 066 } 067 }; 068 process(new SingleByteBuff(buf.nioBuffer())); 069 } else { 070 ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); 071 buf.readBytes(connectionHeader); 072 buf.release(); 073 process(connectionHeader); 074 } 075 } 076 077 void process(ByteBuffer buf) throws IOException, InterruptedException { 078 process(new SingleByteBuff(buf)); 079 } 080 081 void process(ByteBuff buf) throws IOException, InterruptedException { 082 try { 083 if (skipInitialSaslHandshake) { 084 skipInitialSaslHandshake = false; 085 if (callCleanup != null) { 086 callCleanup.run(); 087 } 088 return; 089 } 090 091 if (useSasl) { 092 saslReadAndProcess(buf); 093 } else { 094 processOneRpc(buf); 095 } 096 } catch (Exception e) { 097 if (callCleanup != null) { 098 callCleanup.run(); 099 } 100 throw e; 101 } finally { 102 this.callCleanup = null; 103 } 104 } 105 106 @Override 107 public synchronized void close() { 108 disposeSasl(); 109 channel.close(); 110 callCleanup = null; 111 } 112 113 @Override 114 public boolean isConnectionOpen() { 115 return channel.isOpen(); 116 } 117 118 @Override 119 public NettyServerCall createCall(int id, final BlockingService service, 120 final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, 121 long size, final InetAddress remoteAddress, int timeout, 122 CallCleanup reqCleanup) { 123 return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, 124 remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, 125 this.rpcServer.cellBlockBuilder, reqCleanup); 126 } 127 128 @Override 129 protected void doRespond(RpcResponse resp) { 130 channel.writeAndFlush(resp); 131 } 132}