001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Tag;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.CompactionState;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Durability;
047import org.apache.hadoop.hbase.client.Increment;
048import org.apache.hadoop.hbase.client.Mutation;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.ResultScanner;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
057import org.apache.hadoop.hbase.coprocessor.ObserverContext;
058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
060import org.apache.hadoop.hbase.coprocessor.RegionObserver;
061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.RegionServerTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.wal.WALEdit;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * Class that test tags
079 */
080@Category({ RegionServerTests.class, MediumTests.class })
081public class TestTags {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestTags.class);
087
088  static boolean useFilter = false;
089
090  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  @Rule
093  public final TestName TEST_NAME = new TestName();
094
095  @BeforeClass
096  public static void setUpBeforeClass() throws Exception {
097    Configuration conf = TEST_UTIL.getConfiguration();
098    conf.setInt("hfile.format.version", 3);
099    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
100      TestCoprocessorForTags.class.getName());
101    TEST_UTIL.startMiniCluster(2);
102  }
103
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL.shutdownMiniCluster();
107  }
108
109  @After
110  public void tearDown() {
111    useFilter = false;
112  }
113
114  /**
115   * Test that we can do reverse scans when writing tags and using DataBlockEncoding. Fails with an
116   * exception for PREFIX, DIFF, and FAST_DIFF prior to HBASE-27580
117   */
118  @Test
119  public void testReverseScanWithDBE() throws IOException {
120    byte[] family = Bytes.toBytes("0");
121
122    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
123    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
124
125    try (Connection connection = ConnectionFactory.createConnection(conf)) {
126      for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
127        testReverseScanWithDBE(connection, encoding, family);
128      }
129    }
130  }
131
132  private void testReverseScanWithDBE(Connection conn, DataBlockEncoding encoding, byte[] family)
133    throws IOException {
134    LOG.info("Running test with DBE={}", encoding);
135    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName() + "-" + encoding);
136    TEST_UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
137      .setColumnFamily(
138        ColumnFamilyDescriptorBuilder.newBuilder(family).setDataBlockEncoding(encoding).build())
139      .build(), null);
140
141    Table table = conn.getTable(tableName);
142
143    int maxRows = 10;
144    byte[] val1 = new byte[10];
145    byte[] val2 = new byte[10];
146    Bytes.random(val1);
147    Bytes.random(val2);
148
149    for (int i = 0; i < maxRows; i++) {
150      if (i == maxRows / 2) {
151        TEST_UTIL.flush(tableName);
152      }
153      table.put(new Put(Bytes.toBytes(i)).addColumn(family, Bytes.toBytes(1), val1)
154        .addColumn(family, Bytes.toBytes(2), val2).setTTL(600_000));
155    }
156
157    TEST_UTIL.flush(table.getName());
158
159    Scan scan = new Scan();
160    scan.setReversed(true);
161
162    try (ResultScanner scanner = table.getScanner(scan)) {
163      for (int i = maxRows - 1; i >= 0; i--) {
164        Result row = scanner.next();
165        assertEquals(2, row.size());
166
167        Cell cell1 = row.getColumnLatestCell(family, Bytes.toBytes(1));
168        assertTrue(CellUtil.matchingRows(cell1, Bytes.toBytes(i)));
169        assertTrue(CellUtil.matchingValue(cell1, val1));
170
171        Cell cell2 = row.getColumnLatestCell(family, Bytes.toBytes(2));
172        assertTrue(CellUtil.matchingRows(cell2, Bytes.toBytes(i)));
173        assertTrue(CellUtil.matchingValue(cell2, val2));
174      }
175    }
176
177  }
178
179  @Test
180  public void testTags() throws Exception {
181    Table table = null;
182    try {
183      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
184      byte[] fam = Bytes.toBytes("info");
185      byte[] row = Bytes.toBytes("rowa");
186      // column names
187      byte[] qual = Bytes.toBytes("qual");
188
189      byte[] row1 = Bytes.toBytes("rowb");
190
191      byte[] row2 = Bytes.toBytes("rowc");
192
193      TableDescriptor tableDescriptor =
194        TableDescriptorBuilder
195          .newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam)
196            .setBlockCacheEnabled(true).setDataBlockEncoding(DataBlockEncoding.NONE).build())
197          .build();
198      Admin admin = TEST_UTIL.getAdmin();
199      admin.createTable(tableDescriptor);
200      byte[] value = Bytes.toBytes("value");
201      table = TEST_UTIL.getConnection().getTable(tableName);
202      Put put = new Put(row);
203      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
204      put.setAttribute("visibility", Bytes.toBytes("myTag"));
205      table.put(put);
206      admin.flush(tableName);
207      // We are lacking an API for confirming flush request compaction.
208      // Just sleep for a short time. We won't be able to confirm flush
209      // completion but the test won't hang now or in the future if
210      // default compaction policy causes compaction between flush and
211      // when we go to confirm it.
212      Thread.sleep(1000);
213
214      Put put1 = new Put(row1);
215      byte[] value1 = Bytes.toBytes("1000dfsdf");
216      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
217      // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
218      table.put(put1);
219      admin.flush(tableName);
220      Thread.sleep(1000);
221
222      Put put2 = new Put(row2);
223      byte[] value2 = Bytes.toBytes("1000dfsdf");
224      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
225      put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
226      table.put(put2);
227      admin.flush(tableName);
228      Thread.sleep(1000);
229
230      result(fam, row, qual, row2, table, value, value2, row1, value1);
231
232      admin.compact(tableName);
233      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
234        Thread.sleep(10);
235      }
236      result(fam, row, qual, row2, table, value, value2, row1, value1);
237    } finally {
238      if (table != null) {
239        table.close();
240      }
241    }
242  }
243
244  @Test
245  public void testFlushAndCompactionWithoutTags() throws Exception {
246    Table table = null;
247    try {
248      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
249      byte[] fam = Bytes.toBytes("info");
250      byte[] row = Bytes.toBytes("rowa");
251      // column names
252      byte[] qual = Bytes.toBytes("qual");
253
254      byte[] row1 = Bytes.toBytes("rowb");
255
256      byte[] row2 = Bytes.toBytes("rowc");
257
258      TableDescriptor tableDescriptor =
259        TableDescriptorBuilder.newBuilder(tableName)
260          .setColumnFamily(
261            ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true).build())
262          .build();
263      Admin admin = TEST_UTIL.getAdmin();
264      admin.createTable(tableDescriptor);
265
266      table = TEST_UTIL.getConnection().getTable(tableName);
267      Put put = new Put(row);
268      byte[] value = Bytes.toBytes("value");
269      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
270      table.put(put);
271      admin.flush(tableName);
272      // We are lacking an API for confirming flush request compaction.
273      // Just sleep for a short time. We won't be able to confirm flush
274      // completion but the test won't hang now or in the future if
275      // default compaction policy causes compaction between flush and
276      // when we go to confirm it.
277      Thread.sleep(1000);
278
279      Put put1 = new Put(row1);
280      byte[] value1 = Bytes.toBytes("1000dfsdf");
281      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
282      table.put(put1);
283      admin.flush(tableName);
284      Thread.sleep(1000);
285
286      Put put2 = new Put(row2);
287      byte[] value2 = Bytes.toBytes("1000dfsdf");
288      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
289      table.put(put2);
290      admin.flush(tableName);
291      Thread.sleep(1000);
292
293      Scan s = new Scan().withStartRow(row);
294      ResultScanner scanner = table.getScanner(s);
295      try {
296        Result[] next = scanner.next(3);
297        for (Result result : next) {
298          CellScanner cellScanner = result.cellScanner();
299          cellScanner.advance();
300          Cell current = cellScanner.current();
301          assertEquals(0, current.getTagsLength());
302        }
303      } finally {
304        if (scanner != null) scanner.close();
305      }
306      admin.compact(tableName);
307      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
308        Thread.sleep(10);
309      }
310      s = new Scan().withStartRow(row);
311      scanner = table.getScanner(s);
312      try {
313        Result[] next = scanner.next(3);
314        for (Result result : next) {
315          CellScanner cellScanner = result.cellScanner();
316          cellScanner.advance();
317          Cell current = cellScanner.current();
318          assertEquals(0, current.getTagsLength());
319        }
320      } finally {
321        if (scanner != null) {
322          scanner.close();
323        }
324      }
325    } finally {
326      if (table != null) {
327        table.close();
328      }
329    }
330  }
331
332  @Test
333  public void testFlushAndCompactionwithCombinations() throws Exception {
334    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
335    byte[] fam = Bytes.toBytes("info");
336    byte[] row = Bytes.toBytes("rowa");
337    // column names
338    byte[] qual = Bytes.toBytes("qual");
339
340    byte[] row1 = Bytes.toBytes("rowb");
341
342    byte[] row2 = Bytes.toBytes("rowc");
343    byte[] rowd = Bytes.toBytes("rowd");
344    byte[] rowe = Bytes.toBytes("rowe");
345    Table table = null;
346    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
347      TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
348        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true)
349          .setDataBlockEncoding(encoding).build())
350        .build();
351      Admin admin = TEST_UTIL.getAdmin();
352      admin.createTable(tableDescriptor);
353      try {
354        table = TEST_UTIL.getConnection().getTable(tableName);
355        Put put = new Put(row);
356        byte[] value = Bytes.toBytes("value");
357        put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
358        int bigTagLen = Short.MAX_VALUE - 5;
359        put.setAttribute("visibility", new byte[bigTagLen]);
360        table.put(put);
361        Put put1 = new Put(row1);
362        byte[] value1 = Bytes.toBytes("1000dfsdf");
363        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
364        table.put(put1);
365        admin.flush(tableName);
366        // We are lacking an API for confirming flush request compaction.
367        // Just sleep for a short time. We won't be able to confirm flush
368        // completion but the test won't hang now or in the future if
369        // default compaction policy causes compaction between flush and
370        // when we go to confirm it.
371        Thread.sleep(1000);
372
373        put1 = new Put(row2);
374        value1 = Bytes.toBytes("1000dfsdf");
375        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
376        table.put(put1);
377        admin.flush(tableName);
378        Thread.sleep(1000);
379
380        Put put2 = new Put(rowd);
381        byte[] value2 = Bytes.toBytes("1000dfsdf");
382        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
383        table.put(put2);
384        put2 = new Put(rowe);
385        value2 = Bytes.toBytes("1000dfsddfdf");
386        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
387        put.setAttribute("visibility", Bytes.toBytes("ram"));
388        table.put(put2);
389        admin.flush(tableName);
390        Thread.sleep(1000);
391
392        TestCoprocessorForTags.checkTagPresence = true;
393        Scan s = new Scan().withStartRow(row);
394        s.setCaching(1);
395        ResultScanner scanner = table.getScanner(s);
396        try {
397          Result next = null;
398          while ((next = scanner.next()) != null) {
399            CellScanner cellScanner = next.cellScanner();
400            cellScanner.advance();
401            Cell current = cellScanner.current();
402            if (CellUtil.matchingRows(current, row)) {
403              assertEquals(1, TestCoprocessorForTags.tags.size());
404              Tag tag = TestCoprocessorForTags.tags.get(0);
405              assertEquals(bigTagLen, tag.getValueLength());
406            } else {
407              assertEquals(0, TestCoprocessorForTags.tags.size());
408            }
409          }
410        } finally {
411          if (scanner != null) {
412            scanner.close();
413          }
414          TestCoprocessorForTags.checkTagPresence = false;
415        }
416        while (admin.getCompactionState(tableName) != CompactionState.NONE) {
417          Thread.sleep(10);
418        }
419        TestCoprocessorForTags.checkTagPresence = true;
420        scanner = table.getScanner(s);
421        try {
422          Result next = null;
423          while ((next = scanner.next()) != null) {
424            CellScanner cellScanner = next.cellScanner();
425            cellScanner.advance();
426            Cell current = cellScanner.current();
427            if (CellUtil.matchingRows(current, row)) {
428              assertEquals(1, TestCoprocessorForTags.tags.size());
429              Tag tag = TestCoprocessorForTags.tags.get(0);
430              assertEquals(bigTagLen, tag.getValueLength());
431            } else {
432              assertEquals(0, TestCoprocessorForTags.tags.size());
433            }
434          }
435        } finally {
436          if (scanner != null) {
437            scanner.close();
438          }
439          TestCoprocessorForTags.checkTagPresence = false;
440        }
441      } finally {
442        if (table != null) {
443          table.close();
444        }
445        // delete the table
446        admin.disableTable(tableName);
447        admin.deleteTable(tableName);
448      }
449    }
450  }
451
452  @Test
453  public void testTagsWithAppendAndIncrement() throws Exception {
454    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
455    byte[] f = Bytes.toBytes("f");
456    byte[] q = Bytes.toBytes("q");
457    byte[] row1 = Bytes.toBytes("r1");
458    byte[] row2 = Bytes.toBytes("r2");
459
460    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
461      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
462    TEST_UTIL.getAdmin().createTable(tableDescriptor);
463
464    Table table = null;
465    try {
466      table = TEST_UTIL.getConnection().getTable(tableName);
467      Put put = new Put(row1);
468      byte[] v = Bytes.toBytes(2L);
469      put.addColumn(f, q, v);
470      put.setAttribute("visibility", Bytes.toBytes("tag1"));
471      table.put(put);
472      Increment increment = new Increment(row1);
473      increment.addColumn(f, q, 1L);
474      table.increment(increment);
475      TestCoprocessorForTags.checkTagPresence = true;
476      ResultScanner scanner = table.getScanner(new Scan());
477      Result result = scanner.next();
478      KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
479      List<Tag> tags = TestCoprocessorForTags.tags;
480      assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
481      assertEquals(1, tags.size());
482      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
483      TestCoprocessorForTags.checkTagPresence = false;
484      TestCoprocessorForTags.tags = null;
485
486      increment = new Increment(row1);
487      increment.add(new KeyValue(row1, f, q, 1234L, v));
488      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
489      table.increment(increment);
490      TestCoprocessorForTags.checkTagPresence = true;
491      scanner = table.getScanner(new Scan());
492      result = scanner.next();
493      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
494      tags = TestCoprocessorForTags.tags;
495      assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
496      assertEquals(2, tags.size());
497      // We cannot assume the ordering of tags
498      List<String> tagValues = new ArrayList<>();
499      for (Tag tag : tags) {
500        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
501      }
502      assertTrue(tagValues.contains("tag1"));
503      assertTrue(tagValues.contains("tag2"));
504      TestCoprocessorForTags.checkTagPresence = false;
505      TestCoprocessorForTags.tags = null;
506
507      put = new Put(row2);
508      v = Bytes.toBytes(2L);
509      put.addColumn(f, q, v);
510      table.put(put);
511      increment = new Increment(row2);
512      increment.add(new KeyValue(row2, f, q, 1234L, v));
513      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
514      table.increment(increment);
515      TestCoprocessorForTags.checkTagPresence = true;
516      scanner = table.getScanner(new Scan().withStartRow(row2));
517      result = scanner.next();
518      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
519      tags = TestCoprocessorForTags.tags;
520      assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
521      assertEquals(1, tags.size());
522      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
523      TestCoprocessorForTags.checkTagPresence = false;
524      TestCoprocessorForTags.tags = null;
525
526      // Test Append
527      byte[] row3 = Bytes.toBytes("r3");
528      put = new Put(row3);
529      put.addColumn(f, q, Bytes.toBytes("a"));
530      put.setAttribute("visibility", Bytes.toBytes("tag1"));
531      table.put(put);
532      Append append = new Append(row3);
533      append.addColumn(f, q, Bytes.toBytes("b"));
534      table.append(append);
535      TestCoprocessorForTags.checkTagPresence = true;
536      scanner = table.getScanner(new Scan().withStartRow(row3));
537      result = scanner.next();
538      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
539      tags = TestCoprocessorForTags.tags;
540      assertEquals(1, tags.size());
541      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
542      TestCoprocessorForTags.checkTagPresence = false;
543      TestCoprocessorForTags.tags = null;
544
545      append = new Append(row3);
546      append.add(new KeyValue(row3, f, q, 1234L, v));
547      append.setAttribute("visibility", Bytes.toBytes("tag2"));
548      table.append(append);
549      TestCoprocessorForTags.checkTagPresence = true;
550      scanner = table.getScanner(new Scan().withStartRow(row3));
551      result = scanner.next();
552      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
553      tags = TestCoprocessorForTags.tags;
554      assertEquals(2, tags.size());
555      // We cannot assume the ordering of tags
556      tagValues.clear();
557      for (Tag tag : tags) {
558        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
559      }
560      assertTrue(tagValues.contains("tag1"));
561      assertTrue(tagValues.contains("tag2"));
562      TestCoprocessorForTags.checkTagPresence = false;
563      TestCoprocessorForTags.tags = null;
564
565      byte[] row4 = Bytes.toBytes("r4");
566      put = new Put(row4);
567      put.addColumn(f, q, Bytes.toBytes("a"));
568      table.put(put);
569      append = new Append(row4);
570      append.add(new KeyValue(row4, f, q, 1234L, v));
571      append.setAttribute("visibility", Bytes.toBytes("tag2"));
572      table.append(append);
573      TestCoprocessorForTags.checkTagPresence = true;
574      scanner = table.getScanner(new Scan().withStartRow(row4));
575      result = scanner.next();
576      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
577      tags = TestCoprocessorForTags.tags;
578      assertEquals(1, tags.size());
579      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
580    } finally {
581      TestCoprocessorForTags.checkTagPresence = false;
582      TestCoprocessorForTags.tags = null;
583      if (table != null) {
584        table.close();
585      }
586    }
587  }
588
589  private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
590    byte[] value2, byte[] row1, byte[] value1) throws IOException {
591    Scan s = new Scan().withStartRow(row);
592    // If filters are used this attribute can be specifically check for in
593    // filterKV method and
594    // kvs can be filtered out if the tags of interest is not found in that kv
595    s.setAttribute("visibility", Bytes.toBytes("myTag"));
596    ResultScanner scanner = null;
597    try {
598      scanner = table.getScanner(s);
599      Result next = scanner.next();
600
601      assertTrue(Bytes.equals(next.getRow(), row));
602      assertTrue(Bytes.equals(next.getValue(fam, qual), value));
603
604      Result next2 = scanner.next();
605      assertTrue(next2 != null);
606      assertTrue(Bytes.equals(next2.getRow(), row1));
607      assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
608
609      next2 = scanner.next();
610      assertTrue(next2 != null);
611      assertTrue(Bytes.equals(next2.getRow(), row2));
612      assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
613
614    } finally {
615      if (scanner != null) scanner.close();
616    }
617  }
618
619  public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver {
620
621    public static volatile boolean checkTagPresence = false;
622    public static List<Tag> tags = null;
623
624    @Override
625    public Optional<RegionObserver> getRegionObserver() {
626      return Optional.of(this);
627    }
628
629    @Override
630    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
631      final WALEdit edit, final Durability durability) throws IOException {
632      updateMutationAddingTags(put);
633    }
634
635    private void updateMutationAddingTags(final Mutation m) {
636      byte[] attribute = m.getAttribute("visibility");
637      byte[] cf = null;
638      List<Cell> updatedCells = new ArrayList<>();
639      if (attribute != null) {
640        for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
641          for (Cell cell : edits) {
642            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
643            if (cf == null) {
644              cf = CellUtil.cloneFamily(kv);
645            }
646            Tag tag = new ArrayBackedTag((byte) 1, attribute);
647            List<Tag> tagList = new ArrayList<>();
648            tagList.add(tag);
649
650            KeyValue newKV =
651              new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0,
652                kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(),
653                kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
654                CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList);
655            ((List<Cell>) updatedCells).add(newKV);
656          }
657        }
658        m.getFamilyCellMap().remove(cf);
659        // Update the family map
660        m.getFamilyCellMap().put(cf, updatedCells);
661      }
662    }
663
664    @Override
665    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
666      throws IOException {
667      updateMutationAddingTags(increment);
668      return null;
669    }
670
671    @Override
672    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
673      throws IOException {
674      updateMutationAddingTags(append);
675      return null;
676    }
677
678    @Override
679    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
680      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
681      if (checkTagPresence) {
682        if (results.size() > 0) {
683          // Check tag presence in the 1st cell in 1st Result
684          Result result = results.get(0);
685          CellScanner cellScanner = result.cellScanner();
686          if (cellScanner.advance()) {
687            Cell cell = cellScanner.current();
688            tags = PrivateCellUtil.getTags(cell);
689          }
690        }
691      }
692      return hasMore;
693    }
694  }
695}