001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.hbase.HBaseTestingUtil; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 035import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil; 039import org.junit.jupiter.api.AfterAll; 040import org.junit.jupiter.api.AfterEach; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.BeforeEach; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Tag(LargeTests.TAG) 049public class TestCleanupCompactedFileAfterFailover { 050 051 private static final Logger LOG = 052 LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class); 053 054 private static HBaseTestingUtil TEST_UTIL; 055 private static Admin admin; 056 private static Table table; 057 058 private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover"); 059 private static byte[] ROW = Bytes.toBytes("row"); 060 private static byte[] FAMILY = Bytes.toBytes("cf"); 061 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 062 private static byte[] VALUE = Bytes.toBytes("value"); 063 private static final int RS_NUMBER = 5; 064 065 @BeforeAll 066 public static void beforeClass() throws Exception { 067 TEST_UTIL = new HBaseTestingUtil(); 068 // Set the scanner lease to 20min, so the scanner can't be closed by RegionServer 069 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000); 070 TEST_UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 071 100); 072 TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000"); 073 TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024"); 074 TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0"); 075 TEST_UTIL.startMiniCluster(RS_NUMBER); 076 admin = TEST_UTIL.getAdmin(); 077 } 078 079 @AfterAll 080 public static void afterClass() throws Exception { 081 TEST_UTIL.shutdownMiniCluster(); 082 } 083 084 @BeforeEach 085 public void before() throws Exception { 086 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 087 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 088 admin.createTable(builder.build()); 089 TEST_UTIL.waitTableAvailable(TABLE_NAME); 090 table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 091 } 092 093 @AfterEach 094 public void after() throws Exception { 095 admin.disableTable(TABLE_NAME); 096 admin.deleteTable(TABLE_NAME); 097 } 098 099 @Test 100 public void testCleanupAfterFailoverWithCompactOnce() throws Exception { 101 testCleanupAfterFailover(1); 102 } 103 104 @Test 105 public void testCleanupAfterFailoverWithCompactTwice() throws Exception { 106 testCleanupAfterFailover(2); 107 } 108 109 @Test 110 public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception { 111 testCleanupAfterFailover(3); 112 } 113 114 private void testCleanupAfterFailover(int compactNum) throws Exception { 115 HRegionServer rsServedTable = null; 116 List<HRegion> regions = new ArrayList<>(); 117 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 118 .getLiveRegionServerThreads()) { 119 HRegionServer rs = rsThread.getRegionServer(); 120 if (rs.getOnlineTables().contains(TABLE_NAME)) { 121 regions.addAll(rs.getRegions(TABLE_NAME)); 122 rsServedTable = rs; 123 } 124 } 125 assertNotNull(rsServedTable); 126 assertEquals(1, regions.size(), "Table should only have one region"); 127 HRegion region = regions.get(0); 128 HStore store = region.getStore(FAMILY); 129 130 writeDataAndFlush(3, region); 131 assertEquals(3, store.getStorefilesCount()); 132 133 // Open a scanner and not close, then the storefile will be referenced 134 store.getScanner(new Scan(), null, 0); 135 136 region.compact(true); 137 assertEquals(1, store.getStorefilesCount()); 138 // The compacted file should not be archived as there are references by user scanner 139 assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 140 141 for (int i = 1; i < compactNum; i++) { 142 // Compact again 143 region.compact(true); 144 assertEquals(1, store.getStorefilesCount()); 145 store.closeAndArchiveCompactedFiles(); 146 // Compacted storefiles still be 3 as the new compacted storefile was archived 147 assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 148 } 149 150 int walNum = rsServedTable.getWALs().size(); 151 // Roll WAL 152 rsServedTable.getWalRoller().requestRollAll(); 153 // Flush again 154 region.flush(true); 155 // The WAL which contains compaction event marker should be archived 156 assertEquals(walNum, rsServedTable.getWALs().size(), "The old WAL should be archived"); 157 158 rsServedTable.kill(); 159 // Sleep to wait failover 160 Thread.sleep(3000); 161 TEST_UTIL.waitTableAvailable(TABLE_NAME); 162 163 regions.clear(); 164 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 165 .getLiveRegionServerThreads()) { 166 HRegionServer rs = rsThread.getRegionServer(); 167 if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) { 168 regions.addAll(rs.getRegions(TABLE_NAME)); 169 } 170 } 171 assertEquals(1, regions.size(), "Table should only have one region"); 172 region = regions.get(0); 173 store = region.getStore(FAMILY); 174 // The compacted storefile should be cleaned and only have 1 storefile 175 assertEquals(1, store.getStorefilesCount()); 176 } 177 178 private void writeDataAndFlush(int fileNum, HRegion region) throws Exception { 179 for (int i = 0; i < fileNum; i++) { 180 for (int j = 0; j < 100; j++) { 181 table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j))); 182 } 183 region.flush(true); 184 } 185 } 186 187 private byte[] concat(byte[] base, int index) { 188 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 189 } 190}