001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static java.util.Arrays.asList;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyBoolean;
026import static org.mockito.ArgumentMatchers.anyLong;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030import static org.mockito.hamcrest.MockitoHamcrest.argThat;
031
032import java.io.File;
033import java.io.FileNotFoundException;
034import java.io.FileOutputStream;
035import java.io.IOException;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Random;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.DoNotRetryIOException;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HColumnDescriptor;
049import org.apache.hadoop.hbase.HRegionInfo;
050import org.apache.hadoop.hbase.HTableDescriptor;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.io.hfile.HFile;
054import org.apache.hadoop.hbase.io.hfile.HFileContext;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.hbase.wal.WAL;
059import org.apache.hadoop.hbase.wal.WALEdit;
060import org.apache.hadoop.hbase.wal.WALKeyImpl;
061import org.hamcrest.Description;
062import org.hamcrest.Matcher;
063import org.hamcrest.TypeSafeMatcher;
064import org.junit.Before;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TemporaryFolder;
070import org.junit.rules.TestName;
071import org.mockito.invocation.InvocationOnMock;
072import org.mockito.stubbing.Answer;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
077
078/**
079 * This class attempts to unit test bulk HLog loading.
080 */
081@Category(SmallTests.class)
082public class TestBulkLoad {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestBulkLoad.class);
087
088  @ClassRule
089  public static TemporaryFolder testFolder = new TemporaryFolder();
090  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
091  private final WAL log = mock(WAL.class);
092  private final Configuration conf = HBaseConfiguration.create();
093  private final Random random = new Random();
094  private final byte[] randomBytes = new byte[100];
095  private final byte[] family1 = Bytes.toBytes("family1");
096  private final byte[] family2 = Bytes.toBytes("family2");
097
098  @Rule
099  public TestName name = new TestName();
100
101  @Before
102  public void before() throws IOException {
103    random.nextBytes(randomBytes);
104    // Mockito.when(log.append(htd, info, key, edits, inMemstore));
105  }
106
107  @Test
108  public void verifyBulkLoadEvent() throws IOException {
109    TableName tableName = TableName.valueOf("test", "test");
110    List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
111    byte[] familyName = familyPaths.get(0).getFirst();
112    String storeFileName = familyPaths.get(0).getSecond();
113    storeFileName = (new Path(storeFileName)).getName();
114    List<String> storeFileNames = new ArrayList<>();
115    storeFileNames.add(storeFileName);
116    when(log.append(any(), any(),
117            argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
118                    familyName, storeFileNames)),
119            anyBoolean())).thenAnswer(new Answer() {
120              @Override
121              public Object answer(InvocationOnMock invocation) {
122                WALKeyImpl walKey = invocation.getArgument(1);
123                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
124                if (mvcc != null) {
125                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
126                  walKey.setWriteEntry(we);
127                }
128                return 01L;
129              };
130    });
131    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
132        .bulkLoadHFiles(familyPaths, false, null);
133    verify(log).sync(anyLong());
134  }
135
136  @Test
137  public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
138    testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<>(),false, null);
139  }
140
141  @Test
142  public void shouldBulkLoadSingleFamilyHLog() throws IOException {
143    when(log.append(any(),
144            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
145            anyBoolean())).thenAnswer(new Answer() {
146              @Override
147              public Object answer(InvocationOnMock invocation) {
148                WALKeyImpl walKey = invocation.getArgument(1);
149                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
150                if (mvcc != null) {
151                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
152                  walKey.setWriteEntry(we);
153                }
154                return 01L;
155              };
156    });
157    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
158    verify(log).sync(anyLong());
159  }
160
161  @Test
162  public void shouldBulkLoadManyFamilyHLog() throws IOException {
163    when(log.append(any(),
164            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
165            anyBoolean())).thenAnswer(new Answer() {
166              @Override
167              public Object answer(InvocationOnMock invocation) {
168                WALKeyImpl walKey = invocation.getArgument(1);
169                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
170                if (mvcc != null) {
171                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
172                  walKey.setWriteEntry(we);
173                }
174                return 01L;
175              };
176            });
177    testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
178            false, null);
179    verify(log).sync(anyLong());
180  }
181
182  @Test
183  public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
184    when(log.append(any(),
185            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
186            anyBoolean())).thenAnswer(new Answer() {
187              @Override
188              public Object answer(InvocationOnMock invocation) {
189                WALKeyImpl walKey = invocation.getArgument(1);
190                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
191                if (mvcc != null) {
192                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
193                  walKey.setWriteEntry(we);
194                }
195                return 01L;
196              };
197    });
198    TableName tableName = TableName.valueOf("test", "test");
199    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
200        .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
201    verify(log).sync(anyLong());
202  }
203
204  @Test(expected = DoNotRetryIOException.class)
205  public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
206    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
207      null);
208  }
209
210  @Test(expected = DoNotRetryIOException.class)
211  public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
212      throws IOException {
213    testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
214  }
215
216  @Test(expected = DoNotRetryIOException.class)
217  public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
218    testRegionWithFamilies()
219        .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
220            false, null);
221  }
222
223  @Test(expected = FileNotFoundException.class)
224  public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
225    List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
226    testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
227  }
228
229  private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
230    return new Pair<>(family, getNotExistFilePath());
231  }
232
233  private String getNotExistFilePath() {
234    Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
235    return path.toUri().getPath();
236  }
237
238  private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
239      throws IOException {
240    createHFileForFamilies(family);
241    return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
242  }
243
244
245  private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
246                                                              byte[]... families)
247  throws IOException {
248    HRegionInfo hRegionInfo = new HRegionInfo(tableName);
249    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
250    for (byte[] family : families) {
251      hTableDescriptor.addFamily(new HColumnDescriptor(family));
252    }
253    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
254    // TODO We need a way to do this without creating files
255    return HRegion.createHRegion(hRegionInfo,
256        new Path(testFolder.newFolder().toURI()),
257        conf,
258        hTableDescriptor,
259        log);
260
261  }
262
263  private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
264    TableName tableName = TableName.valueOf(name.getMethodName());
265    return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
266  }
267
268  private List<Pair<byte[], String>> getBlankFamilyPaths(){
269    return new ArrayList<>();
270  }
271
272  private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
273    List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
274    for (byte[] family : families) {
275      familyPaths.add(new Pair<>(family, createHFileForFamilies(family)));
276    }
277    return familyPaths;
278  }
279
280  private String createHFileForFamilies(byte[] family) throws IOException {
281    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
282    // TODO We need a way to do this without creating files
283    File hFileLocation = testFolder.newFile();
284    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
285    try {
286      hFileFactory.withOutputStream(out);
287      hFileFactory.withFileContext(new HFileContext());
288      HFile.Writer writer = hFileFactory.create();
289      try {
290        writer.append(new KeyValue(CellUtil.createCell(randomBytes,
291            family,
292            randomBytes,
293            0L,
294            KeyValue.Type.Put.getCode(),
295            randomBytes)));
296      } finally {
297        writer.close();
298      }
299    } finally {
300      out.close();
301    }
302    return hFileLocation.getAbsoluteFile().getAbsolutePath();
303  }
304
305  private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
306    return new WalMatcher(typeBytes);
307  }
308
309  private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
310      byte[] familyName, List<String> storeFileNames) {
311    return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
312  }
313
314  private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
315    private final byte[] typeBytes;
316    private final byte[] tableName;
317    private final byte[] familyName;
318    private final List<String> storeFileNames;
319
320    public WalMatcher(byte[] typeBytes) {
321      this(typeBytes, null, null, null);
322    }
323
324    public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
325        List<String> storeFileNames) {
326      this.typeBytes = typeBytes;
327      this.tableName = tableName;
328      this.familyName = familyName;
329      this.storeFileNames = storeFileNames;
330    }
331
332    @Override
333    protected boolean matchesSafely(WALEdit item) {
334      assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
335      BulkLoadDescriptor desc;
336      try {
337        desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
338      } catch (IOException e) {
339        return false;
340      }
341      assertNotNull(desc);
342
343      if (tableName != null) {
344        assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
345          tableName));
346      }
347
348      if(storeFileNames != null) {
349        int index=0;
350        StoreDescriptor store = desc.getStores(0);
351        assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
352        assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
353        assertEquals(storeFileNames.size(), store.getStoreFileCount());
354      }
355
356      return true;
357    }
358
359    @Override
360    public void describeTo(Description description) {
361
362    }
363  }
364}