001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.hbase.util.NettyFutureUtils; 024import org.apache.yetus.audience.InterfaceAudience; 025 026import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 027import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 028import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 029 030/** 031 * We will expose the connection to upper layer before initialized, so we need to buffer the calls 032 * passed in and write them out once the connection is established. 033 */ 034@InterfaceAudience.Private 035class BufferCallBeforeInitHandler extends ChannelDuplexHandler { 036 037 static final String NAME = "BufferCall"; 038 039 private enum BufferCallAction { 040 FLUSH, 041 FAIL 042 } 043 044 public static final class BufferCallEvent { 045 046 public final BufferCallAction action; 047 048 public final IOException error; 049 050 private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action, 051 IOException error) { 052 this.action = action; 053 this.error = error; 054 } 055 056 public static BufferCallBeforeInitHandler.BufferCallEvent success() { 057 return SUCCESS_EVENT; 058 } 059 060 public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) { 061 return new BufferCallEvent(BufferCallAction.FAIL, error); 062 } 063 } 064 065 private static final BufferCallEvent SUCCESS_EVENT = 066 new BufferCallEvent(BufferCallAction.FLUSH, null); 067 068 private final Map<Integer, Call> id2Call = new HashMap<>(); 069 070 @Override 071 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 072 if (msg instanceof Call) { 073 Call call = (Call) msg; 074 id2Call.put(call.id, call); 075 // The call is already in track so here we set the write operation as success. 076 // We will fail the call directly if we can not write it out. 077 promise.trySuccess(); 078 } else { 079 NettyFutureUtils.consume(ctx.write(msg, promise)); 080 } 081 } 082 083 @Override 084 public void flush(ChannelHandlerContext ctx) throws Exception { 085 // do not flush anything out 086 } 087 088 @Override 089 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 090 if (evt instanceof BufferCallEvent) { 091 BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt; 092 switch (bcEvt.action) { 093 case FLUSH: 094 for (Call call : id2Call.values()) { 095 NettyFutureUtils.safeWrite(ctx, call); 096 } 097 break; 098 case FAIL: 099 for (Call call : id2Call.values()) { 100 call.setException(bcEvt.error); 101 } 102 break; 103 } 104 ctx.flush(); 105 ctx.pipeline().remove(this); 106 } else if (evt instanceof CallEvent) { 107 // just remove the call for now until we add other call event other than timeout and cancel. 108 id2Call.remove(((CallEvent) evt).call.id); 109 } else { 110 ctx.fireUserEventTriggered(evt); 111 } 112 } 113}