001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 021import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 022import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 023 024import java.io.IOException; 025import java.util.HashMap; 026import java.util.Map; 027 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * We will expose the connection to upper layer before initialized, so we need to buffer the calls 032 * passed in and write them out once the connection is established. 033 */ 034@InterfaceAudience.Private 035class BufferCallBeforeInitHandler extends ChannelDuplexHandler { 036 037 private enum BufferCallAction { 038 FLUSH, FAIL 039 } 040 041 public static final class BufferCallEvent { 042 043 public final BufferCallAction action; 044 045 public final IOException error; 046 047 private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action, 048 IOException error) { 049 this.action = action; 050 this.error = error; 051 } 052 053 public static BufferCallBeforeInitHandler.BufferCallEvent success() { 054 return SUCCESS_EVENT; 055 } 056 057 public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) { 058 return new BufferCallEvent(BufferCallAction.FAIL, error); 059 } 060 } 061 062 private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH, 063 null); 064 065 private final Map<Integer, Call> id2Call = new HashMap<>(); 066 067 @Override 068 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 069 if (msg instanceof Call) { 070 Call call = (Call) msg; 071 id2Call.put(call.id, call); 072 // The call is already in track so here we set the write operation as success. 073 // We will fail the call directly if we can not write it out. 074 promise.trySuccess(); 075 } else { 076 ctx.write(msg, promise); 077 } 078 } 079 080 @Override 081 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 082 if (evt instanceof BufferCallEvent) { 083 BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt; 084 switch (bcEvt.action) { 085 case FLUSH: 086 for (Call call : id2Call.values()) { 087 ctx.write(call); 088 } 089 break; 090 case FAIL: 091 for (Call call : id2Call.values()) { 092 call.setException(bcEvt.error); 093 } 094 break; 095 } 096 ctx.flush(); 097 ctx.pipeline().remove(this); 098 } else if (evt instanceof CallEvent) { 099 // just remove the call for now until we add other call event other than timeout and cancel. 100 id2Call.remove(((CallEvent) evt).call.id); 101 } else { 102 ctx.fireUserEventTriggered(evt); 103 } 104 } 105}