001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil; 041import org.junit.After; 042import org.junit.AfterClass; 043import org.junit.Before; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Category({LargeTests.class}) 052public class TestCleanupCompactedFileAfterFailover { 053 054 private static final Logger LOG = 055 LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class); 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestCleanupCompactedFileAfterFailover.class); 060 061 private static HBaseTestingUtility TEST_UTIL; 062 private static Admin admin; 063 private static Table table; 064 065 private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover"); 066 private static byte[] ROW = Bytes.toBytes("row"); 067 private static byte[] FAMILY = Bytes.toBytes("cf"); 068 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 069 private static byte[] VALUE = Bytes.toBytes("value"); 070 private static final int RS_NUMBER = 5; 071 072 @BeforeClass 073 public static void beforeClass() throws Exception { 074 TEST_UTIL = new HBaseTestingUtility(); 075 // Set the scanner lease to 20min, so the scanner can't be closed by RegionServer 076 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000); 077 TEST_UTIL.getConfiguration() 078 .setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 079 TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000"); 080 TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024"); 081 TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0"); 082 TEST_UTIL.startMiniCluster(RS_NUMBER); 083 admin = TEST_UTIL.getAdmin(); 084 } 085 086 @AfterClass 087 public static void afterClass() throws Exception { 088 TEST_UTIL.shutdownMiniCluster(); 089 } 090 091 @Before 092 public void before() throws Exception { 093 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 094 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 095 admin.createTable(builder.build()); 096 TEST_UTIL.waitTableAvailable(TABLE_NAME); 097 table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 098 } 099 100 @After 101 public void after() throws Exception { 102 admin.disableTable(TABLE_NAME); 103 admin.deleteTable(TABLE_NAME); 104 } 105 106 @Test 107 public void testCleanupAfterFailoverWithCompactOnce() throws Exception { 108 testCleanupAfterFailover(1); 109 } 110 111 @Test 112 public void testCleanupAfterFailoverWithCompactTwice() throws Exception { 113 testCleanupAfterFailover(2); 114 } 115 116 @Test 117 public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception { 118 testCleanupAfterFailover(3); 119 } 120 121 private void testCleanupAfterFailover(int compactNum) throws Exception { 122 HRegionServer rsServedTable = null; 123 List<HRegion> regions = new ArrayList<>(); 124 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 125 .getLiveRegionServerThreads()) { 126 HRegionServer rs = rsThread.getRegionServer(); 127 if (rs.getOnlineTables().contains(TABLE_NAME)) { 128 regions.addAll(rs.getRegions(TABLE_NAME)); 129 rsServedTable = rs; 130 } 131 } 132 assertNotNull(rsServedTable); 133 assertEquals("Table should only have one region", 1, regions.size()); 134 HRegion region = regions.get(0); 135 HStore store = region.getStore(FAMILY); 136 137 writeDataAndFlush(3, region); 138 assertEquals(3, store.getStorefilesCount()); 139 140 // Open a scanner and not close, then the storefile will be referenced 141 store.getScanner(new Scan(), null, 0); 142 143 region.compact(true); 144 assertEquals(1, store.getStorefilesCount()); 145 // The compacted file should not be archived as there are references by user scanner 146 assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 147 148 for (int i = 1; i < compactNum; i++) { 149 // Compact again 150 region.compact(true); 151 assertEquals(1, store.getStorefilesCount()); 152 store.closeAndArchiveCompactedFiles(); 153 // Compacted storefiles still be 3 as the new compacted storefile was archived 154 assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 155 } 156 157 int walNum = rsServedTable.getWALs().size(); 158 // Roll WAL 159 rsServedTable.walRoller.requestRollAll(); 160 // Flush again 161 region.flush(true); 162 // The WAL which contains compaction event marker should be archived 163 assertEquals("The old WAL should be archived", walNum, rsServedTable.getWALs().size()); 164 165 rsServedTable.kill(); 166 // Sleep to wait failover 167 Thread.sleep(3000); 168 TEST_UTIL.waitTableAvailable(TABLE_NAME); 169 170 regions.clear(); 171 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 172 .getLiveRegionServerThreads()) { 173 HRegionServer rs = rsThread.getRegionServer(); 174 if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) { 175 regions.addAll(rs.getRegions(TABLE_NAME)); 176 } 177 } 178 assertEquals("Table should only have one region", 1, regions.size()); 179 region = regions.get(0); 180 store = region.getStore(FAMILY); 181 // The compacted storefile should be cleaned and only have 1 storefile 182 assertEquals(1, store.getStorefilesCount()); 183 } 184 185 private void writeDataAndFlush(int fileNum, HRegion region) throws Exception { 186 for (int i = 0; i < fileNum; i++) { 187 for (int j = 0; j < 100; j++) { 188 table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j))); 189 } 190 region.flush(true); 191 } 192 } 193 194 private byte[] concat(byte[] base, int index) { 195 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 196 } 197}