001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.stream.Collectors; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellBuilderType; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.Tag; 039import org.apache.hadoop.hbase.TagBuilderFactory; 040import org.apache.hadoop.hbase.TagType; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Increment; 046import org.apache.hadoop.hbase.client.Mutation; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.client.TestFromClientSide; 052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 053import org.apache.hadoop.hbase.security.access.AccessController; 054import org.apache.hadoop.hbase.security.access.Permission; 055import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Pair; 059import org.junit.AfterClass; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.TestName; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Test coprocessor methods 071 * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and 072 * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may 073 * change the cells which will be applied to memstore and WAL. So add unit test for the case which 074 * change the cell's column family and tags. 075 */ 076@Category({ CoprocessorTests.class, MediumTests.class }) 077public class TestPostIncrementAndAppendBeforeWAL { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class); 082 083 @Rule 084 public TestName name = new TestName(); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class); 087 088 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 089 090 private static Connection connection; 091 092 private static final byte[] ROW = Bytes.toBytes("row"); 093 private static final String CF1 = "cf1"; 094 private static final byte[] CF1_BYTES = Bytes.toBytes(CF1); 095 private static final String CF2 = "cf2"; 096 private static final byte[] CF2_BYTES = Bytes.toBytes(CF2); 097 private static final String CF_NOT_EXIST = "cf_not_exist"; 098 private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST); 099 private static final byte[] CQ1 = Bytes.toBytes("cq1"); 100 private static final byte[] CQ2 = Bytes.toBytes("cq2"); 101 private static final byte[] VALUE = Bytes.toBytes("value"); 102 private static final byte[] VALUE2 = Bytes.toBytes("valuevalue"); 103 private static final String USER = "User"; 104 private static final Permission PERMS = 105 Permission.newBuilder().withActions(Permission.Action.READ).build(); 106 107 @BeforeClass 108 public static void setupBeforeClass() throws Exception { 109 UTIL.startMiniCluster(); 110 connection = UTIL.getConnection(); 111 } 112 113 @AfterClass 114 public static void tearDownAfterClass() throws Exception { 115 connection.close(); 116 UTIL.shutdownMiniCluster(); 117 } 118 119 private void createTableWithCoprocessor(TableName tableName, String coprocessor) 120 throws IOException { 121 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 122 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build()) 123 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build()) 124 .setCoprocessor(coprocessor).build(); 125 connection.getAdmin().createTable(tableDesc); 126 } 127 128 @Test 129 public void testChangeCellWithDifferntColumnFamily() throws Exception { 130 TableName tableName = TableName.valueOf(name.getMethodName()); 131 createTableWithCoprocessor(tableName, 132 ChangeCellWithDifferntColumnFamilyObserver.class.getName()); 133 134 try (Table table = connection.getTable(tableName)) { 135 Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1); 136 table.increment(increment); 137 Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1); 138 Result result = table.get(get); 139 assertEquals(1, result.size()); 140 assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1))); 141 142 Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE); 143 table.append(append); 144 get = new Get(ROW).addColumn(CF2_BYTES, CQ2); 145 result = table.get(get); 146 assertEquals(1, result.size()); 147 assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2))); 148 } 149 } 150 151 @Test 152 public void testChangeCellWithNotExistColumnFamily() throws Exception { 153 TableName tableName = TableName.valueOf(name.getMethodName()); 154 createTableWithCoprocessor(tableName, 155 ChangeCellWithNotExistColumnFamilyObserver.class.getName()); 156 157 try (Table table = connection.getTable(tableName)) { 158 try { 159 Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1); 160 table.increment(increment); 161 fail("should throw NoSuchColumnFamilyException"); 162 } catch (Exception e) { 163 assertTrue(e instanceof NoSuchColumnFamilyException); 164 } 165 try { 166 Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE); 167 table.append(append); 168 fail("should throw NoSuchColumnFamilyException"); 169 } catch (Exception e) { 170 assertTrue(e instanceof NoSuchColumnFamilyException); 171 } 172 } 173 } 174 175 @Test 176 public void testIncrementTTLWithACLTag() throws Exception { 177 TableName tableName = TableName.valueOf(name.getMethodName()); 178 createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName()); 179 try (Table table = connection.getTable(tableName)) { 180 // Increment without TTL 181 Increment firstIncrement = 182 new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setACL(USER, PERMS); 183 Result result = table.increment(firstIncrement); 184 assertEquals(1, result.size()); 185 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 186 187 // Check if the new cell can be read 188 Get get = new Get(ROW).addColumn(CF1_BYTES, CQ1); 189 result = table.get(get); 190 assertEquals(1, result.size()); 191 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 192 193 // Increment with TTL 194 Increment secondIncrement = 195 new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setTTL(1000).setACL(USER, PERMS); 196 result = table.increment(secondIncrement); 197 198 // We should get value 2 here 199 assertEquals(1, result.size()); 200 assertEquals(2, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 201 202 // Wait 4s to let the second increment expire 203 Thread.sleep(4000); 204 get = new Get(ROW).addColumn(CF1_BYTES, CQ1); 205 result = table.get(get); 206 207 // The value should revert to 1 208 assertEquals(1, result.size()); 209 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 210 } 211 } 212 213 @Test 214 public void testAppendTTLWithACLTag() throws Exception { 215 TableName tableName = TableName.valueOf(name.getMethodName()); 216 createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName()); 217 try (Table table = connection.getTable(tableName)) { 218 // Append without TTL 219 Append firstAppend = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setACL(USER, PERMS); 220 Result result = table.append(firstAppend); 221 assertEquals(1, result.size()); 222 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 223 224 // Check if the new cell can be read 225 Get get = new Get(ROW).addColumn(CF1_BYTES, CQ2); 226 result = table.get(get); 227 assertEquals(1, result.size()); 228 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 229 230 // Append with TTL 231 Append secondAppend = 232 new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setTTL(1000).setACL(USER, PERMS); 233 result = table.append(secondAppend); 234 235 // We should get "valuevalue"" 236 assertEquals(1, result.size()); 237 assertTrue(Bytes.equals(VALUE2, result.getValue(CF1_BYTES, CQ2))); 238 239 // Wait 4s to let the second append expire 240 Thread.sleep(4000); 241 get = new Get(ROW).addColumn(CF1_BYTES, CQ2); 242 result = table.get(get); 243 244 // The value should revert to "value" 245 assertEquals(1, result.size()); 246 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 247 } 248 } 249 250 private static boolean checkAclTag(byte[] acl, Cell cell) { 251 Iterator<Tag> iter = PrivateCellUtil.tagsIterator(cell); 252 while (iter.hasNext()) { 253 Tag tag = iter.next(); 254 if (tag.getType() == TagType.ACL_TAG_TYPE) { 255 Tag temp = 256 TagBuilderFactory.create().setTagType(TagType.ACL_TAG_TYPE).setTagValue(acl).build(); 257 return Tag.matchingValue(tag, temp); 258 } 259 } 260 return false; 261 } 262 263 public static class ChangeCellWithDifferntColumnFamilyObserver 264 implements RegionCoprocessor, RegionObserver { 265 @Override 266 public Optional<RegionObserver> getRegionObserver() { 267 return Optional.of(this); 268 } 269 270 @Override 271 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 272 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 273 List<Pair<Cell, Cell>> cellPairs) throws IOException { 274 return cellPairs.stream() 275 .map( 276 pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond()))) 277 .collect(Collectors.toList()); 278 } 279 280 private Cell newCellWithDifferentColumnFamily(Cell cell) { 281 return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 282 .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 283 .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell)) 284 .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode()) 285 .setValue(CellUtil.cloneValue(cell)).build(); 286 } 287 288 @Override 289 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 290 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 291 List<Pair<Cell, Cell>> cellPairs) throws IOException { 292 return cellPairs.stream() 293 .map( 294 pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond()))) 295 .collect(Collectors.toList()); 296 } 297 } 298 299 public static class ChangeCellWithNotExistColumnFamilyObserver 300 implements RegionCoprocessor, RegionObserver { 301 @Override 302 public Optional<RegionObserver> getRegionObserver() { 303 return Optional.of(this); 304 } 305 306 @Override 307 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 308 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 309 List<Pair<Cell, Cell>> cellPairs) throws IOException { 310 return cellPairs.stream() 311 .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond()))) 312 .collect(Collectors.toList()); 313 } 314 315 private Cell newCellWithNotExistColumnFamily(Cell cell) { 316 return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 317 .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 318 .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length) 319 .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp()) 320 .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build(); 321 } 322 323 @Override 324 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 325 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 326 List<Pair<Cell, Cell>> cellPairs) throws IOException { 327 return cellPairs.stream() 328 .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond()))) 329 .collect(Collectors.toList()); 330 } 331 } 332 333 public static class ChangeCellWithACLTagObserver extends AccessController { 334 @Override 335 public Optional<RegionObserver> getRegionObserver() { 336 return Optional.of(this); 337 } 338 339 @Override 340 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 341 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 342 List<Pair<Cell, Cell>> cellPairs) throws IOException { 343 List<Pair<Cell, Cell>> result = super.postIncrementBeforeWAL(ctx, mutation, cellPairs); 344 for (Pair<Cell, Cell> pair : result) { 345 if (mutation.getACL() != null && !checkAclTag(mutation.getACL(), pair.getSecond())) { 346 throw new DoNotRetryIOException("Unmatched ACL tag."); 347 } 348 } 349 return result; 350 } 351 352 @Override 353 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 354 ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, 355 List<Pair<Cell, Cell>> cellPairs) throws IOException { 356 List<Pair<Cell, Cell>> result = super.postAppendBeforeWAL(ctx, mutation, cellPairs); 357 for (Pair<Cell, Cell> pair : result) { 358 if (mutation.getACL() != null && !checkAclTag(mutation.getACL(), pair.getSecond())) { 359 throw new DoNotRetryIOException("Unmatched ACL tag."); 360 } 361 } 362 return result; 363 } 364 } 365}