001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.concurrent.CompletableFuture; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.regionserver.RegionServerServices; 028import org.apache.hadoop.hbase.testclassification.SmallTests; 029import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 030import org.junit.AfterClass; 031import org.junit.BeforeClass; 032import org.junit.ClassRule; 033import org.junit.experimental.categories.Category; 034 035import org.apache.hbase.thirdparty.io.netty.channel.Channel; 036import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 037import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 038import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 039 040@Category({ RegionServerServices.class, SmallTests.class }) 041public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class); 046 047 private static NioEventLoopGroup GROUP; 048 049 @BeforeClass 050 public static void setUpBeforeClass() { 051 GROUP = new NioEventLoopGroup(); 052 } 053 054 @AfterClass 055 public static void tearDownAfterClass() { 056 GROUP.shutdownGracefully(); 057 } 058 059 @Override 060 protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf) 061 throws IOException { 062 CustomAsyncFSWAL wal = 063 new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class); 064 wal.init(); 065 return wal; 066 } 067 068 @Override 069 protected void resetSyncFlag(CustomAsyncFSWAL wal) { 070 wal.resetSyncFlag(); 071 } 072 073 @Override 074 protected Boolean getSyncFlag(CustomAsyncFSWAL wal) { 075 return wal.getSyncFlag(); 076 } 077 078 @Override 079 protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) { 080 return wal.getWriterSyncFlag(); 081 } 082} 083 084class CustomAsyncFSWAL extends AsyncFSWAL { 085 086 private Boolean syncFlag; 087 088 private Boolean writerSyncFlag; 089 090 public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf, 091 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 092 throws FailedLogCloseException, IOException { 093 super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null, 094 eventLoopGroup, channelClass); 095 } 096 097 @Override 098 protected AsyncWriter createWriterInstance(Path path) throws IOException { 099 AsyncWriter writer = super.createWriterInstance(path); 100 return new AsyncWriter() { 101 102 @Override 103 public void close() throws IOException { 104 writer.close(); 105 } 106 107 @Override 108 public long getLength() { 109 return writer.getLength(); 110 } 111 112 @Override 113 public long getSyncedLength() { 114 return writer.getSyncedLength(); 115 } 116 117 @Override 118 public CompletableFuture<Long> sync(boolean forceSync) { 119 writerSyncFlag = forceSync; 120 return writer.sync(forceSync); 121 } 122 123 @Override 124 public void append(Entry entry) { 125 writer.append(entry); 126 } 127 }; 128 } 129 130 @Override 131 public void sync(boolean forceSync) throws IOException { 132 syncFlag = forceSync; 133 super.sync(forceSync); 134 } 135 136 @Override 137 public void sync(long txid, boolean forceSync) throws IOException { 138 syncFlag = forceSync; 139 super.sync(txid, forceSync); 140 } 141 142 void resetSyncFlag() { 143 this.syncFlag = null; 144 this.writerSyncFlag = null; 145 } 146 147 Boolean getSyncFlag() { 148 return syncFlag; 149 } 150 151 Boolean getWriterSyncFlag() { 152 return writerSyncFlag; 153 } 154}