001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.hbase.util.NettyFutureUtils;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
027import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
028import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
029
030/**
031 * We will expose the connection to upper layer before initialized, so we need to buffer the calls
032 * passed in and write them out once the connection is established.
033 */
034@InterfaceAudience.Private
035class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
036
037  static final String NAME = "BufferCall";
038
039  private enum BufferCallAction {
040    FLUSH,
041    FAIL
042  }
043
044  public static final class BufferCallEvent {
045
046    public final BufferCallAction action;
047
048    public final IOException error;
049
050    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
051      IOException error) {
052      this.action = action;
053      this.error = error;
054    }
055
056    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
057      return SUCCESS_EVENT;
058    }
059
060    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
061      return new BufferCallEvent(BufferCallAction.FAIL, error);
062    }
063  }
064
065  private static final BufferCallEvent SUCCESS_EVENT =
066    new BufferCallEvent(BufferCallAction.FLUSH, null);
067
068  private final Map<Integer, Call> id2Call = new HashMap<>();
069
070  @Override
071  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
072    if (msg instanceof Call) {
073      Call call = (Call) msg;
074      id2Call.put(call.id, call);
075      // The call is already in track so here we set the write operation as success.
076      // We will fail the call directly if we can not write it out.
077      promise.trySuccess();
078    } else {
079      NettyFutureUtils.consume(ctx.write(msg, promise));
080    }
081  }
082
083  @Override
084  public void flush(ChannelHandlerContext ctx) throws Exception {
085    // do not flush anything out
086  }
087
088  @Override
089  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
090    if (evt instanceof BufferCallEvent) {
091      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
092      switch (bcEvt.action) {
093        case FLUSH:
094          for (Call call : id2Call.values()) {
095            NettyFutureUtils.safeWrite(ctx, call);
096          }
097          break;
098        case FAIL:
099          for (Call call : id2Call.values()) {
100            call.setException(bcEvt.error);
101          }
102          break;
103      }
104      ctx.flush();
105      ctx.pipeline().remove(this);
106    } else if (evt instanceof CallEvent) {
107      // just remove the call for now until we add other call event other than timeout and cancel.
108      id2Call.remove(((CallEvent) evt).call.id);
109    } else {
110      ctx.fireUserEventTriggered(evt);
111    }
112  }
113}