001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ArrayBackedTag; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HColumnDescriptor; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Tag; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Append; 044import org.apache.hadoop.hbase.client.CompactionState; 045import org.apache.hadoop.hbase.client.Durability; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Mutation; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.Result; 050import org.apache.hadoop.hbase.client.ResultScanner; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 054import org.apache.hadoop.hbase.coprocessor.ObserverContext; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 057import org.apache.hadoop.hbase.coprocessor.RegionObserver; 058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.RegionServerTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071 072/** 073 * Class that test tags 074 */ 075@Category({ RegionServerTests.class, MediumTests.class }) 076public class TestTags { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class); 080 081 static boolean useFilter = false; 082 083 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 085 @Rule 086 public final TestName TEST_NAME = new TestName(); 087 088 @BeforeClass 089 public static void setUpBeforeClass() throws Exception { 090 Configuration conf = TEST_UTIL.getConfiguration(); 091 conf.setInt("hfile.format.version", 3); 092 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 093 TestCoprocessorForTags.class.getName()); 094 TEST_UTIL.startMiniCluster(2); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 TEST_UTIL.shutdownMiniCluster(); 100 } 101 102 @After 103 public void tearDown() { 104 useFilter = false; 105 } 106 107 @Test 108 public void testTags() throws Exception { 109 Table table = null; 110 try { 111 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 112 byte[] fam = Bytes.toBytes("info"); 113 byte[] row = Bytes.toBytes("rowa"); 114 // column names 115 byte[] qual = Bytes.toBytes("qual"); 116 117 byte[] row1 = Bytes.toBytes("rowb"); 118 119 byte[] row2 = Bytes.toBytes("rowc"); 120 121 HTableDescriptor desc = new HTableDescriptor(tableName); 122 HColumnDescriptor colDesc = new HColumnDescriptor(fam); 123 colDesc.setBlockCacheEnabled(true); 124 colDesc.setDataBlockEncoding(DataBlockEncoding.NONE); 125 desc.addFamily(colDesc); 126 Admin admin = TEST_UTIL.getAdmin(); 127 admin.createTable(desc); 128 byte[] value = Bytes.toBytes("value"); 129 table = TEST_UTIL.getConnection().getTable(tableName); 130 Put put = new Put(row); 131 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 132 put.setAttribute("visibility", Bytes.toBytes("myTag")); 133 table.put(put); 134 admin.flush(tableName); 135 // We are lacking an API for confirming flush request compaction. 136 // Just sleep for a short time. We won't be able to confirm flush 137 // completion but the test won't hang now or in the future if 138 // default compaction policy causes compaction between flush and 139 // when we go to confirm it. 140 Thread.sleep(1000); 141 142 Put put1 = new Put(row1); 143 byte[] value1 = Bytes.toBytes("1000dfsdf"); 144 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 145 // put1.setAttribute("visibility", Bytes.toBytes("myTag3")); 146 table.put(put1); 147 admin.flush(tableName); 148 Thread.sleep(1000); 149 150 Put put2 = new Put(row2); 151 byte[] value2 = Bytes.toBytes("1000dfsdf"); 152 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 153 put2.setAttribute("visibility", Bytes.toBytes("myTag3")); 154 table.put(put2); 155 admin.flush(tableName); 156 Thread.sleep(1000); 157 158 result(fam, row, qual, row2, table, value, value2, row1, value1); 159 160 admin.compact(tableName); 161 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 162 Thread.sleep(10); 163 } 164 result(fam, row, qual, row2, table, value, value2, row1, value1); 165 } finally { 166 if (table != null) { 167 table.close(); 168 } 169 } 170 } 171 172 @Test 173 public void testFlushAndCompactionWithoutTags() throws Exception { 174 Table table = null; 175 try { 176 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 177 byte[] fam = Bytes.toBytes("info"); 178 byte[] row = Bytes.toBytes("rowa"); 179 // column names 180 byte[] qual = Bytes.toBytes("qual"); 181 182 byte[] row1 = Bytes.toBytes("rowb"); 183 184 byte[] row2 = Bytes.toBytes("rowc"); 185 186 HTableDescriptor desc = new HTableDescriptor(tableName); 187 HColumnDescriptor colDesc = new HColumnDescriptor(fam); 188 colDesc.setBlockCacheEnabled(true); 189 // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE); 190 // colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE); 191 desc.addFamily(colDesc); 192 Admin admin = TEST_UTIL.getAdmin(); 193 admin.createTable(desc); 194 195 table = TEST_UTIL.getConnection().getTable(tableName); 196 Put put = new Put(row); 197 byte[] value = Bytes.toBytes("value"); 198 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 199 table.put(put); 200 admin.flush(tableName); 201 // We are lacking an API for confirming flush request compaction. 202 // Just sleep for a short time. We won't be able to confirm flush 203 // completion but the test won't hang now or in the future if 204 // default compaction policy causes compaction between flush and 205 // when we go to confirm it. 206 Thread.sleep(1000); 207 208 Put put1 = new Put(row1); 209 byte[] value1 = Bytes.toBytes("1000dfsdf"); 210 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 211 table.put(put1); 212 admin.flush(tableName); 213 Thread.sleep(1000); 214 215 Put put2 = new Put(row2); 216 byte[] value2 = Bytes.toBytes("1000dfsdf"); 217 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 218 table.put(put2); 219 admin.flush(tableName); 220 Thread.sleep(1000); 221 222 Scan s = new Scan(row); 223 ResultScanner scanner = table.getScanner(s); 224 try { 225 Result[] next = scanner.next(3); 226 for (Result result : next) { 227 CellScanner cellScanner = result.cellScanner(); 228 cellScanner.advance(); 229 Cell current = cellScanner.current(); 230 assertEquals(0, current.getTagsLength()); 231 } 232 } finally { 233 if (scanner != null) scanner.close(); 234 } 235 admin.compact(tableName); 236 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 237 Thread.sleep(10); 238 } 239 s = new Scan(row); 240 scanner = table.getScanner(s); 241 try { 242 Result[] next = scanner.next(3); 243 for (Result result : next) { 244 CellScanner cellScanner = result.cellScanner(); 245 cellScanner.advance(); 246 Cell current = cellScanner.current(); 247 assertEquals(0, current.getTagsLength()); 248 } 249 } finally { 250 if (scanner != null) { 251 scanner.close(); 252 } 253 } 254 } finally { 255 if (table != null) { 256 table.close(); 257 } 258 } 259 } 260 261 @Test 262 public void testFlushAndCompactionwithCombinations() throws Exception { 263 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 264 byte[] fam = Bytes.toBytes("info"); 265 byte[] row = Bytes.toBytes("rowa"); 266 // column names 267 byte[] qual = Bytes.toBytes("qual"); 268 269 byte[] row1 = Bytes.toBytes("rowb"); 270 271 byte[] row2 = Bytes.toBytes("rowc"); 272 byte[] rowd = Bytes.toBytes("rowd"); 273 byte[] rowe = Bytes.toBytes("rowe"); 274 Table table = null; 275 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 276 HTableDescriptor desc = new HTableDescriptor(tableName); 277 HColumnDescriptor colDesc = new HColumnDescriptor(fam); 278 colDesc.setBlockCacheEnabled(true); 279 colDesc.setDataBlockEncoding(encoding); 280 desc.addFamily(colDesc); 281 Admin admin = TEST_UTIL.getAdmin(); 282 admin.createTable(desc); 283 try { 284 table = TEST_UTIL.getConnection().getTable(tableName); 285 Put put = new Put(row); 286 byte[] value = Bytes.toBytes("value"); 287 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value); 288 int bigTagLen = Short.MAX_VALUE - 5; 289 put.setAttribute("visibility", new byte[bigTagLen]); 290 table.put(put); 291 Put put1 = new Put(row1); 292 byte[] value1 = Bytes.toBytes("1000dfsdf"); 293 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 294 table.put(put1); 295 admin.flush(tableName); 296 // We are lacking an API for confirming flush request compaction. 297 // Just sleep for a short time. We won't be able to confirm flush 298 // completion but the test won't hang now or in the future if 299 // default compaction policy causes compaction between flush and 300 // when we go to confirm it. 301 Thread.sleep(1000); 302 303 put1 = new Put(row2); 304 value1 = Bytes.toBytes("1000dfsdf"); 305 put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1); 306 table.put(put1); 307 admin.flush(tableName); 308 Thread.sleep(1000); 309 310 Put put2 = new Put(rowd); 311 byte[] value2 = Bytes.toBytes("1000dfsdf"); 312 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 313 table.put(put2); 314 put2 = new Put(rowe); 315 value2 = Bytes.toBytes("1000dfsddfdf"); 316 put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2); 317 put.setAttribute("visibility", Bytes.toBytes("ram")); 318 table.put(put2); 319 admin.flush(tableName); 320 Thread.sleep(1000); 321 322 TestCoprocessorForTags.checkTagPresence = true; 323 Scan s = new Scan(row); 324 s.setCaching(1); 325 ResultScanner scanner = table.getScanner(s); 326 try { 327 Result next = null; 328 while ((next = scanner.next()) != null) { 329 CellScanner cellScanner = next.cellScanner(); 330 cellScanner.advance(); 331 Cell current = cellScanner.current(); 332 if (CellUtil.matchingRows(current, row)) { 333 assertEquals(1, TestCoprocessorForTags.tags.size()); 334 Tag tag = TestCoprocessorForTags.tags.get(0); 335 assertEquals(bigTagLen, tag.getValueLength()); 336 } else { 337 assertEquals(0, TestCoprocessorForTags.tags.size()); 338 } 339 } 340 } finally { 341 if (scanner != null) { 342 scanner.close(); 343 } 344 TestCoprocessorForTags.checkTagPresence = false; 345 } 346 while (admin.getCompactionState(tableName) != CompactionState.NONE) { 347 Thread.sleep(10); 348 } 349 TestCoprocessorForTags.checkTagPresence = true; 350 scanner = table.getScanner(s); 351 try { 352 Result next = null; 353 while ((next = scanner.next()) != null) { 354 CellScanner cellScanner = next.cellScanner(); 355 cellScanner.advance(); 356 Cell current = cellScanner.current(); 357 if (CellUtil.matchingRows(current, row)) { 358 assertEquals(1, TestCoprocessorForTags.tags.size()); 359 Tag tag = TestCoprocessorForTags.tags.get(0); 360 assertEquals(bigTagLen, tag.getValueLength()); 361 } else { 362 assertEquals(0, TestCoprocessorForTags.tags.size()); 363 } 364 } 365 } finally { 366 if (scanner != null) { 367 scanner.close(); 368 } 369 TestCoprocessorForTags.checkTagPresence = false; 370 } 371 } finally { 372 if (table != null) { 373 table.close(); 374 } 375 // delete the table 376 admin.disableTable(tableName); 377 admin.deleteTable(tableName); 378 } 379 } 380 } 381 382 @Test 383 public void testTagsWithAppendAndIncrement() throws Exception { 384 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 385 byte[] f = Bytes.toBytes("f"); 386 byte[] q = Bytes.toBytes("q"); 387 byte[] row1 = Bytes.toBytes("r1"); 388 byte[] row2 = Bytes.toBytes("r2"); 389 390 HTableDescriptor desc = new HTableDescriptor(tableName); 391 HColumnDescriptor colDesc = new HColumnDescriptor(f); 392 desc.addFamily(colDesc); 393 TEST_UTIL.getAdmin().createTable(desc); 394 395 Table table = null; 396 try { 397 table = TEST_UTIL.getConnection().getTable(tableName); 398 Put put = new Put(row1); 399 byte[] v = Bytes.toBytes(2L); 400 put.addColumn(f, q, v); 401 put.setAttribute("visibility", Bytes.toBytes("tag1")); 402 table.put(put); 403 Increment increment = new Increment(row1); 404 increment.addColumn(f, q, 1L); 405 table.increment(increment); 406 TestCoprocessorForTags.checkTagPresence = true; 407 ResultScanner scanner = table.getScanner(new Scan()); 408 Result result = scanner.next(); 409 KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 410 List<Tag> tags = TestCoprocessorForTags.tags; 411 assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 412 assertEquals(1, tags.size()); 413 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 414 TestCoprocessorForTags.checkTagPresence = false; 415 TestCoprocessorForTags.tags = null; 416 417 increment = new Increment(row1); 418 increment.add(new KeyValue(row1, f, q, 1234L, v)); 419 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 420 table.increment(increment); 421 TestCoprocessorForTags.checkTagPresence = true; 422 scanner = table.getScanner(new Scan()); 423 result = scanner.next(); 424 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 425 tags = TestCoprocessorForTags.tags; 426 assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 427 assertEquals(2, tags.size()); 428 // We cannot assume the ordering of tags 429 List<String> tagValues = new ArrayList<>(); 430 for (Tag tag : tags) { 431 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 432 } 433 assertTrue(tagValues.contains("tag1")); 434 assertTrue(tagValues.contains("tag2")); 435 TestCoprocessorForTags.checkTagPresence = false; 436 TestCoprocessorForTags.tags = null; 437 438 put = new Put(row2); 439 v = Bytes.toBytes(2L); 440 put.addColumn(f, q, v); 441 table.put(put); 442 increment = new Increment(row2); 443 increment.add(new KeyValue(row2, f, q, 1234L, v)); 444 increment.setAttribute("visibility", Bytes.toBytes("tag2")); 445 table.increment(increment); 446 TestCoprocessorForTags.checkTagPresence = true; 447 scanner = table.getScanner(new Scan().setStartRow(row2)); 448 result = scanner.next(); 449 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 450 tags = TestCoprocessorForTags.tags; 451 assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); 452 assertEquals(1, tags.size()); 453 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 454 TestCoprocessorForTags.checkTagPresence = false; 455 TestCoprocessorForTags.tags = null; 456 457 // Test Append 458 byte[] row3 = Bytes.toBytes("r3"); 459 put = new Put(row3); 460 put.addColumn(f, q, Bytes.toBytes("a")); 461 put.setAttribute("visibility", Bytes.toBytes("tag1")); 462 table.put(put); 463 Append append = new Append(row3); 464 append.addColumn(f, q, Bytes.toBytes("b")); 465 table.append(append); 466 TestCoprocessorForTags.checkTagPresence = true; 467 scanner = table.getScanner(new Scan().setStartRow(row3)); 468 result = scanner.next(); 469 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 470 tags = TestCoprocessorForTags.tags; 471 assertEquals(1, tags.size()); 472 assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 473 TestCoprocessorForTags.checkTagPresence = false; 474 TestCoprocessorForTags.tags = null; 475 476 append = new Append(row3); 477 append.add(new KeyValue(row3, f, q, 1234L, v)); 478 append.setAttribute("visibility", Bytes.toBytes("tag2")); 479 table.append(append); 480 TestCoprocessorForTags.checkTagPresence = true; 481 scanner = table.getScanner(new Scan().setStartRow(row3)); 482 result = scanner.next(); 483 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 484 tags = TestCoprocessorForTags.tags; 485 assertEquals(2, tags.size()); 486 // We cannot assume the ordering of tags 487 tagValues.clear(); 488 for (Tag tag : tags) { 489 tagValues.add(Bytes.toString(Tag.cloneValue(tag))); 490 } 491 assertTrue(tagValues.contains("tag1")); 492 assertTrue(tagValues.contains("tag2")); 493 TestCoprocessorForTags.checkTagPresence = false; 494 TestCoprocessorForTags.tags = null; 495 496 byte[] row4 = Bytes.toBytes("r4"); 497 put = new Put(row4); 498 put.addColumn(f, q, Bytes.toBytes("a")); 499 table.put(put); 500 append = new Append(row4); 501 append.add(new KeyValue(row4, f, q, 1234L, v)); 502 append.setAttribute("visibility", Bytes.toBytes("tag2")); 503 table.append(append); 504 TestCoprocessorForTags.checkTagPresence = true; 505 scanner = table.getScanner(new Scan().setStartRow(row4)); 506 result = scanner.next(); 507 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); 508 tags = TestCoprocessorForTags.tags; 509 assertEquals(1, tags.size()); 510 assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0)))); 511 } finally { 512 TestCoprocessorForTags.checkTagPresence = false; 513 TestCoprocessorForTags.tags = null; 514 if (table != null) { 515 table.close(); 516 } 517 } 518 } 519 520 private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value, 521 byte[] value2, byte[] row1, byte[] value1) throws IOException { 522 Scan s = new Scan(row); 523 // If filters are used this attribute can be specifically check for in 524 // filterKV method and 525 // kvs can be filtered out if the tags of interest is not found in that kv 526 s.setAttribute("visibility", Bytes.toBytes("myTag")); 527 ResultScanner scanner = null; 528 try { 529 scanner = table.getScanner(s); 530 Result next = scanner.next(); 531 532 assertTrue(Bytes.equals(next.getRow(), row)); 533 assertTrue(Bytes.equals(next.getValue(fam, qual), value)); 534 535 Result next2 = scanner.next(); 536 assertTrue(next2 != null); 537 assertTrue(Bytes.equals(next2.getRow(), row1)); 538 assertTrue(Bytes.equals(next2.getValue(fam, qual), value1)); 539 540 next2 = scanner.next(); 541 assertTrue(next2 != null); 542 assertTrue(Bytes.equals(next2.getRow(), row2)); 543 assertTrue(Bytes.equals(next2.getValue(fam, qual), value2)); 544 545 } finally { 546 if (scanner != null) scanner.close(); 547 } 548 } 549 550 public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver { 551 552 public static volatile boolean checkTagPresence = false; 553 public static List<Tag> tags = null; 554 555 @Override 556 public Optional<RegionObserver> getRegionObserver() { 557 return Optional.of(this); 558 } 559 560 @Override 561 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 562 final WALEdit edit, final Durability durability) throws IOException { 563 updateMutationAddingTags(put); 564 } 565 566 private void updateMutationAddingTags(final Mutation m) { 567 byte[] attribute = m.getAttribute("visibility"); 568 byte[] cf = null; 569 List<Cell> updatedCells = new ArrayList<>(); 570 if (attribute != null) { 571 for (List<? extends Cell> edits : m.getFamilyCellMap().values()) { 572 for (Cell cell : edits) { 573 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 574 if (cf == null) { 575 cf = CellUtil.cloneFamily(kv); 576 } 577 Tag tag = new ArrayBackedTag((byte) 1, attribute); 578 List<Tag> tagList = new ArrayList<>(); 579 tagList.add(tag); 580 581 KeyValue newKV = 582 new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0, 583 kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(), 584 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), 585 CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList); 586 ((List<Cell>) updatedCells).add(newKV); 587 } 588 } 589 m.getFamilyCellMap().remove(cf); 590 // Update the family map 591 m.getFamilyCellMap().put(cf, updatedCells); 592 } 593 } 594 595 @Override 596 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment) 597 throws IOException { 598 updateMutationAddingTags(increment); 599 return null; 600 } 601 602 @Override 603 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append) 604 throws IOException { 605 updateMutationAddingTags(append); 606 return null; 607 } 608 609 @Override 610 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, 611 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { 612 if (checkTagPresence) { 613 if (results.size() > 0) { 614 // Check tag presence in the 1st cell in 1st Result 615 Result result = results.get(0); 616 CellScanner cellScanner = result.cellScanner(); 617 if (cellScanner.advance()) { 618 Cell cell = cellScanner.current(); 619 tags = PrivateCellUtil.getTags(cell); 620 } 621 } 622 } 623 return hasMore; 624 } 625 } 626}