001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.OptionalLong;
034import java.util.stream.Stream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
047import org.apache.hadoop.hbase.regionserver.HStore;
048import org.apache.hadoop.hbase.regionserver.HStoreFile;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.ScanInfo;
051import org.apache.hadoop.hbase.regionserver.ScanType;
052import org.apache.hadoop.hbase.regionserver.StoreEngine;
053import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
054import org.apache.hadoop.hbase.regionserver.StoreUtils;
055import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
056import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
057import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
058import org.apache.hadoop.hbase.testclassification.RegionServerTests;
059import org.apache.hadoop.hbase.testclassification.SmallTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.TestTemplate;
063import org.junit.jupiter.params.provider.Arguments;
064
065@Tag(RegionServerTests.TAG)
066@Tag(SmallTests.TAG)
067@HBaseParameterizedTestTemplate(name = "{index}: usePrivateReaders={0}")
068public class TestDateTieredCompactor {
069
070  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
071
072  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
073
074  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
075
076  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
077
078  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
079
080  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
081
082  public static Stream<Arguments> parameters() {
083    return Stream.of(Arguments.of(true), Arguments.of(false));
084  }
085
086  private final boolean usePrivateReaders;
087
088  public TestDateTieredCompactor(boolean usePrivateReaders) {
089    this.usePrivateReaders = usePrivateReaders;
090  }
091
092  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
093    final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
094    Configuration conf = HBaseConfiguration.create();
095    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
096    final Scanner scanner = new Scanner(input);
097    // Create store mock that is satisfactory for compactor.
098    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(NAME_OF_THINGS);
099    ScanInfo si =
100      new ScanInfo(conf, familyDescriptor, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR);
101    HStore store = mock(HStore.class);
102    when(store.getStorefiles()).thenReturn(storefiles);
103    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
104    when(store.getScanInfo()).thenReturn(si);
105    when(store.areWritesEnabled()).thenReturn(true);
106    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
107    when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
108    StoreEngine storeEngine = mock(StoreEngine.class);
109    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
110    when(store.getStoreEngine()).thenReturn(storeEngine);
111    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
112    OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
113    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
114
115    return new DateTieredCompactor(conf, store) {
116      @Override
117      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
118        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
119        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
120        return scanner;
121      }
122
123      @Override
124      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
125        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
126        long earliestPutTs) throws IOException {
127        return scanner;
128      }
129    };
130  }
131
132  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
133    boolean allFiles) throws Exception {
134    StoreFileWritersCapture writers = new StoreFileWritersCapture();
135    HStoreFile sf1 = createDummyStoreFile(1L);
136    HStoreFile sf2 = createDummyStoreFile(2L);
137    DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
138    List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
139      boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
140      NoLimitThroughputController.INSTANCE, null);
141    writers.verifyKvs(output, allFiles, boundaries);
142    if (allFiles) {
143      assertEquals(output.length, paths.size());
144    }
145  }
146
147  @SuppressWarnings("unchecked")
148  private static <T> T[] a(T... a) {
149    return a;
150  }
151
152  @TestTemplate
153  public void test() throws Exception {
154    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
155      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
156    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
157      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
158    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
159      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
160  }
161
162  @TestTemplate
163  public void testEmptyOutputFile() throws Exception {
164    StoreFileWritersCapture writers = new StoreFileWritersCapture();
165    CompactionRequestImpl request = createDummyRequest();
166    DateTieredCompactor dtc =
167      createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles()));
168    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
169      new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
170    assertEquals(1, paths.size());
171    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
172    assertEquals(1, dummyWriters.size());
173    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
174    assertTrue(dummyWriter.kvs.isEmpty());
175    assertTrue(dummyWriter.hasMetadata);
176  }
177}