001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static java.util.Arrays.asList; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyLong; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.verify; 028import static org.mockito.Mockito.when; 029import static org.mockito.hamcrest.MockitoHamcrest.argThat; 030 031import java.io.File; 032import java.io.FileNotFoundException; 033import java.io.FileOutputStream; 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.List; 038import java.util.Random; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HColumnDescriptor; 048import org.apache.hadoop.hbase.HRegionInfo; 049import org.apache.hadoop.hbase.HTableDescriptor; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileContext; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.wal.WAL; 058import org.apache.hadoop.hbase.wal.WALEdit; 059import org.apache.hadoop.hbase.wal.WALKeyImpl; 060import org.hamcrest.Description; 061import org.hamcrest.Matcher; 062import org.hamcrest.TypeSafeMatcher; 063import org.junit.Before; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TemporaryFolder; 069import org.junit.rules.TestName; 070import org.mockito.invocation.InvocationOnMock; 071import org.mockito.stubbing.Answer; 072 073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 076 077/** 078 * This class attempts to unit test bulk HLog loading. 079 */ 080@Category(SmallTests.class) 081public class TestBulkLoad { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestBulkLoad.class); 086 087 @ClassRule 088 public static TemporaryFolder testFolder = new TemporaryFolder(); 089 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 090 private final WAL log = mock(WAL.class); 091 private final Configuration conf = HBaseConfiguration.create(); 092 private final Random random = new Random(); 093 private final byte[] randomBytes = new byte[100]; 094 private final byte[] family1 = Bytes.toBytes("family1"); 095 private final byte[] family2 = Bytes.toBytes("family2"); 096 097 @Rule 098 public TestName name = new TestName(); 099 100 @Before 101 public void before() throws IOException { 102 random.nextBytes(randomBytes); 103 // Mockito.when(log.append(htd, info, key, edits, inMemstore)); 104 } 105 106 @Test 107 public void verifyBulkLoadEvent() throws IOException { 108 TableName tableName = TableName.valueOf("test", "test"); 109 List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1); 110 byte[] familyName = familyPaths.get(0).getFirst(); 111 String storeFileName = familyPaths.get(0).getSecond(); 112 storeFileName = (new Path(storeFileName)).getName(); 113 List<String> storeFileNames = new ArrayList<>(); 114 storeFileNames.add(storeFileName); 115 when(log.appendMarker(any(), any(), argThat( 116 bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))). 117 thenAnswer(new Answer() { 118 @Override 119 public Object answer(InvocationOnMock invocation) { 120 WALKeyImpl walKey = invocation.getArgument(1); 121 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 122 if (mvcc != null) { 123 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 124 walKey.setWriteEntry(we); 125 } 126 return 01L; 127 }; 128 }); 129 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1) 130 .bulkLoadHFiles(familyPaths, false, null); 131 verify(log).sync(anyLong()); 132 } 133 134 @Test 135 public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException { 136 testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<>(),false, null); 137 } 138 139 @Test 140 public void shouldBulkLoadSingleFamilyHLog() throws IOException { 141 when(log.appendMarker(any(), 142 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { 143 @Override 144 public Object answer(InvocationOnMock invocation) { 145 WALKeyImpl walKey = invocation.getArgument(1); 146 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 147 if (mvcc != null) { 148 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 149 walKey.setWriteEntry(we); 150 } 151 return 01L; 152 }; 153 }); 154 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); 155 verify(log).sync(anyLong()); 156 } 157 158 @Test 159 public void shouldBulkLoadManyFamilyHLog() throws IOException { 160 when(log.appendMarker(any(), 161 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { 162 @Override 163 public Object answer(InvocationOnMock invocation) { 164 WALKeyImpl walKey = invocation.getArgument(1); 165 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 166 if (mvcc != null) { 167 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 168 walKey.setWriteEntry(we); 169 } 170 return 01L; 171 }; 172 }); 173 testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2), 174 false, null); 175 verify(log).sync(anyLong()); 176 } 177 178 @Test 179 public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { 180 when(log.appendMarker(any(), 181 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { 182 @Override 183 public Object answer(InvocationOnMock invocation) { 184 WALKeyImpl walKey = invocation.getArgument(1); 185 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 186 if (mvcc != null) { 187 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 188 walKey.setWriteEntry(we); 189 } 190 return 01L; 191 }; 192 }); 193 TableName tableName = TableName.valueOf("test", "test"); 194 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) 195 .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); 196 verify(log).sync(anyLong()); 197 } 198 199 @Test(expected = DoNotRetryIOException.class) 200 public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException { 201 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, 202 null); 203 } 204 205 @Test(expected = DoNotRetryIOException.class) 206 public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor() 207 throws IOException { 208 testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null); 209 } 210 211 @Test(expected = DoNotRetryIOException.class) 212 public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException { 213 testRegionWithFamilies() 214 .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)), 215 false, null); 216 } 217 218 @Test(expected = FileNotFoundException.class) 219 public void shouldThrowErrorIfHFileDoesNotExist() throws IOException { 220 List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1)); 221 testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null); 222 } 223 224 private Pair<byte[], String> withMissingHFileForFamily(byte[] family) { 225 return new Pair<>(family, getNotExistFilePath()); 226 } 227 228 private String getNotExistFilePath() { 229 Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist"); 230 return path.toUri().getPath(); 231 } 232 233 private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family) 234 throws IOException { 235 createHFileForFamilies(family); 236 return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath()); 237 } 238 239 240 private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, 241 byte[]... families) 242 throws IOException { 243 HRegionInfo hRegionInfo = new HRegionInfo(tableName); 244 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); 245 for (byte[] family : families) { 246 hTableDescriptor.addFamily(new HColumnDescriptor(family)); 247 } 248 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 249 // TODO We need a way to do this without creating files 250 return HRegion.createHRegion(hRegionInfo, 251 new Path(testFolder.newFolder().toURI()), 252 conf, 253 hTableDescriptor, 254 log); 255 256 } 257 258 private HRegion testRegionWithFamilies(byte[]... families) throws IOException { 259 TableName tableName = TableName.valueOf(name.getMethodName()); 260 return testRegionWithFamiliesAndSpecifiedTableName(tableName, families); 261 } 262 263 private List<Pair<byte[], String>> getBlankFamilyPaths(){ 264 return new ArrayList<>(); 265 } 266 267 private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException { 268 List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths(); 269 for (byte[] family : families) { 270 familyPaths.add(new Pair<>(family, createHFileForFamilies(family))); 271 } 272 return familyPaths; 273 } 274 275 private String createHFileForFamilies(byte[] family) throws IOException { 276 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); 277 // TODO We need a way to do this without creating files 278 File hFileLocation = testFolder.newFile(); 279 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 280 try { 281 hFileFactory.withOutputStream(out); 282 hFileFactory.withFileContext(new HFileContext()); 283 HFile.Writer writer = hFileFactory.create(); 284 try { 285 writer.append(new KeyValue(CellUtil.createCell(randomBytes, 286 family, 287 randomBytes, 288 0L, 289 KeyValue.Type.Put.getCode(), 290 randomBytes))); 291 } finally { 292 writer.close(); 293 } 294 } finally { 295 out.close(); 296 } 297 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 298 } 299 300 private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) { 301 return new WalMatcher(typeBytes); 302 } 303 304 private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName, 305 byte[] familyName, List<String> storeFileNames) { 306 return new WalMatcher(typeBytes, tableName, familyName, storeFileNames); 307 } 308 309 private static class WalMatcher extends TypeSafeMatcher<WALEdit> { 310 private final byte[] typeBytes; 311 private final byte[] tableName; 312 private final byte[] familyName; 313 private final List<String> storeFileNames; 314 315 public WalMatcher(byte[] typeBytes) { 316 this(typeBytes, null, null, null); 317 } 318 319 public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName, 320 List<String> storeFileNames) { 321 this.typeBytes = typeBytes; 322 this.tableName = tableName; 323 this.familyName = familyName; 324 this.storeFileNames = storeFileNames; 325 } 326 327 @Override 328 protected boolean matchesSafely(WALEdit item) { 329 assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes)); 330 BulkLoadDescriptor desc; 331 try { 332 desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0)); 333 } catch (IOException e) { 334 return false; 335 } 336 assertNotNull(desc); 337 338 if (tableName != null) { 339 assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(), 340 tableName)); 341 } 342 343 if(storeFileNames != null) { 344 int index=0; 345 StoreDescriptor store = desc.getStores(0); 346 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); 347 assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); 348 assertEquals(storeFileNames.size(), store.getStoreFileCount()); 349 } 350 351 return true; 352 } 353 354 @Override 355 public void describeTo(Description description) { 356 357 } 358 } 359}