001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.OptionalLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
046import org.apache.hadoop.hbase.regionserver.HStore;
047import org.apache.hadoop.hbase.regionserver.HStoreFile;
048import org.apache.hadoop.hbase.regionserver.InternalScanner;
049import org.apache.hadoop.hbase.regionserver.ScanInfo;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.StoreEngine;
052import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
053import org.apache.hadoop.hbase.regionserver.StoreUtils;
054import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
055import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
056import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.testclassification.SmallTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.junit.runners.Parameterized.Parameter;
066import org.junit.runners.Parameterized.Parameters;
067
068@RunWith(Parameterized.class)
069@Category({ RegionServerTests.class, SmallTests.class })
070public class TestDateTieredCompactor {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestDateTieredCompactor.class);
075
076  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
077
078  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
079
080  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
081
082  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
083
084  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
085
086  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
087
088  @Parameters(name = "{index}: usePrivateReaders={0}")
089  public static Iterable<Object[]> data() {
090    return Arrays.asList(new Object[] { true }, new Object[] { false });
091  }
092
093  @Parameter
094  public boolean usePrivateReaders;
095
096  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
097    final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
098    Configuration conf = HBaseConfiguration.create();
099    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
100    final Scanner scanner = new Scanner(input);
101    // Create store mock that is satisfactory for compactor.
102    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(NAME_OF_THINGS);
103    ScanInfo si =
104      new ScanInfo(conf, familyDescriptor, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR);
105    HStore store = mock(HStore.class);
106    when(store.getStorefiles()).thenReturn(storefiles);
107    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
108    when(store.getScanInfo()).thenReturn(si);
109    when(store.areWritesEnabled()).thenReturn(true);
110    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
111    when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
112    StoreEngine storeEngine = mock(StoreEngine.class);
113    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
114    when(store.getStoreEngine()).thenReturn(storeEngine);
115    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
116    OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
117    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
118
119    return new DateTieredCompactor(conf, store) {
120      @Override
121      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
122        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
123        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
124        return scanner;
125      }
126
127      @Override
128      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
129        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
130        long earliestPutTs) throws IOException {
131        return scanner;
132      }
133    };
134  }
135
136  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
137    boolean allFiles) throws Exception {
138    StoreFileWritersCapture writers = new StoreFileWritersCapture();
139    HStoreFile sf1 = createDummyStoreFile(1L);
140    HStoreFile sf2 = createDummyStoreFile(2L);
141    DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
142    List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
143      boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
144      NoLimitThroughputController.INSTANCE, null);
145    writers.verifyKvs(output, allFiles, boundaries);
146    if (allFiles) {
147      assertEquals(output.length, paths.size());
148    }
149  }
150
151  @SuppressWarnings("unchecked")
152  private static <T> T[] a(T... a) {
153    return a;
154  }
155
156  @Test
157  public void test() throws Exception {
158    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
159      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
160    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
161      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
162    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
163      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
164  }
165
166  @Test
167  public void testEmptyOutputFile() throws Exception {
168    StoreFileWritersCapture writers = new StoreFileWritersCapture();
169    CompactionRequestImpl request = createDummyRequest();
170    DateTieredCompactor dtc =
171      createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles()));
172    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
173      new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
174    assertEquals(1, paths.size());
175    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
176    assertEquals(1, dummyWriters.size());
177    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
178    assertTrue(dummyWriter.kvs.isEmpty());
179    assertTrue(dummyWriter.hasMetadata);
180  }
181}