001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.Matchers.any;
025import static org.mockito.Matchers.anyBoolean;
026import static org.mockito.Matchers.anyLong;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.OptionalLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HColumnDescriptor;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
051import org.apache.hadoop.hbase.regionserver.StoreUtils;
052import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
053import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
054import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064import org.junit.runners.Parameterized.Parameters;
065
066@RunWith(Parameterized.class)
067@Category({ RegionServerTests.class, SmallTests.class })
068public class TestDateTieredCompactor {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestDateTieredCompactor.class);
073
074  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
075
076  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
077
078  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
079
080  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
081
082  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
083
084  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
085
086  @Parameters(name = "{index}: usePrivateReaders={0}")
087  public static Iterable<Object[]> data() {
088    return Arrays.asList(new Object[] { true }, new Object[] { false });
089  }
090
091  @Parameter
092  public boolean usePrivateReaders;
093
094  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
095      final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
096    Configuration conf = HBaseConfiguration.create();
097    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
098    final Scanner scanner = new Scanner(input);
099    // Create store mock that is satisfactory for compactor.
100    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
101    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR);
102    HStore store = mock(HStore.class);
103    when(store.getStorefiles()).thenReturn(storefiles);
104    when(store.getColumnFamilyDescriptor()).thenReturn(col);
105    when(store.getScanInfo()).thenReturn(si);
106    when(store.areWritesEnabled()).thenReturn(true);
107    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
108    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
109    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
110      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
111    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
112      anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
113    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
114    OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
115    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
116
117    return new DateTieredCompactor(conf, store) {
118      @Override
119      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
120          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
121          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
122        return scanner;
123      }
124
125      @Override
126      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
127          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
128          long earliestPutTs) throws IOException {
129        return scanner;
130      }
131    };
132  }
133
134  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
135      boolean allFiles) throws Exception {
136    StoreFileWritersCapture writers = new StoreFileWritersCapture();
137    HStoreFile sf1 = createDummyStoreFile(1L);
138    HStoreFile sf2 = createDummyStoreFile(2L);
139    DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
140    List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
141      boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
142    writers.verifyKvs(output, allFiles, boundaries);
143    if (allFiles) {
144      assertEquals(output.length, paths.size());
145    }
146  }
147
148  @SuppressWarnings("unchecked")
149  private static <T> T[] a(T... a) {
150    return a;
151  }
152
153  @Test
154  public void test() throws Exception {
155    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
156      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
157    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
158      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
159    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
160      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
161  }
162
163  @Test
164  public void testEmptyOutputFile() throws Exception {
165    StoreFileWritersCapture writers = new StoreFileWritersCapture();
166    CompactionRequestImpl request = createDummyRequest();
167    DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
168      new ArrayList<>(request.getFiles()));
169    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
170      NoLimitThroughputController.INSTANCE, null);
171    assertEquals(1, paths.size());
172    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
173    assertEquals(1, dummyWriters.size());
174    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
175    assertTrue(dummyWriter.kvs.isEmpty());
176    assertTrue(dummyWriter.hasMetadata);
177  }
178}