001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
021import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
022import static org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor.TIERING_VALUE_PROVIDER;
023import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_FORMAT;
024import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_PATTERN;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Date;
033import java.util.List;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.Waiter;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine;
044import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.jupiter.api.AfterEach;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052
053@Tag(RegionServerTests.TAG)
054@Tag(SmallTests.TAG)
055public class TestCustomCellTieredCompactor {
056
057  public static final byte[] FAMILY = Bytes.toBytes("cf");
058
059  protected HBaseTestingUtil utility;
060
061  protected Admin admin;
062
063  @BeforeEach
064  public void setUp() throws Exception {
065    utility = new HBaseTestingUtil();
066    utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
067    utility.startMiniCluster();
068  }
069
070  @AfterEach
071  public void tearDown() throws Exception {
072    utility.shutdownMiniCluster();
073  }
074
075  @Test
076  public void testCustomCellTieredCompactor() throws Exception {
077    ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
078    clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName());
079    clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date");
080    TableName tableName = TableName.valueOf("testCustomCellTieredCompactor");
081    TableDescriptorBuilder tblBuilder = TableDescriptorBuilder.newBuilder(tableName);
082    tblBuilder.setColumnFamily(clmBuilder.build());
083    utility.getAdmin().createTable(tblBuilder.build());
084    utility.waitTableAvailable(tableName);
085    Connection connection = utility.getConnection();
086    Table table = connection.getTable(tableName);
087    long recordTime = System.currentTimeMillis();
088    // write data and flush multiple store files:
089    for (int i = 0; i < 6; i++) {
090      List<Put> puts = new ArrayList<>(2);
091      Put put = new Put(Bytes.toBytes(i));
092      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
093      put.addColumn(FAMILY, Bytes.toBytes("date"),
094        Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)));
095      puts.add(put);
096      put = new Put(Bytes.toBytes(i + 1000));
097      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
098      put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime));
099      puts.add(put);
100      table.put(puts);
101      utility.flush(tableName);
102    }
103    table.close();
104    long firstCompactionTime = System.currentTimeMillis();
105    utility.getAdmin().majorCompact(tableName);
106    Waiter.waitFor(utility.getConfiguration(), 5000,
107      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
108          > firstCompactionTime);
109    long numHFiles = utility.getNumHFiles(tableName, FAMILY);
110    // The first major compaction would have no means to detect more than one tier,
111    // because without the min/max values available in the file info portion of the selected files
112    // for compaction, CustomCellDateTieredCompactionPolicy has no means
113    // to calculate the proper boundaries.
114    assertEquals(1, numHFiles);
115    utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
116      .forEach(file -> {
117        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
118        assertNotNull(rangeBytes);
119        try {
120          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
121          assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)),
122            timeRangeTracker.getMin());
123          assertEquals(recordTime, timeRangeTracker.getMax());
124        } catch (IOException e) {
125          fail(e.getMessage());
126        }
127      });
128    // now do major compaction again, to make sure we write two separate files
129    long secondCompactionTime = System.currentTimeMillis();
130    utility.getAdmin().majorCompact(tableName);
131    Waiter.waitFor(utility.getConfiguration(), 5000,
132      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
133          > secondCompactionTime);
134    numHFiles = utility.getNumHFiles(tableName, FAMILY);
135    assertEquals(2, numHFiles);
136    utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
137      .forEach(file -> {
138        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
139        assertNotNull(rangeBytes);
140        try {
141          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
142          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
143        } catch (IOException e) {
144          fail(e.getMessage());
145        }
146      });
147  }
148
149  @Test
150  public void testCustomCellTieredCompactorWithRowKeyDateTieringValue() throws Exception {
151    // Restart mini cluster with RowKeyDateTieringValueProvider
152    utility.shutdownMiniCluster();
153    utility.getConfiguration().set(TIERING_VALUE_PROVIDER,
154      RowKeyDateTieringValueProvider.class.getName());
155    utility.startMiniCluster();
156
157    ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
158    clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName());
159
160    // Table 1: Date at end with format yyyyMMddHHmmssSSS
161    TableName table1Name = TableName.valueOf("testTable1");
162    TableDescriptorBuilder tbl1Builder = TableDescriptorBuilder.newBuilder(table1Name);
163    tbl1Builder.setColumnFamily(clmBuilder.build());
164    tbl1Builder.setValue(TIERING_KEY_DATE_PATTERN, "(\\d{17})$");
165    tbl1Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyyMMddHHmmssSSS");
166    utility.getAdmin().createTable(tbl1Builder.build());
167    utility.waitTableAvailable(table1Name);
168
169    // Table 2: Date at beginning with format yyyy-MM-dd HH:mm:ss
170    TableName table2Name = TableName.valueOf("testTable2");
171    TableDescriptorBuilder tbl2Builder = TableDescriptorBuilder.newBuilder(table2Name);
172    tbl2Builder.setColumnFamily(clmBuilder.build());
173    tbl2Builder.setValue(TIERING_KEY_DATE_PATTERN, "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})");
174    tbl2Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss");
175    utility.getAdmin().createTable(tbl2Builder.build());
176    utility.waitTableAvailable(table2Name);
177
178    Connection connection = utility.getConnection();
179    long recordTime = System.currentTimeMillis();
180    long oldTime = recordTime - (11L * 366L * 24L * 60L * 60L * 1000L);
181
182    SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmssSSS");
183    SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
184
185    // Write to Table 1 with date at end
186    Table table1 = connection.getTable(table1Name);
187    for (int i = 0; i < 6; i++) {
188      List<Put> puts = new ArrayList<>(2);
189
190      // Old data
191      String oldDate = sdf1.format(new Date(oldTime));
192      Put put = new Put(Bytes.toBytes("row_" + i + "_" + oldDate));
193      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
194      puts.add(put);
195
196      // Recent data
197      String recentDate = sdf1.format(new Date(recordTime));
198      put = new Put(Bytes.toBytes("row_" + (i + 1000) + "_" + recentDate));
199      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
200      puts.add(put);
201
202      table1.put(puts);
203      utility.flush(table1Name);
204    }
205    table1.close();
206
207    // Write to Table 2 with date at beginning
208    Table table2 = connection.getTable(table2Name);
209    for (int i = 0; i < 6; i++) {
210      List<Put> puts = new ArrayList<>(2);
211
212      // Old data
213      String oldDate = sdf2.format(new Date(oldTime));
214      Put put = new Put(Bytes.toBytes(oldDate + "_row_" + i));
215      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
216      puts.add(put);
217
218      // Recent data
219      String recentDate = sdf2.format(new Date(recordTime));
220      put = new Put(Bytes.toBytes(recentDate + "_row_" + (i + 1000)));
221      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
222      puts.add(put);
223
224      table2.put(puts);
225      utility.flush(table2Name);
226    }
227    table2.close();
228
229    // First compaction for Table 1
230    long compactionTime1 = System.currentTimeMillis();
231    utility.getAdmin().majorCompact(table1Name);
232    Waiter.waitFor(utility.getConfiguration(), 5000,
233      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
234          > compactionTime1);
235
236    assertEquals(1, utility.getNumHFiles(table1Name, FAMILY));
237
238    utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
239      .forEach(file -> {
240        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
241        assertNotNull(rangeBytes);
242        try {
243          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
244          assertEquals(oldTime, timeRangeTracker.getMin());
245          assertEquals(recordTime, timeRangeTracker.getMax());
246        } catch (IOException e) {
247          fail(e.getMessage());
248        }
249      });
250
251    // Second compaction for Table 1
252    long secondCompactionTime1 = System.currentTimeMillis();
253    utility.getAdmin().majorCompact(table1Name);
254    Waiter.waitFor(utility.getConfiguration(), 5000,
255      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
256          > secondCompactionTime1);
257
258    assertEquals(2, utility.getNumHFiles(table1Name, FAMILY));
259
260    utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
261      .forEach(file -> {
262        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
263        assertNotNull(rangeBytes);
264        try {
265          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
266          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
267        } catch (IOException e) {
268          fail(e.getMessage());
269        }
270      });
271
272    // First compaction for Table 2
273    long compactionTime2 = System.currentTimeMillis();
274    utility.getAdmin().majorCompact(table2Name);
275    Waiter.waitFor(utility.getConfiguration(), 5000,
276      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
277          > compactionTime2);
278
279    assertEquals(1, utility.getNumHFiles(table2Name, FAMILY));
280
281    utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
282      .forEach(file -> {
283        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
284        assertNotNull(rangeBytes);
285        try {
286          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
287          // Table 2 uses yyyy-MM-dd HH:mm:ss format, so we need to account for second precision
288          // The parsed time will be truncated to second precision (no milliseconds)
289          long expectedOldTime = (oldTime / 1000) * 1000;
290          long expectedRecentTime = (recordTime / 1000) * 1000;
291          assertEquals(expectedOldTime, timeRangeTracker.getMin());
292          assertEquals(expectedRecentTime, timeRangeTracker.getMax());
293        } catch (IOException e) {
294          fail(e.getMessage());
295        }
296      });
297
298    // Second compaction for Table 2
299    long secondCompactionTime2 = System.currentTimeMillis();
300    utility.getAdmin().majorCompact(table2Name);
301    Waiter.waitFor(utility.getConfiguration(), 5000,
302      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
303          > secondCompactionTime2);
304
305    assertEquals(2, utility.getNumHFiles(table2Name, FAMILY));
306
307    utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
308      .forEach(file -> {
309        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
310        assertNotNull(rangeBytes);
311        try {
312          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
313          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
314        } catch (IOException e) {
315          fail(e.getMessage());
316        }
317      });
318  }
319}