001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.core.Is.is; 022import static org.junit.Assert.assertThat; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HColumnDescriptor; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 046import org.apache.hadoop.hbase.TableExistsException; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.ClientServiceCallable; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.ResultScanner; 052import org.apache.hadoop.hbase.client.RpcRetryingCaller; 053import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.coprocessor.ObserverContext; 058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 060import org.apache.hadoop.hbase.coprocessor.RegionObserver; 061import org.apache.hadoop.hbase.io.compress.Compression; 062import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 063import org.apache.hadoop.hbase.io.hfile.CacheConfig; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFileContext; 066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 067import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 068import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 070import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 071import org.apache.hadoop.hbase.testclassification.LargeTests; 072import org.apache.hadoop.hbase.testclassification.RegionServerTests; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.wal.WAL; 076import org.apache.hadoop.hbase.wal.WALEdit; 077import org.apache.hadoop.hbase.wal.WALKey; 078import org.junit.BeforeClass; 079import org.junit.ClassRule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.junit.runner.RunWith; 083import org.junit.runners.Parameterized; 084import org.junit.runners.Parameterized.Parameters; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 089 090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 093 094/** 095 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of 096 * the region server's bullkLoad functionality. 097 */ 098@RunWith(Parameterized.class) 099@Category({RegionServerTests.class, LargeTests.class}) 100public class TestHRegionServerBulkLoad { 101 102 @ClassRule 103 public static final HBaseClassTestRule CLASS_RULE = 104 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 105 106 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 107 protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 108 protected final static Configuration conf = UTIL.getConfiguration(); 109 protected final static byte[] QUAL = Bytes.toBytes("qual"); 110 protected final static int NUM_CFS = 10; 111 private int sleepDuration; 112 public static int BLOCKSIZE = 64 * 1024; 113 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 114 115 protected final static byte[][] families = new byte[NUM_CFS][]; 116 static { 117 for (int i = 0; i < NUM_CFS; i++) { 118 families[i] = Bytes.toBytes(family(i)); 119 } 120 } 121 @Parameters 122 public static final Collection<Object[]> parameters() { 123 int[] sleepDurations = new int[] { 0, 30000 }; 124 List<Object[]> configurations = new ArrayList<>(); 125 for (int i : sleepDurations) { 126 configurations.add(new Object[] { i }); 127 } 128 return configurations; 129 } 130 131 public TestHRegionServerBulkLoad(int duration) { 132 this.sleepDuration = duration; 133 } 134 135 @BeforeClass 136 public static void setUpBeforeClass() throws Exception { 137 conf.setInt("hbase.rpc.timeout", 10 * 1000); 138 } 139 140 /** 141 * Create a rowkey compatible with 142 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 143 */ 144 public static byte[] rowkey(int i) { 145 return Bytes.toBytes(String.format("row_%08d", i)); 146 } 147 148 static String family(int i) { 149 return String.format("family_%04d", i); 150 } 151 152 /** 153 * Create an HFile with the given number of rows with a specified value. 154 */ 155 public static void createHFile(FileSystem fs, Path path, byte[] family, 156 byte[] qualifier, byte[] value, int numRows) throws IOException { 157 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE) 158 .withCompression(COMPRESSION) 159 .build(); 160 HFile.Writer writer = HFile 161 .getWriterFactory(conf, new CacheConfig(conf)) 162 .withPath(fs, path) 163 .withFileContext(context) 164 .create(); 165 long now = System.currentTimeMillis(); 166 try { 167 // subtract 2 since iterateOnSplits doesn't include boundary keys 168 for (int i = 0; i < numRows; i++) { 169 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 170 writer.append(kv); 171 } 172 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 173 } finally { 174 writer.close(); 175 } 176 } 177 178 /** 179 * Thread that does full scans of the table looking for any partially 180 * completed rows. 181 * 182 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file 183 * handles. So every 10 iterations (500 file handles) it does a region 184 * compaction to reduce the number of open file handles. 185 */ 186 public static class AtomicHFileLoader extends RepeatingTestThread { 187 final AtomicLong numBulkLoads = new AtomicLong(); 188 final AtomicLong numCompactions = new AtomicLong(); 189 private TableName tableName; 190 191 public AtomicHFileLoader(TableName tableName, TestContext ctx, 192 byte targetFamilies[][]) throws IOException { 193 super(ctx); 194 this.tableName = tableName; 195 } 196 197 @Override 198 public void doAnAction() throws Exception { 199 long iteration = numBulkLoads.getAndIncrement(); 200 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", 201 iteration)); 202 203 // create HFiles for different column families 204 FileSystem fs = UTIL.getTestFileSystem(); 205 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 206 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 207 for (int i = 0; i < NUM_CFS; i++) { 208 Path hfile = new Path(dir, family(i)); 209 byte[] fam = Bytes.toBytes(family(i)); 210 createHFile(fs, hfile, fam, QUAL, val, 1000); 211 famPaths.add(new Pair<>(fam, hfile.toString())); 212 } 213 214 // bulk load HFiles 215 final ClusterConnection conn = (ClusterConnection)UTIL.getConnection(); 216 Table table = conn.getTable(tableName); 217 final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table). 218 prepareBulkLoad(conn); 219 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, 220 tableName, Bytes.toBytes("aaa"), 221 new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 222 @Override 223 public Void rpcCall() throws Exception { 224 LOG.debug("Going to connect to server " + getLocation() + " for row " 225 + Bytes.toStringBinary(getRow())); 226 SecureBulkLoadClient secureClient = null; 227 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 228 try (Table table = conn.getTable(getTableName())) { 229 secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); 230 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 231 true, null, bulkToken); 232 } 233 return null; 234 } 235 }; 236 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 237 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 238 caller.callWithRetries(callable, Integer.MAX_VALUE); 239 240 // Periodically do compaction to reduce the number of open file handles. 241 if (numBulkLoads.get() % 5 == 0) { 242 // 5 * 50 = 250 open file handles! 243 callable = new ClientServiceCallable<Void>(conn, 244 tableName, Bytes.toBytes("aaa"), 245 new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 246 @Override 247 protected Void rpcCall() throws Exception { 248 LOG.debug("compacting " + getLocation() + " for row " 249 + Bytes.toStringBinary(getRow())); 250 AdminProtos.AdminService.BlockingInterface server = 251 conn.getAdmin(getLocation().getServerName()); 252 CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( 253 getLocation().getRegionInfo().getRegionName(), true, null); 254 server.compactRegion(null, request); 255 numCompactions.incrementAndGet(); 256 return null; 257 } 258 }; 259 caller.callWithRetries(callable, Integer.MAX_VALUE); 260 } 261 } 262 } 263 264 public static class MyObserver implements RegionCoprocessor, RegionObserver { 265 static int sleepDuration; 266 267 @Override 268 public Optional<RegionObserver> getRegionObserver() { 269 return Optional.of(this); 270 } 271 272 @Override 273 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 274 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 275 CompactionRequest request) 276 throws IOException { 277 try { 278 Thread.sleep(sleepDuration); 279 } catch (InterruptedException ie) { 280 IOException ioe = new InterruptedIOException(); 281 ioe.initCause(ie); 282 throw ioe; 283 } 284 return scanner; 285 } 286 } 287 288 /** 289 * Thread that does full scans of the table looking for any partially 290 * completed rows. 291 */ 292 public static class AtomicScanReader extends RepeatingTestThread { 293 byte targetFamilies[][]; 294 Table table; 295 AtomicLong numScans = new AtomicLong(); 296 AtomicLong numRowsScanned = new AtomicLong(); 297 TableName TABLE_NAME; 298 299 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, 300 byte targetFamilies[][]) throws IOException { 301 super(ctx); 302 this.TABLE_NAME = TABLE_NAME; 303 this.targetFamilies = targetFamilies; 304 table = UTIL.getConnection().getTable(TABLE_NAME); 305 } 306 307 @Override 308 public void doAnAction() throws Exception { 309 Scan s = new Scan(); 310 for (byte[] family : targetFamilies) { 311 s.addFamily(family); 312 } 313 ResultScanner scanner = table.getScanner(s); 314 315 for (Result res : scanner) { 316 byte[] lastRow = null, lastFam = null, lastQual = null; 317 byte[] gotValue = null; 318 for (byte[] family : targetFamilies) { 319 byte qualifier[] = QUAL; 320 byte thisValue[] = res.getValue(family, qualifier); 321 if (gotValue != null && thisValue != null 322 && !Bytes.equals(gotValue, thisValue)) { 323 324 StringBuilder msg = new StringBuilder(); 325 msg.append("Failed on scan ").append(numScans) 326 .append(" after scanning ").append(numRowsScanned) 327 .append(" rows!\n"); 328 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" 329 + Bytes.toString(family) + ":" + Bytes.toString(qualifier) 330 + " = " + Bytes.toString(thisValue) + "\n"); 331 msg.append("Previous was " + Bytes.toString(lastRow) + "/" 332 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) 333 + " = " + Bytes.toString(gotValue)); 334 throw new RuntimeException(msg.toString()); 335 } 336 337 lastFam = family; 338 lastQual = qualifier; 339 lastRow = res.getRow(); 340 gotValue = thisValue; 341 } 342 numRowsScanned.getAndIncrement(); 343 } 344 numScans.getAndIncrement(); 345 } 346 } 347 348 /** 349 * Creates a table with given table name and specified number of column 350 * families if the table does not already exist. 351 */ 352 public void setupTable(TableName table, int cfs) throws IOException { 353 try { 354 LOG.info("Creating table " + table); 355 HTableDescriptor htd = new HTableDescriptor(table); 356 htd.addCoprocessor(MyObserver.class.getName()); 357 MyObserver.sleepDuration = this.sleepDuration; 358 for (int i = 0; i < 10; i++) { 359 htd.addFamily(new HColumnDescriptor(family(i))); 360 } 361 362 UTIL.getAdmin().createTable(htd); 363 } catch (TableExistsException tee) { 364 LOG.info("Table " + table + " already exists"); 365 } 366 } 367 368 /** 369 * Atomic bulk load. 370 */ 371 @Test 372 public void testAtomicBulkLoad() throws Exception { 373 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 374 375 int millisToRun = 30000; 376 int numScanners = 50; 377 378 UTIL.startMiniCluster(1, false, true); 379 try { 380 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 381 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 382 log.registerWALActionsListener(listener); 383 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 384 assertThat(listener.isFound(), is(true)); 385 } finally { 386 UTIL.shutdownMiniCluster(); 387 } 388 } 389 390 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 391 throws Exception { 392 setupTable(tableName, 10); 393 394 TestContext ctx = new TestContext(UTIL.getConfiguration()); 395 396 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 397 ctx.addThread(loader); 398 399 List<AtomicScanReader> scanners = Lists.newArrayList(); 400 for (int i = 0; i < numScanners; i++) { 401 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 402 scanners.add(scanner); 403 ctx.addThread(scanner); 404 } 405 406 ctx.startThreads(); 407 ctx.waitFor(millisToRun); 408 ctx.stop(); 409 410 LOG.info("Loaders:"); 411 LOG.info(" loaded " + loader.numBulkLoads.get()); 412 LOG.info(" compations " + loader.numCompactions.get()); 413 414 LOG.info("Scanners:"); 415 for (AtomicScanReader scanner : scanners) { 416 LOG.info(" scanned " + scanner.numScans.get()); 417 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 418 } 419 } 420 421 /** 422 * Run test on an HBase instance for 5 minutes. This assumes that the table 423 * under test only has a single region. 424 */ 425 public static void main(String args[]) throws Exception { 426 try { 427 Configuration c = HBaseConfiguration.create(); 428 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 429 test.setConf(c); 430 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 431 } finally { 432 System.exit(0); // something hangs (believe it is lru threadpool) 433 } 434 } 435 436 private void setConf(Configuration c) { 437 UTIL = new HBaseTestingUtility(c); 438 } 439 440 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 441 private boolean found = false; 442 443 @Override 444 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 445 for (Cell cell : logEdit.getCells()) { 446 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 447 for (Map.Entry entry : kv.toStringMap().entrySet()) { 448 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 449 found = true; 450 } 451 } 452 } 453 } 454 455 public boolean isFound() { 456 return found; 457 } 458 } 459} 460 461