001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Tag;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.CompactionState;
045import org.apache.hadoop.hbase.client.Durability;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.ResultScanner;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionObserver;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.testclassification.RegionServerTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071
072/**
073 * Class that test tags
074 */
075@Category({ RegionServerTests.class, MediumTests.class })
076public class TestTags {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTags.class);
080
081  static boolean useFilter = false;
082
083  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084
085  @Rule
086  public final TestName TEST_NAME = new TestName();
087
088  @BeforeClass
089  public static void setUpBeforeClass() throws Exception {
090    Configuration conf = TEST_UTIL.getConfiguration();
091    conf.setInt("hfile.format.version", 3);
092    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
093      TestCoprocessorForTags.class.getName());
094    TEST_UTIL.startMiniCluster(2);
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    TEST_UTIL.shutdownMiniCluster();
100  }
101
102  @After
103  public void tearDown() {
104    useFilter = false;
105  }
106
107  @Test
108  public void testTags() throws Exception {
109    Table table = null;
110    try {
111      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
112      byte[] fam = Bytes.toBytes("info");
113      byte[] row = Bytes.toBytes("rowa");
114      // column names
115      byte[] qual = Bytes.toBytes("qual");
116
117      byte[] row1 = Bytes.toBytes("rowb");
118
119      byte[] row2 = Bytes.toBytes("rowc");
120
121      HTableDescriptor desc = new HTableDescriptor(tableName);
122      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
123      colDesc.setBlockCacheEnabled(true);
124      colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
125      desc.addFamily(colDesc);
126      Admin admin = TEST_UTIL.getAdmin();
127      admin.createTable(desc);
128      byte[] value = Bytes.toBytes("value");
129      table = TEST_UTIL.getConnection().getTable(tableName);
130      Put put = new Put(row);
131      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
132      put.setAttribute("visibility", Bytes.toBytes("myTag"));
133      table.put(put);
134      admin.flush(tableName);
135      // We are lacking an API for confirming flush request compaction.
136      // Just sleep for a short time. We won't be able to confirm flush
137      // completion but the test won't hang now or in the future if
138      // default compaction policy causes compaction between flush and
139      // when we go to confirm it.
140      Thread.sleep(1000);
141
142      Put put1 = new Put(row1);
143      byte[] value1 = Bytes.toBytes("1000dfsdf");
144      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
145      // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
146      table.put(put1);
147      admin.flush(tableName);
148      Thread.sleep(1000);
149
150      Put put2 = new Put(row2);
151      byte[] value2 = Bytes.toBytes("1000dfsdf");
152      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
153      put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
154      table.put(put2);
155      admin.flush(tableName);
156      Thread.sleep(1000);
157
158      result(fam, row, qual, row2, table, value, value2, row1, value1);
159
160      admin.compact(tableName);
161      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
162        Thread.sleep(10);
163      }
164      result(fam, row, qual, row2, table, value, value2, row1, value1);
165    } finally {
166      if (table != null) {
167        table.close();
168      }
169    }
170  }
171
172  @Test
173  public void testFlushAndCompactionWithoutTags() throws Exception {
174    Table table = null;
175    try {
176      TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
177      byte[] fam = Bytes.toBytes("info");
178      byte[] row = Bytes.toBytes("rowa");
179      // column names
180      byte[] qual = Bytes.toBytes("qual");
181
182      byte[] row1 = Bytes.toBytes("rowb");
183
184      byte[] row2 = Bytes.toBytes("rowc");
185
186      HTableDescriptor desc = new HTableDescriptor(tableName);
187      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
188      colDesc.setBlockCacheEnabled(true);
189      // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
190      // colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
191      desc.addFamily(colDesc);
192      Admin admin = TEST_UTIL.getAdmin();
193      admin.createTable(desc);
194
195      table = TEST_UTIL.getConnection().getTable(tableName);
196      Put put = new Put(row);
197      byte[] value = Bytes.toBytes("value");
198      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
199      table.put(put);
200      admin.flush(tableName);
201      // We are lacking an API for confirming flush request compaction.
202      // Just sleep for a short time. We won't be able to confirm flush
203      // completion but the test won't hang now or in the future if
204      // default compaction policy causes compaction between flush and
205      // when we go to confirm it.
206      Thread.sleep(1000);
207
208      Put put1 = new Put(row1);
209      byte[] value1 = Bytes.toBytes("1000dfsdf");
210      put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
211      table.put(put1);
212      admin.flush(tableName);
213      Thread.sleep(1000);
214
215      Put put2 = new Put(row2);
216      byte[] value2 = Bytes.toBytes("1000dfsdf");
217      put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
218      table.put(put2);
219      admin.flush(tableName);
220      Thread.sleep(1000);
221
222      Scan s = new Scan(row);
223      ResultScanner scanner = table.getScanner(s);
224      try {
225        Result[] next = scanner.next(3);
226        for (Result result : next) {
227          CellScanner cellScanner = result.cellScanner();
228          cellScanner.advance();
229          Cell current = cellScanner.current();
230          assertEquals(0, current.getTagsLength());
231        }
232      } finally {
233        if (scanner != null) scanner.close();
234      }
235      admin.compact(tableName);
236      while (admin.getCompactionState(tableName) != CompactionState.NONE) {
237        Thread.sleep(10);
238      }
239      s = new Scan(row);
240      scanner = table.getScanner(s);
241      try {
242        Result[] next = scanner.next(3);
243        for (Result result : next) {
244          CellScanner cellScanner = result.cellScanner();
245          cellScanner.advance();
246          Cell current = cellScanner.current();
247          assertEquals(0, current.getTagsLength());
248        }
249      } finally {
250        if (scanner != null) {
251          scanner.close();
252        }
253      }
254    } finally {
255      if (table != null) {
256        table.close();
257      }
258    }
259  }
260
261  @Test
262  public void testFlushAndCompactionwithCombinations() throws Exception {
263    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
264    byte[] fam = Bytes.toBytes("info");
265    byte[] row = Bytes.toBytes("rowa");
266    // column names
267    byte[] qual = Bytes.toBytes("qual");
268
269    byte[] row1 = Bytes.toBytes("rowb");
270
271    byte[] row2 = Bytes.toBytes("rowc");
272    byte[] rowd = Bytes.toBytes("rowd");
273    byte[] rowe = Bytes.toBytes("rowe");
274    Table table = null;
275    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
276      HTableDescriptor desc = new HTableDescriptor(tableName);
277      HColumnDescriptor colDesc = new HColumnDescriptor(fam);
278      colDesc.setBlockCacheEnabled(true);
279      colDesc.setDataBlockEncoding(encoding);
280      desc.addFamily(colDesc);
281      Admin admin = TEST_UTIL.getAdmin();
282      admin.createTable(desc);
283      try {
284        table = TEST_UTIL.getConnection().getTable(tableName);
285        Put put = new Put(row);
286        byte[] value = Bytes.toBytes("value");
287        put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
288        int bigTagLen = Short.MAX_VALUE - 5;
289        put.setAttribute("visibility", new byte[bigTagLen]);
290        table.put(put);
291        Put put1 = new Put(row1);
292        byte[] value1 = Bytes.toBytes("1000dfsdf");
293        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
294        table.put(put1);
295        admin.flush(tableName);
296        // We are lacking an API for confirming flush request compaction.
297        // Just sleep for a short time. We won't be able to confirm flush
298        // completion but the test won't hang now or in the future if
299        // default compaction policy causes compaction between flush and
300        // when we go to confirm it.
301        Thread.sleep(1000);
302
303        put1 = new Put(row2);
304        value1 = Bytes.toBytes("1000dfsdf");
305        put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
306        table.put(put1);
307        admin.flush(tableName);
308        Thread.sleep(1000);
309
310        Put put2 = new Put(rowd);
311        byte[] value2 = Bytes.toBytes("1000dfsdf");
312        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
313        table.put(put2);
314        put2 = new Put(rowe);
315        value2 = Bytes.toBytes("1000dfsddfdf");
316        put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
317        put.setAttribute("visibility", Bytes.toBytes("ram"));
318        table.put(put2);
319        admin.flush(tableName);
320        Thread.sleep(1000);
321
322        TestCoprocessorForTags.checkTagPresence = true;
323        Scan s = new Scan(row);
324        s.setCaching(1);
325        ResultScanner scanner = table.getScanner(s);
326        try {
327          Result next = null;
328          while ((next = scanner.next()) != null) {
329            CellScanner cellScanner = next.cellScanner();
330            cellScanner.advance();
331            Cell current = cellScanner.current();
332            if (CellUtil.matchingRows(current, row)) {
333              assertEquals(1, TestCoprocessorForTags.tags.size());
334              Tag tag = TestCoprocessorForTags.tags.get(0);
335              assertEquals(bigTagLen, tag.getValueLength());
336            } else {
337              assertEquals(0, TestCoprocessorForTags.tags.size());
338            }
339          }
340        } finally {
341          if (scanner != null) {
342            scanner.close();
343          }
344          TestCoprocessorForTags.checkTagPresence = false;
345        }
346        while (admin.getCompactionState(tableName) != CompactionState.NONE) {
347          Thread.sleep(10);
348        }
349        TestCoprocessorForTags.checkTagPresence = true;
350        scanner = table.getScanner(s);
351        try {
352          Result next = null;
353          while ((next = scanner.next()) != null) {
354            CellScanner cellScanner = next.cellScanner();
355            cellScanner.advance();
356            Cell current = cellScanner.current();
357            if (CellUtil.matchingRows(current, row)) {
358              assertEquals(1, TestCoprocessorForTags.tags.size());
359              Tag tag = TestCoprocessorForTags.tags.get(0);
360              assertEquals(bigTagLen, tag.getValueLength());
361            } else {
362              assertEquals(0, TestCoprocessorForTags.tags.size());
363            }
364          }
365        } finally {
366          if (scanner != null) {
367            scanner.close();
368          }
369          TestCoprocessorForTags.checkTagPresence = false;
370        }
371      } finally {
372        if (table != null) {
373          table.close();
374        }
375        // delete the table
376        admin.disableTable(tableName);
377        admin.deleteTable(tableName);
378      }
379    }
380  }
381
382  @Test
383  public void testTagsWithAppendAndIncrement() throws Exception {
384    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
385    byte[] f = Bytes.toBytes("f");
386    byte[] q = Bytes.toBytes("q");
387    byte[] row1 = Bytes.toBytes("r1");
388    byte[] row2 = Bytes.toBytes("r2");
389
390    HTableDescriptor desc = new HTableDescriptor(tableName);
391    HColumnDescriptor colDesc = new HColumnDescriptor(f);
392    desc.addFamily(colDesc);
393    TEST_UTIL.getAdmin().createTable(desc);
394
395    Table table = null;
396    try {
397      table = TEST_UTIL.getConnection().getTable(tableName);
398      Put put = new Put(row1);
399      byte[] v = Bytes.toBytes(2L);
400      put.addColumn(f, q, v);
401      put.setAttribute("visibility", Bytes.toBytes("tag1"));
402      table.put(put);
403      Increment increment = new Increment(row1);
404      increment.addColumn(f, q, 1L);
405      table.increment(increment);
406      TestCoprocessorForTags.checkTagPresence = true;
407      ResultScanner scanner = table.getScanner(new Scan());
408      Result result = scanner.next();
409      KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
410      List<Tag> tags = TestCoprocessorForTags.tags;
411      assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
412      assertEquals(1, tags.size());
413      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
414      TestCoprocessorForTags.checkTagPresence = false;
415      TestCoprocessorForTags.tags = null;
416
417      increment = new Increment(row1);
418      increment.add(new KeyValue(row1, f, q, 1234L, v));
419      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
420      table.increment(increment);
421      TestCoprocessorForTags.checkTagPresence = true;
422      scanner = table.getScanner(new Scan());
423      result = scanner.next();
424      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
425      tags = TestCoprocessorForTags.tags;
426      assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
427      assertEquals(2, tags.size());
428      // We cannot assume the ordering of tags
429      List<String> tagValues = new ArrayList<>();
430      for (Tag tag : tags) {
431        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
432      }
433      assertTrue(tagValues.contains("tag1"));
434      assertTrue(tagValues.contains("tag2"));
435      TestCoprocessorForTags.checkTagPresence = false;
436      TestCoprocessorForTags.tags = null;
437
438      put = new Put(row2);
439      v = Bytes.toBytes(2L);
440      put.addColumn(f, q, v);
441      table.put(put);
442      increment = new Increment(row2);
443      increment.add(new KeyValue(row2, f, q, 1234L, v));
444      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
445      table.increment(increment);
446      TestCoprocessorForTags.checkTagPresence = true;
447      scanner = table.getScanner(new Scan().setStartRow(row2));
448      result = scanner.next();
449      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
450      tags = TestCoprocessorForTags.tags;
451      assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
452      assertEquals(1, tags.size());
453      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
454      TestCoprocessorForTags.checkTagPresence = false;
455      TestCoprocessorForTags.tags = null;
456
457      // Test Append
458      byte[] row3 = Bytes.toBytes("r3");
459      put = new Put(row3);
460      put.addColumn(f, q, Bytes.toBytes("a"));
461      put.setAttribute("visibility", Bytes.toBytes("tag1"));
462      table.put(put);
463      Append append = new Append(row3);
464      append.addColumn(f, q, Bytes.toBytes("b"));
465      table.append(append);
466      TestCoprocessorForTags.checkTagPresence = true;
467      scanner = table.getScanner(new Scan().setStartRow(row3));
468      result = scanner.next();
469      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
470      tags = TestCoprocessorForTags.tags;
471      assertEquals(1, tags.size());
472      assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
473      TestCoprocessorForTags.checkTagPresence = false;
474      TestCoprocessorForTags.tags = null;
475
476      append = new Append(row3);
477      append.add(new KeyValue(row3, f, q, 1234L, v));
478      append.setAttribute("visibility", Bytes.toBytes("tag2"));
479      table.append(append);
480      TestCoprocessorForTags.checkTagPresence = true;
481      scanner = table.getScanner(new Scan().setStartRow(row3));
482      result = scanner.next();
483      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
484      tags = TestCoprocessorForTags.tags;
485      assertEquals(2, tags.size());
486      // We cannot assume the ordering of tags
487      tagValues.clear();
488      for (Tag tag : tags) {
489        tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
490      }
491      assertTrue(tagValues.contains("tag1"));
492      assertTrue(tagValues.contains("tag2"));
493      TestCoprocessorForTags.checkTagPresence = false;
494      TestCoprocessorForTags.tags = null;
495
496      byte[] row4 = Bytes.toBytes("r4");
497      put = new Put(row4);
498      put.addColumn(f, q, Bytes.toBytes("a"));
499      table.put(put);
500      append = new Append(row4);
501      append.add(new KeyValue(row4, f, q, 1234L, v));
502      append.setAttribute("visibility", Bytes.toBytes("tag2"));
503      table.append(append);
504      TestCoprocessorForTags.checkTagPresence = true;
505      scanner = table.getScanner(new Scan().setStartRow(row4));
506      result = scanner.next();
507      kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
508      tags = TestCoprocessorForTags.tags;
509      assertEquals(1, tags.size());
510      assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
511    } finally {
512      TestCoprocessorForTags.checkTagPresence = false;
513      TestCoprocessorForTags.tags = null;
514      if (table != null) {
515        table.close();
516      }
517    }
518  }
519
520  private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
521    byte[] value2, byte[] row1, byte[] value1) throws IOException {
522    Scan s = new Scan(row);
523    // If filters are used this attribute can be specifically check for in
524    // filterKV method and
525    // kvs can be filtered out if the tags of interest is not found in that kv
526    s.setAttribute("visibility", Bytes.toBytes("myTag"));
527    ResultScanner scanner = null;
528    try {
529      scanner = table.getScanner(s);
530      Result next = scanner.next();
531
532      assertTrue(Bytes.equals(next.getRow(), row));
533      assertTrue(Bytes.equals(next.getValue(fam, qual), value));
534
535      Result next2 = scanner.next();
536      assertTrue(next2 != null);
537      assertTrue(Bytes.equals(next2.getRow(), row1));
538      assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
539
540      next2 = scanner.next();
541      assertTrue(next2 != null);
542      assertTrue(Bytes.equals(next2.getRow(), row2));
543      assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
544
545    } finally {
546      if (scanner != null) scanner.close();
547    }
548  }
549
550  public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver {
551
552    public static volatile boolean checkTagPresence = false;
553    public static List<Tag> tags = null;
554
555    @Override
556    public Optional<RegionObserver> getRegionObserver() {
557      return Optional.of(this);
558    }
559
560    @Override
561    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
562      final WALEdit edit, final Durability durability) throws IOException {
563      updateMutationAddingTags(put);
564    }
565
566    private void updateMutationAddingTags(final Mutation m) {
567      byte[] attribute = m.getAttribute("visibility");
568      byte[] cf = null;
569      List<Cell> updatedCells = new ArrayList<>();
570      if (attribute != null) {
571        for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
572          for (Cell cell : edits) {
573            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
574            if (cf == null) {
575              cf = CellUtil.cloneFamily(kv);
576            }
577            Tag tag = new ArrayBackedTag((byte) 1, attribute);
578            List<Tag> tagList = new ArrayList<>();
579            tagList.add(tag);
580
581            KeyValue newKV =
582              new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0,
583                kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(),
584                kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
585                CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList);
586            ((List<Cell>) updatedCells).add(newKV);
587          }
588        }
589        m.getFamilyCellMap().remove(cf);
590        // Update the family map
591        m.getFamilyCellMap().put(cf, updatedCells);
592      }
593    }
594
595    @Override
596    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
597      throws IOException {
598      updateMutationAddingTags(increment);
599      return null;
600    }
601
602    @Override
603    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
604      throws IOException {
605      updateMutationAddingTags(append);
606      return null;
607    }
608
609    @Override
610    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
611      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
612      if (checkTagPresence) {
613        if (results.size() > 0) {
614          // Check tag presence in the 1st cell in 1st Result
615          Result result = results.get(0);
616          CellScanner cellScanner = result.cellScanner();
617          if (cellScanner.advance()) {
618            Cell cell = cellScanner.current();
619            tags = PrivateCellUtil.getTags(cell);
620          }
621        }
622      }
623      return hasMore;
624    }
625  }
626}