001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
021import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
022import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
023
024import java.io.IOException;
025import java.util.HashMap;
026import java.util.Map;
027
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * We will expose the connection to upper layer before initialized, so we need to buffer the calls
032 * passed in and write them out once the connection is established.
033 */
034@InterfaceAudience.Private
035class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
036
037  private enum BufferCallAction {
038    FLUSH, FAIL
039  }
040
041  public static final class BufferCallEvent {
042
043    public final BufferCallAction action;
044
045    public final IOException error;
046
047    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
048        IOException error) {
049      this.action = action;
050      this.error = error;
051    }
052
053    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
054      return SUCCESS_EVENT;
055    }
056
057    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
058      return new BufferCallEvent(BufferCallAction.FAIL, error);
059    }
060  }
061
062  private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
063      null);
064
065  private final Map<Integer, Call> id2Call = new HashMap<>();
066
067  @Override
068  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
069    if (msg instanceof Call) {
070      Call call = (Call) msg;
071      id2Call.put(call.id, call);
072      // The call is already in track so here we set the write operation as success.
073      // We will fail the call directly if we can not write it out.
074      promise.trySuccess();
075    } else {
076      ctx.write(msg, promise);
077    }
078  }
079
080  @Override
081  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
082    if (evt instanceof BufferCallEvent) {
083      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
084      switch (bcEvt.action) {
085        case FLUSH:
086          for (Call call : id2Call.values()) {
087            ctx.write(call);
088          }
089          break;
090        case FAIL:
091          for (Call call : id2Call.values()) {
092            call.setException(bcEvt.error);
093          }
094          break;
095      }
096      ctx.flush();
097      ctx.pipeline().remove(this);
098    } else if (evt instanceof CallEvent) {
099      // just remove the call for now until we add other call event other than timeout and cancel.
100      id2Call.remove(((CallEvent) evt).call.id);
101    } else {
102      ctx.fireUserEventTriggered(evt);
103    }
104  }
105}