001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import javax.net.ssl.SSLException;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
031
032/**
033 * We will expose the connection to upper layer before initialized, so we need to buffer the calls
034 * passed in and write them out once the connection is established.
035 */
036@InterfaceAudience.Private
037class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
038
039  private static final Logger LOG = LoggerFactory.getLogger(BufferCallBeforeInitHandler.class);
040
041  static final String NAME = "BufferCall";
042
043  private enum BufferCallAction {
044    FLUSH,
045    FAIL
046  }
047
048  public static final class BufferCallEvent {
049
050    public final BufferCallAction action;
051
052    public final IOException error;
053
054    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
055      IOException error) {
056      this.action = action;
057      this.error = error;
058    }
059
060    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
061      return SUCCESS_EVENT;
062    }
063
064    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
065      return new BufferCallEvent(BufferCallAction.FAIL, error);
066    }
067  }
068
069  private static final BufferCallEvent SUCCESS_EVENT =
070    new BufferCallEvent(BufferCallAction.FLUSH, null);
071
072  private final Map<Integer, Call> id2Call = new HashMap<>();
073
074  @Override
075  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
076    if (msg instanceof Call) {
077      Call call = (Call) msg;
078      id2Call.put(call.id, call);
079      // The call is already in track so here we set the write operation as success.
080      // We will fail the call directly if we can not write it out.
081      promise.trySuccess();
082    } else {
083      ctx.write(msg, promise);
084    }
085  }
086
087  @Override
088  public void flush(ChannelHandlerContext ctx) throws Exception {
089    // do not flush anything out
090  }
091
092  @Override
093  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
094    if (evt instanceof BufferCallEvent) {
095      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
096      switch (bcEvt.action) {
097        case FLUSH:
098          for (Call call : id2Call.values()) {
099            ctx.write(call);
100          }
101          ctx.flush();
102          ctx.pipeline().remove(this);
103          break;
104        case FAIL:
105          for (Call call : id2Call.values()) {
106            call.setException(bcEvt.error);
107          }
108          // here we do not remove us from the pipeline, for receiving possible exceptions and log
109          // it, especially the ssl exceptions, to prevent it reaching the tail of the pipeline and
110          // generate a confusing netty WARN
111          break;
112      }
113    } else if (evt instanceof CallEvent) {
114      // just remove the call for now until we add other call event other than timeout and cancel.
115      id2Call.remove(((CallEvent) evt).call.id);
116    } else {
117      ctx.fireUserEventTriggered(evt);
118    }
119  }
120
121  private boolean isSslError(Throwable cause) {
122    Throwable error = cause;
123    do {
124      if (error instanceof SSLException) {
125        return true;
126      }
127      error = error.getCause();
128    } while (error != null);
129    return false;
130  }
131
132  @Override
133  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
134    if (isSslError(cause)) {
135      // this should have been logged in other places, see HBASE-27782 for more details.
136      // here we just log it with debug and tell users that this is not a critical problem,
137      // otherwise if we just pass it through the pipeline, it will lead to a confusing
138      // "An exceptionCaught() event was fired, and it reached at the tail of the pipeline"
139      LOG.debug(
140        "got ssl exception, which should have already been proceeded, log it here to"
141          + " prevent it being passed to netty's TailContext and trigger a confusing WARN message",
142        cause);
143    } else {
144      ctx.fireExceptionCaught(cause);
145    }
146  }
147}