001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.ExtendedCellScanner;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Tag;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.CompactionState;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Increment;
050import org.apache.hadoop.hbase.client.Mutation;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
064import org.apache.hadoop.hbase.testclassification.MediumTests;
065import org.apache.hadoop.hbase.testclassification.RegionServerTests;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.junit.After;
069import org.junit.AfterClass;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Class that test tags
081 */
082@Category({ RegionServerTests.class, MediumTests.class })
083public class TestTags {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestTags.class);
089
090  static boolean useFilter = false;
091
092  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
093
094  @Rule
095  public final TestName TEST_NAME = new TestName();
096
097  @BeforeClass
098  public static void setUpBeforeClass() throws Exception {
099    Configuration conf = TEST_UTIL.getConfiguration();
100    conf.setInt("hfile.format.version", 3);
101    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
102      TestCoprocessorForTags.class.getName());
103    TEST_UTIL.startMiniCluster(2);
104  }
105
106  @AfterClass
107  public static void tearDownAfterClass() throws Exception {
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  @After
112  public void tearDown() {
113    useFilter = false;
114  }
115
116  /**
117   * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an
118   * exception for PREFIX, DIFF, and FAST_DIFF prior to HBASE-27580
119   */
120  @Test
121  public void testReverseScanWithDBE() throws IOException {
122    byte[] family = Bytes.toBytes("0");
123
124    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
125    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
126
127    try (Connection connection = ConnectionFactory.createConnection(conf)) {
128      for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
129        testReverseScanWithDBE(connection, encoding, family, HConstants.DEFAULT_BLOCKSIZE, 10);
130      }
131    }
132  }
133
134  /**
135   * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an
136   * exception for PREFIX, DIFF, and FAST_DIFF
137   */
138  @Test
139  public void testReverseScanWithDBEWhenCurrentBlockUpdates() throws IOException {
140    byte[] family = Bytes.toBytes("0");
141
142    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
143    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
144
145    try (Connection connection = ConnectionFactory.createConnection(conf)) {
146      for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
147        testReverseScanWithDBE(connection, encoding, family, 1024, 30000);
148      }
149    }
150  }
151
152  private void testReverseScanWithDBE(Connection conn, DataBlockEncoding encoding, byte[] family,
153    int blockSize, int maxRows) throws IOException {
154    LOG.info("Running test with DBE={}", encoding);
155    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName() + "-" + encoding);
156    TEST_UTIL.createTable(
157      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
158        .newBuilder(family).setDataBlockEncoding(encoding).setBlocksize(blockSize).build()).build(),
159      null);
160
161    Table table = conn.getTable(tableName);
162
163    byte[] val1 = new byte[10];
164    byte[] val2 = new byte[10];
165    Bytes.random(val1);
166    Bytes.random(val2);
167
168    for (int i = 0; i < maxRows; i++) {
169      table.put(new Put(Bytes.toBytes(i)).addColumn(family, Bytes.toBytes(1), val1)
170        .addColumn(family, Bytes.toBytes(2), val2).setTTL(600_000));
171    }
172
173    TEST_UTIL.flush(table.getName());
174
175    Scan scan = new Scan();
176    scan.setReversed(true);
177
178    try (ResultScanner scanner = table.getScanner(scan)) {
179      for (int i = maxRows - 1; i >= 0; i--) {
180        Result row = scanner.next();
181        assertEquals(2, row.size());
182
183        Cell cell1 = row.getColumnLatestCell(family, Bytes.toBytes(1));
184        assertTrue(CellUtil.matchingRows(cell1, Bytes.toBytes(i)));
185        assertTrue(CellUtil.matchingValue(cell1, val1));
186
187        Cell cell2 = row.getColumnLatestCell(family, Bytes.toBytes(2));
188        assertTrue(CellUtil.matchingRows(cell2, Bytes.toBytes(i)));
189        assertTrue(CellUtil.matchingValue(cell2, val2));
190      }
191    }
192
193  }
194
195  @Test
196  public void testTags() throws Exception {
197    Table table = null;
198    try {
199      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
200      byte[] fam = Bytes.toBytes("info");
201      byte[] row = Bytes.toBytes("rowa");
202      // column names
203      byte[] qual = Bytes.toBytes("qual");
204
205      byte[] row1 = Bytes.toBytes("rowb");
206
207      byte[] row2 = Bytes.toBytes("rowc");
208
209      TableDescriptor tableDescriptor =
210        TableDescriptorBuilder
211          .newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam)
212            .setBlockCacheEnabled(true).setDataBlockEncoding(DataBlockEncoding.NONE).build())
213          .build();
214      Admin admin = TEST_UTIL.getAdmin();
215      admin.createTable(tableDescriptor);
216      byte[] value = Bytes.toBytes("value");
217      table = TEST_UTIL.getConnection().getTable(tableName);
218      Put put = new Put(row);
219      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
220      put.setAttribute("visibility", Bytes.toBytes("myTag"));
221      table.put(put);
222      admin.flush(tableName);
223      // We are lacking an API for confirming flush request compaction.
224      // Just sleep for a short time. We won't be able to confirm flush
225      // completion but the test won't hang now or in the future if
226      // default compaction policy causes compaction between flush and
227      // when we go to confirm it.
228      Thread.sleep(1000);
229
230      Put put1 = new Put(row1);
231      byte[] value1 = Bytes.toBytes("1000dfsdf");
232      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
233      // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
234      table.put(put1);
235      admin.flush(tableName);
236      Thread.sleep(1000);
237
238      Put put2 = new Put(row2);
239      byte[] value2 = Bytes.toBytes("1000dfsdf");
240      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
241      put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
242      table.put(put2);
243      admin.flush(tableName);
244      Thread.sleep(1000);
245
246      result(fam, row, qual, row2, table, value, value2, row1, value1);
247
248      admin.compact(tableName);
249      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
250        Thread.sleep(10);
251      }
252      result(fam, row, qual, row2, table, value, value2, row1, value1);
253    } finally {
254      if (table != null) {
255        table.close();
256      }
257    }
258  }
259
260  @Test
261  public void testFlushAndCompactionWithoutTags() throws Exception {
262    Table table = null;
263    try {
264      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
265      byte[] fam = Bytes.toBytes("info");
266      byte[] row = Bytes.toBytes("rowa");
267      // column names
268      byte[] qual = Bytes.toBytes("qual");
269
270      byte[] row1 = Bytes.toBytes("rowb");
271
272      byte[] row2 = Bytes.toBytes("rowc");
273
274      TableDescriptor tableDescriptor =
275        TableDescriptorBuilder.newBuilder(tableName)
276          .setColumnFamily(
277            ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true).build())
278          .build();
279      Admin admin = TEST_UTIL.getAdmin();
280      admin.createTable(tableDescriptor);
281
282      table = TEST_UTIL.getConnection().getTable(tableName);
283      Put put = new Put(row);
284      byte[] value = Bytes.toBytes("value");
285      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
286      table.put(put);
287      admin.flush(tableName);
288      // We are lacking an API for confirming flush request compaction.
289      // Just sleep for a short time. We won't be able to confirm flush
290      // completion but the test won't hang now or in the future if
291      // default compaction policy causes compaction between flush and
292      // when we go to confirm it.
293      Thread.sleep(1000);
294
295      Put put1 = new Put(row1);
296      byte[] value1 = Bytes.toBytes("1000dfsdf");
297      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
298      table.put(put1);
299      admin.flush(tableName);
300      Thread.sleep(1000);
301
302      Put put2 = new Put(row2);
303      byte[] value2 = Bytes.toBytes("1000dfsdf");
304      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
305      table.put(put2);
306      admin.flush(tableName);
307      Thread.sleep(1000);
308
309      Scan s = new Scan().withStartRow(row);
310      ResultScanner scanner = table.getScanner(s);
311      try {
312        Result[] next = scanner.next(3);
313        for (Result result : next) {
314          ExtendedCellScanner cellScanner = result.cellScanner();
315          cellScanner.advance();
316          ExtendedCell current = cellScanner.current();
317          assertEquals(0, current.getTagsLength());
318        }
319      } finally {
320        if (scanner != null) scanner.close();
321      }
322      admin.compact(tableName);
323      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
324        Thread.sleep(10);
325      }
326      s = new Scan().withStartRow(row);
327      scanner = table.getScanner(s);
328      try {
329        Result[] next = scanner.next(3);
330        for (Result result : next) {
331          ExtendedCellScanner cellScanner = result.cellScanner();
332          cellScanner.advance();
333          ExtendedCell current = cellScanner.current();
334          assertEquals(0, current.getTagsLength());
335        }
336      } finally {
337        if (scanner != null) {
338          scanner.close();
339        }
340      }
341    } finally {
342      if (table != null) {
343        table.close();
344      }
345    }
346  }
347
348  @Test
349  public void testFlushAndCompactionwithCombinations() throws Exception {
350    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
351    byte[] fam = Bytes.toBytes("info");
352    byte[] row = Bytes.toBytes("rowa");
353    // column names
354    byte[] qual = Bytes.toBytes("qual");
355
356    byte[] row1 = Bytes.toBytes("rowb");
357
358    byte[] row2 = Bytes.toBytes("rowc");
359    byte[] rowd = Bytes.toBytes("rowd");
360    byte[] rowe = Bytes.toBytes("rowe");
361    Table table = null;
362    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
363      TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
364        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true)
365          .setDataBlockEncoding(encoding).build())
366        .build();
367      Admin admin = TEST_UTIL.getAdmin();
368      admin.createTable(tableDescriptor);
369      try {
370        table = TEST_UTIL.getConnection().getTable(tableName);
371        Put put = new Put(row);
372        byte[] value = Bytes.toBytes("value");
373        put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
374        int bigTagLen = Short.MAX_VALUE - 5;
375        put.setAttribute("visibility", new byte[bigTagLen]);
376        table.put(put);
377        Put put1 = new Put(row1);
378        byte[] value1 = Bytes.toBytes("1000dfsdf");
379        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
380        table.put(put1);
381        admin.flush(tableName);
382        // We are lacking an API for confirming flush request compaction.
383        // Just sleep for a short time. We won't be able to confirm flush
384        // completion but the test won't hang now or in the future if
385        // default compaction policy causes compaction between flush and
386        // when we go to confirm it.
387        Thread.sleep(1000);
388
389        put1 = new Put(row2);
390        value1 = Bytes.toBytes("1000dfsdf");
391        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
392        table.put(put1);
393        admin.flush(tableName);
394        Thread.sleep(1000);
395
396        Put put2 = new Put(rowd);
397        byte[] value2 = Bytes.toBytes("1000dfsdf");
398        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
399        table.put(put2);
400        put2 = new Put(rowe);
401        value2 = Bytes.toBytes("1000dfsddfdf");
402        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
403        put.setAttribute("visibility", Bytes.toBytes("ram"));
404        table.put(put2);
405        admin.flush(tableName);
406        Thread.sleep(1000);
407
408        TestCoprocessorForTags.checkTagPresence = true;
409        Scan s = new Scan().withStartRow(row);
410        s.setCaching(1);
411        ResultScanner scanner = table.getScanner(s);
412        try {
413          Result next = null;
414          while ((next = scanner.next()) != null) {
415            CellScanner cellScanner = next.cellScanner();
416            cellScanner.advance();
417            Cell current = cellScanner.current();
418            if (CellUtil.matchingRows(current, row)) {
419              assertEquals(1, TestCoprocessorForTags.tags.size());
420              Tag tag = TestCoprocessorForTags.tags.get(0);
421              assertEquals(bigTagLen, tag.getValueLength());
422            } else {
423              assertEquals(0, TestCoprocessorForTags.tags.size());
424            }
425          }
426        } finally {
427          if (scanner != null) {
428            scanner.close();
429          }
430          TestCoprocessorForTags.checkTagPresence = false;
431        }
432        while (admin.getCompactionState(tableName) != CompactionState.NONE) {
433          Thread.sleep(10);
434        }
435        TestCoprocessorForTags.checkTagPresence = true;
436        scanner = table.getScanner(s);
437        try {
438          Result next = null;
439          while ((next = scanner.next()) != null) {
440            CellScanner cellScanner = next.cellScanner();
441            cellScanner.advance();
442            Cell current = cellScanner.current();
443            if (CellUtil.matchingRows(current, row)) {
444              assertEquals(1, TestCoprocessorForTags.tags.size());
445              Tag tag = TestCoprocessorForTags.tags.get(0);
446              assertEquals(bigTagLen, tag.getValueLength());
447            } else {
448              assertEquals(0, TestCoprocessorForTags.tags.size());
449            }
450          }
451        } finally {
452          if (scanner != null) {
453            scanner.close();
454          }
455          TestCoprocessorForTags.checkTagPresence = false;
456        }
457      } finally {
458        if (table != null) {
459          table.close();
460        }
461        // delete the table
462        admin.disableTable(tableName);
463        admin.deleteTable(tableName);
464      }
465    }
466  }
467
468  @Test
469  public void testTagsWithAppendAndIncrement() throws Exception {
470    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
471    byte[] f = Bytes.toBytes("f");
472    byte[] q = Bytes.toBytes("q");
473    byte[] row1 = Bytes.toBytes("r1");
474    byte[] row2 = Bytes.toBytes("r2");
475
476    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
477      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
478    TEST_UTIL.getAdmin().createTable(tableDescriptor);
479
480    Table table = null;
481    try {
482      table = TEST_UTIL.getConnection().getTable(tableName);
483      Put put = new Put(row1);
484      byte[] v = Bytes.toBytes(2L);
485      put.addColumn(f, q, v);
486      put.setAttribute("visibility", Bytes.toBytes("tag1"));
487      table.put(put);
488      Increment increment = new Increment(row1);
489      increment.addColumn(f, q, 1L);
490      table.increment(increment);
491      TestCoprocessorForTags.checkTagPresence = true;
492      ResultScanner scanner = table.getScanner(new Scan());
493      Result result = scanner.next();
494      KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
495      List<Tag> tags = TestCoprocessorForTags.tags;
496      assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
497      assertEquals(1, tags.size());
498      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
499      TestCoprocessorForTags.checkTagPresence = false;
500      TestCoprocessorForTags.tags = null;
501
502      increment = new Increment(row1);
503      increment.add(new KeyValue(row1, f, q, 1234L, v));
504      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
505      table.increment(increment);
506      TestCoprocessorForTags.checkTagPresence = true;
507      scanner = table.getScanner(new Scan());
508      result = scanner.next();
509      kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
510      tags = TestCoprocessorForTags.tags;
511      assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
512      assertEquals(2, tags.size());
513      // We cannot assume the ordering of tags
514      List<String> tagValues = new ArrayList<>();
515      for (Tag tag : tags) {
516        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
517      }
518      assertTrue(tagValues.contains("tag1"));
519      assertTrue(tagValues.contains("tag2"));
520      TestCoprocessorForTags.checkTagPresence = false;
521      TestCoprocessorForTags.tags = null;
522
523      put = new Put(row2);
524      v = Bytes.toBytes(2L);
525      put.addColumn(f, q, v);
526      table.put(put);
527      increment = new Increment(row2);
528      increment.add(new KeyValue(row2, f, q, 1234L, v));
529      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
530      table.increment(increment);
531      TestCoprocessorForTags.checkTagPresence = true;
532      scanner = table.getScanner(new Scan().withStartRow(row2));
533      result = scanner.next();
534      kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
535      tags = TestCoprocessorForTags.tags;
536      assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
537      assertEquals(1, tags.size());
538      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
539      TestCoprocessorForTags.checkTagPresence = false;
540      TestCoprocessorForTags.tags = null;
541
542      // Test Append
543      byte[] row3 = Bytes.toBytes("r3");
544      put = new Put(row3);
545      put.addColumn(f, q, Bytes.toBytes("a"));
546      put.setAttribute("visibility", Bytes.toBytes("tag1"));
547      table.put(put);
548      Append append = new Append(row3);
549      append.addColumn(f, q, Bytes.toBytes("b"));
550      table.append(append);
551      TestCoprocessorForTags.checkTagPresence = true;
552      scanner = table.getScanner(new Scan().withStartRow(row3));
553      result = scanner.next();
554      kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
555      tags = TestCoprocessorForTags.tags;
556      assertEquals(1, tags.size());
557      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
558      TestCoprocessorForTags.checkTagPresence = false;
559      TestCoprocessorForTags.tags = null;
560
561      append = new Append(row3);
562      append.add(new KeyValue(row3, f, q, 1234L, v));
563      append.setAttribute("visibility", Bytes.toBytes("tag2"));
564      table.append(append);
565      TestCoprocessorForTags.checkTagPresence = true;
566      scanner = table.getScanner(new Scan().withStartRow(row3));
567      result = scanner.next();
568      kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
569      tags = TestCoprocessorForTags.tags;
570      assertEquals(2, tags.size());
571      // We cannot assume the ordering of tags
572      tagValues.clear();
573      for (Tag tag : tags) {
574        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
575      }
576      assertTrue(tagValues.contains("tag1"));
577      assertTrue(tagValues.contains("tag2"));
578      TestCoprocessorForTags.checkTagPresence = false;
579      TestCoprocessorForTags.tags = null;
580
581      byte[] row4 = Bytes.toBytes("r4");
582      put = new Put(row4);
583      put.addColumn(f, q, Bytes.toBytes("a"));
584      table.put(put);
585      append = new Append(row4);
586      append.add(new KeyValue(row4, f, q, 1234L, v));
587      append.setAttribute("visibility", Bytes.toBytes("tag2"));
588      table.append(append);
589      TestCoprocessorForTags.checkTagPresence = true;
590      scanner = table.getScanner(new Scan().withStartRow(row4));
591      result = scanner.next();
592      kv = KeyValueUtil.ensureKeyValue((ExtendedCell) result.getColumnLatestCell(f, q));
593      tags = TestCoprocessorForTags.tags;
594      assertEquals(1, tags.size());
595      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
596    } finally {
597      TestCoprocessorForTags.checkTagPresence = false;
598      TestCoprocessorForTags.tags = null;
599      if (table != null) {
600        table.close();
601      }
602    }
603  }
604
605  private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
606    byte[] value2, byte[] row1, byte[] value1) throws IOException {
607    Scan s = new Scan().withStartRow(row);
608    // If filters are used this attribute can be specifically check for in
609    // filterKV method and
610    // kvs can be filtered out if the tags of interest is not found in that kv
611    s.setAttribute("visibility", Bytes.toBytes("myTag"));
612    ResultScanner scanner = null;
613    try {
614      scanner = table.getScanner(s);
615      Result next = scanner.next();
616
617      assertTrue(Bytes.equals(next.getRow(), row));
618      assertTrue(Bytes.equals(next.getValue(fam, qual), value));
619
620      Result next2 = scanner.next();
621      assertTrue(next2 != null);
622      assertTrue(Bytes.equals(next2.getRow(), row1));
623      assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
624
625      next2 = scanner.next();
626      assertTrue(next2 != null);
627      assertTrue(Bytes.equals(next2.getRow(), row2));
628      assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
629
630    } finally {
631      if (scanner != null) scanner.close();
632    }
633  }
634
635  public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver {
636
637    public static volatile boolean checkTagPresence = false;
638    public static List<Tag> tags = null;
639
640    @Override
641    public Optional<RegionObserver> getRegionObserver() {
642      return Optional.of(this);
643    }
644
645    @Override
646    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
647      final WALEdit edit, final Durability durability) throws IOException {
648      updateMutationAddingTags(put);
649    }
650
651    private void updateMutationAddingTags(final Mutation m) {
652      byte[] attribute = m.getAttribute("visibility");
653      byte[] cf = null;
654      List<Cell> updatedCells = new ArrayList<>();
655      if (attribute != null) {
656        for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
657          for (Cell cell : edits) {
658            KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell);
659            if (cf == null) {
660              cf = CellUtil.cloneFamily(kv);
661            }
662            Tag tag = new ArrayBackedTag((byte) 1, attribute);
663            List<Tag> tagList = new ArrayList<>();
664            tagList.add(tag);
665
666            KeyValue newKV =
667              new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0,
668                kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(),
669                kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
670                CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList);
671            ((List<Cell>) updatedCells).add(newKV);
672          }
673        }
674        m.getFamilyCellMap().remove(cf);
675        // Update the family map
676        m.getFamilyCellMap().put(cf, updatedCells);
677      }
678    }
679
680    @Override
681    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
682      throws IOException {
683      updateMutationAddingTags(increment);
684      return null;
685    }
686
687    @Override
688    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
689      throws IOException {
690      updateMutationAddingTags(append);
691      return null;
692    }
693
694    @Override
695    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
696      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
697      if (checkTagPresence) {
698        if (results.size() > 0) {
699          // Check tag presence in the 1st cell in 1st Result
700          Result result = results.get(0);
701          ExtendedCellScanner cellScanner = result.cellScanner();
702          if (cellScanner.advance()) {
703            ExtendedCell cell = cellScanner.current();
704            tags = PrivateCellUtil.getTags(cell);
705          }
706        }
707      }
708      return hasMore;
709    }
710  }
711}