001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.stream.Collectors;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellBuilderType;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Tag;
040import org.apache.hadoop.hbase.TagBuilderFactory;
041import org.apache.hadoop.hbase.TagType;
042import org.apache.hadoop.hbase.client.Append;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.client.TestFromClientSide;
053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
054import org.apache.hadoop.hbase.security.access.AccessController;
055import org.apache.hadoop.hbase.security.access.Permission;
056import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * Test coprocessor methods
072 * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and
073 * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may
074 * change the cells which will be applied to memstore and WAL. So add unit test for the case which
075 * change the cell's column family and tags.
076 */
077@Category({ CoprocessorTests.class, MediumTests.class })
078public class TestPostIncrementAndAppendBeforeWAL {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class);
083
084  @Rule
085  public TestName name = new TestName();
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
088
089  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
090
091  private static Connection connection;
092
093  private static final byte[] ROW = Bytes.toBytes("row");
094  private static final String CF1 = "cf1";
095  private static final byte[] CF1_BYTES = Bytes.toBytes(CF1);
096  private static final String CF2 = "cf2";
097  private static final byte[] CF2_BYTES = Bytes.toBytes(CF2);
098  private static final String CF_NOT_EXIST = "cf_not_exist";
099  private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST);
100  private static final byte[] CQ1 = Bytes.toBytes("cq1");
101  private static final byte[] CQ2 = Bytes.toBytes("cq2");
102  private static final byte[] VALUE = Bytes.toBytes("value");
103  private static final byte[] VALUE2 = Bytes.toBytes("valuevalue");
104  private static final String USER = "User";
105  private static final Permission PERMS =
106    Permission.newBuilder().withActions(Permission.Action.READ).build();
107
108  @BeforeClass
109  public static void setupBeforeClass() throws Exception {
110    UTIL.startMiniCluster();
111    connection = UTIL.getConnection();
112  }
113
114  @AfterClass
115  public static void tearDownAfterClass() throws Exception {
116    connection.close();
117    UTIL.shutdownMiniCluster();
118  }
119
120  private void createTableWithCoprocessor(TableName tableName, String coprocessor)
121    throws IOException {
122    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
123      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build())
124      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build())
125      .setCoprocessor(coprocessor).build();
126    connection.getAdmin().createTable(tableDesc);
127  }
128
129  @Test
130  public void testChangeCellWithDifferntColumnFamily() throws Exception {
131    TableName tableName = TableName.valueOf(name.getMethodName());
132    createTableWithCoprocessor(tableName,
133      ChangeCellWithDifferntColumnFamilyObserver.class.getName());
134
135    try (Table table = connection.getTable(tableName)) {
136      Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
137      table.increment(increment);
138      Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1);
139      Result result = table.get(get);
140      assertEquals(1, result.size());
141      assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1)));
142
143      Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
144      table.append(append);
145      get = new Get(ROW).addColumn(CF2_BYTES, CQ2);
146      result = table.get(get);
147      assertEquals(1, result.size());
148      assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2)));
149    }
150  }
151
152  @Test
153  public void testChangeCellWithNotExistColumnFamily() throws Exception {
154    TableName tableName = TableName.valueOf(name.getMethodName());
155    createTableWithCoprocessor(tableName,
156      ChangeCellWithNotExistColumnFamilyObserver.class.getName());
157
158    try (Table table = connection.getTable(tableName)) {
159      try {
160        Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
161        table.increment(increment);
162        fail("should throw NoSuchColumnFamilyException");
163      } catch (Exception e) {
164        assertTrue(e instanceof NoSuchColumnFamilyException);
165      }
166      try {
167        Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
168        table.append(append);
169        fail("should throw NoSuchColumnFamilyException");
170      } catch (Exception e) {
171        assertTrue(e instanceof NoSuchColumnFamilyException);
172      }
173    }
174  }
175
176  @Test
177  public void testIncrementTTLWithACLTag() throws Exception {
178    TableName tableName = TableName.valueOf(name.getMethodName());
179    createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
180    try (Table table = connection.getTable(tableName)) {
181      // Increment without TTL
182      Increment firstIncrement =
183        new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setACL(USER, PERMS);
184      Result result = table.increment(firstIncrement);
185      assertEquals(1, result.size());
186      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
187
188      // Check if the new cell can be read
189      Get get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
190      result = table.get(get);
191      assertEquals(1, result.size());
192      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
193
194      // Increment with TTL
195      Increment secondIncrement =
196        new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setTTL(1000).setACL(USER, PERMS);
197      result = table.increment(secondIncrement);
198
199      // We should get value 2 here
200      assertEquals(1, result.size());
201      assertEquals(2, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
202
203      // Wait 4s to let the second increment expire
204      Thread.sleep(4000);
205      get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
206      result = table.get(get);
207
208      // The value should revert to 1
209      assertEquals(1, result.size());
210      assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
211    }
212  }
213
214  @Test
215  public void testAppendTTLWithACLTag() throws Exception {
216    TableName tableName = TableName.valueOf(name.getMethodName());
217    createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
218    try (Table table = connection.getTable(tableName)) {
219      // Append without TTL
220      Append firstAppend = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setACL(USER, PERMS);
221      Result result = table.append(firstAppend);
222      assertEquals(1, result.size());
223      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
224
225      // Check if the new cell can be read
226      Get get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
227      result = table.get(get);
228      assertEquals(1, result.size());
229      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
230
231      // Append with TTL
232      Append secondAppend =
233        new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setTTL(1000).setACL(USER, PERMS);
234      result = table.append(secondAppend);
235
236      // We should get "valuevalue""
237      assertEquals(1, result.size());
238      assertTrue(Bytes.equals(VALUE2, result.getValue(CF1_BYTES, CQ2)));
239
240      // Wait 4s to let the second append expire
241      Thread.sleep(4000);
242      get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
243      result = table.get(get);
244
245      // The value should revert to "value"
246      assertEquals(1, result.size());
247      assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
248    }
249  }
250
251  private static boolean checkAclTag(byte[] acl, ExtendedCell cell) {
252    Iterator<Tag> iter = PrivateCellUtil.tagsIterator(cell);
253    while (iter.hasNext()) {
254      Tag tag = iter.next();
255      if (tag.getType() == TagType.ACL_TAG_TYPE) {
256        Tag temp =
257          TagBuilderFactory.create().setTagType(TagType.ACL_TAG_TYPE).setTagValue(acl).build();
258        return Tag.matchingValue(tag, temp);
259      }
260    }
261    return false;
262  }
263
264  public static class ChangeCellWithDifferntColumnFamilyObserver
265    implements RegionCoprocessor, RegionObserver {
266    @Override
267    public Optional<RegionObserver> getRegionObserver() {
268      return Optional.of(this);
269    }
270
271    @Override
272    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
273      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
274      List<Pair<Cell, Cell>> cellPairs) throws IOException {
275      return cellPairs.stream()
276        .map(
277          pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
278        .collect(Collectors.toList());
279    }
280
281    private Cell newCellWithDifferentColumnFamily(Cell cell) {
282      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
283        .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
284        .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell))
285        .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode())
286        .setValue(CellUtil.cloneValue(cell)).build();
287    }
288
289    @Override
290    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
291      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
292      List<Pair<Cell, Cell>> cellPairs) throws IOException {
293      return cellPairs.stream()
294        .map(
295          pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
296        .collect(Collectors.toList());
297    }
298  }
299
300  public static class ChangeCellWithNotExistColumnFamilyObserver
301    implements RegionCoprocessor, RegionObserver {
302    @Override
303    public Optional<RegionObserver> getRegionObserver() {
304      return Optional.of(this);
305    }
306
307    @Override
308    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
309      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
310      List<Pair<Cell, Cell>> cellPairs) throws IOException {
311      return cellPairs.stream()
312        .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
313        .collect(Collectors.toList());
314    }
315
316    private Cell newCellWithNotExistColumnFamily(Cell cell) {
317      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
318        .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
319        .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length)
320        .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp())
321        .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build();
322    }
323
324    @Override
325    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
326      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
327      List<Pair<Cell, Cell>> cellPairs) throws IOException {
328      return cellPairs.stream()
329        .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
330        .collect(Collectors.toList());
331    }
332  }
333
334  public static class ChangeCellWithACLTagObserver extends AccessController {
335    @Override
336    public Optional<RegionObserver> getRegionObserver() {
337      return Optional.of(this);
338    }
339
340    @Override
341    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
342      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
343      List<Pair<Cell, Cell>> cellPairs) throws IOException {
344      List<Pair<Cell, Cell>> result = super.postIncrementBeforeWAL(ctx, mutation, cellPairs);
345      for (Pair<Cell, Cell> pair : result) {
346        if (
347          mutation.getACL() != null
348            && !checkAclTag(mutation.getACL(), (ExtendedCell) pair.getSecond())
349        ) {
350          throw new DoNotRetryIOException("Unmatched ACL tag.");
351        }
352      }
353      return result;
354    }
355
356    @Override
357    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
358      ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
359      List<Pair<Cell, Cell>> cellPairs) throws IOException {
360      List<Pair<Cell, Cell>> result = super.postAppendBeforeWAL(ctx, mutation, cellPairs);
361      for (Pair<Cell, Cell> pair : result) {
362        if (
363          mutation.getACL() != null
364            && !checkAclTag(mutation.getACL(), (ExtendedCell) pair.getSecond())
365        ) {
366          throw new DoNotRetryIOException("Unmatched ACL tag.");
367        }
368      }
369      return result;
370    }
371  }
372}