001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import org.apache.yetus.audience.InterfaceAudience; 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023 024import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker; 025import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 026import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener; 027 028/** 029 * Helper class for processing netty futures. 030 */ 031@InterfaceAudience.Private 032public final class NettyFutureUtils { 033 034 private static final Logger LOG = LoggerFactory.getLogger(NettyFutureUtils.class); 035 036 private NettyFutureUtils() { 037 } 038 039 /** 040 * This is method is used when you just want to add a listener to the given netty future. Ignoring 041 * the return value of a Future is considered as a bad practice as it may suppress exceptions 042 * thrown from the code that completes the future, and this method will catch all the exception 043 * thrown from the {@code listener} to catch possible code bugs. 044 * <p/> 045 * And the error phone check will always report FutureReturnValueIgnored because every method in 046 * the {@link Future} class will return a new {@link Future}, so you always have one future that 047 * has not been checked. So we introduce this method and add a suppress warnings annotation here. 048 */ 049 @SuppressWarnings({ "FutureReturnValueIgnored", "rawtypes", "unchecked" }) 050 public static <V> void addListener(Future<V> future, 051 GenericFutureListener<? extends Future<? super V>> listener) { 052 future.addListener(f -> { 053 try { 054 // the ? operator in template makes it really hard to pass compile, so here we just cast the 055 // listener to raw type. 056 ((GenericFutureListener) listener).operationComplete(f); 057 } catch (Throwable t) { 058 LOG.error("Unexpected error caught when processing netty", t); 059 } 060 }); 061 } 062 063 private static void loggingWhenError(Future<?> future) { 064 if (!future.isSuccess()) { 065 LOG.warn("IO operation failed", future.cause()); 066 } 067 } 068 069 /** 070 * Log the error if the future indicates any failure. 071 */ 072 @SuppressWarnings("FutureReturnValueIgnored") 073 public static void consume(Future<?> future) { 074 future.addListener(NettyFutureUtils::loggingWhenError); 075 } 076 077 /** 078 * Close the channel and eat the returned future by logging the error when the future is completed 079 * with error. 080 */ 081 public static void safeClose(ChannelOutboundInvoker channel) { 082 consume(channel.close()); 083 } 084 085 /** 086 * Call write on the channel and eat the returned future by logging the error when the future is 087 * completed with error. 088 */ 089 public static void safeWrite(ChannelOutboundInvoker channel, Object msg) { 090 consume(channel.write(msg)); 091 } 092 093 /** 094 * Call writeAndFlush on the channel and eat the returned future by logging the error when the 095 * future is completed with error. 096 */ 097 public static void safeWriteAndFlush(ChannelOutboundInvoker channel, Object msg) { 098 consume(channel.writeAndFlush(msg)); 099 } 100}