001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ArrayBackedTag; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.ExtendedCellScanner; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Tag; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Append; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.CompactionState; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.Increment; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.ResultScanner; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 064import org.apache.hadoop.hbase.testclassification.MediumTests; 065import org.apache.hadoop.hbase.testclassification.RegionServerTests; 066import org.apache.hadoop.hbase.util.Bytes; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.junit.After; 069import org.junit.AfterClass; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Class that test tags 081 */ 082@Category({ RegionServerTests.class, MediumTests.class }) 083public class TestTags { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestTags.class); 089 090 static boolean useFilter = false; 091 092 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 093 094 @Rule 095 public final TestName TEST_NAME = new TestName(); 096 097 @BeforeClass 098 public static void setUpBeforeClass() throws Exception { 099 Configuration conf = TEST_UTIL.getConfiguration(); 100 conf.setInt("hfile.format.version", 3); 101 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 102 TestCoprocessorForTags.class.getName()); 103 TEST_UTIL.startMiniCluster(2); 104 } 105 106 @AfterClass 107 public static void tearDownAfterClass() throws Exception { 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 @After 112 public void tearDown() { 113 useFilter = false; 114 } 115 116 /** 117 * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an 118 * exception for PREFIX, DIFF, and FAST_DIFF prior to HBASE-27580 119 */ 120 @Test 121 public void testReverseScanWithDBE() throws IOException { 122 byte[] family = Bytes.toBytes("0"); 123 124 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 125 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 126 127 try (Connection connection = ConnectionFactory.createConnection(conf)) { 128 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 129 testReverseScanWithDBE(connection, encoding, family, HConstants.DEFAULT_BLOCKSIZE, 10); 130 } 131 } 132 } 133 134 /** 135 * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an 136 * exception for PREFIX, DIFF, and FAST_DIFF 137 */ 138 @Test 139 public void testReverseScanWithDBEWhenCurrentBlockUpdates() throws IOException { 140 byte[] family = Bytes.toBytes("0"); 141 142 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 143 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 144 145 try (Connection connection = ConnectionFactory.createConnection(conf)) { 146 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 147 testReverseScanWithDBE(connection, encoding, family, 1024, 30000); 148 } 149 } 150 } 151 152 private void testReverseScanWithDBE(Connection conn, DataBlockEncoding encoding, byte[] family, 153 int blockSize, int maxRows) throws IOException { 154 LOG.info("Running test with DBE={}", encoding); 155 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName() + "-" + encoding); 156 TEST_UTIL.createTable( 157 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 158 .newBuilder(family).setDataBlockEncoding(encoding).setBlocksize(blockSize).build()).build(), 159 null); 160 161 Table table = conn.getTable(tableName); 162 163 byte[] val1 = new byte[10]; 164 byte[] val2 = new byte[10]; 165 Bytes.random(val1); 166 Bytes.random(val2); 167 168 for (int i = 0; i < maxRows; i++) { 169 table.put(new Put(Bytes.toBytes(i)).addColumn(family, Bytes.toBytes(1), val1) 170 .addColumn(family, Bytes.toBytes(2), val2).setTTL(600_000)); 171 } 172 173 TEST_UTIL.flush(table.getName()); 174 175 Scan scan = new Scan(); 176 scan.setReversed(true); 177 178 try (ResultScanner scanner = table.getScanner(scan)) { 179 for (int i = maxRows - 1; i >= 0; i--) { 180 Result row = scanner.next(); 181 assertEquals(2, row.size()); 182 183 Cell cell1 = row.getColumnLatestCell(family, Bytes.toBytes(1)); 184 assertTrue(CellUtil.matchingRows(cell1, Bytes.toBytes(i))); 185 assertTrue(CellUtil.matchingValue(cell1, val1)); 186 187 Cell cell2 = row.getColumnLatestCell(family, Bytes.toBytes(2)); 188 assertTrue(CellUtil.matchingRows(cell2, Bytes.toBytes(i))); 189 assertTrue(CellUtil.matchingValue(cell2, val2)); 190 } 191 } 192 193 } 194 195 @Test 196 public void testTags() throws Exception { 197 Table table = null; 198 try { 199 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 200 byte[] fam = Bytes.toBytes("info"); 201 byte[] row = Bytes.toBytes("rowa"); 202 // column names 203 byte[] qual = Bytes.toBytes("qual"); 204 205 byte[] row1 = Bytes.toBytes("rowb"); 206 207 byte[] row2 = Bytes.toBytes("rowc"); 208 209 TableDescriptor tableDescriptor = 210 TableDescriptorBuilder 211 .newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam) 212 .setBlockCacheEnabled(true).setDataBlockEncoding(DataBlockEncoding.NONE).build()) 213 .build(); 214 Admin admin = TEST_UTIL.getAdmin(); 215 admin.createTable(tableDescriptor); 216 byte[] value = Bytes.toBytes("value"); 217 table = TEST_UTIL.getConnection().getTable(tableName); 218 Put put = new Put(row); 219 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 220 put.setAttribute("visibility", Bytes.toBytes("myTag")); 221 table.put(put); 222 admin.flush(tableName); 223 // We are lacking an API for confirming flush request compaction. 224 // Just sleep for a short time. We won't be able to confirm flush 225 // completion but the test won't hang now or in the future if 226 // default compaction policy causes compaction between flush and 227 // when we go to confirm it. 228 Thread.sleep(1000); 229 230 Put put1 = new Put(row1); 231 byte[] value1 = Bytes.toBytes("1000dfsdf"); 232 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 233 // put1.setAttribute("visibility", Bytes.toBytes("myTag3")); 234 table.put(put1); 235 admin.flush(tableName); 236 Thread.sleep(1000); 237 238 Put put2 = new Put(row2); 239 byte[] value2 = Bytes.toBytes("1000dfsdf"); 240 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 241 put2.setAttribute("visibility", Bytes.toBytes("myTag3")); 242 table.put(put2); 243 admin.flush(tableName); 244 Thread.sleep(1000); 245 246 result(fam, row, qual, row2, table, value, value2, row1, value1); 247 248 admin.compact(tableName); 249 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 250 Thread.sleep(10); 251 } 252 result(fam, row, qual, row2, table, value, value2, row1, value1); 253 } finally { 254 if (table != null) { 255 table.close(); 256 } 257 } 258 } 259 260 @Test 261 public void testFlushAndCompactionWithoutTags() throws Exception { 262 Table table = null; 263 try { 264 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 265 byte[] fam = Bytes.toBytes("info"); 266 byte[] row = Bytes.toBytes("rowa"); 267 // column names 268 byte[] qual = Bytes.toBytes("qual"); 269 270 byte[] row1 = Bytes.toBytes("rowb"); 271 272 byte[] row2 = Bytes.toBytes("rowc"); 273 274 TableDescriptor tableDescriptor = 275 TableDescriptorBuilder.newBuilder(tableName) 276 .setColumnFamily( 277 ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true).build()) 278 .build(); 279 Admin admin = TEST_UTIL.getAdmin(); 280 admin.createTable(tableDescriptor); 281 282 table = TEST_UTIL.getConnection().getTable(tableName); 283 Put put = new Put(row); 284 byte[] value = Bytes.toBytes("value"); 285 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 286 table.put(put); 287 admin.flush(tableName); 288 // We are lacking an API for confirming flush request compaction. 289 // Just sleep for a short time. We won't be able to confirm flush 290 // completion but the test won't hang now or in the future if 291 // default compaction policy causes compaction between flush and 292 // when we go to confirm it. 293 Thread.sleep(1000); 294 295 Put put1 = new Put(row1); 296 byte[] value1 = Bytes.toBytes("1000dfsdf"); 297 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 298 table.put(put1); 299 admin.flush(tableName); 300 Thread.sleep(1000); 301 302 Put put2 = new Put(row2); 303 byte[] value2 = Bytes.toBytes("1000dfsdf"); 304 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 305 table.put(put2); 306 admin.flush(tableName); 307 Thread.sleep(1000); 308 309 Scan s = new Scan().withStartRow(row); 310 ResultScanner scanner = table.getScanner(s); 311 try { 312 Result[] next = scanner.next(3); 313 for (Result result : next) { 314 ExtendedCellScanner cellScanner = result.cellScanner(); 315 cellScanner.advance(); 316 ExtendedCell current = cellScanner.current(); 317 assertEquals(0, current.getTagsLength()); 318 } 319 } finally { 320 if (scanner != null) scanner.close(); 321 } 322 admin.compact(tableName); 323 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 324 Thread.sleep(10); 325 } 326 s = new Scan().withStartRow(row); 327 scanner = table.getScanner(s); 328 try { 329 Result[] next = scanner.next(3); 330 for (Result result : next) { 331 ExtendedCellScanner cellScanner = result.cellScanner(); 332 cellScanner.advance(); 333 ExtendedCell current = cellScanner.current(); 334 assertEquals(0, current.getTagsLength()); 335 } 336 } finally { 337 if (scanner != null) { 338 scanner.close(); 339 } 340 } 341 } finally { 342 if (table != null) { 343 table.close(); 344 } 345 } 346 } 347 348 @Test 349 public void testFlushAndCompactionwithCombinations() throws Exception { 350 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 351 byte[] fam = Bytes.toBytes("info"); 352 byte[] row = Bytes.toBytes("rowa"); 353 // column names 354 byte[] qual = Bytes.toBytes("qual"); 355 356 byte[] row1 = Bytes.toBytes("rowb"); 357 358 byte[] row2 = Bytes.toBytes("rowc"); 359 byte[] rowd = Bytes.toBytes("rowd"); 360 byte[] rowe = Bytes.toBytes("rowe"); 361 Table table = null; 362 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 363 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 364 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true) 365 .setDataBlockEncoding(encoding).build()) 366 .build(); 367 Admin admin = TEST_UTIL.getAdmin(); 368 admin.createTable(tableDescriptor); 369 try { 370 table = TEST_UTIL.getConnection().getTable(tableName); 371 Put put = new Put(row); 372 byte[] value = Bytes.toBytes("value"); 373 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 374 int bigTagLen = Short.MAX_VALUE - 5; 375 put.setAttribute("visibility", new byte[bigTagLen]); 376 table.put(put); 377 Put put1 = new Put(row1); 378 byte[] value1 = Bytes.toBytes("1000dfsdf"); 379 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 380 table.put(put1); 381 admin.flush(tableName); 382 // We are lacking an API for confirming flush request compaction. 383 // Just sleep for a short time. We won't be able to confirm flush 384 // completion but the test won't hang now or in the future if 385 // default compaction policy causes compaction between flush and 386 // when we go to confirm it. 387 Thread.sleep(1000); 388 389 put1 = new Put(row2); 390 value1 = Bytes.toBytes("1000dfsdf"); 391 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 392 table.put(put1); 393 admin.flush(tableName); 394 Thread.sleep(1000); 395 396 Put put2 = new Put(rowd); 397 byte[] value2 = Bytes.toBytes("1000dfsdf"); 398 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 399 table.put(put2); 400 put2 = new Put(rowe); 401 value2 = Bytes.toBytes("1000dfsddfdf"); 402 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 403 put.setAttribute("visibility", Bytes.toBytes("ram")); 404 table.put(put2); 405 admin.flush(tableName); 406 Thread.sleep(1000); 407 408 TestCoprocessorForTags.checkTagPresence = true; 409 Scan s = new Scan().withStartRow(row); 410 s.setCaching(1); 411 ResultScanner scanner = table.getScanner(s); 412 try { 413 Result next = null; 414 while ((next = scanner.next()) != null) { 415 CellScanner cellScanner = next.cellScanner(); 416 cellScanner.advance(); 417 Cell current = cellScanner.current(); 418 if (CellUtil.matchingRows(current, row)) { 419 assertEquals(1, TestCoprocessorForTags.tags.size()); 420 Tag tag = TestCoprocessorForTags.tags.get(0); 421 assertEquals(bigTagLen, tag.getValueLength()); 422 } else { 423 assertEquals(0, TestCoprocessorForTags.tags.size()); 424 } 425 } 426 } finally { 427 if (scanner != null) { 428 scanner.close(); 429 } 430 TestCoprocessorForTags.checkTagPresence = false; 431 } 432 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 433 Thread.sleep(10); 434 } 435 TestCoprocessorForTags.checkTagPresence = true; 436 scanner = table.getScanner(s); 437 try { 438 Result next = null; 439 while ((next = scanner.next()) != null) { 440 CellScanner cellScanner = next.cellScanner(); 441 cellScanner.advance(); 442 Cell current = cellScanner.current(); 443 if (CellUtil.matchingRows(current, row)) { 444 assertEquals(1, TestCoprocessorForTags.tags.size()); 445 Tag tag = TestCoprocessorForTags.tags.get(0); 446 assertEquals(bigTagLen, tag.getValueLength()); 447 } else { 448 assertEquals(0, TestCoprocessorForTags.tags.size()); 449 } 450 } 451 } finally { 452 if (scanner != null) { 453 scanner.close(); 454 } 455 TestCoprocessorForTags.checkTagPresence = false; 456 } 457 } finally { 458 if (table != null) { 459 table.close(); 460 } 461 // delete the table 462 admin.disableTable(tableName); 463 admin.deleteTable(tableName); 464 } 465 } 466 } 467 468 @Test 469 public void testTagsWithAppendAndIncrement() throws Exception { 470 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 471 byte[] f = Bytes.toBytes("f"); 472 byte[] q = Bytes.toBytes("q"); 473 byte[] row1 = Bytes.toBytes("r1"); 474 byte[] row2 = Bytes.toBytes("r2"); 475 476 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 477 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 478 TEST_UTIL.getAdmin().createTable(tableDescriptor); 479 480 Table table = null; 481 try { 482 table = TEST_UTIL.getConnection().getTable(tableName); 483 Put put = new Put(row1); 484 byte[] v = Bytes.toBytes(2L); 485 put.addColumn(f, q, v); 486 put.setAttribute("visibility", Bytes.toBytes("tag1")); 487 table.put(put); 488 Increment increment = new Increment(row1); 489 increment.addColumn(f, q, 1L); 490 table.increment(increment); 491 TestCoprocessorForTags.checkTagPresence = true; 492 ResultScanner scanner = table.getScanner(new Scan()); 493 Result result = scanner.next(); 494 KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 495 List<Tag> tags = TestCoprocessorForTags.tags; 496 assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 497 assertEquals(1, tags.size()); 498 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 499 TestCoprocessorForTags.checkTagPresence = false; 500 TestCoprocessorForTags.tags = null; 501 502 increment = new Increment(row1); 503 increment.add(new KeyValue(row1, f, q, 1234L, v)); 504 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 505 table.increment(increment); 506 TestCoprocessorForTags.checkTagPresence = true; 507 scanner = table.getScanner(new Scan()); 508 result = scanner.next(); 509 kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 510 tags = TestCoprocessorForTags.tags; 511 assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 512 assertEquals(2, tags.size()); 513 // We cannot assume the ordering of tags 514 List<String> tagValues = new ArrayList<>(); 515 for (Tag tag : tags) { 516 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 517 } 518 assertTrue(tagValues.contains("tag1")); 519 assertTrue(tagValues.contains("tag2")); 520 TestCoprocessorForTags.checkTagPresence = false; 521 TestCoprocessorForTags.tags = null; 522 523 put = new Put(row2); 524 v = Bytes.toBytes(2L); 525 put.addColumn(f, q, v); 526 table.put(put); 527 increment = new Increment(row2); 528 increment.add(new KeyValue(row2, f, q, 1234L, v)); 529 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 530 table.increment(increment); 531 TestCoprocessorForTags.checkTagPresence = true; 532 scanner = table.getScanner(new Scan().withStartRow(row2)); 533 result = scanner.next(); 534 kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 535 tags = TestCoprocessorForTags.tags; 536 assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 537 assertEquals(1, tags.size()); 538 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 539 TestCoprocessorForTags.checkTagPresence = false; 540 TestCoprocessorForTags.tags = null; 541 542 // Test Append 543 byte[] row3 = Bytes.toBytes("r3"); 544 put = new Put(row3); 545 put.addColumn(f, q, Bytes.toBytes("a")); 546 put.setAttribute("visibility", Bytes.toBytes("tag1")); 547 table.put(put); 548 Append append = new Append(row3); 549 append.addColumn(f, q, Bytes.toBytes("b")); 550 table.append(append); 551 TestCoprocessorForTags.checkTagPresence = true; 552 scanner = table.getScanner(new Scan().withStartRow(row3)); 553 result = scanner.next(); 554 kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 555 tags = TestCoprocessorForTags.tags; 556 assertEquals(1, tags.size()); 557 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 558 TestCoprocessorForTags.checkTagPresence = false; 559 TestCoprocessorForTags.tags = null; 560 561 append = new Append(row3); 562 append.add(new KeyValue(row3, f, q, 1234L, v)); 563 append.setAttribute("visibility", Bytes.toBytes("tag2")); 564 table.append(append); 565 TestCoprocessorForTags.checkTagPresence = true; 566 scanner = table.getScanner(new Scan().withStartRow(row3)); 567 result = scanner.next(); 568 kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 569 tags = TestCoprocessorForTags.tags; 570 assertEquals(2, tags.size()); 571 // We cannot assume the ordering of tags 572 tagValues.clear(); 573 for (Tag tag : tags) { 574 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 575 } 576 assertTrue(tagValues.contains("tag1")); 577 assertTrue(tagValues.contains("tag2")); 578 TestCoprocessorForTags.checkTagPresence = false; 579 TestCoprocessorForTags.tags = null; 580 581 byte[] row4 = Bytes.toBytes("r4"); 582 put = new Put(row4); 583 put.addColumn(f, q, Bytes.toBytes("a")); 584 table.put(put); 585 append = new Append(row4); 586 append.add(new KeyValue(row4, f, q, 1234L, v)); 587 append.setAttribute("visibility", Bytes.toBytes("tag2")); 588 table.append(append); 589 TestCoprocessorForTags.checkTagPresence = true; 590 scanner = table.getScanner(new Scan().withStartRow(row4)); 591 result = scanner.next(); 592 kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q)); 593 tags = TestCoprocessorForTags.tags; 594 assertEquals(1, tags.size()); 595 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 596 } finally { 597 TestCoprocessorForTags.checkTagPresence = false; 598 TestCoprocessorForTags.tags = null; 599 if (table != null) { 600 table.close(); 601 } 602 } 603 } 604 605 private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value, 606 byte[] value2, byte[] row1, byte[] value1) throws IOException { 607 Scan s = new Scan().withStartRow(row); 608 // If filters are used this attribute can be specifically check for in 609 // filterKV method and 610 // kvs can be filtered out if the tags of interest is not found in that kv 611 s.setAttribute("visibility", Bytes.toBytes("myTag")); 612 ResultScanner scanner = null; 613 try { 614 scanner = table.getScanner(s); 615 Result next = scanner.next(); 616 617 assertTrue(Bytes.equals(next.getRow(), row)); 618 assertTrue(Bytes.equals(next.getValue(fam, qual), value)); 619 620 Result next2 = scanner.next(); 621 assertTrue(next2 != null); 622 assertTrue(Bytes.equals(next2.getRow(), row1)); 623 assertTrue(Bytes.equals(next2.getValue(fam, qual), value1)); 624 625 next2 = scanner.next(); 626 assertTrue(next2 != null); 627 assertTrue(Bytes.equals(next2.getRow(), row2)); 628 assertTrue(Bytes.equals(next2.getValue(fam, qual), value2)); 629 630 } finally { 631 if (scanner != null) scanner.close(); 632 } 633 } 634 635 public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver { 636 637 public static volatile boolean checkTagPresence = false; 638 public static List<Tag> tags = null; 639 640 @Override 641 public Optional<RegionObserver> getRegionObserver() { 642 return Optional.of(this); 643 } 644 645 @Override 646 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 647 final WALEdit edit, final Durability durability) throws IOException { 648 updateMutationAddingTags(put); 649 } 650 651 private void updateMutationAddingTags(final Mutation m) { 652 byte[] attribute = m.getAttribute("visibility"); 653 byte[] cf = null; 654 List<Cell> updatedCells = new ArrayList<>(); 655 if (attribute != null) { 656 for (List<? extends Cell> edits : m.getFamilyCellMap().values()) { 657 for (Cell cell : edits) { 658 KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell); 659 if (cf == null) { 660 cf = CellUtil.cloneFamily(kv); 661 } 662 Tag tag = new ArrayBackedTag((byte) 1, attribute); 663 List<Tag> tagList = new ArrayList<>(); 664 tagList.add(tag); 665 666 KeyValue newKV = 667 new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0, 668 kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(), 669 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), 670 CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList); 671 ((List<Cell>) updatedCells).add(newKV); 672 } 673 } 674 m.getFamilyCellMap().remove(cf); 675 // Update the family map 676 m.getFamilyCellMap().put(cf, updatedCells); 677 } 678 } 679 680 @Override 681 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment) 682 throws IOException { 683 updateMutationAddingTags(increment); 684 return null; 685 } 686 687 @Override 688 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append) 689 throws IOException { 690 updateMutationAddingTags(append); 691 return null; 692 } 693 694 @Override 695 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 696 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 697 if (checkTagPresence) { 698 if (results.size() > 0) { 699 // Check tag presence in the 1st cell in 1st Result 700 Result result = results.get(0); 701 ExtendedCellScanner cellScanner = result.cellScanner(); 702 if (cellScanner.advance()) { 703 ExtendedCell cell = cellScanner.current(); 704 tags = PrivateCellUtil.getTags(cell); 705 } 706 } 707 } 708 return hasMore; 709 } 710 } 711}