001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ArrayBackedTag; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.KeyValueUtil; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Tag; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.CompactionState; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Durability; 047import org.apache.hadoop.hbase.client.Increment; 048import org.apache.hadoop.hbase.client.Mutation; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.ResultScanner; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 057import org.apache.hadoop.hbase.coprocessor.ObserverContext; 058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 060import org.apache.hadoop.hbase.coprocessor.RegionObserver; 061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.RegionServerTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.wal.WALEdit; 066import org.junit.After; 067import org.junit.AfterClass; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** 078 * Class that test tags 079 */ 080@Category({ RegionServerTests.class, MediumTests.class }) 081public class TestTags { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestTags.class); 087 088 static boolean useFilter = false; 089 090 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 091 092 @Rule 093 public final TestName TEST_NAME = new TestName(); 094 095 @BeforeClass 096 public static void setUpBeforeClass() throws Exception { 097 Configuration conf = TEST_UTIL.getConfiguration(); 098 conf.setInt("hfile.format.version", 3); 099 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 100 TestCoprocessorForTags.class.getName()); 101 TEST_UTIL.startMiniCluster(2); 102 } 103 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniCluster(); 107 } 108 109 @After 110 public void tearDown() { 111 useFilter = false; 112 } 113 114 /** 115 * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an 116 * exception for PREFIX, DIFF, and FAST_DIFF prior to HBASE-27580 117 */ 118 @Test 119 public void testReverseScanWithDBE() throws IOException { 120 byte[] family = Bytes.toBytes("0"); 121 122 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 123 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 124 125 try (Connection connection = ConnectionFactory.createConnection(conf)) { 126 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 127 testReverseScanWithDBE(connection, encoding, family); 128 } 129 } 130 } 131 132 private void testReverseScanWithDBE(Connection conn, DataBlockEncoding encoding, byte[] family) 133 throws IOException { 134 LOG.info("Running test with DBE={}", encoding); 135 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName() + "-" + encoding); 136 TEST_UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) 137 .setColumnFamily( 138 ColumnFamilyDescriptorBuilder.newBuilder(family).setDataBlockEncoding(encoding).build()) 139 .build(), null); 140 141 Table table = conn.getTable(tableName); 142 143 int maxRows = 10; 144 byte[] val1 = new byte[10]; 145 byte[] val2 = new byte[10]; 146 Bytes.random(val1); 147 Bytes.random(val2); 148 149 for (int i = 0; i < maxRows; i++) { 150 if (i == maxRows / 2) { 151 TEST_UTIL.flush(tableName); 152 } 153 table.put(new Put(Bytes.toBytes(i)).addColumn(family, Bytes.toBytes(1), val1) 154 .addColumn(family, Bytes.toBytes(2), val2).setTTL(600_000)); 155 } 156 157 TEST_UTIL.flush(table.getName()); 158 159 Scan scan = new Scan(); 160 scan.setReversed(true); 161 162 try (ResultScanner scanner = table.getScanner(scan)) { 163 for (int i = maxRows - 1; i >= 0; i--) { 164 Result row = scanner.next(); 165 assertEquals(2, row.size()); 166 167 Cell cell1 = row.getColumnLatestCell(family, Bytes.toBytes(1)); 168 assertTrue(CellUtil.matchingRows(cell1, Bytes.toBytes(i))); 169 assertTrue(CellUtil.matchingValue(cell1, val1)); 170 171 Cell cell2 = row.getColumnLatestCell(family, Bytes.toBytes(2)); 172 assertTrue(CellUtil.matchingRows(cell2, Bytes.toBytes(i))); 173 assertTrue(CellUtil.matchingValue(cell2, val2)); 174 } 175 } 176 177 } 178 179 @Test 180 public void testTags() throws Exception { 181 Table table = null; 182 try { 183 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 184 byte[] fam = Bytes.toBytes("info"); 185 byte[] row = Bytes.toBytes("rowa"); 186 // column names 187 byte[] qual = Bytes.toBytes("qual"); 188 189 byte[] row1 = Bytes.toBytes("rowb"); 190 191 byte[] row2 = Bytes.toBytes("rowc"); 192 193 TableDescriptor tableDescriptor = 194 TableDescriptorBuilder 195 .newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam) 196 .setBlockCacheEnabled(true).setDataBlockEncoding(DataBlockEncoding.NONE).build()) 197 .build(); 198 Admin admin = TEST_UTIL.getAdmin(); 199 admin.createTable(tableDescriptor); 200 byte[] value = Bytes.toBytes("value"); 201 table = TEST_UTIL.getConnection().getTable(tableName); 202 Put put = new Put(row); 203 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 204 put.setAttribute("visibility", Bytes.toBytes("myTag")); 205 table.put(put); 206 admin.flush(tableName); 207 // We are lacking an API for confirming flush request compaction. 208 // Just sleep for a short time. We won't be able to confirm flush 209 // completion but the test won't hang now or in the future if 210 // default compaction policy causes compaction between flush and 211 // when we go to confirm it. 212 Thread.sleep(1000); 213 214 Put put1 = new Put(row1); 215 byte[] value1 = Bytes.toBytes("1000dfsdf"); 216 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 217 // put1.setAttribute("visibility", Bytes.toBytes("myTag3")); 218 table.put(put1); 219 admin.flush(tableName); 220 Thread.sleep(1000); 221 222 Put put2 = new Put(row2); 223 byte[] value2 = Bytes.toBytes("1000dfsdf"); 224 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 225 put2.setAttribute("visibility", Bytes.toBytes("myTag3")); 226 table.put(put2); 227 admin.flush(tableName); 228 Thread.sleep(1000); 229 230 result(fam, row, qual, row2, table, value, value2, row1, value1); 231 232 admin.compact(tableName); 233 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 234 Thread.sleep(10); 235 } 236 result(fam, row, qual, row2, table, value, value2, row1, value1); 237 } finally { 238 if (table != null) { 239 table.close(); 240 } 241 } 242 } 243 244 @Test 245 public void testFlushAndCompactionWithoutTags() throws Exception { 246 Table table = null; 247 try { 248 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 249 byte[] fam = Bytes.toBytes("info"); 250 byte[] row = Bytes.toBytes("rowa"); 251 // column names 252 byte[] qual = Bytes.toBytes("qual"); 253 254 byte[] row1 = Bytes.toBytes("rowb"); 255 256 byte[] row2 = Bytes.toBytes("rowc"); 257 258 TableDescriptor tableDescriptor = 259 TableDescriptorBuilder.newBuilder(tableName) 260 .setColumnFamily( 261 ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true).build()) 262 .build(); 263 Admin admin = TEST_UTIL.getAdmin(); 264 admin.createTable(tableDescriptor); 265 266 table = TEST_UTIL.getConnection().getTable(tableName); 267 Put put = new Put(row); 268 byte[] value = Bytes.toBytes("value"); 269 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 270 table.put(put); 271 admin.flush(tableName); 272 // We are lacking an API for confirming flush request compaction. 273 // Just sleep for a short time. We won't be able to confirm flush 274 // completion but the test won't hang now or in the future if 275 // default compaction policy causes compaction between flush and 276 // when we go to confirm it. 277 Thread.sleep(1000); 278 279 Put put1 = new Put(row1); 280 byte[] value1 = Bytes.toBytes("1000dfsdf"); 281 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 282 table.put(put1); 283 admin.flush(tableName); 284 Thread.sleep(1000); 285 286 Put put2 = new Put(row2); 287 byte[] value2 = Bytes.toBytes("1000dfsdf"); 288 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 289 table.put(put2); 290 admin.flush(tableName); 291 Thread.sleep(1000); 292 293 Scan s = new Scan().withStartRow(row); 294 ResultScanner scanner = table.getScanner(s); 295 try { 296 Result[] next = scanner.next(3); 297 for (Result result : next) { 298 CellScanner cellScanner = result.cellScanner(); 299 cellScanner.advance(); 300 Cell current = cellScanner.current(); 301 assertEquals(0, current.getTagsLength()); 302 } 303 } finally { 304 if (scanner != null) scanner.close(); 305 } 306 admin.compact(tableName); 307 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 308 Thread.sleep(10); 309 } 310 s = new Scan().withStartRow(row); 311 scanner = table.getScanner(s); 312 try { 313 Result[] next = scanner.next(3); 314 for (Result result : next) { 315 CellScanner cellScanner = result.cellScanner(); 316 cellScanner.advance(); 317 Cell current = cellScanner.current(); 318 assertEquals(0, current.getTagsLength()); 319 } 320 } finally { 321 if (scanner != null) { 322 scanner.close(); 323 } 324 } 325 } finally { 326 if (table != null) { 327 table.close(); 328 } 329 } 330 } 331 332 @Test 333 public void testFlushAndCompactionwithCombinations() throws Exception { 334 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 335 byte[] fam = Bytes.toBytes("info"); 336 byte[] row = Bytes.toBytes("rowa"); 337 // column names 338 byte[] qual = Bytes.toBytes("qual"); 339 340 byte[] row1 = Bytes.toBytes("rowb"); 341 342 byte[] row2 = Bytes.toBytes("rowc"); 343 byte[] rowd = Bytes.toBytes("rowd"); 344 byte[] rowe = Bytes.toBytes("rowe"); 345 Table table = null; 346 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 347 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 348 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true) 349 .setDataBlockEncoding(encoding).build()) 350 .build(); 351 Admin admin = TEST_UTIL.getAdmin(); 352 admin.createTable(tableDescriptor); 353 try { 354 table = TEST_UTIL.getConnection().getTable(tableName); 355 Put put = new Put(row); 356 byte[] value = Bytes.toBytes("value"); 357 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 358 int bigTagLen = Short.MAX_VALUE - 5; 359 put.setAttribute("visibility", new byte[bigTagLen]); 360 table.put(put); 361 Put put1 = new Put(row1); 362 byte[] value1 = Bytes.toBytes("1000dfsdf"); 363 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 364 table.put(put1); 365 admin.flush(tableName); 366 // We are lacking an API for confirming flush request compaction. 367 // Just sleep for a short time. We won't be able to confirm flush 368 // completion but the test won't hang now or in the future if 369 // default compaction policy causes compaction between flush and 370 // when we go to confirm it. 371 Thread.sleep(1000); 372 373 put1 = new Put(row2); 374 value1 = Bytes.toBytes("1000dfsdf"); 375 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 376 table.put(put1); 377 admin.flush(tableName); 378 Thread.sleep(1000); 379 380 Put put2 = new Put(rowd); 381 byte[] value2 = Bytes.toBytes("1000dfsdf"); 382 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 383 table.put(put2); 384 put2 = new Put(rowe); 385 value2 = Bytes.toBytes("1000dfsddfdf"); 386 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 387 put.setAttribute("visibility", Bytes.toBytes("ram")); 388 table.put(put2); 389 admin.flush(tableName); 390 Thread.sleep(1000); 391 392 TestCoprocessorForTags.checkTagPresence = true; 393 Scan s = new Scan().withStartRow(row); 394 s.setCaching(1); 395 ResultScanner scanner = table.getScanner(s); 396 try { 397 Result next = null; 398 while ((next = scanner.next()) != null) { 399 CellScanner cellScanner = next.cellScanner(); 400 cellScanner.advance(); 401 Cell current = cellScanner.current(); 402 if (CellUtil.matchingRows(current, row)) { 403 assertEquals(1, TestCoprocessorForTags.tags.size()); 404 Tag tag = TestCoprocessorForTags.tags.get(0); 405 assertEquals(bigTagLen, tag.getValueLength()); 406 } else { 407 assertEquals(0, TestCoprocessorForTags.tags.size()); 408 } 409 } 410 } finally { 411 if (scanner != null) { 412 scanner.close(); 413 } 414 TestCoprocessorForTags.checkTagPresence = false; 415 } 416 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 417 Thread.sleep(10); 418 } 419 TestCoprocessorForTags.checkTagPresence = true; 420 scanner = table.getScanner(s); 421 try { 422 Result next = null; 423 while ((next = scanner.next()) != null) { 424 CellScanner cellScanner = next.cellScanner(); 425 cellScanner.advance(); 426 Cell current = cellScanner.current(); 427 if (CellUtil.matchingRows(current, row)) { 428 assertEquals(1, TestCoprocessorForTags.tags.size()); 429 Tag tag = TestCoprocessorForTags.tags.get(0); 430 assertEquals(bigTagLen, tag.getValueLength()); 431 } else { 432 assertEquals(0, TestCoprocessorForTags.tags.size()); 433 } 434 } 435 } finally { 436 if (scanner != null) { 437 scanner.close(); 438 } 439 TestCoprocessorForTags.checkTagPresence = false; 440 } 441 } finally { 442 if (table != null) { 443 table.close(); 444 } 445 // delete the table 446 admin.disableTable(tableName); 447 admin.deleteTable(tableName); 448 } 449 } 450 } 451 452 @Test 453 public void testTagsWithAppendAndIncrement() throws Exception { 454 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 455 byte[] f = Bytes.toBytes("f"); 456 byte[] q = Bytes.toBytes("q"); 457 byte[] row1 = Bytes.toBytes("r1"); 458 byte[] row2 = Bytes.toBytes("r2"); 459 460 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 461 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 462 TEST_UTIL.getAdmin().createTable(tableDescriptor); 463 464 Table table = null; 465 try { 466 table = TEST_UTIL.getConnection().getTable(tableName); 467 Put put = new Put(row1); 468 byte[] v = Bytes.toBytes(2L); 469 put.addColumn(f, q, v); 470 put.setAttribute("visibility", Bytes.toBytes("tag1")); 471 table.put(put); 472 Increment increment = new Increment(row1); 473 increment.addColumn(f, q, 1L); 474 table.increment(increment); 475 TestCoprocessorForTags.checkTagPresence = true; 476 ResultScanner scanner = table.getScanner(new Scan()); 477 Result result = scanner.next(); 478 KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 479 List<Tag> tags = TestCoprocessorForTags.tags; 480 assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 481 assertEquals(1, tags.size()); 482 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 483 TestCoprocessorForTags.checkTagPresence = false; 484 TestCoprocessorForTags.tags = null; 485 486 increment = new Increment(row1); 487 increment.add(new KeyValue(row1, f, q, 1234L, v)); 488 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 489 table.increment(increment); 490 TestCoprocessorForTags.checkTagPresence = true; 491 scanner = table.getScanner(new Scan()); 492 result = scanner.next(); 493 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 494 tags = TestCoprocessorForTags.tags; 495 assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 496 assertEquals(2, tags.size()); 497 // We cannot assume the ordering of tags 498 List<String> tagValues = new ArrayList<>(); 499 for (Tag tag : tags) { 500 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 501 } 502 assertTrue(tagValues.contains("tag1")); 503 assertTrue(tagValues.contains("tag2")); 504 TestCoprocessorForTags.checkTagPresence = false; 505 TestCoprocessorForTags.tags = null; 506 507 put = new Put(row2); 508 v = Bytes.toBytes(2L); 509 put.addColumn(f, q, v); 510 table.put(put); 511 increment = new Increment(row2); 512 increment.add(new KeyValue(row2, f, q, 1234L, v)); 513 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 514 table.increment(increment); 515 TestCoprocessorForTags.checkTagPresence = true; 516 scanner = table.getScanner(new Scan().withStartRow(row2)); 517 result = scanner.next(); 518 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 519 tags = TestCoprocessorForTags.tags; 520 assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 521 assertEquals(1, tags.size()); 522 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 523 TestCoprocessorForTags.checkTagPresence = false; 524 TestCoprocessorForTags.tags = null; 525 526 // Test Append 527 byte[] row3 = Bytes.toBytes("r3"); 528 put = new Put(row3); 529 put.addColumn(f, q, Bytes.toBytes("a")); 530 put.setAttribute("visibility", Bytes.toBytes("tag1")); 531 table.put(put); 532 Append append = new Append(row3); 533 append.addColumn(f, q, Bytes.toBytes("b")); 534 table.append(append); 535 TestCoprocessorForTags.checkTagPresence = true; 536 scanner = table.getScanner(new Scan().withStartRow(row3)); 537 result = scanner.next(); 538 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 539 tags = TestCoprocessorForTags.tags; 540 assertEquals(1, tags.size()); 541 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 542 TestCoprocessorForTags.checkTagPresence = false; 543 TestCoprocessorForTags.tags = null; 544 545 append = new Append(row3); 546 append.add(new KeyValue(row3, f, q, 1234L, v)); 547 append.setAttribute("visibility", Bytes.toBytes("tag2")); 548 table.append(append); 549 TestCoprocessorForTags.checkTagPresence = true; 550 scanner = table.getScanner(new Scan().withStartRow(row3)); 551 result = scanner.next(); 552 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 553 tags = TestCoprocessorForTags.tags; 554 assertEquals(2, tags.size()); 555 // We cannot assume the ordering of tags 556 tagValues.clear(); 557 for (Tag tag : tags) { 558 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 559 } 560 assertTrue(tagValues.contains("tag1")); 561 assertTrue(tagValues.contains("tag2")); 562 TestCoprocessorForTags.checkTagPresence = false; 563 TestCoprocessorForTags.tags = null; 564 565 byte[] row4 = Bytes.toBytes("r4"); 566 put = new Put(row4); 567 put.addColumn(f, q, Bytes.toBytes("a")); 568 table.put(put); 569 append = new Append(row4); 570 append.add(new KeyValue(row4, f, q, 1234L, v)); 571 append.setAttribute("visibility", Bytes.toBytes("tag2")); 572 table.append(append); 573 TestCoprocessorForTags.checkTagPresence = true; 574 scanner = table.getScanner(new Scan().withStartRow(row4)); 575 result = scanner.next(); 576 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 577 tags = TestCoprocessorForTags.tags; 578 assertEquals(1, tags.size()); 579 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 580 } finally { 581 TestCoprocessorForTags.checkTagPresence = false; 582 TestCoprocessorForTags.tags = null; 583 if (table != null) { 584 table.close(); 585 } 586 } 587 } 588 589 private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value, 590 byte[] value2, byte[] row1, byte[] value1) throws IOException { 591 Scan s = new Scan().withStartRow(row); 592 // If filters are used this attribute can be specifically check for in 593 // filterKV method and 594 // kvs can be filtered out if the tags of interest is not found in that kv 595 s.setAttribute("visibility", Bytes.toBytes("myTag")); 596 ResultScanner scanner = null; 597 try { 598 scanner = table.getScanner(s); 599 Result next = scanner.next(); 600 601 assertTrue(Bytes.equals(next.getRow(), row)); 602 assertTrue(Bytes.equals(next.getValue(fam, qual), value)); 603 604 Result next2 = scanner.next(); 605 assertTrue(next2 != null); 606 assertTrue(Bytes.equals(next2.getRow(), row1)); 607 assertTrue(Bytes.equals(next2.getValue(fam, qual), value1)); 608 609 next2 = scanner.next(); 610 assertTrue(next2 != null); 611 assertTrue(Bytes.equals(next2.getRow(), row2)); 612 assertTrue(Bytes.equals(next2.getValue(fam, qual), value2)); 613 614 } finally { 615 if (scanner != null) scanner.close(); 616 } 617 } 618 619 public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver { 620 621 public static volatile boolean checkTagPresence = false; 622 public static List<Tag> tags = null; 623 624 @Override 625 public Optional<RegionObserver> getRegionObserver() { 626 return Optional.of(this); 627 } 628 629 @Override 630 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 631 final WALEdit edit, final Durability durability) throws IOException { 632 updateMutationAddingTags(put); 633 } 634 635 private void updateMutationAddingTags(final Mutation m) { 636 byte[] attribute = m.getAttribute("visibility"); 637 byte[] cf = null; 638 List<Cell> updatedCells = new ArrayList<>(); 639 if (attribute != null) { 640 for (List<? extends Cell> edits : m.getFamilyCellMap().values()) { 641 for (Cell cell : edits) { 642 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 643 if (cf == null) { 644 cf = CellUtil.cloneFamily(kv); 645 } 646 Tag tag = new ArrayBackedTag((byte) 1, attribute); 647 List<Tag> tagList = new ArrayList<>(); 648 tagList.add(tag); 649 650 KeyValue newKV = 651 new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0, 652 kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(), 653 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), 654 CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList); 655 ((List<Cell>) updatedCells).add(newKV); 656 } 657 } 658 m.getFamilyCellMap().remove(cf); 659 // Update the family map 660 m.getFamilyCellMap().put(cf, updatedCells); 661 } 662 } 663 664 @Override 665 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment) 666 throws IOException { 667 updateMutationAddingTags(increment); 668 return null; 669 } 670 671 @Override 672 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append) 673 throws IOException { 674 updateMutationAddingTags(append); 675 return null; 676 } 677 678 @Override 679 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 680 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 681 if (checkTagPresence) { 682 if (results.size() > 0) { 683 // Check tag presence in the 1st cell in 1st Result 684 Result result = results.get(0); 685 CellScanner cellScanner = result.cellScanner(); 686 if (cellScanner.advance()) { 687 Cell cell = cellScanner.current(); 688 tags = PrivateCellUtil.getTags(cell); 689 } 690 } 691 } 692 return hasMore; 693 } 694 } 695}