001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.Is.is; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.TreeMap; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.stream.Stream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.KeyValueUtil; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 047import org.apache.hadoop.hbase.StartTestingClusterOption; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionLocator; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 063import org.apache.hadoop.hbase.coprocessor.RegionObserver; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.io.hfile.HFile; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 073import org.apache.hadoop.hbase.testclassification.LargeTests; 074import org.apache.hadoop.hbase.testclassification.RegionServerTests; 075import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.wal.WAL; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALKey; 081import org.junit.jupiter.api.BeforeAll; 082import org.junit.jupiter.api.Tag; 083import org.junit.jupiter.api.TestTemplate; 084import org.junit.jupiter.params.provider.Arguments; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 089 090/** 091 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's 092 * bullkLoad functionality. 093 */ 094@Tag(RegionServerTests.TAG) 095@Tag(LargeTests.TAG) 096@HBaseParameterizedTestTemplate(name = "{index}: duration={0}") 097public class TestHRegionServerBulkLoad { 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 100 protected static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 101 protected final static Configuration conf = UTIL.getConfiguration(); 102 protected final static byte[] QUAL = Bytes.toBytes("qual"); 103 protected final static int NUM_CFS = 10; 104 private int sleepDuration; 105 public static int BLOCKSIZE = 64 * 1024; 106 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 107 108 protected final static byte[][] families = new byte[NUM_CFS][]; 109 static { 110 for (int i = 0; i < NUM_CFS; i++) { 111 families[i] = Bytes.toBytes(family(i)); 112 } 113 } 114 115 public static final Stream<Arguments> parameters() { 116 int[] sleepDurations = new int[] { 0, 30000 }; 117 List<Arguments> configurations = new ArrayList<>(); 118 for (int i : sleepDurations) { 119 configurations.add(Arguments.of(i)); 120 } 121 return configurations.stream(); 122 } 123 124 public TestHRegionServerBulkLoad(int duration) { 125 this.sleepDuration = duration; 126 } 127 128 @BeforeAll 129 public static void setUpBeforeClass() throws Exception { 130 conf.setInt("hbase.rpc.timeout", 10 * 1000); 131 } 132 133 /** 134 * Create a rowkey compatible with 135 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 136 */ 137 public static byte[] rowkey(int i) { 138 return Bytes.toBytes(String.format("row_%08d", i)); 139 } 140 141 static String family(int i) { 142 return String.format("family_%04d", i); 143 } 144 145 /** 146 * Create an HFile with the given number of rows with a specified value. 147 */ 148 public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, 149 byte[] value, int numRows) throws IOException { 150 HFileContext context = 151 new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build(); 152 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 153 .withFileContext(context).create(); 154 long now = EnvironmentEdgeManager.currentTime(); 155 try { 156 // subtract 2 since iterateOnSplits doesn't include boundary keys 157 for (int i = 0; i < numRows; i++) { 158 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 159 writer.append(kv); 160 } 161 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 162 } finally { 163 writer.close(); 164 } 165 } 166 167 /** 168 * Thread that does full scans of the table looking for any partially completed rows. Each 169 * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10 170 * iterations (500 file handles) it does a region compaction to reduce the number of open file 171 * handles. 172 */ 173 public static class AtomicHFileLoader extends RepeatingTestThread { 174 final AtomicLong numBulkLoads = new AtomicLong(); 175 final AtomicLong numCompactions = new AtomicLong(); 176 private TableName tableName; 177 178 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 179 throws IOException { 180 super(ctx); 181 this.tableName = tableName; 182 } 183 184 @Override 185 public void doAnAction() throws Exception { 186 long iteration = numBulkLoads.getAndIncrement(); 187 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 188 189 // create HFiles for different column families 190 FileSystem fs = UTIL.getTestFileSystem(); 191 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 192 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR); 193 for (int i = 0; i < NUM_CFS; i++) { 194 Path hfile = new Path(dir, family(i)); 195 byte[] fam = Bytes.toBytes(family(i)); 196 createHFile(fs, hfile, fam, QUAL, val, 1000); 197 family2Files.put(fam, Collections.singletonList(hfile)); 198 } 199 // bulk load HFiles 200 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files); 201 final Connection conn = UTIL.getConnection(); 202 // Periodically do compaction to reduce the number of open file handles. 203 if (numBulkLoads.get() % 5 == 0) { 204 // 5 * 50 = 250 open file handles! 205 try (RegionLocator locator = conn.getRegionLocator(tableName)) { 206 HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true); 207 conn.getAdmin().compactRegion(loc.getRegion().getRegionName()); 208 numCompactions.incrementAndGet(); 209 } 210 } 211 } 212 } 213 214 public static class MyObserver implements RegionCoprocessor, RegionObserver { 215 static int sleepDuration; 216 217 @Override 218 public Optional<RegionObserver> getRegionObserver() { 219 return Optional.of(this); 220 } 221 222 @Override 223 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, 224 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 225 CompactionRequest request) throws IOException { 226 try { 227 Thread.sleep(sleepDuration); 228 } catch (InterruptedException ie) { 229 IOException ioe = new InterruptedIOException(); 230 ioe.initCause(ie); 231 throw ioe; 232 } 233 return scanner; 234 } 235 } 236 237 /** 238 * Thread that does full scans of the table looking for any partially completed rows. 239 */ 240 public static class AtomicScanReader extends RepeatingTestThread { 241 byte targetFamilies[][]; 242 Table table; 243 AtomicLong numScans = new AtomicLong(); 244 AtomicLong numRowsScanned = new AtomicLong(); 245 TableName TABLE_NAME; 246 247 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][]) 248 throws IOException { 249 super(ctx); 250 this.TABLE_NAME = TABLE_NAME; 251 this.targetFamilies = targetFamilies; 252 table = UTIL.getConnection().getTable(TABLE_NAME); 253 } 254 255 @Override 256 public void doAnAction() throws Exception { 257 Scan s = new Scan(); 258 for (byte[] family : targetFamilies) { 259 s.addFamily(family); 260 } 261 ResultScanner scanner = table.getScanner(s); 262 263 for (Result res : scanner) { 264 byte[] lastRow = null, lastFam = null, lastQual = null; 265 byte[] gotValue = null; 266 for (byte[] family : targetFamilies) { 267 byte qualifier[] = QUAL; 268 byte thisValue[] = res.getValue(family, qualifier); 269 if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) { 270 271 StringBuilder msg = new StringBuilder(); 272 msg.append("Failed on scan ").append(numScans).append(" after scanning ") 273 .append(numRowsScanned).append(" rows!\n"); 274 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) 275 + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n"); 276 msg.append("Previous was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) 277 + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue)); 278 throw new RuntimeException(msg.toString()); 279 } 280 281 lastFam = family; 282 lastQual = qualifier; 283 lastRow = res.getRow(); 284 gotValue = thisValue; 285 } 286 numRowsScanned.getAndIncrement(); 287 } 288 numScans.getAndIncrement(); 289 } 290 } 291 292 /** 293 * Creates a table with given table name and specified number of column families if the table does 294 * not already exist. 295 */ 296 public void setupTable(TableName table, int cfs) throws IOException { 297 try { 298 LOG.info("Creating table " + table); 299 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table); 300 301 tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName()); 302 MyObserver.sleepDuration = this.sleepDuration; 303 for (int i = 0; i < 10; i++) { 304 ColumnFamilyDescriptor columnFamilyDescriptor = 305 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build(); 306 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 307 } 308 309 UTIL.getAdmin().createTable(tableDescriptorBuilder.build()); 310 } catch (TableExistsException tee) { 311 LOG.info("Table " + table + " already exists"); 312 } 313 } 314 315 /** 316 * Atomic bulk load. 317 */ 318 @TestTemplate 319 public void testAtomicBulkLoad() throws Exception { 320 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 321 322 int millisToRun = 30000; 323 int numScanners = 50; 324 325 // Set createWALDir to true and use default values for other options. 326 UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build()); 327 try { 328 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 329 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 330 log.registerWALActionsListener(listener); 331 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 332 assertThat(listener.isFound(), is(true)); 333 } finally { 334 UTIL.shutdownMiniCluster(); 335 } 336 } 337 338 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 339 throws Exception { 340 setupTable(tableName, 10); 341 342 TestContext ctx = new TestContext(UTIL.getConfiguration()); 343 344 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 345 ctx.addThread(loader); 346 347 List<AtomicScanReader> scanners = Lists.newArrayList(); 348 for (int i = 0; i < numScanners; i++) { 349 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 350 scanners.add(scanner); 351 ctx.addThread(scanner); 352 } 353 354 ctx.startThreads(); 355 ctx.waitFor(millisToRun); 356 ctx.stop(); 357 358 LOG.info("Loaders:"); 359 LOG.info(" loaded " + loader.numBulkLoads.get()); 360 LOG.info(" compations " + loader.numCompactions.get()); 361 362 LOG.info("Scanners:"); 363 for (AtomicScanReader scanner : scanners) { 364 LOG.info(" scanned " + scanner.numScans.get()); 365 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 366 } 367 } 368 369 /** 370 * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a 371 * single region. 372 */ 373 public static void main(String args[]) throws Exception { 374 try { 375 Configuration c = HBaseConfiguration.create(); 376 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 377 test.setConf(c); 378 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 379 } finally { 380 System.exit(0); // something hangs (believe it is lru threadpool) 381 } 382 } 383 384 private void setConf(Configuration c) { 385 UTIL = new HBaseTestingUtil(c); 386 } 387 388 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 389 private boolean found = false; 390 391 @Override 392 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 393 for (Cell cell : logEdit.getCells()) { 394 KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell); 395 for (Map.Entry<String, Object> entry : kv.toStringMap().entrySet()) { 396 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 397 found = true; 398 } 399 } 400 } 401 } 402 403 public boolean isFound() { 404 return found; 405 } 406 } 407}