001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
022import static org.junit.Assert.assertEquals;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.anyBoolean;
025import static org.mockito.ArgumentMatchers.anyLong;
026import static org.mockito.ArgumentMatchers.anyString;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.regionserver.HStore;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
051import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
052import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
053import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062import org.junit.runners.Parameterized.Parameter;
063import org.junit.runners.Parameterized.Parameters;
064
065@RunWith(Parameterized.class)
066@Category({ RegionServerTests.class, SmallTests.class })
067public class TestStripeCompactor {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestStripeCompactor.class);
072
073  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
074  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
075
076  private static final byte[] KEY_B = Bytes.toBytes("bbb");
077  private static final byte[] KEY_C = Bytes.toBytes("ccc");
078  private static final byte[] KEY_D = Bytes.toBytes("ddd");
079
080  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
081  private static final KeyValue KV_B = kvAfter(KEY_B);
082  private static final KeyValue KV_C = kvAfter(KEY_C);
083  private static final KeyValue KV_D = kvAfter(KEY_D);
084
085  @Parameters(name = "{index}: usePrivateReaders={0}")
086  public static Iterable<Object[]> data() {
087    return Arrays.asList(new Object[] { true }, new Object[] { false });
088  }
089
090  @Parameter
091  public boolean usePrivateReaders;
092
093  private static KeyValue kvAfter(byte[] key) {
094    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
095  }
096
097  @SuppressWarnings("unchecked")
098  private static <T> T[] a(T... a) {
099    return a;
100  }
101
102  private static KeyValue[] e() {
103    return TestStripeCompactor.<KeyValue> a();
104  }
105
106  @Test
107  public void testBoundaryCompactions() throws Exception {
108    // General verification
109    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
110      a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
111    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
112    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
113  }
114
115  @Test
116  public void testBoundaryCompactionEmptyFiles() throws Exception {
117    // No empty file if there're already files.
118    verifyBoundaryCompaction(a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null),
119      null, null, false);
120    verifyBoundaryCompaction(a(KV_A, KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D),
121      a(a(KV_A), null, a(KV_C)), null, null, false);
122    // But should be created if there are no file.
123    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null,
124      null, false);
125    // In major range if there's major range.
126    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B,
127      KEY_C, false);
128    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY,
129      KEY_C, false);
130    // Major range should have files regardless of KVs.
131    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
132      a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
133    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
134      a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
135
136  }
137
138  private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output)
139      throws Exception {
140    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
141  }
142
143  private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output,
144      byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception {
145    StoreFileWritersCapture writers = new StoreFileWritersCapture();
146    StripeCompactor sc = createCompactor(writers, input);
147    List<Path> paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom,
148      majorTo, NoLimitThroughputController.INSTANCE, null);
149    writers.verifyKvs(output, allFiles, true);
150    if (allFiles) {
151      assertEquals(output.length, paths.size());
152      writers.verifyBoundaries(boundaries);
153    }
154  }
155
156  @Test
157  public void testSizeCompactions() throws Exception {
158    // General verification with different sizes.
159    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
160      a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
161    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
162      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
163    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
164    // Verify row boundaries are preserved.
165    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
166      a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
167    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
168      a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
169    // Too much data, count limits the number of files.
170    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
171      a(a(KV_A), a(KV_B, KV_C, KV_D)));
172    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
173      new KeyValue[][] { a(KV_A, KV_B, KV_C) });
174    // Too little data/large count, no extra files.
175    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
176      a(a(KV_A, KV_B), a(KV_C, KV_D)));
177  }
178
179  private void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, byte[] left,
180      byte[] right, KeyValue[][] output) throws Exception {
181    StoreFileWritersCapture writers = new StoreFileWritersCapture();
182    StripeCompactor sc = createCompactor(writers, input);
183    List<Path> paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null,
184      null, NoLimitThroughputController.INSTANCE, null);
185    assertEquals(output.length, paths.size());
186    writers.verifyKvs(output, true, true);
187    List<byte[]> boundaries = new ArrayList<>(output.length + 2);
188    boundaries.add(left);
189    for (int i = 1; i < output.length; ++i) {
190      boundaries.add(CellUtil.cloneRow(output[i][0]));
191    }
192    boundaries.add(right);
193    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
194  }
195
196  private StripeCompactor createCompactor(StoreFileWritersCapture writers, KeyValue[] input)
197      throws Exception {
198    Configuration conf = HBaseConfiguration.create();
199    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
200    final Scanner scanner = new Scanner(input);
201
202    // Create store mock that is satisfactory for compactor.
203    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(NAME_OF_THINGS);
204    ScanInfo si = new ScanInfo(conf, familyDescriptor, Long.MAX_VALUE, 0,
205      CellComparatorImpl.COMPARATOR);
206    HStore store = mock(HStore.class);
207    when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
208    when(store.getScanInfo()).thenReturn(si);
209    when(store.areWritesEnabled()).thenReturn(true);
210    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
211    when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
212    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
213      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
214    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
215      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
216    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
217
218    return new StripeCompactor(conf, store) {
219      @Override
220      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
221          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
222          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
223        return scanner;
224      }
225
226      @Override
227      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
228          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
229          long earliestPutTs) throws IOException {
230        return scanner;
231      }
232    };
233  }
234}