001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; 025import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 026import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; 027import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 028import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 029import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; 030import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 040import org.apache.hbase.thirdparty.io.netty.channel.Channel; 041import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 042 043/** 044 * A WAL provider that use {@link AsyncFSWAL}. 045 */ 046@InterfaceAudience.Private 047@InterfaceStability.Evolving 048public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { 049 050 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); 051 052 public static final String WRITER_IMPL = "hbase.regionserver.wal.async.writer.impl"; 053 054 // Only public so classes back in regionserver.wal can access 055 public interface AsyncWriter extends WALProvider.AsyncWriter { 056 /** 057 * @throws IOException if something goes wrong initializing an output stream 058 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that 059 * meet the needs of the given Writer implementation. 060 */ 061 void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, 062 StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; 063 } 064 065 /** 066 * Protected visibility for used in tests. 067 */ 068 protected EventLoopGroup eventLoopGroup; 069 070 /** 071 * Protected visibility for used in tests. 072 */ 073 protected Class<? extends Channel> channelClass; 074 075 @Override 076 protected AsyncFSWAL createWAL() throws IOException { 077 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 078 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), 079 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, 080 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 081 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 082 } 083 084 @Override 085 protected void doInit(Configuration conf) throws IOException { 086 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 087 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 088 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 089 channelClass = eventLoopGroupAndChannelClass.getSecond(); 090 } 091 092 /** 093 * Public because of AsyncFSWAL. Should be package-private 094 */ 095 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 096 boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 097 throws IOException { 098 return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), 099 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName())); 100 } 101 102 /** 103 * Public because of AsyncFSWAL. Should be package-private 104 */ 105 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 106 boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, 107 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException { 108 // Configuration already does caching for the Class lookup. 109 Class<? extends AsyncWriter> logWriterClass = 110 conf.getClass(WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); 111 try { 112 AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) 113 .newInstance(eventLoopGroup, channelClass); 114 writer.init(fs, path, conf, overwritable, blocksize, monitor); 115 return writer; 116 } catch (Exception e) { 117 if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { 118 LOG.error("The RegionServer async write ahead log provider " 119 + "relies on the ability to call " + e.getMessage() + " for proper operation during " 120 + "component failures, but the current FileSystem does not support doing so. Please " 121 + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " 122 + "it points to a FileSystem mount that has suitable capabilities for output streams."); 123 } else { 124 LOG.debug("Error instantiating log writer.", e); 125 } 126 Throwables.propagateIfPossible(e, IOException.class); 127 throw new IOException("cannot get log writer", e); 128 } 129 } 130 131 /** 132 * Test whether we can load the helper classes for async dfs output. 133 */ 134 public static boolean load() { 135 try { 136 Class.forName(FanOutOneBlockAsyncDFSOutput.class.getName()); 137 Class.forName(FanOutOneBlockAsyncDFSOutputHelper.class.getName()); 138 Class.forName(FanOutOneBlockAsyncDFSOutputSaslHelper.class.getName()); 139 return true; 140 } catch (Throwable e) { 141 return false; 142 } 143 } 144}