001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.client.RegionInfo; 025import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; 026import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 027import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; 028import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 029import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 030import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; 031import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.yetus.audience.InterfaceStability; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 041import org.apache.hbase.thirdparty.io.netty.channel.Channel; 042import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 043 044/** 045 * A WAL provider that use {@link AsyncFSWAL}. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Evolving 049public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); 052 053 public static final String WRITER_IMPL = "hbase.regionserver.wal.async.writer.impl"; 054 055 // Only public so classes back in regionserver.wal can access 056 public interface AsyncWriter extends WALProvider.AsyncWriter { 057 /** 058 * @throws IOException if something goes wrong initializing an output stream 059 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that 060 * meet the needs of the given Writer implementation. 061 */ 062 void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, 063 StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; 064 } 065 066 /** 067 * Protected visibility for used in tests. 068 */ 069 protected EventLoopGroup eventLoopGroup; 070 071 /** 072 * Protected visibility for used in tests. 073 */ 074 protected Class<? extends Channel> channelClass; 075 076 @Override 077 protected AsyncFSWAL createWAL() throws IOException { 078 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 079 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), 080 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, 081 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, null, null, 082 eventLoopGroup, channelClass, 083 factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 084 } 085 086 @Override 087 protected WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir, 088 String prefix, String suffix) throws IOException { 089 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 090 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), 091 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, prefix, suffix, 092 remoteFs, remoteWALDir, eventLoopGroup, channelClass, 093 factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 094 } 095 096 @Override 097 protected void doInit(Configuration conf) throws IOException { 098 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 099 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 100 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 101 channelClass = eventLoopGroupAndChannelClass.getSecond(); 102 } 103 104 /** 105 * Public because of AsyncFSWAL. Should be package-private 106 */ 107 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 108 boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 109 throws IOException { 110 return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), 111 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName())); 112 } 113 114 /** 115 * Public because of AsyncFSWAL. Should be package-private 116 */ 117 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 118 boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, 119 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException { 120 // Configuration already does caching for the Class lookup. 121 Class<? extends AsyncWriter> logWriterClass = 122 conf.getClass(WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); 123 try { 124 AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) 125 .newInstance(eventLoopGroup, channelClass); 126 writer.init(fs, path, conf, overwritable, blocksize, monitor); 127 return writer; 128 } catch (Exception e) { 129 if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { 130 LOG.error("The RegionServer async write ahead log provider " 131 + "relies on the ability to call " + e.getMessage() + " for proper operation during " 132 + "component failures, but the current FileSystem does not support doing so. Please " 133 + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " 134 + "it points to a FileSystem mount that has suitable capabilities for output streams."); 135 } else { 136 LOG.debug("Error instantiating log writer.", e); 137 } 138 Throwables.propagateIfPossible(e, IOException.class); 139 throw new IOException("cannot get log writer", e); 140 } 141 } 142 143 /** 144 * Test whether we can load the helper classes for async dfs output. 145 */ 146 public static boolean load() { 147 try { 148 Class.forName(FanOutOneBlockAsyncDFSOutput.class.getName()); 149 Class.forName(FanOutOneBlockAsyncDFSOutputHelper.class.getName()); 150 Class.forName(FanOutOneBlockAsyncDFSOutputSaslHelper.class.getName()); 151 return true; 152 } catch (Throwable e) { 153 return false; 154 } 155 } 156}