001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.concurrent.CompletableFuture; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.regionserver.RegionServerServices; 028import org.apache.hadoop.hbase.testclassification.SmallTests; 029import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 030import org.junit.AfterClass; 031import org.junit.BeforeClass; 032import org.junit.ClassRule; 033import org.junit.experimental.categories.Category; 034 035import org.apache.hbase.thirdparty.io.netty.channel.Channel; 036import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 037import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 038import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 039 040@Category({ RegionServerServices.class, SmallTests.class }) 041public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class); 046 047 private static NioEventLoopGroup GROUP; 048 049 @BeforeClass 050 public static void setUpBeforeClass() { 051 GROUP = new NioEventLoopGroup(); 052 } 053 054 @AfterClass 055 public static void tearDownAfterClass() { 056 GROUP.shutdownGracefully(); 057 } 058 059 @Override 060 protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf) 061 throws IOException { 062 CustomAsyncFSWAL wal = 063 new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class); 064 wal.init(); 065 return wal; 066 } 067 068 @Override 069 protected void resetSyncFlag(CustomAsyncFSWAL wal) { 070 wal.resetSyncFlag(); 071 } 072 073 @Override 074 protected Boolean getSyncFlag(CustomAsyncFSWAL wal) { 075 return wal.getSyncFlag(); 076 } 077 078 @Override 079 protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) { 080 return wal.getWriterSyncFlag(); 081 } 082} 083 084class CustomAsyncFSWAL extends AsyncFSWAL { 085 086 private Boolean syncFlag; 087 088 private Boolean writerSyncFlag; 089 090 public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf, 091 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 092 throws FailedLogCloseException, IOException { 093 super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null, 094 eventLoopGroup, channelClass); 095 } 096 097 @Override 098 protected AsyncWriter createWriterInstance(Path path) throws IOException { 099 AsyncWriter writer = super.createWriterInstance(path); 100 return new AsyncWriter() { 101 102 @Override 103 public void close() throws IOException { 104 writer.close(); 105 } 106 107 @Override 108 public long getLength() { 109 return writer.getLength(); 110 } 111 112 @Override 113 public CompletableFuture<Long> sync(boolean forceSync) { 114 writerSyncFlag = forceSync; 115 return writer.sync(forceSync); 116 } 117 118 @Override 119 public void append(Entry entry) { 120 writer.append(entry); 121 } 122 }; 123 } 124 125 @Override 126 public void sync(boolean forceSync) throws IOException { 127 syncFlag = forceSync; 128 super.sync(forceSync); 129 } 130 131 @Override 132 public void sync(long txid, boolean forceSync) throws IOException { 133 syncFlag = forceSync; 134 super.sync(txid, forceSync); 135 } 136 137 void resetSyncFlag() { 138 this.syncFlag = null; 139 this.writerSyncFlag = null; 140 } 141 142 Boolean getSyncFlag() { 143 return syncFlag; 144 } 145 146 Boolean getWriterSyncFlag() { 147 return writerSyncFlag; 148 } 149}