001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; 025import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 026import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; 027import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 028import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 029import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; 030import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 040import org.apache.hbase.thirdparty.io.netty.channel.Channel; 041import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 042import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 043import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 044import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 045 046/** 047 * A WAL provider that use {@link AsyncFSWAL}. 048 */ 049@InterfaceAudience.Private 050@InterfaceStability.Evolving 051public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { 052 053 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); 054 055 public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl"; 056 057 // Only public so classes back in regionserver.wal can access 058 public interface AsyncWriter extends WALProvider.AsyncWriter { 059 /** 060 * @throws IOException if something goes wrong initializing an output stream 061 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that 062 * meet the needs of the given Writer implementation. 063 */ 064 void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, 065 StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; 066 } 067 068 private EventLoopGroup eventLoopGroup; 069 070 private Class<? extends Channel> channelClass; 071 072 @Override 073 protected AsyncFSWAL createWAL() throws IOException { 074 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 075 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), 076 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, 077 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 078 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 079 } 080 081 @Override 082 protected void doInit(Configuration conf) throws IOException { 083 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 084 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 085 if (eventLoopGroupAndChannelClass != null) { 086 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 087 channelClass = eventLoopGroupAndChannelClass.getSecond(); 088 } else { 089 eventLoopGroup = 090 new NioEventLoopGroup(1, new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); 091 channelClass = NioSocketChannel.class; 092 } 093 } 094 095 /** 096 * Public because of AsyncFSWAL. Should be package-private 097 */ 098 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 099 boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 100 throws IOException { 101 return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), 102 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName())); 103 } 104 105 /** 106 * Public because of AsyncFSWAL. Should be package-private 107 */ 108 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 109 boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, 110 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException { 111 // Configuration already does caching for the Class lookup. 112 Class<? extends AsyncWriter> logWriterClass = 113 conf.getClass(WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); 114 try { 115 AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) 116 .newInstance(eventLoopGroup, channelClass); 117 writer.init(fs, path, conf, overwritable, blocksize, monitor); 118 return writer; 119 } catch (Exception e) { 120 if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { 121 LOG.error("The RegionServer async write ahead log provider " 122 + "relies on the ability to call " + e.getMessage() + " for proper operation during " 123 + "component failures, but the current FileSystem does not support doing so. Please " 124 + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " 125 + "it points to a FileSystem mount that has suitable capabilities for output streams."); 126 } else { 127 LOG.debug("Error instantiating log writer.", e); 128 } 129 Throwables.propagateIfPossible(e, IOException.class); 130 throw new IOException("cannot get log writer", e); 131 } 132 } 133 134 /** 135 * Test whether we can load the helper classes for async dfs output. 136 */ 137 public static boolean load() { 138 try { 139 Class.forName(FanOutOneBlockAsyncDFSOutput.class.getName()); 140 Class.forName(FanOutOneBlockAsyncDFSOutputHelper.class.getName()); 141 Class.forName(FanOutOneBlockAsyncDFSOutputSaslHelper.class.getName()); 142 return true; 143 } catch (Throwable e) { 144 return false; 145 } 146 } 147}