001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
037import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil;
041import org.junit.After;
042import org.junit.AfterClass;
043import org.junit.Before;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({LargeTests.class})
052public class TestCleanupCompactedFileAfterFailover {
053
054  private static final Logger LOG =
055      LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class);
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestCleanupCompactedFileAfterFailover.class);
060
061  private static HBaseTestingUtility TEST_UTIL;
062  private static Admin admin;
063  private static Table table;
064
065  private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover");
066  private static byte[] ROW = Bytes.toBytes("row");
067  private static byte[] FAMILY = Bytes.toBytes("cf");
068  private static byte[] QUALIFIER = Bytes.toBytes("cq");
069  private static byte[] VALUE = Bytes.toBytes("value");
070  private static final int RS_NUMBER = 5;
071
072  @BeforeClass
073  public static void beforeClass() throws Exception {
074    TEST_UTIL = new HBaseTestingUtility();
075    // Set the scanner lease to 20min, so the scanner can't be closed by RegionServer
076    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
077    TEST_UTIL.getConfiguration()
078        .setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
079    TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
080    TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
081    TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
082    TEST_UTIL.startMiniCluster(RS_NUMBER);
083    admin = TEST_UTIL.getAdmin();
084  }
085
086  @AfterClass
087  public static void afterClass() throws Exception {
088    TEST_UTIL.shutdownMiniCluster();
089  }
090
091  @Before
092  public void before() throws Exception {
093    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
094    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
095    admin.createTable(builder.build());
096    TEST_UTIL.waitTableAvailable(TABLE_NAME);
097    table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
098  }
099
100  @After
101  public void after() throws Exception {
102    admin.disableTable(TABLE_NAME);
103    admin.deleteTable(TABLE_NAME);
104  }
105
106  @Test
107  public void testCleanupAfterFailoverWithCompactOnce() throws Exception {
108    testCleanupAfterFailover(1);
109  }
110
111  @Test
112  public void testCleanupAfterFailoverWithCompactTwice() throws Exception {
113    testCleanupAfterFailover(2);
114  }
115
116  @Test
117  public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception {
118    testCleanupAfterFailover(3);
119  }
120
121  private void testCleanupAfterFailover(int compactNum) throws Exception {
122    HRegionServer rsServedTable = null;
123    List<HRegion> regions = new ArrayList<>();
124    for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
125        .getLiveRegionServerThreads()) {
126      HRegionServer rs = rsThread.getRegionServer();
127      if (rs.getOnlineTables().contains(TABLE_NAME)) {
128        regions.addAll(rs.getRegions(TABLE_NAME));
129        rsServedTable = rs;
130      }
131    }
132    assertNotNull(rsServedTable);
133    assertEquals("Table should only have one region", 1, regions.size());
134    HRegion region = regions.get(0);
135    HStore store = region.getStore(FAMILY);
136
137    writeDataAndFlush(3, region);
138    assertEquals(3, store.getStorefilesCount());
139
140    // Open a scanner and not close, then the storefile will be referenced
141    store.getScanner(new Scan(), null, 0);
142
143    region.compact(true);
144    assertEquals(1, store.getStorefilesCount());
145    // The compacted file should not be archived as there are references by user scanner
146    assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
147
148    for (int i = 1; i < compactNum; i++) {
149      // Compact again
150      region.compact(true);
151      assertEquals(1, store.getStorefilesCount());
152      store.closeAndArchiveCompactedFiles();
153      // Compacted storefiles still be 3 as the new compacted storefile was archived
154      assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
155    }
156
157    int walNum = rsServedTable.getWALs().size();
158    // Roll WAL
159    rsServedTable.walRoller.requestRollAll();
160    // Flush again
161    region.flush(true);
162    // The WAL which contains compaction event marker should be archived
163    assertEquals("The old WAL should be archived", walNum, rsServedTable.getWALs().size());
164
165    rsServedTable.kill();
166    // Sleep to wait failover
167    Thread.sleep(3000);
168    TEST_UTIL.waitTableAvailable(TABLE_NAME);
169
170    regions.clear();
171    for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
172        .getLiveRegionServerThreads()) {
173      HRegionServer rs = rsThread.getRegionServer();
174      if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) {
175        regions.addAll(rs.getRegions(TABLE_NAME));
176      }
177    }
178    assertEquals("Table should only have one region", 1, regions.size());
179    region = regions.get(0);
180    store = region.getStore(FAMILY);
181    // The compacted storefile should be cleaned and only have 1 storefile
182    assertEquals(1, store.getStorefilesCount());
183  }
184
185  private void writeDataAndFlush(int fileNum, HRegion region) throws Exception {
186    for (int i = 0; i < fileNum; i++) {
187      for (int j = 0; j < 100; j++) {
188        table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j)));
189      }
190      region.flush(true);
191    }
192  }
193
194  private byte[] concat(byte[] base, int index) {
195    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
196  }
197}