001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.Is.is; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HColumnDescriptor; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 046import org.apache.hadoop.hbase.StartMiniClusterOption; 047import org.apache.hadoop.hbase.TableExistsException; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ClientServiceCallable; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.ResultScanner; 054import org.apache.hadoop.hbase.client.RpcRetryingCaller; 055import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.io.compress.Compression; 064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 065import org.apache.hadoop.hbase.io.hfile.CacheConfig; 066import org.apache.hadoop.hbase.io.hfile.HFile; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 069import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 073import org.apache.hadoop.hbase.testclassification.LargeTests; 074import org.apache.hadoop.hbase.testclassification.RegionServerTests; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.wal.WAL; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALKey; 081import org.junit.BeforeClass; 082import org.junit.ClassRule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.runner.RunWith; 086import org.junit.runners.Parameterized; 087import org.junit.runners.Parameterized.Parameters; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 092 093import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 096 097/** 098 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's 099 * bullkLoad functionality. 100 */ 101@RunWith(Parameterized.class) 102@Category({ RegionServerTests.class, LargeTests.class }) 103public class TestHRegionServerBulkLoad { 104 105 @ClassRule 106 public static final HBaseClassTestRule CLASS_RULE = 107 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 108 109 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 110 protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 111 protected final static Configuration conf = UTIL.getConfiguration(); 112 protected final static byte[] QUAL = Bytes.toBytes("qual"); 113 protected final static int NUM_CFS = 10; 114 private int sleepDuration; 115 public static int BLOCKSIZE = 64 * 1024; 116 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 117 118 protected final static byte[][] families = new byte[NUM_CFS][]; 119 static { 120 for (int i = 0; i < NUM_CFS; i++) { 121 families[i] = Bytes.toBytes(family(i)); 122 } 123 } 124 125 @Parameters 126 public static final Collection<Object[]> parameters() { 127 int[] sleepDurations = new int[] { 0, 30000 }; 128 List<Object[]> configurations = new ArrayList<>(); 129 for (int i : sleepDurations) { 130 configurations.add(new Object[] { i }); 131 } 132 return configurations; 133 } 134 135 public TestHRegionServerBulkLoad(int duration) { 136 this.sleepDuration = duration; 137 } 138 139 @BeforeClass 140 public static void setUpBeforeClass() throws Exception { 141 conf.setInt("hbase.rpc.timeout", 10 * 1000); 142 } 143 144 /** 145 * Create a rowkey compatible with 146 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 147 */ 148 public static byte[] rowkey(int i) { 149 return Bytes.toBytes(String.format("row_%08d", i)); 150 } 151 152 static String family(int i) { 153 return String.format("family_%04d", i); 154 } 155 156 /** 157 * Create an HFile with the given number of rows with a specified value. 158 */ 159 public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, 160 byte[] value, int numRows) throws IOException { 161 HFileContext context = 162 new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build(); 163 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 164 .withFileContext(context).create(); 165 long now = EnvironmentEdgeManager.currentTime(); 166 try { 167 // subtract 2 since iterateOnSplits doesn't include boundary keys 168 for (int i = 0; i < numRows; i++) { 169 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 170 writer.append(kv); 171 } 172 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 173 } finally { 174 writer.close(); 175 } 176 } 177 178 /** 179 * Thread that does full scans of the table looking for any partially completed rows. Each 180 * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10 181 * iterations (500 file handles) it does a region compaction to reduce the number of open file 182 * handles. 183 */ 184 public static class AtomicHFileLoader extends RepeatingTestThread { 185 final AtomicLong numBulkLoads = new AtomicLong(); 186 final AtomicLong numCompactions = new AtomicLong(); 187 private TableName tableName; 188 189 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 190 throws IOException { 191 super(ctx); 192 this.tableName = tableName; 193 } 194 195 @Override 196 public void doAnAction() throws Exception { 197 long iteration = numBulkLoads.getAndIncrement(); 198 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 199 200 // create HFiles for different column families 201 FileSystem fs = UTIL.getTestFileSystem(); 202 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 203 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 204 for (int i = 0; i < NUM_CFS; i++) { 205 Path hfile = new Path(dir, family(i)); 206 byte[] fam = Bytes.toBytes(family(i)); 207 createHFile(fs, hfile, fam, QUAL, val, 1000); 208 famPaths.add(new Pair<>(fam, hfile.toString())); 209 } 210 211 // bulk load HFiles 212 final ClusterConnection conn = (ClusterConnection) UTIL.getConnection(); 213 Table table = conn.getTable(tableName); 214 final String bulkToken = 215 new SecureBulkLoadClient(UTIL.getConfiguration(), table).prepareBulkLoad(conn); 216 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, 217 Bytes.toBytes("aaa"), new RpcControllerFactory(UTIL.getConfiguration()).newController(), 218 HConstants.PRIORITY_UNSET) { 219 @Override 220 public Void rpcCall() throws Exception { 221 LOG.debug("Going to connect to server " + getLocation() + " for row " 222 + Bytes.toStringBinary(getRow())); 223 SecureBulkLoadClient secureClient = null; 224 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 225 try (Table table = conn.getTable(getTableName())) { 226 secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); 227 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, 228 bulkToken); 229 } 230 return null; 231 } 232 }; 233 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 234 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 235 caller.callWithRetries(callable, Integer.MAX_VALUE); 236 237 // Periodically do compaction to reduce the number of open file handles. 238 if (numBulkLoads.get() % 5 == 0) { 239 // 5 * 50 = 250 open file handles! 240 callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), 241 new RpcControllerFactory(UTIL.getConfiguration()).newController(), 242 HConstants.PRIORITY_UNSET) { 243 @Override 244 protected Void rpcCall() throws Exception { 245 LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); 246 AdminProtos.AdminService.BlockingInterface server = 247 conn.getAdmin(getLocation().getServerName()); 248 CompactRegionRequest request = RequestConverter 249 .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null); 250 server.compactRegion(null, request); 251 numCompactions.incrementAndGet(); 252 return null; 253 } 254 }; 255 caller.callWithRetries(callable, Integer.MAX_VALUE); 256 } 257 } 258 } 259 260 public static class MyObserver implements RegionCoprocessor, RegionObserver { 261 static int sleepDuration; 262 263 @Override 264 public Optional<RegionObserver> getRegionObserver() { 265 return Optional.of(this); 266 } 267 268 @Override 269 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 270 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 271 CompactionRequest request) throws IOException { 272 try { 273 Thread.sleep(sleepDuration); 274 } catch (InterruptedException ie) { 275 IOException ioe = new InterruptedIOException(); 276 ioe.initCause(ie); 277 throw ioe; 278 } 279 return scanner; 280 } 281 } 282 283 /** 284 * Thread that does full scans of the table looking for any partially completed rows. 285 */ 286 public static class AtomicScanReader extends RepeatingTestThread { 287 byte targetFamilies[][]; 288 Table table; 289 AtomicLong numScans = new AtomicLong(); 290 AtomicLong numRowsScanned = new AtomicLong(); 291 TableName TABLE_NAME; 292 293 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][]) 294 throws IOException { 295 super(ctx); 296 this.TABLE_NAME = TABLE_NAME; 297 this.targetFamilies = targetFamilies; 298 table = UTIL.getConnection().getTable(TABLE_NAME); 299 } 300 301 @Override 302 public void doAnAction() throws Exception { 303 Scan s = new Scan(); 304 for (byte[] family : targetFamilies) { 305 s.addFamily(family); 306 } 307 ResultScanner scanner = table.getScanner(s); 308 309 for (Result res : scanner) { 310 byte[] lastRow = null, lastFam = null, lastQual = null; 311 byte[] gotValue = null; 312 for (byte[] family : targetFamilies) { 313 byte qualifier[] = QUAL; 314 byte thisValue[] = res.getValue(family, qualifier); 315 if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) { 316 317 StringBuilder msg = new StringBuilder(); 318 msg.append("Failed on scan ").append(numScans).append(" after scanning ") 319 .append(numRowsScanned).append(" rows!\n"); 320 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) 321 + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n"); 322 msg.append("Previous was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) 323 + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue)); 324 throw new RuntimeException(msg.toString()); 325 } 326 327 lastFam = family; 328 lastQual = qualifier; 329 lastRow = res.getRow(); 330 gotValue = thisValue; 331 } 332 numRowsScanned.getAndIncrement(); 333 } 334 numScans.getAndIncrement(); 335 } 336 } 337 338 /** 339 * Creates a table with given table name and specified number of column families if the table does 340 * not already exist. 341 */ 342 public void setupTable(TableName table, int cfs) throws IOException { 343 try { 344 LOG.info("Creating table " + table); 345 HTableDescriptor htd = new HTableDescriptor(table); 346 htd.addCoprocessor(MyObserver.class.getName()); 347 MyObserver.sleepDuration = this.sleepDuration; 348 for (int i = 0; i < 10; i++) { 349 htd.addFamily(new HColumnDescriptor(family(i))); 350 } 351 352 UTIL.getAdmin().createTable(htd); 353 } catch (TableExistsException tee) { 354 LOG.info("Table " + table + " already exists"); 355 } 356 } 357 358 /** 359 * Atomic bulk load. 360 */ 361 @Test 362 public void testAtomicBulkLoad() throws Exception { 363 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 364 365 int millisToRun = 30000; 366 int numScanners = 50; 367 368 // Set createWALDir to true and use default values for other options. 369 UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build()); 370 try { 371 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 372 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 373 log.registerWALActionsListener(listener); 374 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 375 assertThat(listener.isFound(), is(true)); 376 } finally { 377 UTIL.shutdownMiniCluster(); 378 } 379 } 380 381 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 382 throws Exception { 383 setupTable(tableName, 10); 384 385 TestContext ctx = new TestContext(UTIL.getConfiguration()); 386 387 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 388 ctx.addThread(loader); 389 390 List<AtomicScanReader> scanners = Lists.newArrayList(); 391 for (int i = 0; i < numScanners; i++) { 392 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 393 scanners.add(scanner); 394 ctx.addThread(scanner); 395 } 396 397 ctx.startThreads(); 398 ctx.waitFor(millisToRun); 399 ctx.stop(); 400 401 LOG.info("Loaders:"); 402 LOG.info(" loaded " + loader.numBulkLoads.get()); 403 LOG.info(" compations " + loader.numCompactions.get()); 404 405 LOG.info("Scanners:"); 406 for (AtomicScanReader scanner : scanners) { 407 LOG.info(" scanned " + scanner.numScans.get()); 408 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 409 } 410 } 411 412 /** 413 * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a 414 * single region. 415 */ 416 public static void main(String args[]) throws Exception { 417 try { 418 Configuration c = HBaseConfiguration.create(); 419 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 420 test.setConf(c); 421 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 422 } finally { 423 System.exit(0); // something hangs (believe it is lru threadpool) 424 } 425 } 426 427 private void setConf(Configuration c) { 428 UTIL = new HBaseTestingUtility(c); 429 } 430 431 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 432 private boolean found = false; 433 434 @Override 435 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 436 for (Cell cell : logEdit.getCells()) { 437 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 438 for (Map.Entry entry : kv.toStringMap().entrySet()) { 439 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 440 found = true; 441 } 442 } 443 } 444 } 445 446 public boolean isFound() { 447 return found; 448 } 449 } 450}