001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static java.util.Arrays.asList;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyLong;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.verify;
028import static org.mockito.Mockito.when;
029import static org.mockito.hamcrest.MockitoHamcrest.argThat;
030
031import java.io.File;
032import java.io.FileNotFoundException;
033import java.io.FileOutputStream;
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.List;
038import java.util.Random;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.CellUtil;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HColumnDescriptor;
048import org.apache.hadoop.hbase.HRegionInfo;
049import org.apache.hadoop.hbase.HTableDescriptor;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileContext;
054import org.apache.hadoop.hbase.testclassification.SmallTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.wal.WAL;
058import org.apache.hadoop.hbase.wal.WALEdit;
059import org.apache.hadoop.hbase.wal.WALKeyImpl;
060import org.hamcrest.Description;
061import org.hamcrest.Matcher;
062import org.hamcrest.TypeSafeMatcher;
063import org.junit.Before;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TemporaryFolder;
069import org.junit.rules.TestName;
070import org.mockito.invocation.InvocationOnMock;
071import org.mockito.stubbing.Answer;
072
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
076
077/**
078 * This class attempts to unit test bulk HLog loading.
079 */
080@Category(SmallTests.class)
081public class TestBulkLoad {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestBulkLoad.class);
086
087  @ClassRule
088  public static TemporaryFolder testFolder = new TemporaryFolder();
089  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
090  private final WAL log = mock(WAL.class);
091  private final Configuration conf = HBaseConfiguration.create();
092  private final Random random = new Random();
093  private final byte[] randomBytes = new byte[100];
094  private final byte[] family1 = Bytes.toBytes("family1");
095  private final byte[] family2 = Bytes.toBytes("family2");
096
097  @Rule
098  public TestName name = new TestName();
099
100  @Before
101  public void before() throws IOException {
102    random.nextBytes(randomBytes);
103    // Mockito.when(log.append(htd, info, key, edits, inMemstore));
104  }
105
106  @Test
107  public void verifyBulkLoadEvent() throws IOException {
108    TableName tableName = TableName.valueOf("test", "test");
109    List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
110    byte[] familyName = familyPaths.get(0).getFirst();
111    String storeFileName = familyPaths.get(0).getSecond();
112    storeFileName = (new Path(storeFileName)).getName();
113    List<String> storeFileNames = new ArrayList<>();
114    storeFileNames.add(storeFileName);
115    when(log.appendMarker(any(), any(), argThat(
116        bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))).
117        thenAnswer(new Answer() {
118          @Override
119          public Object answer(InvocationOnMock invocation) {
120            WALKeyImpl walKey = invocation.getArgument(1);
121            MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
122            if (mvcc != null) {
123              MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
124              walKey.setWriteEntry(we);
125            }
126            return 01L;
127          };
128    });
129    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
130        .bulkLoadHFiles(familyPaths, false, null);
131    verify(log).sync(anyLong());
132  }
133
134  @Test
135  public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
136    testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<>(),false, null);
137  }
138
139  @Test
140  public void shouldBulkLoadSingleFamilyHLog() throws IOException {
141    when(log.appendMarker(any(),
142            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
143              @Override
144              public Object answer(InvocationOnMock invocation) {
145                WALKeyImpl walKey = invocation.getArgument(1);
146                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
147                if (mvcc != null) {
148                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
149                  walKey.setWriteEntry(we);
150                }
151                return 01L;
152              };
153    });
154    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
155    verify(log).sync(anyLong());
156  }
157
158  @Test
159  public void shouldBulkLoadManyFamilyHLog() throws IOException {
160    when(log.appendMarker(any(),
161            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
162              @Override
163              public Object answer(InvocationOnMock invocation) {
164                WALKeyImpl walKey = invocation.getArgument(1);
165                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
166                if (mvcc != null) {
167                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
168                  walKey.setWriteEntry(we);
169                }
170                return 01L;
171              };
172            });
173    testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
174            false, null);
175    verify(log).sync(anyLong());
176  }
177
178  @Test
179  public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
180    when(log.appendMarker(any(),
181            any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
182              @Override
183              public Object answer(InvocationOnMock invocation) {
184                WALKeyImpl walKey = invocation.getArgument(1);
185                MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
186                if (mvcc != null) {
187                  MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
188                  walKey.setWriteEntry(we);
189                }
190                return 01L;
191              };
192    });
193    TableName tableName = TableName.valueOf("test", "test");
194    testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
195        .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
196    verify(log).sync(anyLong());
197  }
198
199  @Test(expected = DoNotRetryIOException.class)
200  public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
201    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
202      null);
203  }
204
205  @Test(expected = DoNotRetryIOException.class)
206  public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
207      throws IOException {
208    testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
209  }
210
211  @Test(expected = DoNotRetryIOException.class)
212  public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
213    testRegionWithFamilies()
214        .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
215            false, null);
216  }
217
218  @Test(expected = FileNotFoundException.class)
219  public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
220    List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
221    testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
222  }
223
224  private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
225    return new Pair<>(family, getNotExistFilePath());
226  }
227
228  private String getNotExistFilePath() {
229    Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
230    return path.toUri().getPath();
231  }
232
233  private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
234      throws IOException {
235    createHFileForFamilies(family);
236    return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
237  }
238
239
240  private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
241                                                              byte[]... families)
242  throws IOException {
243    HRegionInfo hRegionInfo = new HRegionInfo(tableName);
244    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
245    for (byte[] family : families) {
246      hTableDescriptor.addFamily(new HColumnDescriptor(family));
247    }
248    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
249    // TODO We need a way to do this without creating files
250    return HRegion.createHRegion(hRegionInfo,
251        new Path(testFolder.newFolder().toURI()),
252        conf,
253        hTableDescriptor,
254        log);
255
256  }
257
258  private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
259    TableName tableName = TableName.valueOf(name.getMethodName());
260    return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
261  }
262
263  private List<Pair<byte[], String>> getBlankFamilyPaths(){
264    return new ArrayList<>();
265  }
266
267  private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
268    List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
269    for (byte[] family : families) {
270      familyPaths.add(new Pair<>(family, createHFileForFamilies(family)));
271    }
272    return familyPaths;
273  }
274
275  private String createHFileForFamilies(byte[] family) throws IOException {
276    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
277    // TODO We need a way to do this without creating files
278    File hFileLocation = testFolder.newFile();
279    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
280    try {
281      hFileFactory.withOutputStream(out);
282      hFileFactory.withFileContext(new HFileContext());
283      HFile.Writer writer = hFileFactory.create();
284      try {
285        writer.append(new KeyValue(CellUtil.createCell(randomBytes,
286            family,
287            randomBytes,
288            0L,
289            KeyValue.Type.Put.getCode(),
290            randomBytes)));
291      } finally {
292        writer.close();
293      }
294    } finally {
295      out.close();
296    }
297    return hFileLocation.getAbsoluteFile().getAbsolutePath();
298  }
299
300  private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
301    return new WalMatcher(typeBytes);
302  }
303
304  private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
305      byte[] familyName, List<String> storeFileNames) {
306    return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
307  }
308
309  private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
310    private final byte[] typeBytes;
311    private final byte[] tableName;
312    private final byte[] familyName;
313    private final List<String> storeFileNames;
314
315    public WalMatcher(byte[] typeBytes) {
316      this(typeBytes, null, null, null);
317    }
318
319    public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
320        List<String> storeFileNames) {
321      this.typeBytes = typeBytes;
322      this.tableName = tableName;
323      this.familyName = familyName;
324      this.storeFileNames = storeFileNames;
325    }
326
327    @Override
328    protected boolean matchesSafely(WALEdit item) {
329      assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
330      BulkLoadDescriptor desc;
331      try {
332        desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
333      } catch (IOException e) {
334        return false;
335      }
336      assertNotNull(desc);
337
338      if (tableName != null) {
339        assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
340          tableName));
341      }
342
343      if(storeFileNames != null) {
344        int index=0;
345        StoreDescriptor store = desc.getStores(0);
346        assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
347        assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
348        assertEquals(storeFileNames.size(), store.getStoreFileCount());
349      }
350
351      return true;
352    }
353
354    @Override
355    public void describeTo(Description description) {
356
357    }
358  }
359}