001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.function.BooleanSupplier;
021import java.util.function.IntSupplier;
022import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
024import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hbase.thirdparty.io.netty.channel.Channel;
028import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
031import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
032
033/**
034 * Handler to enforce writability protections on our server channels: <br>
035 * - Responds to channel writability events, which are triggered when the total pending bytes for a
036 * channel passes configured high and low watermarks. When high watermark is exceeded, the channel
037 * is setAutoRead(false). This way, we won't accept new requests from the client until some pending
038 * outbound bytes are successfully received by the client.<br>
039 * - Pre-processes any channel write requests. If the total pending outbound bytes exceeds a fatal
040 * threshold, the channel is forcefully closed and the write is set to failed. This handler should
041 * be the last handler in the pipeline so that it's the first handler to receive any messages sent
042 * to channel.write() or channel.writeAndFlush().
043 */
044@InterfaceAudience.Private
045public class NettyRpcServerChannelWritabilityHandler extends ChannelDuplexHandler {
046
047  static final String NAME = "NettyRpcServerChannelWritabilityHandler";
048
049  private final MetricsHBaseServer metrics;
050  private final IntSupplier pendingBytesFatalThreshold;
051  private final BooleanSupplier isWritabilityBackpressureEnabled;
052
053  private boolean writable = true;
054  private long unwritableStartTime;
055
056  NettyRpcServerChannelWritabilityHandler(MetricsHBaseServer metrics,
057    IntSupplier pendingBytesFatalThreshold, BooleanSupplier isWritabilityBackpressureEnabled) {
058    this.metrics = metrics;
059    this.pendingBytesFatalThreshold = pendingBytesFatalThreshold;
060    this.isWritabilityBackpressureEnabled = isWritabilityBackpressureEnabled;
061  }
062
063  @Override
064  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
065    throws Exception {
066    if (handleFatalThreshold(ctx)) {
067      promise.setFailure(
068        new ConnectionClosedException("Channel outbound bytes exceeded fatal threshold"));
069      if (msg instanceof RpcResponse) {
070        ((RpcResponse) msg).done();
071      } else {
072        ReferenceCountUtil.release(msg);
073      }
074      return;
075    }
076    ctx.write(msg, promise);
077  }
078
079  @Override
080  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
081    if (isWritabilityBackpressureEnabled.getAsBoolean()) {
082      handleWritabilityChanged(ctx);
083    }
084    ctx.fireChannelWritabilityChanged();
085  }
086
087  private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
088    int fatalThreshold = pendingBytesFatalThreshold.getAsInt();
089    if (fatalThreshold <= 0) {
090      return false;
091    }
092
093    Channel channel = ctx.channel();
094    long outboundBytes = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
095    if (outboundBytes < fatalThreshold) {
096      return false;
097    }
098
099    if (channel.isOpen()) {
100      metrics.maxOutboundBytesExceeded();
101      RpcServer.LOG.warn(
102        "{}: Closing connection because outbound buffer size of {} exceeds fatal threshold of {}",
103        channel.remoteAddress(), outboundBytes, fatalThreshold);
104      NettyUnsafeUtils.closeImmediately(channel);
105    }
106
107    return true;
108  }
109
110  private void handleWritabilityChanged(ChannelHandlerContext ctx) {
111    boolean oldWritableValue = this.writable;
112
113    this.writable = ctx.channel().isWritable();
114    ctx.channel().config().setAutoRead(this.writable);
115
116    if (!oldWritableValue && this.writable) {
117      // changing from not writable to writable, update metrics
118      metrics.unwritableTime(EnvironmentEdgeManager.currentTime() - unwritableStartTime);
119      unwritableStartTime = 0;
120    } else if (oldWritableValue && !this.writable) {
121      // changing from writable to non-writable, set start time
122      unwritableStartTime = EnvironmentEdgeManager.currentTime();
123    }
124  }
125}