001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; 026import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 027import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; 028import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 029import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; 030import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 039import org.apache.hbase.thirdparty.io.netty.channel.Channel; 040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 041import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 042import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 043import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 044 045/** 046 * A WAL provider that use {@link AsyncFSWAL}. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Evolving 050public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { 051 052 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); 053 054 // Only public so classes back in regionserver.wal can access 055 public interface AsyncWriter extends WALProvider.AsyncWriter { 056 /** 057 * @throws IOException if something goes wrong initializing an output stream 058 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that 059 * meet the needs of the given Writer implementation. 060 */ 061 void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) 062 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 063 } 064 065 private EventLoopGroup eventLoopGroup; 066 067 private Class<? extends Channel> channelClass; 068 @Override 069 protected AsyncFSWAL createWAL() throws IOException { 070 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 071 getWALDirectoryName(factory.factoryId), 072 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, 073 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, 074 eventLoopGroup, channelClass); 075 } 076 077 @Override 078 protected void doInit(Configuration conf) throws IOException { 079 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 080 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 081 if (eventLoopGroupAndChannelClass != null) { 082 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 083 channelClass = eventLoopGroupAndChannelClass.getSecond(); 084 } else { 085 eventLoopGroup = new NioEventLoopGroup(1, 086 new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); 087 channelClass = NioSocketChannel.class; 088 } 089 } 090 091 /** 092 * Public because of AsyncFSWAL. Should be package-private 093 */ 094 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 095 boolean overwritable, EventLoopGroup eventLoopGroup, 096 Class<? extends Channel> channelClass) throws IOException { 097 return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), 098 eventLoopGroup, channelClass); 099 } 100 101 /** 102 * Public because of AsyncFSWAL. Should be package-private 103 */ 104 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 105 boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, 106 Class<? extends Channel> channelClass) throws IOException { 107 // Configuration already does caching for the Class lookup. 108 Class<? extends AsyncWriter> logWriterClass = conf.getClass( 109 "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); 110 try { 111 AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) 112 .newInstance(eventLoopGroup, channelClass); 113 writer.init(fs, path, conf, overwritable, blocksize); 114 return writer; 115 } catch (Exception e) { 116 if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { 117 LOG.error("The RegionServer async write ahead log provider " + 118 "relies on the ability to call " + e.getMessage() + " for proper operation during " + 119 "component failures, but the current FileSystem does not support doing so. Please " + 120 "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + 121 "it points to a FileSystem mount that has suitable capabilities for output streams."); 122 } else { 123 LOG.debug("Error instantiating log writer.", e); 124 } 125 Throwables.propagateIfPossible(e, IOException.class); 126 throw new IOException("cannot get log writer", e); 127 } 128 } 129 130 /** 131 * Test whether we can load the helper classes for async dfs output. 132 */ 133 public static boolean load() { 134 try { 135 Class.forName(FanOutOneBlockAsyncDFSOutput.class.getName()); 136 Class.forName(FanOutOneBlockAsyncDFSOutputHelper.class.getName()); 137 Class.forName(FanOutOneBlockAsyncDFSOutputSaslHelper.class.getName()); 138 return true; 139 } catch (Throwable e) { 140 return false; 141 } 142 } 143}