001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import org.apache.yetus.audience.InterfaceAudience;
021import org.slf4j.Logger;
022import org.slf4j.LoggerFactory;
023
024import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
025import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
026import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener;
027
028/**
029 * Helper class for processing netty futures.
030 */
031@InterfaceAudience.Private
032public final class NettyFutureUtils {
033
034  private static final Logger LOG = LoggerFactory.getLogger(NettyFutureUtils.class);
035
036  private NettyFutureUtils() {
037  }
038
039  /**
040   * This is method is used when you just want to add a listener to the given netty future. Ignoring
041   * the return value of a Future is considered as a bad practice as it may suppress exceptions
042   * thrown from the code that completes the future, and this method will catch all the exception
043   * thrown from the {@code listener} to catch possible code bugs.
044   * <p/>
045   * And the error phone check will always report FutureReturnValueIgnored because every method in
046   * the {@link Future} class will return a new {@link Future}, so you always have one future that
047   * has not been checked. So we introduce this method and add a suppress warnings annotation here.
048   */
049  @SuppressWarnings({ "FutureReturnValueIgnored", "rawtypes", "unchecked" })
050  public static <V> void addListener(Future<V> future,
051    GenericFutureListener<? extends Future<? super V>> listener) {
052    future.addListener(f -> {
053      try {
054        // the ? operator in template makes it really hard to pass compile, so here we just cast the
055        // listener to raw type.
056        ((GenericFutureListener) listener).operationComplete(f);
057      } catch (Throwable t) {
058        LOG.error("Unexpected error caught when processing netty", t);
059      }
060    });
061  }
062
063  private static void loggingWhenError(Future<?> future) {
064    if (!future.isSuccess()) {
065      LOG.warn("IO operation failed", future.cause());
066    }
067  }
068
069  /**
070   * Log the error if the future indicates any failure.
071   */
072  @SuppressWarnings("FutureReturnValueIgnored")
073  public static void consume(Future<?> future) {
074    future.addListener(NettyFutureUtils::loggingWhenError);
075  }
076
077  /**
078   * Close the channel and eat the returned future by logging the error when the future is completed
079   * with error.
080   */
081  public static void safeClose(ChannelOutboundInvoker channel) {
082    consume(channel.close());
083  }
084
085  /**
086   * Call write on the channel and eat the returned future by logging the error when the future is
087   * completed with error.
088   */
089  public static void safeWrite(ChannelOutboundInvoker channel, Object msg) {
090    consume(channel.write(msg));
091  }
092
093  /**
094   * Call writeAndFlush on the channel and eat the returned future by logging the error when the
095   * future is completed with error.
096   */
097  public static void safeWriteAndFlush(ChannelOutboundInvoker channel, Object msg) {
098    consume(channel.writeAndFlush(msg));
099  }
100}