001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import javax.net.ssl.SSLException; 024import org.apache.hadoop.hbase.util.NettyFutureUtils; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 030import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 031import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 032 033/** 034 * We will expose the connection to upper layer before initialized, so we need to buffer the calls 035 * passed in and write them out once the connection is established. 036 */ 037@InterfaceAudience.Private 038class BufferCallBeforeInitHandler extends ChannelDuplexHandler { 039 040 private static final Logger LOG = LoggerFactory.getLogger(BufferCallBeforeInitHandler.class); 041 042 static final String NAME = "BufferCall"; 043 044 private enum BufferCallAction { 045 FLUSH, 046 FAIL 047 } 048 049 public static final class BufferCallEvent { 050 051 public final BufferCallAction action; 052 053 public final IOException error; 054 055 private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action, 056 IOException error) { 057 this.action = action; 058 this.error = error; 059 } 060 061 public static BufferCallBeforeInitHandler.BufferCallEvent success() { 062 return SUCCESS_EVENT; 063 } 064 065 public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) { 066 return new BufferCallEvent(BufferCallAction.FAIL, error); 067 } 068 } 069 070 private static final BufferCallEvent SUCCESS_EVENT = 071 new BufferCallEvent(BufferCallAction.FLUSH, null); 072 073 private final Map<Integer, Call> id2Call = new HashMap<>(); 074 075 @Override 076 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 077 if (msg instanceof Call) { 078 Call call = (Call) msg; 079 id2Call.put(call.id, call); 080 // The call is already in track so here we set the write operation as success. 081 // We will fail the call directly if we can not write it out. 082 promise.trySuccess(); 083 } else { 084 NettyFutureUtils.consume(ctx.write(msg, promise)); 085 } 086 } 087 088 @Override 089 public void flush(ChannelHandlerContext ctx) throws Exception { 090 // do not flush anything out 091 } 092 093 @Override 094 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 095 if (evt instanceof BufferCallEvent) { 096 BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt; 097 switch (bcEvt.action) { 098 case FLUSH: 099 for (Call call : id2Call.values()) { 100 NettyFutureUtils.safeWrite(ctx, call); 101 } 102 ctx.flush(); 103 ctx.pipeline().remove(this); 104 break; 105 case FAIL: 106 for (Call call : id2Call.values()) { 107 call.setException(bcEvt.error); 108 } 109 // here we do not remove us from the pipeline, for receiving possible exceptions and log 110 // it, especially the ssl exceptions, to prevent it reaching the tail of the pipeline and 111 // generate a confusing netty WARN 112 break; 113 } 114 } else if (evt instanceof CallEvent) { 115 // just remove the call for now until we add other call event other than timeout and cancel. 116 id2Call.remove(((CallEvent) evt).call.id); 117 } else { 118 ctx.fireUserEventTriggered(evt); 119 } 120 } 121 122 private boolean isSslError(Throwable cause) { 123 Throwable error = cause; 124 do { 125 if (error instanceof SSLException) { 126 return true; 127 } 128 error = error.getCause(); 129 } while (error != null); 130 return false; 131 } 132 133 @Override 134 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 135 if (isSslError(cause)) { 136 // this should have been logged in other places, see HBASE-27782 for more details. 137 // here we just log it with debug and tell users that this is not a critical problem, 138 // otherwise if we just pass it through the pipeline, it will lead to a confusing 139 // "An exceptionCaught() event was fired, and it reached at the tail of the pipeline" 140 LOG.debug( 141 "got ssl exception, which should have already been proceeded, log it here to" 142 + " prevent it being passed to netty's TailContext and trigger a confusing WARN message", 143 cause); 144 } else { 145 ctx.fireExceptionCaught(cause); 146 } 147 } 148}