001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.Is.is; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.TreeMap; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 046import org.apache.hadoop.hbase.StartTestingClusterOption; 047import org.apache.hadoop.hbase.TableExistsException; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionLocator; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.ResultScanner; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.io.compress.Compression; 064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 065import org.apache.hadoop.hbase.io.hfile.CacheConfig; 066import org.apache.hadoop.hbase.io.hfile.HFile; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 071import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 072import org.apache.hadoop.hbase.testclassification.LargeTests; 073import org.apache.hadoop.hbase.testclassification.RegionServerTests; 074import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.wal.WAL; 078import org.apache.hadoop.hbase.wal.WALEdit; 079import org.apache.hadoop.hbase.wal.WALKey; 080import org.junit.BeforeClass; 081import org.junit.ClassRule; 082import org.junit.Test; 083import org.junit.experimental.categories.Category; 084import org.junit.runner.RunWith; 085import org.junit.runners.Parameterized; 086import org.junit.runners.Parameterized.Parameters; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 091 092/** 093 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's 094 * bullkLoad functionality. 095 */ 096@RunWith(Parameterized.class) 097@Category({ RegionServerTests.class, LargeTests.class }) 098public class TestHRegionServerBulkLoad { 099 100 @ClassRule 101 public static final HBaseClassTestRule CLASS_RULE = 102 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 103 104 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 105 protected static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 106 protected final static Configuration conf = UTIL.getConfiguration(); 107 protected final static byte[] QUAL = Bytes.toBytes("qual"); 108 protected final static int NUM_CFS = 10; 109 private int sleepDuration; 110 public static int BLOCKSIZE = 64 * 1024; 111 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 112 113 protected final static byte[][] families = new byte[NUM_CFS][]; 114 static { 115 for (int i = 0; i < NUM_CFS; i++) { 116 families[i] = Bytes.toBytes(family(i)); 117 } 118 } 119 120 @Parameters 121 public static final Collection<Object[]> parameters() { 122 int[] sleepDurations = new int[] { 0, 30000 }; 123 List<Object[]> configurations = new ArrayList<>(); 124 for (int i : sleepDurations) { 125 configurations.add(new Object[] { i }); 126 } 127 return configurations; 128 } 129 130 public TestHRegionServerBulkLoad(int duration) { 131 this.sleepDuration = duration; 132 } 133 134 @BeforeClass 135 public static void setUpBeforeClass() throws Exception { 136 conf.setInt("hbase.rpc.timeout", 10 * 1000); 137 } 138 139 /** 140 * Create a rowkey compatible with 141 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 142 */ 143 public static byte[] rowkey(int i) { 144 return Bytes.toBytes(String.format("row_%08d", i)); 145 } 146 147 static String family(int i) { 148 return String.format("family_%04d", i); 149 } 150 151 /** 152 * Create an HFile with the given number of rows with a specified value. 153 */ 154 public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, 155 byte[] value, int numRows) throws IOException { 156 HFileContext context = 157 new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build(); 158 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 159 .withFileContext(context).create(); 160 long now = EnvironmentEdgeManager.currentTime(); 161 try { 162 // subtract 2 since iterateOnSplits doesn't include boundary keys 163 for (int i = 0; i < numRows; i++) { 164 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 165 writer.append(kv); 166 } 167 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 168 } finally { 169 writer.close(); 170 } 171 } 172 173 /** 174 * Thread that does full scans of the table looking for any partially completed rows. Each 175 * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10 176 * iterations (500 file handles) it does a region compaction to reduce the number of open file 177 * handles. 178 */ 179 public static class AtomicHFileLoader extends RepeatingTestThread { 180 final AtomicLong numBulkLoads = new AtomicLong(); 181 final AtomicLong numCompactions = new AtomicLong(); 182 private TableName tableName; 183 184 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 185 throws IOException { 186 super(ctx); 187 this.tableName = tableName; 188 } 189 190 @Override 191 public void doAnAction() throws Exception { 192 long iteration = numBulkLoads.getAndIncrement(); 193 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 194 195 // create HFiles for different column families 196 FileSystem fs = UTIL.getTestFileSystem(); 197 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 198 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR); 199 for (int i = 0; i < NUM_CFS; i++) { 200 Path hfile = new Path(dir, family(i)); 201 byte[] fam = Bytes.toBytes(family(i)); 202 createHFile(fs, hfile, fam, QUAL, val, 1000); 203 family2Files.put(fam, Collections.singletonList(hfile)); 204 } 205 // bulk load HFiles 206 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files); 207 final Connection conn = UTIL.getConnection(); 208 // Periodically do compaction to reduce the number of open file handles. 209 if (numBulkLoads.get() % 5 == 0) { 210 // 5 * 50 = 250 open file handles! 211 try (RegionLocator locator = conn.getRegionLocator(tableName)) { 212 HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true); 213 conn.getAdmin().compactRegion(loc.getRegion().getRegionName()); 214 numCompactions.incrementAndGet(); 215 } 216 } 217 } 218 } 219 220 public static class MyObserver implements RegionCoprocessor, RegionObserver { 221 static int sleepDuration; 222 223 @Override 224 public Optional<RegionObserver> getRegionObserver() { 225 return Optional.of(this); 226 } 227 228 @Override 229 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 230 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 231 CompactionRequest request) throws IOException { 232 try { 233 Thread.sleep(sleepDuration); 234 } catch (InterruptedException ie) { 235 IOException ioe = new InterruptedIOException(); 236 ioe.initCause(ie); 237 throw ioe; 238 } 239 return scanner; 240 } 241 } 242 243 /** 244 * Thread that does full scans of the table looking for any partially completed rows. 245 */ 246 public static class AtomicScanReader extends RepeatingTestThread { 247 byte targetFamilies[][]; 248 Table table; 249 AtomicLong numScans = new AtomicLong(); 250 AtomicLong numRowsScanned = new AtomicLong(); 251 TableName TABLE_NAME; 252 253 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][]) 254 throws IOException { 255 super(ctx); 256 this.TABLE_NAME = TABLE_NAME; 257 this.targetFamilies = targetFamilies; 258 table = UTIL.getConnection().getTable(TABLE_NAME); 259 } 260 261 @Override 262 public void doAnAction() throws Exception { 263 Scan s = new Scan(); 264 for (byte[] family : targetFamilies) { 265 s.addFamily(family); 266 } 267 ResultScanner scanner = table.getScanner(s); 268 269 for (Result res : scanner) { 270 byte[] lastRow = null, lastFam = null, lastQual = null; 271 byte[] gotValue = null; 272 for (byte[] family : targetFamilies) { 273 byte qualifier[] = QUAL; 274 byte thisValue[] = res.getValue(family, qualifier); 275 if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) { 276 277 StringBuilder msg = new StringBuilder(); 278 msg.append("Failed on scan ").append(numScans).append(" after scanning ") 279 .append(numRowsScanned).append(" rows!\n"); 280 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) 281 + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n"); 282 msg.append("Previous was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) 283 + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue)); 284 throw new RuntimeException(msg.toString()); 285 } 286 287 lastFam = family; 288 lastQual = qualifier; 289 lastRow = res.getRow(); 290 gotValue = thisValue; 291 } 292 numRowsScanned.getAndIncrement(); 293 } 294 numScans.getAndIncrement(); 295 } 296 } 297 298 /** 299 * Creates a table with given table name and specified number of column families if the table does 300 * not already exist. 301 */ 302 public void setupTable(TableName table, int cfs) throws IOException { 303 try { 304 LOG.info("Creating table " + table); 305 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table); 306 307 tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName()); 308 MyObserver.sleepDuration = this.sleepDuration; 309 for (int i = 0; i < 10; i++) { 310 ColumnFamilyDescriptor columnFamilyDescriptor = 311 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build(); 312 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 313 } 314 315 UTIL.getAdmin().createTable(tableDescriptorBuilder.build()); 316 } catch (TableExistsException tee) { 317 LOG.info("Table " + table + " already exists"); 318 } 319 } 320 321 /** 322 * Atomic bulk load. 323 */ 324 @Test 325 public void testAtomicBulkLoad() throws Exception { 326 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 327 328 int millisToRun = 30000; 329 int numScanners = 50; 330 331 // Set createWALDir to true and use default values for other options. 332 UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build()); 333 try { 334 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 335 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 336 log.registerWALActionsListener(listener); 337 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 338 assertThat(listener.isFound(), is(true)); 339 } finally { 340 UTIL.shutdownMiniCluster(); 341 } 342 } 343 344 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 345 throws Exception { 346 setupTable(tableName, 10); 347 348 TestContext ctx = new TestContext(UTIL.getConfiguration()); 349 350 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 351 ctx.addThread(loader); 352 353 List<AtomicScanReader> scanners = Lists.newArrayList(); 354 for (int i = 0; i < numScanners; i++) { 355 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 356 scanners.add(scanner); 357 ctx.addThread(scanner); 358 } 359 360 ctx.startThreads(); 361 ctx.waitFor(millisToRun); 362 ctx.stop(); 363 364 LOG.info("Loaders:"); 365 LOG.info(" loaded " + loader.numBulkLoads.get()); 366 LOG.info(" compations " + loader.numCompactions.get()); 367 368 LOG.info("Scanners:"); 369 for (AtomicScanReader scanner : scanners) { 370 LOG.info(" scanned " + scanner.numScans.get()); 371 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 372 } 373 } 374 375 /** 376 * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a 377 * single region. 378 */ 379 public static void main(String args[]) throws Exception { 380 try { 381 Configuration c = HBaseConfiguration.create(); 382 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 383 test.setConf(c); 384 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 385 } finally { 386 System.exit(0); // something hangs (believe it is lru threadpool) 387 } 388 } 389 390 private void setConf(Configuration c) { 391 UTIL = new HBaseTestingUtil(c); 392 } 393 394 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 395 private boolean found = false; 396 397 @Override 398 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 399 for (Cell cell : logEdit.getCells()) { 400 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 401 for (Map.Entry entry : kv.toStringMap().entrySet()) { 402 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 403 found = true; 404 } 405 } 406 } 407 } 408 409 public boolean isFound() { 410 return found; 411 } 412 } 413}