001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.OptionalLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.StoreEngine;
051import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
052import org.apache.hadoop.hbase.regionserver.StoreUtils;
053import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
054import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
055import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.testclassification.SmallTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameter;
065import org.junit.runners.Parameterized.Parameters;
066
067@RunWith(Parameterized.class)
068@Category({ RegionServerTests.class, SmallTests.class })
069public class TestDateTieredCompactor {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestDateTieredCompactor.class);
074
075  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
076
077  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
078
079  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
080
081  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
082
083  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
084
085  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
086
087  @Parameters(name = "{index}: usePrivateReaders={0}")
088  public static Iterable<Object[]> data() {
089    return Arrays.asList(new Object[] { true }, new Object[] { false });
090  }
091
092  @Parameter
093  public boolean usePrivateReaders;
094
095  @SuppressWarnings({ "rawtypes", "unchecked" })
096  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
097    final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
098    Configuration conf = HBaseConfiguration.create();
099    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
100    final Scanner scanner = new Scanner(input);
101    // Create store mock that is satisfactory for compactor.
102    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
103    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR);
104    HStore store = mock(HStore.class);
105    when(store.getStorefiles()).thenReturn(storefiles);
106    when(store.getColumnFamilyDescriptor()).thenReturn(col);
107    when(store.getScanInfo()).thenReturn(si);
108    when(store.areWritesEnabled()).thenReturn(true);
109    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
110    when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
111    StoreEngine storeEngine = mock(StoreEngine.class);
112    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
113    when(store.getStoreEngine()).thenReturn(storeEngine);
114    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
115    OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
116    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
117
118    return new DateTieredCompactor(conf, store) {
119      @Override
120      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
121        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
122        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
123        return scanner;
124      }
125
126      @Override
127      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
128        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
129        long earliestPutTs) throws IOException {
130        return scanner;
131      }
132    };
133  }
134
135  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
136    boolean allFiles) throws Exception {
137    StoreFileWritersCapture writers = new StoreFileWritersCapture();
138    HStoreFile sf1 = createDummyStoreFile(1L);
139    HStoreFile sf2 = createDummyStoreFile(2L);
140    DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
141    List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
142      boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
143      NoLimitThroughputController.INSTANCE, null);
144    writers.verifyKvs(output, allFiles, boundaries);
145    if (allFiles) {
146      assertEquals(output.length, paths.size());
147    }
148  }
149
150  @SuppressWarnings("unchecked")
151  private static <T> T[] a(T... a) {
152    return a;
153  }
154
155  @Test
156  public void test() throws Exception {
157    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
158      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
159    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
160      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
161    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
162      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
163  }
164
165  @Test
166  public void testEmptyOutputFile() throws Exception {
167    StoreFileWritersCapture writers = new StoreFileWritersCapture();
168    CompactionRequestImpl request = createDummyRequest();
169    DateTieredCompactor dtc =
170      createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles()));
171    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
172      new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
173    assertEquals(1, paths.size());
174    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
175    assertEquals(1, dummyWriters.size());
176    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
177    assertTrue(dummyWriter.kvs.isEmpty());
178    assertTrue(dummyWriter.hasMetadata);
179  }
180}