001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.core.Is.is; 022import static org.junit.Assert.assertThat; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HColumnDescriptor; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 046import org.apache.hadoop.hbase.StartMiniClusterOption; 047import org.apache.hadoop.hbase.TableExistsException; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ClientServiceCallable; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.RpcRetryingCaller; 054import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.coprocessor.ObserverContext; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 061import org.apache.hadoop.hbase.coprocessor.RegionObserver; 062import org.apache.hadoop.hbase.io.compress.Compression; 063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 064import org.apache.hadoop.hbase.io.hfile.CacheConfig; 065import org.apache.hadoop.hbase.io.hfile.HFile; 066import org.apache.hadoop.hbase.io.hfile.HFileContext; 067import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 068import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 071import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 072import org.apache.hadoop.hbase.testclassification.LargeTests; 073import org.apache.hadoop.hbase.testclassification.RegionServerTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.wal.WAL; 077import org.apache.hadoop.hbase.wal.WALEdit; 078import org.apache.hadoop.hbase.wal.WALKey; 079import org.junit.BeforeClass; 080import org.junit.ClassRule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.runner.RunWith; 084import org.junit.runners.Parameterized; 085import org.junit.runners.Parameterized.Parameters; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 090 091import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 094 095/** 096 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of 097 * the region server's bullkLoad functionality. 098 */ 099@RunWith(Parameterized.class) 100@Category({RegionServerTests.class, LargeTests.class}) 101public class TestHRegionServerBulkLoad { 102 103 @ClassRule 104 public static final HBaseClassTestRule CLASS_RULE = 105 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 106 107 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 108 protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 109 protected final static Configuration conf = UTIL.getConfiguration(); 110 protected final static byte[] QUAL = Bytes.toBytes("qual"); 111 protected final static int NUM_CFS = 10; 112 private int sleepDuration; 113 public static int BLOCKSIZE = 64 * 1024; 114 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 115 116 protected final static byte[][] families = new byte[NUM_CFS][]; 117 static { 118 for (int i = 0; i < NUM_CFS; i++) { 119 families[i] = Bytes.toBytes(family(i)); 120 } 121 } 122 @Parameters 123 public static final Collection<Object[]> parameters() { 124 int[] sleepDurations = new int[] { 0, 30000 }; 125 List<Object[]> configurations = new ArrayList<>(); 126 for (int i : sleepDurations) { 127 configurations.add(new Object[] { i }); 128 } 129 return configurations; 130 } 131 132 public TestHRegionServerBulkLoad(int duration) { 133 this.sleepDuration = duration; 134 } 135 136 @BeforeClass 137 public static void setUpBeforeClass() throws Exception { 138 conf.setInt("hbase.rpc.timeout", 10 * 1000); 139 } 140 141 /** 142 * Create a rowkey compatible with 143 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 144 */ 145 public static byte[] rowkey(int i) { 146 return Bytes.toBytes(String.format("row_%08d", i)); 147 } 148 149 static String family(int i) { 150 return String.format("family_%04d", i); 151 } 152 153 /** 154 * Create an HFile with the given number of rows with a specified value. 155 */ 156 public static void createHFile(FileSystem fs, Path path, byte[] family, 157 byte[] qualifier, byte[] value, int numRows) throws IOException { 158 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE) 159 .withCompression(COMPRESSION) 160 .build(); 161 HFile.Writer writer = HFile 162 .getWriterFactory(conf, new CacheConfig(conf)) 163 .withPath(fs, path) 164 .withFileContext(context) 165 .create(); 166 long now = System.currentTimeMillis(); 167 try { 168 // subtract 2 since iterateOnSplits doesn't include boundary keys 169 for (int i = 0; i < numRows; i++) { 170 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 171 writer.append(kv); 172 } 173 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 174 } finally { 175 writer.close(); 176 } 177 } 178 179 /** 180 * Thread that does full scans of the table looking for any partially 181 * completed rows. 182 * 183 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file 184 * handles. So every 10 iterations (500 file handles) it does a region 185 * compaction to reduce the number of open file handles. 186 */ 187 public static class AtomicHFileLoader extends RepeatingTestThread { 188 final AtomicLong numBulkLoads = new AtomicLong(); 189 final AtomicLong numCompactions = new AtomicLong(); 190 private TableName tableName; 191 192 public AtomicHFileLoader(TableName tableName, TestContext ctx, 193 byte targetFamilies[][]) throws IOException { 194 super(ctx); 195 this.tableName = tableName; 196 } 197 198 @Override 199 public void doAnAction() throws Exception { 200 long iteration = numBulkLoads.getAndIncrement(); 201 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", 202 iteration)); 203 204 // create HFiles for different column families 205 FileSystem fs = UTIL.getTestFileSystem(); 206 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 207 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 208 for (int i = 0; i < NUM_CFS; i++) { 209 Path hfile = new Path(dir, family(i)); 210 byte[] fam = Bytes.toBytes(family(i)); 211 createHFile(fs, hfile, fam, QUAL, val, 1000); 212 famPaths.add(new Pair<>(fam, hfile.toString())); 213 } 214 215 // bulk load HFiles 216 final ClusterConnection conn = (ClusterConnection)UTIL.getConnection(); 217 Table table = conn.getTable(tableName); 218 final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table). 219 prepareBulkLoad(conn); 220 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, 221 tableName, Bytes.toBytes("aaa"), 222 new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 223 @Override 224 public Void rpcCall() throws Exception { 225 LOG.debug("Going to connect to server " + getLocation() + " for row " 226 + Bytes.toStringBinary(getRow())); 227 SecureBulkLoadClient secureClient = null; 228 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 229 try (Table table = conn.getTable(getTableName())) { 230 secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); 231 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 232 true, null, bulkToken); 233 } 234 return null; 235 } 236 }; 237 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 238 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 239 caller.callWithRetries(callable, Integer.MAX_VALUE); 240 241 // Periodically do compaction to reduce the number of open file handles. 242 if (numBulkLoads.get() % 5 == 0) { 243 // 5 * 50 = 250 open file handles! 244 callable = new ClientServiceCallable<Void>(conn, 245 tableName, Bytes.toBytes("aaa"), 246 new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 247 @Override 248 protected Void rpcCall() throws Exception { 249 LOG.debug("compacting " + getLocation() + " for row " 250 + Bytes.toStringBinary(getRow())); 251 AdminProtos.AdminService.BlockingInterface server = 252 conn.getAdmin(getLocation().getServerName()); 253 CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( 254 getLocation().getRegionInfo().getRegionName(), true, null); 255 server.compactRegion(null, request); 256 numCompactions.incrementAndGet(); 257 return null; 258 } 259 }; 260 caller.callWithRetries(callable, Integer.MAX_VALUE); 261 } 262 } 263 } 264 265 public static class MyObserver implements RegionCoprocessor, RegionObserver { 266 static int sleepDuration; 267 268 @Override 269 public Optional<RegionObserver> getRegionObserver() { 270 return Optional.of(this); 271 } 272 273 @Override 274 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 275 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 276 CompactionRequest request) 277 throws IOException { 278 try { 279 Thread.sleep(sleepDuration); 280 } catch (InterruptedException ie) { 281 IOException ioe = new InterruptedIOException(); 282 ioe.initCause(ie); 283 throw ioe; 284 } 285 return scanner; 286 } 287 } 288 289 /** 290 * Thread that does full scans of the table looking for any partially 291 * completed rows. 292 */ 293 public static class AtomicScanReader extends RepeatingTestThread { 294 byte targetFamilies[][]; 295 Table table; 296 AtomicLong numScans = new AtomicLong(); 297 AtomicLong numRowsScanned = new AtomicLong(); 298 TableName TABLE_NAME; 299 300 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, 301 byte targetFamilies[][]) throws IOException { 302 super(ctx); 303 this.TABLE_NAME = TABLE_NAME; 304 this.targetFamilies = targetFamilies; 305 table = UTIL.getConnection().getTable(TABLE_NAME); 306 } 307 308 @Override 309 public void doAnAction() throws Exception { 310 Scan s = new Scan(); 311 for (byte[] family : targetFamilies) { 312 s.addFamily(family); 313 } 314 ResultScanner scanner = table.getScanner(s); 315 316 for (Result res : scanner) { 317 byte[] lastRow = null, lastFam = null, lastQual = null; 318 byte[] gotValue = null; 319 for (byte[] family : targetFamilies) { 320 byte qualifier[] = QUAL; 321 byte thisValue[] = res.getValue(family, qualifier); 322 if (gotValue != null && thisValue != null 323 && !Bytes.equals(gotValue, thisValue)) { 324 325 StringBuilder msg = new StringBuilder(); 326 msg.append("Failed on scan ").append(numScans) 327 .append(" after scanning ").append(numRowsScanned) 328 .append(" rows!\n"); 329 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" 330 + Bytes.toString(family) + ":" + Bytes.toString(qualifier) 331 + " = " + Bytes.toString(thisValue) + "\n"); 332 msg.append("Previous was " + Bytes.toString(lastRow) + "/" 333 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) 334 + " = " + Bytes.toString(gotValue)); 335 throw new RuntimeException(msg.toString()); 336 } 337 338 lastFam = family; 339 lastQual = qualifier; 340 lastRow = res.getRow(); 341 gotValue = thisValue; 342 } 343 numRowsScanned.getAndIncrement(); 344 } 345 numScans.getAndIncrement(); 346 } 347 } 348 349 /** 350 * Creates a table with given table name and specified number of column 351 * families if the table does not already exist. 352 */ 353 public void setupTable(TableName table, int cfs) throws IOException { 354 try { 355 LOG.info("Creating table " + table); 356 HTableDescriptor htd = new HTableDescriptor(table); 357 htd.addCoprocessor(MyObserver.class.getName()); 358 MyObserver.sleepDuration = this.sleepDuration; 359 for (int i = 0; i < 10; i++) { 360 htd.addFamily(new HColumnDescriptor(family(i))); 361 } 362 363 UTIL.getAdmin().createTable(htd); 364 } catch (TableExistsException tee) { 365 LOG.info("Table " + table + " already exists"); 366 } 367 } 368 369 /** 370 * Atomic bulk load. 371 */ 372 @Test 373 public void testAtomicBulkLoad() throws Exception { 374 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 375 376 int millisToRun = 30000; 377 int numScanners = 50; 378 379 // Set createWALDir to true and use default values for other options. 380 UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build()); 381 try { 382 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 383 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 384 log.registerWALActionsListener(listener); 385 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 386 assertThat(listener.isFound(), is(true)); 387 } finally { 388 UTIL.shutdownMiniCluster(); 389 } 390 } 391 392 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 393 throws Exception { 394 setupTable(tableName, 10); 395 396 TestContext ctx = new TestContext(UTIL.getConfiguration()); 397 398 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 399 ctx.addThread(loader); 400 401 List<AtomicScanReader> scanners = Lists.newArrayList(); 402 for (int i = 0; i < numScanners; i++) { 403 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 404 scanners.add(scanner); 405 ctx.addThread(scanner); 406 } 407 408 ctx.startThreads(); 409 ctx.waitFor(millisToRun); 410 ctx.stop(); 411 412 LOG.info("Loaders:"); 413 LOG.info(" loaded " + loader.numBulkLoads.get()); 414 LOG.info(" compations " + loader.numCompactions.get()); 415 416 LOG.info("Scanners:"); 417 for (AtomicScanReader scanner : scanners) { 418 LOG.info(" scanned " + scanner.numScans.get()); 419 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 420 } 421 } 422 423 /** 424 * Run test on an HBase instance for 5 minutes. This assumes that the table 425 * under test only has a single region. 426 */ 427 public static void main(String args[]) throws Exception { 428 try { 429 Configuration c = HBaseConfiguration.create(); 430 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 431 test.setConf(c); 432 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 433 } finally { 434 System.exit(0); // something hangs (believe it is lru threadpool) 435 } 436 } 437 438 private void setConf(Configuration c) { 439 UTIL = new HBaseTestingUtility(c); 440 } 441 442 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 443 private boolean found = false; 444 445 @Override 446 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 447 for (Cell cell : logEdit.getCells()) { 448 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 449 for (Map.Entry entry : kv.toStringMap().entrySet()) { 450 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 451 found = true; 452 } 453 } 454 } 455 } 456 457 public boolean isFound() { 458 return found; 459 } 460 } 461} 462 463