001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.concurrent.CompletableFuture; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 027import org.apache.hadoop.hbase.testclassification.RegionServerTests; 028import org.apache.hadoop.hbase.testclassification.SmallTests; 029import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 030import org.junit.jupiter.api.AfterAll; 031import org.junit.jupiter.api.BeforeAll; 032import org.junit.jupiter.api.Tag; 033 034import org.apache.hbase.thirdparty.io.netty.channel.Channel; 035import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 036import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 037import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 038 039@Tag(RegionServerTests.TAG) 040@Tag(SmallTests.TAG) 041public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> { 042 043 private static NioEventLoopGroup GROUP; 044 045 @BeforeAll 046 public static void setUpBeforeClass() { 047 GROUP = new NioEventLoopGroup(); 048 } 049 050 @AfterAll 051 public static void tearDownAfterClass() throws Exception { 052 GROUP.shutdownGracefully().get(); 053 } 054 055 @Override 056 protected CustomAsyncFSWAL getWAL0(FileSystem fs, Path root, String logDir, Configuration conf) 057 throws IOException { 058 CustomAsyncFSWAL wal = 059 new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class); 060 wal.init(); 061 return wal; 062 } 063 064 @Override 065 protected void resetSyncFlag(CustomAsyncFSWAL wal) { 066 wal.resetSyncFlag(); 067 } 068 069 @Override 070 protected Boolean getSyncFlag(CustomAsyncFSWAL wal) { 071 return wal.getSyncFlag(); 072 } 073 074 @Override 075 protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) { 076 return wal.getWriterSyncFlag(); 077 } 078} 079 080class CustomAsyncFSWAL extends AsyncFSWAL { 081 082 private Boolean syncFlag; 083 084 private Boolean writerSyncFlag; 085 086 public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf, 087 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 088 throws FailedLogCloseException, IOException { 089 super(fs, null, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, 090 null, null, null, eventLoopGroup, channelClass, 091 StreamSlowMonitor.create(conf, "monitorForSuffix")); 092 } 093 094 @Override 095 protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException { 096 AsyncWriter writer = super.createWriterInstance(fs, path); 097 return new AsyncWriter() { 098 099 @Override 100 public void close() throws IOException { 101 writer.close(); 102 } 103 104 @Override 105 public long getLength() { 106 return writer.getLength(); 107 } 108 109 @Override 110 public long getSyncedLength() { 111 return writer.getSyncedLength(); 112 } 113 114 @Override 115 public CompletableFuture<Long> sync(boolean forceSync) { 116 writerSyncFlag = forceSync; 117 return writer.sync(forceSync); 118 } 119 120 @Override 121 public void append(Entry entry) { 122 writer.append(entry); 123 } 124 }; 125 } 126 127 @Override 128 protected void doSync(boolean forceSync) throws IOException { 129 syncFlag = forceSync; 130 super.doSync(forceSync); 131 } 132 133 @Override 134 protected void doSync(long txid, boolean forceSync) throws IOException { 135 syncFlag = forceSync; 136 super.doSync(txid, forceSync); 137 } 138 139 void resetSyncFlag() { 140 this.syncFlag = null; 141 this.writerSyncFlag = null; 142 } 143 144 Boolean getSyncFlag() { 145 return syncFlag; 146 } 147 148 Boolean getWriterSyncFlag() { 149 return writerSyncFlag; 150 } 151}