001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 026import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 027import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 028import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 029import org.apache.hadoop.hbase.testclassification.MediumTests; 030import org.apache.hadoop.hbase.testclassification.RegionServerTests; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.Pair; 033import org.junit.jupiter.api.AfterAll; 034import org.junit.jupiter.api.BeforeAll; 035import org.junit.jupiter.api.Tag; 036 037import org.apache.hbase.thirdparty.io.netty.channel.Channel; 038import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 039 040/** 041 * Testcase for HBASE-22539 042 */ 043@Tag(RegionServerTests.TAG) 044@Tag(MediumTests.TAG) 045public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer 046 extends WALCorruptionDueToDanglingByteBufferTestBase { 047 048 public static final class PauseWAL extends AsyncFSWAL { 049 050 public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 051 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 052 String prefix, String suffix, EventLoopGroup eventLoopGroup, 053 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 054 super(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 055 null, null, eventLoopGroup, channelClass, 056 StreamSlowMonitor.create(conf, "monitorForSuffix")); 057 } 058 059 @Override 060 protected void atHeadOfRingBufferEventHandlerAppend() { 061 if (ARRIVE != null) { 062 ARRIVE.countDown(); 063 try { 064 RESUME.await(); 065 } catch (InterruptedException e) { 066 } 067 } 068 } 069 } 070 071 public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> { 072 073 private EventLoopGroup eventLoopGroup; 074 075 private Class<? extends Channel> channelClass; 076 077 @Override 078 protected PauseWAL createWAL() throws IOException { 079 return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 080 getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), 081 conf, listeners, true, logPrefix, 082 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 083 channelClass); 084 } 085 086 @Override 087 protected void doInit(Configuration conf) throws IOException { 088 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 089 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 090 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 091 channelClass = eventLoopGroupAndChannelClass.getSecond(); 092 } 093 } 094 095 @BeforeAll 096 public static void setUp() throws Exception { 097 UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, 098 WALProvider.class); 099 UTIL.startMiniCluster(1); 100 UTIL.createTable(TABLE_NAME, CF); 101 UTIL.waitTableAvailable(TABLE_NAME); 102 } 103 104 @AfterAll 105 public static void tearDown() throws Exception { 106 UTIL.shutdownMiniCluster(); 107 } 108}