001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static java.util.Arrays.asList; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyBoolean; 026import static org.mockito.ArgumentMatchers.anyLong; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030import static org.mockito.hamcrest.MockitoHamcrest.argThat; 031 032import java.io.File; 033import java.io.FileNotFoundException; 034import java.io.FileOutputStream; 035import java.io.IOException; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.List; 039import java.util.Random; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.DoNotRetryIOException; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.HColumnDescriptor; 049import org.apache.hadoop.hbase.HRegionInfo; 050import org.apache.hadoop.hbase.HTableDescriptor; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.io.hfile.HFile; 054import org.apache.hadoop.hbase.io.hfile.HFileContext; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WALEdit; 060import org.apache.hadoop.hbase.wal.WALKeyImpl; 061import org.hamcrest.Description; 062import org.hamcrest.Matcher; 063import org.hamcrest.TypeSafeMatcher; 064import org.junit.Before; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.rules.TemporaryFolder; 070import org.junit.rules.TestName; 071import org.mockito.invocation.InvocationOnMock; 072import org.mockito.stubbing.Answer; 073 074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 077 078/** 079 * This class attempts to unit test bulk HLog loading. 080 */ 081@Category(SmallTests.class) 082public class TestBulkLoad { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestBulkLoad.class); 087 088 @ClassRule 089 public static TemporaryFolder testFolder = new TemporaryFolder(); 090 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 091 private final WAL log = mock(WAL.class); 092 private final Configuration conf = HBaseConfiguration.create(); 093 private final Random random = new Random(); 094 private final byte[] randomBytes = new byte[100]; 095 private final byte[] family1 = Bytes.toBytes("family1"); 096 private final byte[] family2 = Bytes.toBytes("family2"); 097 098 @Rule 099 public TestName name = new TestName(); 100 101 @Before 102 public void before() throws IOException { 103 random.nextBytes(randomBytes); 104 // Mockito.when(log.append(htd, info, key, edits, inMemstore)); 105 } 106 107 @Test 108 public void verifyBulkLoadEvent() throws IOException { 109 TableName tableName = TableName.valueOf("test", "test"); 110 List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1); 111 byte[] familyName = familyPaths.get(0).getFirst(); 112 String storeFileName = familyPaths.get(0).getSecond(); 113 storeFileName = (new Path(storeFileName)).getName(); 114 List<String> storeFileNames = new ArrayList<>(); 115 storeFileNames.add(storeFileName); 116 when(log.append(any(), any(), 117 argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), 118 familyName, storeFileNames)), 119 anyBoolean())).thenAnswer(new Answer() { 120 @Override 121 public Object answer(InvocationOnMock invocation) { 122 WALKeyImpl walKey = invocation.getArgument(1); 123 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 124 if (mvcc != null) { 125 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 126 walKey.setWriteEntry(we); 127 } 128 return 01L; 129 }; 130 }); 131 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1) 132 .bulkLoadHFiles(familyPaths, false, null); 133 verify(log).sync(anyLong()); 134 } 135 136 @Test 137 public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException { 138 testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<>(),false, null); 139 } 140 141 @Test 142 public void shouldBulkLoadSingleFamilyHLog() throws IOException { 143 when(log.append(any(), 144 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), 145 anyBoolean())).thenAnswer(new Answer() { 146 @Override 147 public Object answer(InvocationOnMock invocation) { 148 WALKeyImpl walKey = invocation.getArgument(1); 149 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 150 if (mvcc != null) { 151 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 152 walKey.setWriteEntry(we); 153 } 154 return 01L; 155 }; 156 }); 157 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); 158 verify(log).sync(anyLong()); 159 } 160 161 @Test 162 public void shouldBulkLoadManyFamilyHLog() throws IOException { 163 when(log.append(any(), 164 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), 165 anyBoolean())).thenAnswer(new Answer() { 166 @Override 167 public Object answer(InvocationOnMock invocation) { 168 WALKeyImpl walKey = invocation.getArgument(1); 169 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 170 if (mvcc != null) { 171 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 172 walKey.setWriteEntry(we); 173 } 174 return 01L; 175 }; 176 }); 177 testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2), 178 false, null); 179 verify(log).sync(anyLong()); 180 } 181 182 @Test 183 public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { 184 when(log.append(any(), 185 any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), 186 anyBoolean())).thenAnswer(new Answer() { 187 @Override 188 public Object answer(InvocationOnMock invocation) { 189 WALKeyImpl walKey = invocation.getArgument(1); 190 MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); 191 if (mvcc != null) { 192 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); 193 walKey.setWriteEntry(we); 194 } 195 return 01L; 196 }; 197 }); 198 TableName tableName = TableName.valueOf("test", "test"); 199 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) 200 .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); 201 verify(log).sync(anyLong()); 202 } 203 204 @Test(expected = DoNotRetryIOException.class) 205 public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException { 206 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, 207 null); 208 } 209 210 @Test(expected = DoNotRetryIOException.class) 211 public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor() 212 throws IOException { 213 testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null); 214 } 215 216 @Test(expected = DoNotRetryIOException.class) 217 public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException { 218 testRegionWithFamilies() 219 .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)), 220 false, null); 221 } 222 223 @Test(expected = FileNotFoundException.class) 224 public void shouldThrowErrorIfHFileDoesNotExist() throws IOException { 225 List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1)); 226 testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null); 227 } 228 229 private Pair<byte[], String> withMissingHFileForFamily(byte[] family) { 230 return new Pair<>(family, getNotExistFilePath()); 231 } 232 233 private String getNotExistFilePath() { 234 Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist"); 235 return path.toUri().getPath(); 236 } 237 238 private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family) 239 throws IOException { 240 createHFileForFamilies(family); 241 return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath()); 242 } 243 244 245 private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, 246 byte[]... families) 247 throws IOException { 248 HRegionInfo hRegionInfo = new HRegionInfo(tableName); 249 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); 250 for (byte[] family : families) { 251 hTableDescriptor.addFamily(new HColumnDescriptor(family)); 252 } 253 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 254 // TODO We need a way to do this without creating files 255 return HRegion.createHRegion(hRegionInfo, 256 new Path(testFolder.newFolder().toURI()), 257 conf, 258 hTableDescriptor, 259 log); 260 261 } 262 263 private HRegion testRegionWithFamilies(byte[]... families) throws IOException { 264 TableName tableName = TableName.valueOf(name.getMethodName()); 265 return testRegionWithFamiliesAndSpecifiedTableName(tableName, families); 266 } 267 268 private List<Pair<byte[], String>> getBlankFamilyPaths(){ 269 return new ArrayList<>(); 270 } 271 272 private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException { 273 List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths(); 274 for (byte[] family : families) { 275 familyPaths.add(new Pair<>(family, createHFileForFamilies(family))); 276 } 277 return familyPaths; 278 } 279 280 private String createHFileForFamilies(byte[] family) throws IOException { 281 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); 282 // TODO We need a way to do this without creating files 283 File hFileLocation = testFolder.newFile(); 284 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 285 try { 286 hFileFactory.withOutputStream(out); 287 hFileFactory.withFileContext(new HFileContext()); 288 HFile.Writer writer = hFileFactory.create(); 289 try { 290 writer.append(new KeyValue(CellUtil.createCell(randomBytes, 291 family, 292 randomBytes, 293 0L, 294 KeyValue.Type.Put.getCode(), 295 randomBytes))); 296 } finally { 297 writer.close(); 298 } 299 } finally { 300 out.close(); 301 } 302 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 303 } 304 305 private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) { 306 return new WalMatcher(typeBytes); 307 } 308 309 private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName, 310 byte[] familyName, List<String> storeFileNames) { 311 return new WalMatcher(typeBytes, tableName, familyName, storeFileNames); 312 } 313 314 private static class WalMatcher extends TypeSafeMatcher<WALEdit> { 315 private final byte[] typeBytes; 316 private final byte[] tableName; 317 private final byte[] familyName; 318 private final List<String> storeFileNames; 319 320 public WalMatcher(byte[] typeBytes) { 321 this(typeBytes, null, null, null); 322 } 323 324 public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName, 325 List<String> storeFileNames) { 326 this.typeBytes = typeBytes; 327 this.tableName = tableName; 328 this.familyName = familyName; 329 this.storeFileNames = storeFileNames; 330 } 331 332 @Override 333 protected boolean matchesSafely(WALEdit item) { 334 assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes)); 335 BulkLoadDescriptor desc; 336 try { 337 desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0)); 338 } catch (IOException e) { 339 return false; 340 } 341 assertNotNull(desc); 342 343 if (tableName != null) { 344 assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(), 345 tableName)); 346 } 347 348 if(storeFileNames != null) { 349 int index=0; 350 StoreDescriptor store = desc.getStores(0); 351 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); 352 assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); 353 assertEquals(storeFileNames.size(), store.getStoreFileCount()); 354 } 355 356 return true; 357 } 358 359 @Override 360 public void describeTo(Description description) { 361 362 } 363 } 364}