001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static java.util.Arrays.asList;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyLong;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.verify;
028import static org.mockito.Mockito.when;
029import static org.mockito.hamcrest.MockitoHamcrest.argThat;
030
031import java.io.File;
032import java.io.FileNotFoundException;
033import java.io.FileOutputStream;
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.List;
038import java.util.Random;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HColumnDescriptor;
048import org.apache.hadoop.hbase.HRegionInfo;
049import org.apache.hadoop.hbase.HTableDescriptor;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
054import org.apache.hadoop.hbase.testclassification.SmallTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.wal.WAL;
058import org.apache.hadoop.hbase.wal.WALEdit;
059import org.apache.hadoop.hbase.wal.WALKeyImpl;
060import org.hamcrest.Description;
061import org.hamcrest.Matcher;
062import org.hamcrest.TypeSafeMatcher;
063import org.junit.Before;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TemporaryFolder;
069import org.junit.rules.TestName;
070import org.mockito.invocation.InvocationOnMock;
071import org.mockito.stubbing.Answer;
072
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
076
077/**
078 * This class attempts to unit test bulk HLog loading.
079 */
080@Category(SmallTests.class)
081public class TestBulkLoad {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestBulkLoad.class);
086
087  @ClassRule
088  public static TemporaryFolder testFolder = new TemporaryFolder();
089  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
090  private final WAL log = mock(WAL.class);
091  private final Configuration conf = HBaseConfiguration.create();
092  private final Random random = new Random();
093  private final byte[] randomBytes = new byte[100];
094  private final byte[] family1 = Bytes.toBytes("family1");
095  private final byte[] family2 = Bytes.toBytes("family2");
096  private final byte[] family3 = Bytes.toBytes("family3");
097
098  @Rule
099  public TestName name = new TestName();
100
101  @Before
102  public void before() throws IOException {
103    random.nextBytes(randomBytes);
104    // Mockito.when(log.append(htd, info, key, edits, inMemstore));
105  }
106
107  @Test
108  public void verifyBulkLoadEvent() throws IOException {
109    TableName tableName = TableName.valueOf("test", "test");
110    List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
111    byte[] familyName = familyPaths.get(0).getFirst();
112    String storeFileName = familyPaths.get(0).getSecond();
113    storeFileName = (new Path(storeFileName)).getName();
114    List<String> storeFileNames = new ArrayList<>();
115    storeFileNames.add(storeFileName);
116    when(log.appendMarker(any(), any(), argThat(
117        bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))).
118        thenAnswer(new Answer() {
119          @Override
120          public Object answer(InvocationOnMock invocation) {
121            WALKeyImpl walKey = invocation.getArgument(1);
122            MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
123            if (mvcc != null) {
124              MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
125              walKey.setWriteEntry(we);
126            }
127            return 01L;
128          };
129    });
130    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
131        .bulkLoadHFiles(familyPaths, false, null);
132    verify(log).sync(anyLong());
133  }
134
135  @Test
136  public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
137    testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<>(),false, null);
138  }
139
140  @Test
141  public void shouldBulkLoadSingleFamilyHLog() throws IOException {
142    when(log.appendMarker(any(),
143            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
144              @Override
145              public Object answer(InvocationOnMock invocation) {
146                WALKeyImpl walKey = invocation.getArgument(1);
147                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
148                if (mvcc != null) {
149                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
150                  walKey.setWriteEntry(we);
151                }
152                return 01L;
153              };
154    });
155    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
156    verify(log).sync(anyLong());
157  }
158
159  @Test
160  public void shouldBulkLoadManyFamilyHLog() throws IOException {
161    when(log.appendMarker(any(),
162            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
163              @Override
164              public Object answer(InvocationOnMock invocation) {
165                WALKeyImpl walKey = invocation.getArgument(1);
166                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
167                if (mvcc != null) {
168                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
169                  walKey.setWriteEntry(we);
170                }
171                return 01L;
172              };
173            });
174    testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
175            false, null);
176    verify(log).sync(anyLong());
177  }
178
179  @Test
180  public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
181    when(log.appendMarker(any(),
182            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
183              @Override
184              public Object answer(InvocationOnMock invocation) {
185                WALKeyImpl walKey = invocation.getArgument(1);
186                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
187                if (mvcc != null) {
188                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
189                  walKey.setWriteEntry(we);
190                }
191                return 01L;
192              };
193    });
194    TableName tableName = TableName.valueOf("test", "test");
195    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
196        .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
197    verify(log).sync(anyLong());
198  }
199
200  @Test(expected = DoNotRetryIOException.class)
201  public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
202    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
203      null);
204  }
205
206  // after HBASE-24021 will throw DoNotRetryIOException, not MultipleIOException
207  @Test(expected = DoNotRetryIOException.class)
208  public void shouldCrashIfBulkLoadMultiFamiliesNotInTable() throws IOException {
209    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2, family3),
210      false, null);
211  }
212
213  @Test(expected = DoNotRetryIOException.class)
214  public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
215      throws IOException {
216    testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
217  }
218
219  @Test(expected = DoNotRetryIOException.class)
220  public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
221    testRegionWithFamilies()
222        .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
223            false, null);
224  }
225
226  @Test(expected = FileNotFoundException.class)
227  public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
228    List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
229    testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
230  }
231
232  // after HBASE-24021 will throw FileNotFoundException, not MultipleIOException
233  @Test(expected = FileNotFoundException.class)
234  public void shouldThrowErrorIfMultiHFileDoesNotExist() throws IOException {
235    List<Pair<byte[], String>> list = new ArrayList<>();
236    list.addAll(asList(withMissingHFileForFamily(family1)));
237    list.addAll(asList(withMissingHFileForFamily(family2)));
238    testRegionWithFamilies(family1, family2).bulkLoadHFiles(list, false, null);
239  }
240
241  private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
242    return new Pair<>(family, getNotExistFilePath());
243  }
244
245  private String getNotExistFilePath() {
246    Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
247    return path.toUri().getPath();
248  }
249
250  private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
251      throws IOException {
252    createHFileForFamilies(family);
253    return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
254  }
255
256
257  private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
258                                                              byte[]... families)
259  throws IOException {
260    HRegionInfo hRegionInfo = new HRegionInfo(tableName);
261    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
262    for (byte[] family : families) {
263      hTableDescriptor.addFamily(new HColumnDescriptor(family));
264    }
265    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
266    // TODO We need a way to do this without creating files
267    return HRegion.createHRegion(hRegionInfo,
268        new Path(testFolder.newFolder().toURI()),
269        conf,
270        hTableDescriptor,
271        log);
272
273  }
274
275  private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
276    TableName tableName = TableName.valueOf(name.getMethodName());
277    return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
278  }
279
280  private List<Pair<byte[], String>> getBlankFamilyPaths(){
281    return new ArrayList<>();
282  }
283
284  private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
285    List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
286    for (byte[] family : families) {
287      familyPaths.add(new Pair<>(family, createHFileForFamilies(family)));
288    }
289    return familyPaths;
290  }
291
292  private String createHFileForFamilies(byte[] family) throws IOException {
293    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
294    // TODO We need a way to do this without creating files
295    File hFileLocation = testFolder.newFile();
296    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
297    try {
298      hFileFactory.withOutputStream(out);
299      hFileFactory.withFileContext(new HFileContextBuilder().build());
300      HFile.Writer writer = hFileFactory.create();
301      try {
302        writer.append(new KeyValue(CellUtil.createCell(randomBytes,
303            family,
304            randomBytes,
305            0L,
306            KeyValue.Type.Put.getCode(),
307            randomBytes)));
308      } finally {
309        writer.close();
310      }
311    } finally {
312      out.close();
313    }
314    return hFileLocation.getAbsoluteFile().getAbsolutePath();
315  }
316
317  private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
318    return new WalMatcher(typeBytes);
319  }
320
321  private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
322      byte[] familyName, List<String> storeFileNames) {
323    return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
324  }
325
326  private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
327    private final byte[] typeBytes;
328    private final byte[] tableName;
329    private final byte[] familyName;
330    private final List<String> storeFileNames;
331
332    public WalMatcher(byte[] typeBytes) {
333      this(typeBytes, null, null, null);
334    }
335
336    public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
337        List<String> storeFileNames) {
338      this.typeBytes = typeBytes;
339      this.tableName = tableName;
340      this.familyName = familyName;
341      this.storeFileNames = storeFileNames;
342    }
343
344    @Override
345    protected boolean matchesSafely(WALEdit item) {
346      assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
347      BulkLoadDescriptor desc;
348      try {
349        desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
350      } catch (IOException e) {
351        return false;
352      }
353      assertNotNull(desc);
354
355      if (tableName != null) {
356        assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
357          tableName));
358      }
359
360      if(storeFileNames != null) {
361        int index=0;
362        StoreDescriptor store = desc.getStores(0);
363        assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
364        assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
365        assertEquals(storeFileNames.size(), store.getStoreFileCount());
366      }
367
368      return true;
369    }
370
371    @Override
372    public void describeTo(Description description) {
373
374    }
375  }
376}