001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.io.ByteBuffAllocator; 026import org.apache.hadoop.hbase.ipc.RpcServerFactory; 027import org.apache.hadoop.hbase.ipc.SimpleRpcServer; 028import org.apache.hadoop.hbase.regionserver.HRegion; 029import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 030import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.testclassification.RegionServerTests; 033import org.apache.hadoop.hbase.util.CommonFSUtils; 034import org.junit.jupiter.api.AfterAll; 035import org.junit.jupiter.api.BeforeAll; 036import org.junit.jupiter.api.Tag; 037 038@Tag(RegionServerTests.TAG) 039@Tag(MediumTests.TAG) 040public class TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer 041 extends WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase { 042 043 public static final class PauseWAL extends FSHLog { 044 045 private int testTableWalAppendsCount = 0; 046 047 public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 048 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 049 String prefix, String suffix) throws IOException { 050 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 051 } 052 053 @Override 054 protected void atHeadOfRingBufferEventHandlerAppend() { 055 // Let the 1st Append go through. The write thread will wait for this to go through before 056 // calling further put() 057 if (ARRIVE != null) { // Means appends as part of puts in testcase 058 // Sleep for a second so that RS handler thread put all the mini batch WAL appends to ring 059 // buffer. 060 if (testTableWalAppendsCount == 0) { 061 try { 062 Thread.sleep(1000); 063 } catch (InterruptedException e) { 064 } 065 } 066 // Let the first minibatch write go through. When 2nd one comes, notify the waiting test 067 // case for doing further batch puts and make this WAL append thread to pause 068 if (testTableWalAppendsCount == 1) { 069 ARRIVE.countDown(); 070 try { 071 RESUME.await(); 072 } catch (InterruptedException e) { 073 } 074 } 075 testTableWalAppendsCount++; 076 } 077 } 078 } 079 080 public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> { 081 082 @Override 083 protected PauseWAL createWAL() throws IOException { 084 return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 085 getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), 086 conf, listeners, true, logPrefix, 087 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); 088 } 089 090 @Override 091 protected void doInit(Configuration conf) throws IOException { 092 } 093 } 094 095 @BeforeAll 096 public static void setUp() throws Exception { 097 UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class, 098 WALProvider.class); 099 UTIL.getConfiguration().setInt(HRegion.HBASE_REGIONSERVER_MINIBATCH_SIZE, 1); 100 UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 101 SimpleRpcServer.class.getName()); 102 UTIL.getConfiguration().setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 1); 103 UTIL.getConfiguration().setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024); 104 UTIL.getConfiguration().setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 500); 105 UTIL.startMiniCluster(1); 106 UTIL.createTable(TABLE_NAME, CF); 107 UTIL.waitTableAvailable(TABLE_NAME); 108 } 109 110 @AfterAll 111 public static void tearDown() throws Exception { 112 UTIL.shutdownMiniCluster(); 113 } 114}