001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.stream.Collectors;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellBuilderType;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.Tag;
039import org.apache.hadoop.hbase.TagBuilderFactory;
040import org.apache.hadoop.hbase.TagType;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Increment;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.client.TestFromClientSide;
052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
053import org.apache.hadoop.hbase.security.access.AccessController;
054import org.apache.hadoop.hbase.security.access.Permission;
055import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Pair;
059import org.junit.AfterClass;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Test coprocessor methods
071 * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and
072 * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may
073 * change the cells which will be applied to memstore and WAL. So add unit test for the case which
074 * change the cell's column family and tags.
075 */
076@Category({ CoprocessorTests.class, MediumTests.class })
077public class TestPostIncrementAndAppendBeforeWAL {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class);
082
083  @Rule
084  public TestName name = new TestName();
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
087
088  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
089
090  private static Connection connection;
091
092  private static final byte[] ROW = Bytes.toBytes("row");
093  private static final String CF1 = "cf1";
094  private static final byte[] CF1_BYTES = Bytes.toBytes(CF1);
095  private static final String CF2 = "cf2";
096  private static final byte[] CF2_BYTES = Bytes.toBytes(CF2);
097  private static final String CF_NOT_EXIST = "cf_not_exist";
098  private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST);
099  private static final byte[] CQ1 = Bytes.toBytes("cq1");
100  private static final byte[] CQ2 = Bytes.toBytes("cq2");
101  private static final byte[] VALUE = Bytes.toBytes("value");
102  private static final byte[] VALUE2 = Bytes.toBytes("valuevalue");
103  private static final String USER = "User";
104  private static final Permission PERMS =
105    Permission.newBuilder().withActions(Permission.Action.READ).build();
106
107  @BeforeClass
108  public static void setupBeforeClass() throws Exception {
109    UTIL.startMiniCluster();
110    connection = UTIL.getConnection();
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    connection.close();
116    UTIL.shutdownMiniCluster();
117  }
118
119  private void createTableWithCoprocessor(TableName tableName, String coprocessor)
120    throws IOException {
121    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
122      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build())
123      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build())
124      .setCoprocessor(coprocessor).build();
125    connection.getAdmin().createTable(tableDesc);
126  }
127
128  @Test
129  public void testChangeCellWithDifferntColumnFamily() throws Exception {
130    TableName tableName = TableName.valueOf(name.getMethodName());
131    createTableWithCoprocessor(tableName,
132      ChangeCellWithDifferntColumnFamilyObserver.class.getName());
133
134    try (Table table = connection.getTable(tableName)) {
135      Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
136      table.increment(increment);
137      Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1);
138      Result result = table.get(get);
139      assertEquals(1, result.size());
140      assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1)));
141
142      Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
143      table.append(append);
144      get = new Get(ROW).addColumn(CF2_BYTES, CQ2);
145      result = table.get(get);
146      assertEquals(1, result.size());
147      assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2)));
148    }
149  }
150
151  @Test
152  public void testChangeCellWithNotExistColumnFamily() throws Exception {
153    TableName tableName = TableName.valueOf(name.getMethodName());
154    createTableWithCoprocessor(tableName,
155      ChangeCellWithNotExistColumnFamilyObserver.class.getName());
156
157    try (Table table = connection.getTable(tableName)) {
158      try {
159        Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
160        table.increment(increment);
161        fail("should throw NoSuchColumnFamilyException");
162      } catch (Exception e) {
163        assertTrue(e instanceof NoSuchColumnFamilyException);
164      }
165      try {
166        Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
167        table.append(append);
168        fail("should throw NoSuchColumnFamilyException");
169      } catch (Exception e) {
170        assertTrue(e instanceof NoSuchColumnFamilyException);
171      }
172    }
173  }
174
175  @Test
176  public void testIncrementTTLWithACLTag() throws Exception {
177    TableName tableName = TableName.valueOf(name.getMethodName());
178    createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
179    try (Table table = connection.getTable(tableName)) {
180      // Increment without TTL
181      Increment firstIncrement =
182        new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setACL(USER, PERMS);
183      Result result = table.increment(firstIncrement);
184      assertEquals(1, result.size());
185      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
186
187      // Check if the new cell can be read
188      Get get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
189      result = table.get(get);
190      assertEquals(1, result.size());
191      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
192
193      // Increment with TTL
194      Increment secondIncrement =
195        new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setTTL(1000).setACL(USER, PERMS);
196      result = table.increment(secondIncrement);
197
198      // We should get value 2 here
199      assertEquals(1, result.size());
200      assertEquals(2, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
201
202      // Wait 4s to let the second increment expire
203      Thread.sleep(4000);
204      get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
205      result = table.get(get);
206
207      // The value should revert to 1
208      assertEquals(1, result.size());
209      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
210    }
211  }
212
213  @Test
214  public void testAppendTTLWithACLTag() throws Exception {
215    TableName tableName = TableName.valueOf(name.getMethodName());
216    createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
217    try (Table table = connection.getTable(tableName)) {
218      // Append without TTL
219      Append firstAppend = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setACL(USER, PERMS);
220      Result result = table.append(firstAppend);
221      assertEquals(1, result.size());
222      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
223
224      // Check if the new cell can be read
225      Get get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
226      result = table.get(get);
227      assertEquals(1, result.size());
228      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
229
230      // Append with TTL
231      Append secondAppend =
232        new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setTTL(1000).setACL(USER, PERMS);
233      result = table.append(secondAppend);
234
235      // We should get "valuevalue""
236      assertEquals(1, result.size());
237      assertTrue(Bytes.equals(VALUE2, result.getValue(CF1_BYTES, CQ2)));
238
239      // Wait 4s to let the second append expire
240      Thread.sleep(4000);
241      get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
242      result = table.get(get);
243
244      // The value should revert to "value"
245      assertEquals(1, result.size());
246      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
247    }
248  }
249
250  private static boolean checkAclTag(byte[] acl, Cell cell) {
251    Iterator<Tag> iter = PrivateCellUtil.tagsIterator(cell);
252    while (iter.hasNext()) {
253      Tag tag = iter.next();
254      if (tag.getType() == TagType.ACL_TAG_TYPE) {
255        Tag temp =
256          TagBuilderFactory.create().setTagType(TagType.ACL_TAG_TYPE).setTagValue(acl).build();
257        return Tag.matchingValue(tag, temp);
258      }
259    }
260    return false;
261  }
262
263  public static class ChangeCellWithDifferntColumnFamilyObserver
264    implements RegionCoprocessor, RegionObserver {
265    @Override
266    public Optional<RegionObserver> getRegionObserver() {
267      return Optional.of(this);
268    }
269
270    @Override
271    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
272      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
273      List<Pair<Cell, Cell>> cellPairs) throws IOException {
274      return cellPairs.stream()
275        .map(
276          pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
277        .collect(Collectors.toList());
278    }
279
280    private Cell newCellWithDifferentColumnFamily(Cell cell) {
281      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
282        .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
283        .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell))
284        .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode())
285        .setValue(CellUtil.cloneValue(cell)).build();
286    }
287
288    @Override
289    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
290      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
291      List<Pair<Cell, Cell>> cellPairs) throws IOException {
292      return cellPairs.stream()
293        .map(
294          pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
295        .collect(Collectors.toList());
296    }
297  }
298
299  public static class ChangeCellWithNotExistColumnFamilyObserver
300    implements RegionCoprocessor, RegionObserver {
301    @Override
302    public Optional<RegionObserver> getRegionObserver() {
303      return Optional.of(this);
304    }
305
306    @Override
307    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
308      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
309      List<Pair<Cell, Cell>> cellPairs) throws IOException {
310      return cellPairs.stream()
311        .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
312        .collect(Collectors.toList());
313    }
314
315    private Cell newCellWithNotExistColumnFamily(Cell cell) {
316      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
317        .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
318        .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length)
319        .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp())
320        .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build();
321    }
322
323    @Override
324    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
325      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
326      List<Pair<Cell, Cell>> cellPairs) throws IOException {
327      return cellPairs.stream()
328        .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
329        .collect(Collectors.toList());
330    }
331  }
332
333  public static class ChangeCellWithACLTagObserver extends AccessController {
334    @Override
335    public Optional<RegionObserver> getRegionObserver() {
336      return Optional.of(this);
337    }
338
339    @Override
340    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
341      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
342      List<Pair<Cell, Cell>> cellPairs) throws IOException {
343      List<Pair<Cell, Cell>> result = super.postIncrementBeforeWAL(ctx, mutation, cellPairs);
344      for (Pair<Cell, Cell> pair : result) {
345        if (mutation.getACL() != null && !checkAclTag(mutation.getACL(), pair.getSecond())) {
346          throw new DoNotRetryIOException("Unmatched ACL tag.");
347        }
348      }
349      return result;
350    }
351
352    @Override
353    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
354      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
355      List<Pair<Cell, Cell>> cellPairs) throws IOException {
356      List<Pair<Cell, Cell>> result = super.postAppendBeforeWAL(ctx, mutation, cellPairs);
357      for (Pair<Cell, Cell> pair : result) {
358        if (mutation.getACL() != null && !checkAclTag(mutation.getACL(), pair.getSecond())) {
359          throw new DoNotRetryIOException("Unmatched ACL tag.");
360        }
361      }
362      return result;
363    }
364  }
365}