001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Tag;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.CompactionState;
045import org.apache.hadoop.hbase.client.Durability;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.ResultScanner;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionObserver;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.testclassification.RegionServerTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071
072/**
073 * Class that test tags
074 */
075@Category({RegionServerTests.class, MediumTests.class})
076public class TestTags {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestTags.class);
081
082  static boolean useFilter = false;
083
084  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
085
086  @Rule
087  public final TestName TEST_NAME = new TestName();
088
089  @BeforeClass
090  public static void setUpBeforeClass() throws Exception {
091    Configuration conf = TEST_UTIL.getConfiguration();
092    conf.setInt("hfile.format.version", 3);
093    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
094        TestCoprocessorForTags.class.getName());
095    TEST_UTIL.startMiniCluster(2);
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TEST_UTIL.shutdownMiniCluster();
101  }
102
103  @After
104  public void tearDown() {
105    useFilter = false;
106  }
107
108  @Test
109  public void testTags() throws Exception {
110    Table table = null;
111    try {
112      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
113      byte[] fam = Bytes.toBytes("info");
114      byte[] row = Bytes.toBytes("rowa");
115      // column names
116      byte[] qual = Bytes.toBytes("qual");
117
118      byte[] row1 = Bytes.toBytes("rowb");
119
120      byte[] row2 = Bytes.toBytes("rowc");
121
122      HTableDescriptor desc = new HTableDescriptor(tableName);
123      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
124      colDesc.setBlockCacheEnabled(true);
125      colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
126      desc.addFamily(colDesc);
127      Admin admin = TEST_UTIL.getAdmin();
128      admin.createTable(desc);
129      byte[] value = Bytes.toBytes("value");
130      table = TEST_UTIL.getConnection().getTable(tableName);
131      Put put = new Put(row);
132      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
133      put.setAttribute("visibility", Bytes.toBytes("myTag"));
134      table.put(put);
135      admin.flush(tableName);
136      // We are lacking an API for confirming flush request compaction.
137      // Just sleep for a short time. We won't be able to confirm flush
138      // completion but the test won't hang now or in the future if
139      // default compaction policy causes compaction between flush and
140      // when we go to confirm it.
141      Thread.sleep(1000);
142
143      Put put1 = new Put(row1);
144      byte[] value1 = Bytes.toBytes("1000dfsdf");
145      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
146      // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
147      table.put(put1);
148      admin.flush(tableName);
149      Thread.sleep(1000);
150
151      Put put2 = new Put(row2);
152      byte[] value2 = Bytes.toBytes("1000dfsdf");
153      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
154      put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
155      table.put(put2);
156      admin.flush(tableName);
157      Thread.sleep(1000);
158
159      result(fam, row, qual, row2, table, value, value2, row1, value1);
160
161      admin.compact(tableName);
162      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
163        Thread.sleep(10);
164      }
165      result(fam, row, qual, row2, table, value, value2, row1, value1);
166    } finally {
167      if (table != null) {
168        table.close();
169      }
170    }
171  }
172
173  @Test
174  public void testFlushAndCompactionWithoutTags() throws Exception {
175    Table table = null;
176    try {
177      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
178      byte[] fam = Bytes.toBytes("info");
179      byte[] row = Bytes.toBytes("rowa");
180      // column names
181      byte[] qual = Bytes.toBytes("qual");
182
183      byte[] row1 = Bytes.toBytes("rowb");
184
185      byte[] row2 = Bytes.toBytes("rowc");
186
187      HTableDescriptor desc = new HTableDescriptor(tableName);
188      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
189      colDesc.setBlockCacheEnabled(true);
190      // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
191      // colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
192      desc.addFamily(colDesc);
193      Admin admin = TEST_UTIL.getAdmin();
194      admin.createTable(desc);
195
196      table = TEST_UTIL.getConnection().getTable(tableName);
197      Put put = new Put(row);
198      byte[] value = Bytes.toBytes("value");
199      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
200      table.put(put);
201      admin.flush(tableName);
202      // We are lacking an API for confirming flush request compaction.
203      // Just sleep for a short time. We won't be able to confirm flush
204      // completion but the test won't hang now or in the future if
205      // default compaction policy causes compaction between flush and
206      // when we go to confirm it.
207      Thread.sleep(1000);
208
209      Put put1 = new Put(row1);
210      byte[] value1 = Bytes.toBytes("1000dfsdf");
211      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
212      table.put(put1);
213      admin.flush(tableName);
214      Thread.sleep(1000);
215
216      Put put2 = new Put(row2);
217      byte[] value2 = Bytes.toBytes("1000dfsdf");
218      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
219      table.put(put2);
220      admin.flush(tableName);
221      Thread.sleep(1000);
222
223      Scan s = new Scan(row);
224      ResultScanner scanner = table.getScanner(s);
225      try {
226        Result[] next = scanner.next(3);
227        for (Result result : next) {
228          CellScanner cellScanner = result.cellScanner();
229          cellScanner.advance();
230          Cell current = cellScanner.current();
231          assertEquals(0, current.getTagsLength());
232        }
233      } finally {
234        if (scanner != null)
235          scanner.close();
236      }
237      admin.compact(tableName);
238      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
239        Thread.sleep(10);
240      }
241      s = new Scan(row);
242      scanner = table.getScanner(s);
243      try {
244        Result[] next = scanner.next(3);
245        for (Result result : next) {
246          CellScanner cellScanner = result.cellScanner();
247          cellScanner.advance();
248          Cell current = cellScanner.current();
249          assertEquals(0, current.getTagsLength());
250        }
251      } finally {
252        if (scanner != null) {
253          scanner.close();
254        }
255      }
256    } finally {
257      if (table != null) {
258        table.close();
259      }
260    }
261  }
262
263  @Test
264  public void testFlushAndCompactionwithCombinations() throws Exception {
265    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
266    byte[] fam = Bytes.toBytes("info");
267    byte[] row = Bytes.toBytes("rowa");
268    // column names
269    byte[] qual = Bytes.toBytes("qual");
270
271    byte[] row1 = Bytes.toBytes("rowb");
272
273    byte[] row2 = Bytes.toBytes("rowc");
274    byte[] rowd = Bytes.toBytes("rowd");
275    byte[] rowe = Bytes.toBytes("rowe");
276    Table table = null;
277    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
278      HTableDescriptor desc = new HTableDescriptor(tableName);
279      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
280      colDesc.setBlockCacheEnabled(true);
281      colDesc.setDataBlockEncoding(encoding);
282      desc.addFamily(colDesc);
283      Admin admin = TEST_UTIL.getAdmin();
284      admin.createTable(desc);
285      try {
286        table = TEST_UTIL.getConnection().getTable(tableName);
287        Put put = new Put(row);
288        byte[] value = Bytes.toBytes("value");
289        put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
290        int bigTagLen = Short.MAX_VALUE - 5;
291        put.setAttribute("visibility", new byte[bigTagLen]);
292        table.put(put);
293        Put put1 = new Put(row1);
294        byte[] value1 = Bytes.toBytes("1000dfsdf");
295        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
296        table.put(put1);
297        admin.flush(tableName);
298        // We are lacking an API for confirming flush request compaction.
299        // Just sleep for a short time. We won't be able to confirm flush
300        // completion but the test won't hang now or in the future if
301        // default compaction policy causes compaction between flush and
302        // when we go to confirm it.
303        Thread.sleep(1000);
304
305        put1 = new Put(row2);
306        value1 = Bytes.toBytes("1000dfsdf");
307        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
308        table.put(put1);
309        admin.flush(tableName);
310        Thread.sleep(1000);
311
312        Put put2 = new Put(rowd);
313        byte[] value2 = Bytes.toBytes("1000dfsdf");
314        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
315        table.put(put2);
316        put2 = new Put(rowe);
317        value2 = Bytes.toBytes("1000dfsddfdf");
318        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
319        put.setAttribute("visibility", Bytes.toBytes("ram"));
320        table.put(put2);
321        admin.flush(tableName);
322        Thread.sleep(1000);
323
324        TestCoprocessorForTags.checkTagPresence = true;
325        Scan s = new Scan(row);
326        s.setCaching(1);
327        ResultScanner scanner = table.getScanner(s);
328        try {
329          Result next = null;
330          while ((next = scanner.next()) != null) {
331            CellScanner cellScanner = next.cellScanner();
332            cellScanner.advance();
333            Cell current = cellScanner.current();
334            if (CellUtil.matchingRows(current, row)) {
335              assertEquals(1, TestCoprocessorForTags.tags.size());
336              Tag tag = TestCoprocessorForTags.tags.get(0);
337              assertEquals(bigTagLen, tag.getValueLength());
338            } else {
339              assertEquals(0, TestCoprocessorForTags.tags.size());
340            }
341          }
342        } finally {
343          if (scanner != null) {
344            scanner.close();
345          }
346          TestCoprocessorForTags.checkTagPresence = false;
347        }
348        while (admin.getCompactionState(tableName) != CompactionState.NONE) {
349          Thread.sleep(10);
350        }
351        TestCoprocessorForTags.checkTagPresence = true;
352        scanner = table.getScanner(s);
353        try {
354          Result next = null;
355          while ((next = scanner.next()) != null) {
356            CellScanner cellScanner = next.cellScanner();
357            cellScanner.advance();
358            Cell current = cellScanner.current();
359            if (CellUtil.matchingRows(current, row)) {
360              assertEquals(1, TestCoprocessorForTags.tags.size());
361              Tag tag = TestCoprocessorForTags.tags.get(0);
362              assertEquals(bigTagLen, tag.getValueLength());
363            } else {
364              assertEquals(0, TestCoprocessorForTags.tags.size());
365            }
366          }
367        } finally {
368          if (scanner != null) {
369            scanner.close();
370          }
371          TestCoprocessorForTags.checkTagPresence = false;
372        }
373      } finally {
374        if (table != null) {
375          table.close();
376        }
377        // delete the table
378        admin.disableTable(tableName);
379        admin.deleteTable(tableName);
380      }
381    }
382  }
383
384  @Test
385  public void testTagsWithAppendAndIncrement() throws Exception {
386    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
387    byte[] f = Bytes.toBytes("f");
388    byte[] q = Bytes.toBytes("q");
389    byte[] row1 = Bytes.toBytes("r1");
390    byte[] row2 = Bytes.toBytes("r2");
391
392    HTableDescriptor desc = new HTableDescriptor(tableName);
393    HColumnDescriptor colDesc = new HColumnDescriptor(f);
394    desc.addFamily(colDesc);
395    TEST_UTIL.getAdmin().createTable(desc);
396
397    Table table = null;
398    try {
399      table = TEST_UTIL.getConnection().getTable(tableName);
400      Put put = new Put(row1);
401      byte[] v = Bytes.toBytes(2L);
402      put.addColumn(f, q, v);
403      put.setAttribute("visibility", Bytes.toBytes("tag1"));
404      table.put(put);
405      Increment increment = new Increment(row1);
406      increment.addColumn(f, q, 1L);
407      table.increment(increment);
408      TestCoprocessorForTags.checkTagPresence = true;
409      ResultScanner scanner = table.getScanner(new Scan());
410      Result result = scanner.next();
411      KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
412      List<Tag> tags = TestCoprocessorForTags.tags;
413      assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
414      assertEquals(1, tags.size());
415      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
416      TestCoprocessorForTags.checkTagPresence = false;
417      TestCoprocessorForTags.tags = null;
418
419      increment = new Increment(row1);
420      increment.add(new KeyValue(row1, f, q, 1234L, v));
421      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
422      table.increment(increment);
423      TestCoprocessorForTags.checkTagPresence = true;
424      scanner = table.getScanner(new Scan());
425      result = scanner.next();
426      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
427      tags = TestCoprocessorForTags.tags;
428      assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
429      assertEquals(2, tags.size());
430      // We cannot assume the ordering of tags
431      List<String> tagValues = new ArrayList<>();
432      for (Tag tag: tags) {
433        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
434      }
435      assertTrue(tagValues.contains("tag1"));
436      assertTrue(tagValues.contains("tag2"));
437      TestCoprocessorForTags.checkTagPresence = false;
438      TestCoprocessorForTags.tags = null;
439
440      put = new Put(row2);
441      v = Bytes.toBytes(2L);
442      put.addColumn(f, q, v);
443      table.put(put);
444      increment = new Increment(row2);
445      increment.add(new KeyValue(row2, f, q, 1234L, v));
446      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
447      table.increment(increment);
448      TestCoprocessorForTags.checkTagPresence = true;
449      scanner = table.getScanner(new Scan().setStartRow(row2));
450      result = scanner.next();
451      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
452      tags = TestCoprocessorForTags.tags;
453      assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
454      assertEquals(1, tags.size());
455      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
456      TestCoprocessorForTags.checkTagPresence = false;
457      TestCoprocessorForTags.tags = null;
458
459      // Test Append
460      byte[] row3 = Bytes.toBytes("r3");
461      put = new Put(row3);
462      put.addColumn(f, q, Bytes.toBytes("a"));
463      put.setAttribute("visibility", Bytes.toBytes("tag1"));
464      table.put(put);
465      Append append = new Append(row3);
466      append.addColumn(f, q, Bytes.toBytes("b"));
467      table.append(append);
468      TestCoprocessorForTags.checkTagPresence = true;
469      scanner = table.getScanner(new Scan().setStartRow(row3));
470      result = scanner.next();
471      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
472      tags = TestCoprocessorForTags.tags;
473      assertEquals(1, tags.size());
474      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
475      TestCoprocessorForTags.checkTagPresence = false;
476      TestCoprocessorForTags.tags = null;
477
478      append = new Append(row3);
479      append.add(new KeyValue(row3, f, q, 1234L, v));
480      append.setAttribute("visibility", Bytes.toBytes("tag2"));
481      table.append(append);
482      TestCoprocessorForTags.checkTagPresence = true;
483      scanner = table.getScanner(new Scan().setStartRow(row3));
484      result = scanner.next();
485      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
486      tags = TestCoprocessorForTags.tags;
487      assertEquals(2, tags.size());
488      // We cannot assume the ordering of tags
489      tagValues.clear();
490      for (Tag tag: tags) {
491        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
492      }
493      assertTrue(tagValues.contains("tag1"));
494      assertTrue(tagValues.contains("tag2"));
495      TestCoprocessorForTags.checkTagPresence = false;
496      TestCoprocessorForTags.tags = null;
497
498      byte[] row4 = Bytes.toBytes("r4");
499      put = new Put(row4);
500      put.addColumn(f, q, Bytes.toBytes("a"));
501      table.put(put);
502      append = new Append(row4);
503      append.add(new KeyValue(row4, f, q, 1234L, v));
504      append.setAttribute("visibility", Bytes.toBytes("tag2"));
505      table.append(append);
506      TestCoprocessorForTags.checkTagPresence = true;
507      scanner = table.getScanner(new Scan().setStartRow(row4));
508      result = scanner.next();
509      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
510      tags = TestCoprocessorForTags.tags;
511      assertEquals(1, tags.size());
512      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
513    } finally {
514      TestCoprocessorForTags.checkTagPresence = false;
515      TestCoprocessorForTags.tags = null;
516      if (table != null) {
517        table.close();
518      }
519    }
520  }
521
522  private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
523      byte[] value2, byte[] row1, byte[] value1) throws IOException {
524    Scan s = new Scan(row);
525    // If filters are used this attribute can be specifically check for in
526    // filterKV method and
527    // kvs can be filtered out if the tags of interest is not found in that kv
528    s.setAttribute("visibility", Bytes.toBytes("myTag"));
529    ResultScanner scanner = null;
530    try {
531      scanner = table.getScanner(s);
532      Result next = scanner.next();
533
534      assertTrue(Bytes.equals(next.getRow(), row));
535      assertTrue(Bytes.equals(next.getValue(fam, qual), value));
536
537      Result next2 = scanner.next();
538      assertTrue(next2 != null);
539      assertTrue(Bytes.equals(next2.getRow(), row1));
540      assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
541
542      next2 = scanner.next();
543      assertTrue(next2 != null);
544      assertTrue(Bytes.equals(next2.getRow(), row2));
545      assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
546
547    } finally {
548      if (scanner != null)
549        scanner.close();
550    }
551  }
552
553  public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver {
554
555    public static volatile boolean checkTagPresence = false;
556    public static List<Tag> tags = null;
557
558    @Override
559    public Optional<RegionObserver> getRegionObserver() {
560      return Optional.of(this);
561    }
562
563    @Override
564    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
565        final WALEdit edit, final Durability durability) throws IOException {
566      updateMutationAddingTags(put);
567    }
568
569    private void updateMutationAddingTags(final Mutation m) {
570      byte[] attribute = m.getAttribute("visibility");
571      byte[] cf = null;
572      List<Cell> updatedCells = new ArrayList<>();
573      if (attribute != null) {
574        for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
575          for (Cell cell : edits) {
576            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
577            if (cf == null) {
578              cf = CellUtil.cloneFamily(kv);
579            }
580            Tag tag = new ArrayBackedTag((byte) 1, attribute);
581            List<Tag> tagList = new ArrayList<>();
582            tagList.add(tag);
583
584            KeyValue newKV = new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(),
585                CellUtil.cloneFamily(kv), 0, kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0,
586                kv.getQualifierLength(), kv.getTimestamp(),
587                KeyValue.Type.codeToType(kv.getTypeByte()), CellUtil.cloneValue(kv), 0,
588                kv.getValueLength(), tagList);
589            ((List<Cell>) updatedCells).add(newKV);
590          }
591        }
592        m.getFamilyCellMap().remove(cf);
593        // Update the family map
594        m.getFamilyCellMap().put(cf, updatedCells);
595      }
596    }
597
598    @Override
599    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
600        throws IOException {
601      updateMutationAddingTags(increment);
602      return null;
603    }
604
605    @Override
606    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
607        throws IOException {
608      updateMutationAddingTags(append);
609      return null;
610    }
611
612    @Override
613    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
614        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
615      if (checkTagPresence) {
616        if (results.size() > 0) {
617          // Check tag presence in the 1st cell in 1st Result
618          Result result = results.get(0);
619          CellScanner cellScanner = result.cellScanner();
620          if (cellScanner.advance()) {
621            Cell cell = cellScanner.current();
622            tags = PrivateCellUtil.getTags(cell);
623          }
624        }
625      }
626      return hasMore;
627    }
628  }
629}