001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
021import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
022import static org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor.TIERING_VALUE_PROVIDER;
023import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_FORMAT;
024import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_PATTERN;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Date;
033import java.util.List;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine;
045import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.After;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055@Category({ RegionServerTests.class, SmallTests.class })
056public class TestCustomCellTieredCompactor {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class);
061
062  public static final byte[] FAMILY = Bytes.toBytes("cf");
063
064  protected HBaseTestingUtil utility;
065
066  protected Admin admin;
067
068  @Before
069  public void setUp() throws Exception {
070    utility = new HBaseTestingUtil();
071    utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
072    utility.startMiniCluster();
073  }
074
075  @After
076  public void tearDown() throws Exception {
077    utility.shutdownMiniCluster();
078  }
079
080  @Test
081  public void testCustomCellTieredCompactor() throws Exception {
082    ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
083    clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName());
084    clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date");
085    TableName tableName = TableName.valueOf("testCustomCellTieredCompactor");
086    TableDescriptorBuilder tblBuilder = TableDescriptorBuilder.newBuilder(tableName);
087    tblBuilder.setColumnFamily(clmBuilder.build());
088    utility.getAdmin().createTable(tblBuilder.build());
089    utility.waitTableAvailable(tableName);
090    Connection connection = utility.getConnection();
091    Table table = connection.getTable(tableName);
092    long recordTime = System.currentTimeMillis();
093    // write data and flush multiple store files:
094    for (int i = 0; i < 6; i++) {
095      List<Put> puts = new ArrayList<>(2);
096      Put put = new Put(Bytes.toBytes(i));
097      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
098      put.addColumn(FAMILY, Bytes.toBytes("date"),
099        Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)));
100      puts.add(put);
101      put = new Put(Bytes.toBytes(i + 1000));
102      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
103      put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime));
104      puts.add(put);
105      table.put(puts);
106      utility.flush(tableName);
107    }
108    table.close();
109    long firstCompactionTime = System.currentTimeMillis();
110    utility.getAdmin().majorCompact(tableName);
111    Waiter.waitFor(utility.getConfiguration(), 5000,
112      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
113          > firstCompactionTime);
114    long numHFiles = utility.getNumHFiles(tableName, FAMILY);
115    // The first major compaction would have no means to detect more than one tier,
116    // because without the min/max values available in the file info portion of the selected files
117    // for compaction, CustomCellDateTieredCompactionPolicy has no means
118    // to calculate the proper boundaries.
119    assertEquals(1, numHFiles);
120    utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
121      .forEach(file -> {
122        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
123        assertNotNull(rangeBytes);
124        try {
125          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
126          assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)),
127            timeRangeTracker.getMin());
128          assertEquals(recordTime, timeRangeTracker.getMax());
129        } catch (IOException e) {
130          fail(e.getMessage());
131        }
132      });
133    // now do major compaction again, to make sure we write two separate files
134    long secondCompactionTime = System.currentTimeMillis();
135    utility.getAdmin().majorCompact(tableName);
136    Waiter.waitFor(utility.getConfiguration(), 5000,
137      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
138          > secondCompactionTime);
139    numHFiles = utility.getNumHFiles(tableName, FAMILY);
140    assertEquals(2, numHFiles);
141    utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
142      .forEach(file -> {
143        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
144        assertNotNull(rangeBytes);
145        try {
146          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
147          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
148        } catch (IOException e) {
149          fail(e.getMessage());
150        }
151      });
152  }
153
154  @Test
155  public void testCustomCellTieredCompactorWithRowKeyDateTieringValue() throws Exception {
156    // Restart mini cluster with RowKeyDateTieringValueProvider
157    utility.shutdownMiniCluster();
158    utility.getConfiguration().set(TIERING_VALUE_PROVIDER,
159      RowKeyDateTieringValueProvider.class.getName());
160    utility.startMiniCluster();
161
162    ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
163    clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName());
164
165    // Table 1: Date at end with format yyyyMMddHHmmssSSS
166    TableName table1Name = TableName.valueOf("testTable1");
167    TableDescriptorBuilder tbl1Builder = TableDescriptorBuilder.newBuilder(table1Name);
168    tbl1Builder.setColumnFamily(clmBuilder.build());
169    tbl1Builder.setValue(TIERING_KEY_DATE_PATTERN, "(\\d{17})$");
170    tbl1Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyyMMddHHmmssSSS");
171    utility.getAdmin().createTable(tbl1Builder.build());
172    utility.waitTableAvailable(table1Name);
173
174    // Table 2: Date at beginning with format yyyy-MM-dd HH:mm:ss
175    TableName table2Name = TableName.valueOf("testTable2");
176    TableDescriptorBuilder tbl2Builder = TableDescriptorBuilder.newBuilder(table2Name);
177    tbl2Builder.setColumnFamily(clmBuilder.build());
178    tbl2Builder.setValue(TIERING_KEY_DATE_PATTERN, "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})");
179    tbl2Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss");
180    utility.getAdmin().createTable(tbl2Builder.build());
181    utility.waitTableAvailable(table2Name);
182
183    Connection connection = utility.getConnection();
184    long recordTime = System.currentTimeMillis();
185    long oldTime = recordTime - (11L * 366L * 24L * 60L * 60L * 1000L);
186
187    SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmssSSS");
188    SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
189
190    // Write to Table 1 with date at end
191    Table table1 = connection.getTable(table1Name);
192    for (int i = 0; i < 6; i++) {
193      List<Put> puts = new ArrayList<>(2);
194
195      // Old data
196      String oldDate = sdf1.format(new Date(oldTime));
197      Put put = new Put(Bytes.toBytes("row_" + i + "_" + oldDate));
198      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
199      puts.add(put);
200
201      // Recent data
202      String recentDate = sdf1.format(new Date(recordTime));
203      put = new Put(Bytes.toBytes("row_" + (i + 1000) + "_" + recentDate));
204      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
205      puts.add(put);
206
207      table1.put(puts);
208      utility.flush(table1Name);
209    }
210    table1.close();
211
212    // Write to Table 2 with date at beginning
213    Table table2 = connection.getTable(table2Name);
214    for (int i = 0; i < 6; i++) {
215      List<Put> puts = new ArrayList<>(2);
216
217      // Old data
218      String oldDate = sdf2.format(new Date(oldTime));
219      Put put = new Put(Bytes.toBytes(oldDate + "_row_" + i));
220      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
221      puts.add(put);
222
223      // Recent data
224      String recentDate = sdf2.format(new Date(recordTime));
225      put = new Put(Bytes.toBytes(recentDate + "_row_" + (i + 1000)));
226      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
227      puts.add(put);
228
229      table2.put(puts);
230      utility.flush(table2Name);
231    }
232    table2.close();
233
234    // First compaction for Table 1
235    long compactionTime1 = System.currentTimeMillis();
236    utility.getAdmin().majorCompact(table1Name);
237    Waiter.waitFor(utility.getConfiguration(), 5000,
238      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
239          > compactionTime1);
240
241    assertEquals(1, utility.getNumHFiles(table1Name, FAMILY));
242
243    utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
244      .forEach(file -> {
245        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
246        assertNotNull(rangeBytes);
247        try {
248          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
249          assertEquals(oldTime, timeRangeTracker.getMin());
250          assertEquals(recordTime, timeRangeTracker.getMax());
251        } catch (IOException e) {
252          fail(e.getMessage());
253        }
254      });
255
256    // Second compaction for Table 1
257    long secondCompactionTime1 = System.currentTimeMillis();
258    utility.getAdmin().majorCompact(table1Name);
259    Waiter.waitFor(utility.getConfiguration(), 5000,
260      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
261          > secondCompactionTime1);
262
263    assertEquals(2, utility.getNumHFiles(table1Name, FAMILY));
264
265    utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
266      .forEach(file -> {
267        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
268        assertNotNull(rangeBytes);
269        try {
270          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
271          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
272        } catch (IOException e) {
273          fail(e.getMessage());
274        }
275      });
276
277    // First compaction for Table 2
278    long compactionTime2 = System.currentTimeMillis();
279    utility.getAdmin().majorCompact(table2Name);
280    Waiter.waitFor(utility.getConfiguration(), 5000,
281      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
282          > compactionTime2);
283
284    assertEquals(1, utility.getNumHFiles(table2Name, FAMILY));
285
286    utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
287      .forEach(file -> {
288        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
289        assertNotNull(rangeBytes);
290        try {
291          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
292          // Table 2 uses yyyy-MM-dd HH:mm:ss format, so we need to account for second precision
293          // The parsed time will be truncated to second precision (no milliseconds)
294          long expectedOldTime = (oldTime / 1000) * 1000;
295          long expectedRecentTime = (recordTime / 1000) * 1000;
296          assertEquals(expectedOldTime, timeRangeTracker.getMin());
297          assertEquals(expectedRecentTime, timeRangeTracker.getMax());
298        } catch (IOException e) {
299          fail(e.getMessage());
300        }
301      });
302
303    // Second compaction for Table 2
304    long secondCompactionTime2 = System.currentTimeMillis();
305    utility.getAdmin().majorCompact(table2Name);
306    Waiter.waitFor(utility.getConfiguration(), 5000,
307      () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
308          > secondCompactionTime2);
309
310    assertEquals(2, utility.getNumHFiles(table2Name, FAMILY));
311
312    utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
313      .forEach(file -> {
314        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
315        assertNotNull(rangeBytes);
316        try {
317          TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
318          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
319        } catch (IOException e) {
320          fail(e.getMessage());
321        }
322      });
323  }
324}