001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.Coprocessor; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HColumnDescriptor; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTableDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 058import org.apache.hadoop.hbase.regionserver.HRegion; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.Pair; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.Assert; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 081 082@Category({LargeTests.class, ClientTests.class}) 083public class TestFromClientSide3 { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestFromClientSide3.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 090 private final static HBaseTestingUtility TEST_UTIL 091 = new HBaseTestingUtility(); 092 private static final int WAITTABLE_MILLIS = 10000; 093 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 094 private static Random random = new Random(); 095 private static int SLAVES = 3; 096 private static final byte[] ROW = Bytes.toBytes("testRow"); 097 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 098 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 099 private static final byte[] VALUE = Bytes.toBytes("testValue"); 100 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 101 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 102 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 103 104 @Rule 105 public TestName name = new TestName(); 106 private TableName tableName; 107 108 /** 109 * @throws java.lang.Exception 110 */ 111 @BeforeClass 112 public static void setUpBeforeClass() throws Exception { 113 TEST_UTIL.startMiniCluster(SLAVES); 114 } 115 116 /** 117 * @throws java.lang.Exception 118 */ 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 TEST_UTIL.shutdownMiniCluster(); 122 } 123 124 /** 125 * @throws java.lang.Exception 126 */ 127 @Before 128 public void setUp() throws Exception { 129 tableName = TableName.valueOf(name.getMethodName()); 130 } 131 132 /** 133 * @throws java.lang.Exception 134 */ 135 @After 136 public void tearDown() throws Exception { 137 for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) { 138 LOG.info("Tear down, remove table=" + htd.getTableName()); 139 TEST_UTIL.deleteTable(htd.getTableName()); 140 } 141 } 142 143 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) 144 throws Exception { 145 Put put = new Put(row); 146 for (int i = 0; i < nPuts; i++) { 147 byte[] qualifier = Bytes.toBytes(random.nextInt()); 148 byte[] value = Bytes.toBytes(random.nextInt()); 149 put.addColumn(family, qualifier, value); 150 } 151 table.put(put); 152 } 153 154 private void performMultiplePutAndFlush(HBaseAdmin admin, Table table, 155 byte[] row, byte[] family, int nFlushes, int nPuts) 156 throws Exception { 157 158 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) { 159 // connection needed for poll-wait 160 HRegionLocation loc = locator.getRegionLocation(row, true); 161 AdminProtos.AdminService.BlockingInterface server = 162 ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName()); 163 byte[] regName = loc.getRegionInfo().getRegionName(); 164 165 for (int i = 0; i < nFlushes; i++) { 166 randomCFPuts(table, row, family, nPuts); 167 List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); 168 int sfCount = sf.size(); 169 170 admin.flush(table.getName()); 171 } 172 } 173 } 174 175 private static List<Cell> toList(ResultScanner scanner) { 176 try { 177 List<Cell> cells = new ArrayList<>(); 178 for (Result r : scanner) { 179 cells.addAll(r.listCells()); 180 } 181 return cells; 182 } finally { 183 scanner.close(); 184 } 185 } 186 187 @Test 188 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 189 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 190 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 191 byte[] row = Bytes.toBytes("SpecifiedRow"); 192 byte[] value0 = Bytes.toBytes("value_0"); 193 byte[] value1 = Bytes.toBytes("value_1"); 194 Put put = new Put(row); 195 put.addColumn(FAMILY, QUALIFIER, VALUE); 196 table.put(put); 197 Delete d = new Delete(row); 198 table.delete(d); 199 put = new Put(row); 200 put.addColumn(FAMILY, null, value0); 201 table.put(put); 202 put = new Put(row); 203 put.addColumn(FAMILY, null, value1); 204 table.put(put); 205 List<Cell> cells = toList(table.getScanner(new Scan())); 206 assertEquals(1, cells.size()); 207 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 208 209 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 210 assertEquals(1, cells.size()); 211 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 212 213 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 214 assertEquals(0, cells.size()); 215 216 TEST_UTIL.getAdmin().flush(tableName); 217 cells = toList(table.getScanner(new Scan())); 218 assertEquals(1, cells.size()); 219 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 220 221 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 222 assertEquals(1, cells.size()); 223 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 224 225 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 226 assertEquals(0, cells.size()); 227 } 228 } 229 230 @Test 231 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 232 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 233 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 234 byte[] row = Bytes.toBytes("SpecifiedRow"); 235 byte[] qual0 = Bytes.toBytes("qual0"); 236 byte[] qual1 = Bytes.toBytes("qual1"); 237 long now = System.currentTimeMillis(); 238 Delete d = new Delete(row, now); 239 table.delete(d); 240 241 Put put = new Put(row); 242 put.addColumn(FAMILY, null, now + 1, VALUE); 243 table.put(put); 244 245 put = new Put(row); 246 put.addColumn(FAMILY, qual1, now + 2, qual1); 247 table.put(put); 248 249 put = new Put(row); 250 put.addColumn(FAMILY, qual0, now + 3, qual0); 251 table.put(put); 252 253 Result r = table.get(new Get(row)); 254 assertEquals(r.toString(), 3, r.size()); 255 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 256 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 257 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 258 259 TEST_UTIL.getAdmin().flush(tableName); 260 r = table.get(new Get(row)); 261 assertEquals(3, r.size()); 262 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 263 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 264 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 265 } 266 } 267 268 // override the config settings at the CF level and ensure priority 269 @Test 270 public void testAdvancedConfigOverride() throws Exception { 271 /* 272 * Overall idea: (1) create 3 store files and issue a compaction. config's 273 * compaction.min == 3, so should work. (2) Increase the compaction.min 274 * toggle in the HTD to 5 and modify table. If we use the HTD value instead 275 * of the default config value, adding 3 files and issuing a compaction 276 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 277 * and modify table. The CF schema should override the Table schema and now 278 * cause a minor compaction. 279 */ 280 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 281 282 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 283 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 284 try (Admin admin = TEST_UTIL.getAdmin()) { 285 ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection(); 286 287 // Create 3 store files. 288 byte[] row = Bytes.toBytes(random.nextInt()); 289 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100); 290 291 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 292 // Verify we have multiple store files. 293 HRegionLocation loc = locator.getRegionLocation(row, true); 294 byte[] regionName = loc.getRegionInfo().getRegionName(); 295 AdminProtos.AdminService.BlockingInterface server = 296 connection.getAdmin(loc.getServerName()); 297 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1); 298 299 // Issue a compaction request 300 admin.compact(tableName); 301 302 // poll wait for the compactions to happen 303 for (int i = 0; i < 10 * 1000 / 40; ++i) { 304 // The number of store files after compaction should be lesser. 305 loc = locator.getRegionLocation(row, true); 306 if (!loc.getRegionInfo().isOffline()) { 307 regionName = loc.getRegionInfo().getRegionName(); 308 server = connection.getAdmin(loc.getServerName()); 309 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) { 310 break; 311 } 312 } 313 Thread.sleep(40); 314 } 315 // verify the compactions took place and that we didn't just time out 316 assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1); 317 318 // change the compaction.min config option for this table to 5 319 LOG.info("hbase.hstore.compaction.min should now be 5"); 320 HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor()); 321 htd.setValue("hbase.hstore.compaction.min", String.valueOf(5)); 322 admin.modifyTable(tableName, htd); 323 Pair<Integer, Integer> st = admin.getAlterStatus(tableName); 324 while (null != st && st.getFirst() > 0) { 325 LOG.debug(st.getFirst() + " regions left to update"); 326 Thread.sleep(40); 327 st = admin.getAlterStatus(tableName); 328 } 329 LOG.info("alter status finished"); 330 331 // Create 3 more store files. 332 performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10); 333 334 // Issue a compaction request 335 admin.compact(tableName); 336 337 // This time, the compaction request should not happen 338 Thread.sleep(10 * 1000); 339 loc = locator.getRegionLocation(row, true); 340 regionName = loc.getRegionInfo().getRegionName(); 341 server = connection.getAdmin(loc.getServerName()); 342 int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size(); 343 assertTrue(sfCount > 1); 344 345 // change an individual CF's config option to 2 & online schema update 346 LOG.info("hbase.hstore.compaction.min should now be 2"); 347 HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 348 hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2)); 349 htd.modifyFamily(hcd); 350 admin.modifyTable(tableName, htd); 351 st = admin.getAlterStatus(tableName); 352 while (null != st && st.getFirst() > 0) { 353 LOG.debug(st.getFirst() + " regions left to update"); 354 Thread.sleep(40); 355 st = admin.getAlterStatus(tableName); 356 } 357 LOG.info("alter status finished"); 358 359 // Issue a compaction request 360 admin.compact(tableName); 361 362 // poll wait for the compactions to happen 363 for (int i = 0; i < 10 * 1000 / 40; ++i) { 364 loc = locator.getRegionLocation(row, true); 365 regionName = loc.getRegionInfo().getRegionName(); 366 try { 367 server = connection.getAdmin(loc.getServerName()); 368 if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) { 369 break; 370 } 371 } catch (Exception e) { 372 LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName)); 373 } 374 Thread.sleep(40); 375 } 376 377 // verify the compaction took place and that we didn't just time out 378 assertTrue(ProtobufUtil.getStoreFiles( 379 server, regionName, FAMILY).size() < sfCount); 380 381 // Finally, ensure that we can remove a custom config value after we made it 382 LOG.info("Removing CF config value"); 383 LOG.info("hbase.hstore.compaction.min should now be 5"); 384 hcd = new HColumnDescriptor(htd.getFamily(FAMILY)); 385 hcd.setValue("hbase.hstore.compaction.min", null); 386 htd.modifyFamily(hcd); 387 admin.modifyTable(tableName, htd); 388 st = admin.getAlterStatus(tableName); 389 while (null != st && st.getFirst() > 0) { 390 LOG.debug(st.getFirst() + " regions left to update"); 391 Thread.sleep(40); 392 st = admin.getAlterStatus(tableName); 393 } 394 LOG.info("alter status finished"); 395 assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue( 396 "hbase.hstore.compaction.min")); 397 } 398 } 399 } 400 } 401 402 @Test 403 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 404 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 405 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 406 List actions = (List) new ArrayList(); 407 Object[] results = new Object[2]; 408 // create an empty Put 409 Put put1 = new Put(ROW); 410 actions.add(put1); 411 412 Put put2 = new Put(ANOTHERROW); 413 put2.addColumn(FAMILY, QUALIFIER, VALUE); 414 actions.add(put2); 415 416 table.batch(actions, results); 417 fail("Empty Put should have failed the batch call"); 418 } catch (IllegalArgumentException iae) { 419 } 420 } 421 422 // Test Table.batch with large amount of mutations against the same key. 423 // It used to trigger read lock's "Maximum lock count exceeded" Error. 424 @Test 425 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 426 int sixtyFourK = 64 * 1024; 427 List actions = new ArrayList(); 428 Object[] results = new Object[(sixtyFourK + 1) * 2]; 429 430 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 431 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 432 433 for (int i = 0; i < sixtyFourK + 1; i++) { 434 Put put1 = new Put(ROW); 435 put1.addColumn(FAMILY, QUALIFIER, VALUE); 436 actions.add(put1); 437 438 Put put2 = new Put(ANOTHERROW); 439 put2.addColumn(FAMILY, QUALIFIER, VALUE); 440 actions.add(put2); 441 } 442 443 table.batch(actions, results); 444 } 445 } 446 447 @Test 448 public void testBatchWithRowMutation() throws Exception { 449 LOG.info("Starting testBatchWithRowMutation"); 450 byte [][] QUALIFIERS = new byte [][] { 451 Bytes.toBytes("a"), Bytes.toBytes("b") 452 }; 453 454 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 455 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 456 457 RowMutations arm = RowMutations.of(Collections.singletonList( 458 new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 459 Object[] batchResult = new Object[1]; 460 table.batch(Arrays.asList(arm), batchResult); 461 462 Get g = new Get(ROW); 463 Result r = table.get(g); 464 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 465 466 arm = RowMutations.of(Arrays.asList( 467 new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 468 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 469 table.batch(Arrays.asList(arm), batchResult); 470 r = table.get(g); 471 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 472 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 473 474 // Test that we get the correct remote exception for RowMutations from batch() 475 try { 476 arm = RowMutations.of(Collections.singletonList( 477 new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE))); 478 table.batch(Arrays.asList(arm), batchResult); 479 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 480 } catch (RetriesExhaustedWithDetailsException e) { 481 String msg = e.getMessage(); 482 assertTrue(msg.contains("NoSuchColumnFamilyException")); 483 } 484 } 485 } 486 487 @Test 488 public void testHTableExistsMethodSingleRegionSingleGet() 489 throws IOException, InterruptedException { 490 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 491 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 492 493 // Test with a single region table. 494 Put put = new Put(ROW); 495 put.addColumn(FAMILY, QUALIFIER, VALUE); 496 497 Get get = new Get(ROW); 498 499 boolean exist = table.exists(get); 500 assertFalse(exist); 501 502 table.put(put); 503 504 exist = table.exists(get); 505 assertTrue(exist); 506 } 507 } 508 509 @Test 510 public void testHTableExistsMethodSingleRegionMultipleGets() 511 throws IOException, InterruptedException { 512 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 513 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 514 515 Put put = new Put(ROW); 516 put.addColumn(FAMILY, QUALIFIER, VALUE); 517 table.put(put); 518 519 List<Get> gets = new ArrayList<>(); 520 gets.add(new Get(ROW)); 521 gets.add(new Get(ANOTHERROW)); 522 523 boolean[] results = table.exists(gets); 524 assertTrue(results[0]); 525 assertFalse(results[1]); 526 } 527 } 528 529 @Test 530 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 531 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 532 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 533 534 Put put = new Put(ROW); 535 put.addColumn(FAMILY, QUALIFIER, VALUE); 536 table.put(put); 537 538 Get get = new Get(ROW); 539 540 boolean exist = table.exists(get); 541 assertEquals(true, exist); 542 543 Result result = table.get(get); 544 assertEquals(false, result.isEmpty()); 545 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 546 } 547 } 548 549 @Test 550 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 551 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 552 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 553 554 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 555 Put put = new Put(ROW); 556 put.addColumn(FAMILY, QUALIFIER, VALUE); 557 table.put(put); 558 put = new Put(ROW2); 559 put.addColumn(FAMILY, QUALIFIER, VALUE); 560 table.put(put); 561 562 Get get = new Get(ROW); 563 Get get2 = new Get(ROW2); 564 ArrayList<Get> getList = new ArrayList(2); 565 getList.add(get); 566 getList.add(get2); 567 568 boolean[] exists = table.existsAll(getList); 569 assertEquals(true, exists[0]); 570 assertEquals(true, exists[1]); 571 572 Result[] result = table.get(getList); 573 assertEquals(false, result[0].isEmpty()); 574 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 575 assertEquals(false, result[1].isEmpty()); 576 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 577 } 578 } 579 580 @Test 581 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 582 try (Table table = TEST_UTIL.createTable( 583 tableName, new byte[][] { FAMILY }, 584 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 585 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 586 587 Put put = new Put(ROW); 588 put.addColumn(FAMILY, QUALIFIER, VALUE); 589 590 Get get = new Get(ROW); 591 592 boolean exist = table.exists(get); 593 assertFalse(exist); 594 595 table.put(put); 596 597 exist = table.exists(get); 598 assertTrue(exist); 599 } 600 } 601 602 @Test 603 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 604 try (Table table = TEST_UTIL.createTable( 605 tableName, 606 new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 607 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 608 609 Put put = new Put(ROW); 610 put.addColumn(FAMILY, QUALIFIER, VALUE); 611 table.put(put); 612 613 List<Get> gets = new ArrayList<>(); 614 gets.add(new Get(ANOTHERROW)); 615 gets.add(new Get(Bytes.add(ROW, new byte[]{0x00}))); 616 gets.add(new Get(ROW)); 617 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00}))); 618 619 LOG.info("Calling exists"); 620 boolean[] results = table.existsAll(gets); 621 assertFalse(results[0]); 622 assertFalse(results[1]); 623 assertTrue(results[2]); 624 assertFalse(results[3]); 625 626 // Test with the first region. 627 put = new Put(new byte[]{0x00}); 628 put.addColumn(FAMILY, QUALIFIER, VALUE); 629 table.put(put); 630 631 gets = new ArrayList<>(); 632 gets.add(new Get(new byte[]{0x00})); 633 gets.add(new Get(new byte[]{0x00, 0x00})); 634 results = table.existsAll(gets); 635 assertTrue(results[0]); 636 assertFalse(results[1]); 637 638 // Test with the last region 639 put = new Put(new byte[]{(byte) 0xff, (byte) 0xff}); 640 put.addColumn(FAMILY, QUALIFIER, VALUE); 641 table.put(put); 642 643 gets = new ArrayList<>(); 644 gets.add(new Get(new byte[]{(byte) 0xff})); 645 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff})); 646 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff})); 647 results = table.existsAll(gets); 648 assertFalse(results[0]); 649 assertTrue(results[1]); 650 assertFalse(results[2]); 651 } 652 } 653 654 @Test 655 public void testGetEmptyRow() throws Exception { 656 //Create a table and put in 1 row 657 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 658 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 659 660 Put put = new Put(ROW_BYTES); 661 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 662 table.put(put); 663 664 //Try getting the row with an empty row key 665 Result res = null; 666 try { 667 res = table.get(new Get(new byte[0])); 668 fail(); 669 } catch (IllegalArgumentException e) { 670 // Expected. 671 } 672 assertTrue(res == null); 673 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 674 assertTrue(res.isEmpty() == true); 675 res = table.get(new Get(ROW_BYTES)); 676 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 677 } 678 } 679 680 @Test 681 public void testConnectionDefaultUsesCodec() throws Exception { 682 ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); 683 assertTrue(con.hasCellBlockSupport()); 684 } 685 686 @Test 687 public void testPutWithPreBatchMutate() throws Exception { 688 testPreBatchMutate(tableName, () -> { 689 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 690 Put put = new Put(ROW); 691 put.addColumn(FAMILY, QUALIFIER, VALUE); 692 t.put(put); 693 } catch (IOException ex) { 694 throw new RuntimeException(ex); 695 } 696 }); 697 } 698 699 @Test 700 public void testRowMutationsWithPreBatchMutate() throws Exception { 701 testPreBatchMutate(tableName, () -> { 702 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 703 RowMutations rm = new RowMutations(ROW, 1); 704 Put put = new Put(ROW); 705 put.addColumn(FAMILY, QUALIFIER, VALUE); 706 rm.add(put); 707 t.mutateRow(rm); 708 } catch (IOException ex) { 709 throw new RuntimeException(ex); 710 } 711 }); 712 } 713 714 private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { 715 HTableDescriptor desc = new HTableDescriptor(tableName); 716 desc.addCoprocessor(WaitingForScanObserver.class.getName()); 717 desc.addFamily(new HColumnDescriptor(FAMILY)); 718 TEST_UTIL.getAdmin().createTable(desc); 719 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 720 721 ExecutorService service = Executors.newFixedThreadPool(2); 722 service.execute(rn); 723 final List<Cell> cells = new ArrayList<>(); 724 service.execute(() -> { 725 try { 726 // waiting for update. 727 TimeUnit.SECONDS.sleep(3); 728 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 729 Scan scan = new Scan(); 730 try (ResultScanner scanner = t.getScanner(scan)) { 731 for (Result r : scanner) { 732 cells.addAll(Arrays.asList(r.rawCells())); 733 } 734 } 735 } 736 } catch (IOException | InterruptedException ex) { 737 throw new RuntimeException(ex); 738 } 739 }); 740 service.shutdown(); 741 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 742 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 743 + ", so the data is invisible to reader", 0, cells.size()); 744 TEST_UTIL.deleteTable(tableName); 745 } 746 747 @Test 748 public void testLockLeakWithDelta() throws Exception, Throwable { 749 HTableDescriptor desc = new HTableDescriptor(tableName); 750 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 751 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 752 desc.addFamily(new HColumnDescriptor(FAMILY)); 753 TEST_UTIL.getAdmin().createTable(desc); 754 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 755 756 // new a connection for lower retry number. 757 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 758 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 759 try (Connection con = ConnectionFactory.createConnection(copy)) { 760 HRegion region = (HRegion) find(tableName); 761 region.setTimeoutForWriteLock(10); 762 ExecutorService putService = Executors.newSingleThreadExecutor(); 763 putService.execute(() -> { 764 try (Table table = con.getTable(tableName)) { 765 Put put = new Put(ROW); 766 put.addColumn(FAMILY, QUALIFIER, VALUE); 767 // the put will be blocked by WaitingForMultiMutationsObserver. 768 table.put(put); 769 } catch (IOException ex) { 770 throw new RuntimeException(ex); 771 } 772 }); 773 ExecutorService appendService = Executors.newSingleThreadExecutor(); 774 appendService.execute(() -> { 775 Append append = new Append(ROW); 776 append.addColumn(FAMILY, QUALIFIER, VALUE); 777 try (Table table = con.getTable(tableName)) { 778 table.append(append); 779 fail("The APPEND should fail because the target lock is blocked by previous put"); 780 } catch (Exception ex) { 781 } 782 }); 783 appendService.shutdown(); 784 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 785 WaitingForMultiMutationsObserver observer = 786 find(tableName, WaitingForMultiMutationsObserver.class); 787 observer.latch.countDown(); 788 putService.shutdown(); 789 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 790 try (Table table = con.getTable(tableName)) { 791 Result r = table.get(new Get(ROW)); 792 assertFalse(r.isEmpty()); 793 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 794 } 795 } 796 HRegion region = (HRegion) find(tableName); 797 int readLockCount = region.getReadLockCount(); 798 LOG.info("readLockCount:" + readLockCount); 799 assertEquals(0, readLockCount); 800 } 801 802 @Test 803 public void testMultiRowMutations() throws Exception, Throwable { 804 HTableDescriptor desc = new HTableDescriptor(tableName); 805 desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); 806 desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); 807 desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); 808 desc.addFamily(new HColumnDescriptor(FAMILY)); 809 TEST_UTIL.getAdmin().createTable(desc); 810 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 811 812 // new a connection for lower retry number. 813 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 814 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 815 try (Connection con = ConnectionFactory.createConnection(copy)) { 816 byte[] row = Bytes.toBytes("ROW-0"); 817 byte[] rowLocked= Bytes.toBytes("ROW-1"); 818 byte[] value0 = Bytes.toBytes("VALUE-0"); 819 byte[] value1 = Bytes.toBytes("VALUE-1"); 820 byte[] value2 = Bytes.toBytes("VALUE-2"); 821 assertNoLocks(tableName); 822 ExecutorService putService = Executors.newSingleThreadExecutor(); 823 putService.execute(() -> { 824 try (Table table = con.getTable(tableName)) { 825 Put put0 = new Put(rowLocked); 826 put0.addColumn(FAMILY, QUALIFIER, value0); 827 // the put will be blocked by WaitingForMultiMutationsObserver. 828 table.put(put0); 829 } catch (IOException ex) { 830 throw new RuntimeException(ex); 831 } 832 }); 833 ExecutorService cpService = Executors.newSingleThreadExecutor(); 834 cpService.execute(() -> { 835 boolean threw; 836 Put put1 = new Put(row); 837 Put put2 = new Put(rowLocked); 838 put1.addColumn(FAMILY, QUALIFIER, value1); 839 put2.addColumn(FAMILY, QUALIFIER, value2); 840 try (Table table = con.getTable(tableName)) { 841 MultiRowMutationProtos.MutateRowsRequest request 842 = MultiRowMutationProtos.MutateRowsRequest.newBuilder() 843 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 844 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 845 .MutationType.PUT, put1)) 846 .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 847 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto 848 .MutationType.PUT, put2)) 849 .build(); 850 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, 851 ROW, ROW, 852 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 853 ServerRpcController controller = new ServerRpcController(); 854 CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> 855 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 856 exe.mutateRows(controller, request, rpcCallback); 857 return rpcCallback.get(); 858 }); 859 threw = false; 860 } catch (Throwable ex) { 861 threw = true; 862 } 863 if (!threw) { 864 // Can't call fail() earlier because the catch would eat it. 865 fail("This cp should fail because the target lock is blocked by previous put"); 866 } 867 }); 868 cpService.shutdown(); 869 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 870 WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); 871 observer.latch.countDown(); 872 putService.shutdown(); 873 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 874 try (Table table = con.getTable(tableName)) { 875 Get g0 = new Get(row); 876 Get g1 = new Get(rowLocked); 877 Result r0 = table.get(g0); 878 Result r1 = table.get(g1); 879 assertTrue(r0.isEmpty()); 880 assertFalse(r1.isEmpty()); 881 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 882 } 883 assertNoLocks(tableName); 884 } 885 } 886 887 /** 888 * A test case for issue HBASE-17482 889 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped 890 * onto cells in the append thread, a countdown latch is used to ensure that happened 891 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) 892 * make the seqid/mvcc acquirement in handler thread and stamping in append thread 893 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc. 894 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner 895 * with a smaller readpoint can see these data, which disobey the multi version 896 * concurrency control rules. 897 * This test case is to reproduce this scenario. 898 * @throws IOException 899 */ 900 @Test 901 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 902 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 903 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 904 //put two row first to init the scanner 905 Put put = new Put(Bytes.toBytes("0")); 906 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 907 table.put(put); 908 put = new Put(Bytes.toBytes("00")); 909 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 910 table.put(put); 911 Scan scan = new Scan(); 912 scan.setTimeRange(0, Long.MAX_VALUE); 913 scan.setCaching(1); 914 ResultScanner scanner = table.getScanner(scan); 915 int rowNum = scanner.next() != null ? 1 : 0; 916 //the started scanner shouldn't see the rows put below 917 for (int i = 1; i < 1000; i++) { 918 put = new Put(Bytes.toBytes(String.valueOf(i))); 919 put.setDurability(Durability.ASYNC_WAL); 920 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 921 table.put(put); 922 } 923 for (Result result : scanner) { 924 rowNum++; 925 } 926 //scanner should only see two rows 927 assertEquals(2, rowNum); 928 scanner = table.getScanner(scan); 929 rowNum = 0; 930 for (Result result : scanner) { 931 rowNum++; 932 } 933 // the new scanner should see all rows 934 assertEquals(1001, rowNum); 935 } 936 } 937 938 @Test 939 public void testPutThenGetWithMultipleThreads() throws Exception { 940 final int THREAD_NUM = 20; 941 final int ROUND_NUM = 10; 942 for (int round = 0; round < ROUND_NUM; round++) { 943 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 944 final AtomicInteger successCnt = new AtomicInteger(0); 945 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 946 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 947 948 for (int i = 0; i < THREAD_NUM; i++) { 949 final int index = i; 950 Thread t = new Thread(new Runnable() { 951 952 @Override 953 public void run() { 954 final byte[] row = Bytes.toBytes("row-" + index); 955 final byte[] value = Bytes.toBytes("v" + index); 956 try { 957 Put put = new Put(row); 958 put.addColumn(FAMILY, QUALIFIER, value); 959 ht.put(put); 960 Get get = new Get(row); 961 Result result = ht.get(get); 962 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 963 if (Bytes.equals(value, returnedValue)) { 964 successCnt.getAndIncrement(); 965 } else { 966 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 967 + ", returned value: " 968 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 969 } 970 } catch (Throwable e) { 971 // do nothing 972 } 973 } 974 }); 975 threads.add(t); 976 } 977 for (Thread t : threads) { 978 t.start(); 979 } 980 for (Thread t : threads) { 981 t.join(); 982 } 983 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 984 } 985 TEST_UTIL.deleteTable(tableName); 986 } 987 } 988 989 private static void assertNoLocks(final TableName tableName) 990 throws IOException, InterruptedException { 991 HRegion region = (HRegion) find(tableName); 992 assertEquals(0, region.getLockedRows().size()); 993 } 994 private static HRegion find(final TableName tableName) 995 throws IOException, InterruptedException { 996 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 997 List<HRegion> regions = rs.getRegions(tableName); 998 assertEquals(1, regions.size()); 999 return regions.get(0); 1000 } 1001 1002 private static <T extends RegionObserver> T find(final TableName tableName, 1003 Class<T> clz) throws IOException, InterruptedException { 1004 HRegion region = find(tableName); 1005 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1006 assertTrue("The cp instance should be " + clz.getName() 1007 + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); 1008 return clz.cast(cp); 1009 } 1010 1011 public static class WaitingForMultiMutationsObserver 1012 implements RegionCoprocessor, RegionObserver { 1013 final CountDownLatch latch = new CountDownLatch(1); 1014 1015 @Override 1016 public Optional<RegionObserver> getRegionObserver() { 1017 return Optional.of(this); 1018 } 1019 1020 @Override 1021 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1022 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1023 try { 1024 latch.await(); 1025 } catch (InterruptedException ex) { 1026 throw new IOException(ex); 1027 } 1028 } 1029 } 1030 1031 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1032 private final CountDownLatch latch = new CountDownLatch(1); 1033 1034 @Override 1035 public Optional<RegionObserver> getRegionObserver() { 1036 return Optional.of(this); 1037 } 1038 1039 @Override 1040 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, 1041 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1042 try { 1043 // waiting for scanner 1044 latch.await(); 1045 } catch (InterruptedException ex) { 1046 throw new IOException(ex); 1047 } 1048 } 1049 1050 @Override 1051 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 1052 final Scan scan, final RegionScanner s) throws IOException { 1053 latch.countDown(); 1054 return s; 1055 } 1056 } 1057 1058 static byte[] generateHugeValue(int size) { 1059 Random rand = ThreadLocalRandom.current(); 1060 byte[] value = new byte[size]; 1061 for (int i = 0; i < value.length; i++) { 1062 value[i] = (byte) rand.nextInt(256); 1063 } 1064 return value; 1065 } 1066 1067 @Test 1068 public void testScanWithBatchSizeReturnIncompleteCells() 1069 throws IOException, InterruptedException { 1070 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1071 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1072 .build(); 1073 try (Table table = TEST_UTIL.createTable(hd, null)) { 1074 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1075 1076 Put put = new Put(ROW); 1077 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1078 table.put(put); 1079 1080 put = new Put(ROW); 1081 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1082 table.put(put); 1083 1084 for (int i = 2; i < 5; i++) { 1085 for (int version = 0; version < 2; version++) { 1086 put = new Put(ROW); 1087 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1088 table.put(put); 1089 } 1090 } 1091 1092 Scan scan = new Scan(); 1093 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1094 .setMaxResultSize(4 * 1024 * 1024); 1095 Result result; 1096 try (ResultScanner scanner = table.getScanner(scan)) { 1097 List<Result> list = new ArrayList<>(); 1098 /* 1099 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; 1100 * The second scan rpc should return a result with 3 cells, because reach the batch limit 1101 * = 3; 1102 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1103 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1104 * into two result: 2+3 -> 3+2; 1105 */ 1106 while ((result = scanner.next()) != null) { 1107 list.add(result); 1108 } 1109 1110 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1111 Assert.assertEquals(2, list.size()); 1112 Assert.assertEquals(3, list.get(0).size()); 1113 Assert.assertEquals(2, list.get(1).size()); 1114 } 1115 1116 scan = new Scan(); 1117 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1118 .setMaxResultSize(4 * 1024 * 1024); 1119 try (ResultScanner scanner = table.getScanner(scan)) { 1120 List<Result> list = new ArrayList<>(); 1121 while ((result = scanner.next()) != null) { 1122 list.add(result); 1123 } 1124 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1125 Assert.assertEquals(3, list.size()); 1126 Assert.assertEquals(2, list.get(0).size()); 1127 Assert.assertEquals(2, list.get(1).size()); 1128 Assert.assertEquals(1, list.get(2).size()); 1129 } 1130 } 1131 } 1132}