001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.stream.Collectors; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellBuilderType; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Tag; 040import org.apache.hadoop.hbase.TagBuilderFactory; 041import org.apache.hadoop.hbase.TagType; 042import org.apache.hadoop.hbase.client.Append; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Mutation; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.client.TestFromClientSide; 053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 054import org.apache.hadoop.hbase.security.access.AccessController; 055import org.apache.hadoop.hbase.security.access.Permission; 056import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.Pair; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * Test coprocessor methods 072 * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and 073 * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may 074 * change the cells which will be applied to memstore and WAL. So add unit test for the case which 075 * change the cell's column family and tags. 076 */ 077@Category({ CoprocessorTests.class, MediumTests.class }) 078public class TestPostIncrementAndAppendBeforeWAL { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class); 083 084 @Rule 085 public TestName name = new TestName(); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class); 088 089 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 090 091 private static Connection connection; 092 093 private static final byte[] ROW = Bytes.toBytes("row"); 094 private static final String CF1 = "cf1"; 095 private static final byte[] CF1_BYTES = Bytes.toBytes(CF1); 096 private static final String CF2 = "cf2"; 097 private static final byte[] CF2_BYTES = Bytes.toBytes(CF2); 098 private static final String CF_NOT_EXIST = "cf_not_exist"; 099 private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST); 100 private static final byte[] CQ1 = Bytes.toBytes("cq1"); 101 private static final byte[] CQ2 = Bytes.toBytes("cq2"); 102 private static final byte[] VALUE = Bytes.toBytes("value"); 103 private static final byte[] VALUE2 = Bytes.toBytes("valuevalue"); 104 private static final String USER = "User"; 105 private static final Permission PERMS = 106 Permission.newBuilder().withActions(Permission.Action.READ).build(); 107 108 @BeforeClass 109 public static void setupBeforeClass() throws Exception { 110 UTIL.startMiniCluster(); 111 connection = UTIL.getConnection(); 112 } 113 114 @AfterClass 115 public static void tearDownAfterClass() throws Exception { 116 connection.close(); 117 UTIL.shutdownMiniCluster(); 118 } 119 120 private void createTableWithCoprocessor(TableName tableName, String coprocessor) 121 throws IOException { 122 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 123 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build()) 124 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build()) 125 .setCoprocessor(coprocessor).build(); 126 connection.getAdmin().createTable(tableDesc); 127 } 128 129 @Test 130 public void testChangeCellWithDifferntColumnFamily() throws Exception { 131 TableName tableName = TableName.valueOf(name.getMethodName()); 132 createTableWithCoprocessor(tableName, 133 ChangeCellWithDifferntColumnFamilyObserver.class.getName()); 134 135 try (Table table = connection.getTable(tableName)) { 136 Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1); 137 table.increment(increment); 138 Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1); 139 Result result = table.get(get); 140 assertEquals(1, result.size()); 141 assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1))); 142 143 Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE); 144 table.append(append); 145 get = new Get(ROW).addColumn(CF2_BYTES, CQ2); 146 result = table.get(get); 147 assertEquals(1, result.size()); 148 assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2))); 149 } 150 } 151 152 @Test 153 public void testChangeCellWithNotExistColumnFamily() throws Exception { 154 TableName tableName = TableName.valueOf(name.getMethodName()); 155 createTableWithCoprocessor(tableName, 156 ChangeCellWithNotExistColumnFamilyObserver.class.getName()); 157 158 try (Table table = connection.getTable(tableName)) { 159 try { 160 Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1); 161 table.increment(increment); 162 fail("should throw NoSuchColumnFamilyException"); 163 } catch (Exception e) { 164 assertTrue(e instanceof NoSuchColumnFamilyException); 165 } 166 try { 167 Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE); 168 table.append(append); 169 fail("should throw NoSuchColumnFamilyException"); 170 } catch (Exception e) { 171 assertTrue(e instanceof NoSuchColumnFamilyException); 172 } 173 } 174 } 175 176 @Test 177 public void testIncrementTTLWithACLTag() throws Exception { 178 TableName tableName = TableName.valueOf(name.getMethodName()); 179 createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName()); 180 try (Table table = connection.getTable(tableName)) { 181 // Increment without TTL 182 Increment firstIncrement = 183 new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setACL(USER, PERMS); 184 Result result = table.increment(firstIncrement); 185 assertEquals(1, result.size()); 186 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 187 188 // Check if the new cell can be read 189 Get get = new Get(ROW).addColumn(CF1_BYTES, CQ1); 190 result = table.get(get); 191 assertEquals(1, result.size()); 192 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 193 194 // Increment with TTL 195 Increment secondIncrement = 196 new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setTTL(1000).setACL(USER, PERMS); 197 result = table.increment(secondIncrement); 198 199 // We should get value 2 here 200 assertEquals(1, result.size()); 201 assertEquals(2, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 202 203 // Wait 4s to let the second increment expire 204 Thread.sleep(4000); 205 get = new Get(ROW).addColumn(CF1_BYTES, CQ1); 206 result = table.get(get); 207 208 // The value should revert to 1 209 assertEquals(1, result.size()); 210 assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1))); 211 } 212 } 213 214 @Test 215 public void testAppendTTLWithACLTag() throws Exception { 216 TableName tableName = TableName.valueOf(name.getMethodName()); 217 createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName()); 218 try (Table table = connection.getTable(tableName)) { 219 // Append without TTL 220 Append firstAppend = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setACL(USER, PERMS); 221 Result result = table.append(firstAppend); 222 assertEquals(1, result.size()); 223 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 224 225 // Check if the new cell can be read 226 Get get = new Get(ROW).addColumn(CF1_BYTES, CQ2); 227 result = table.get(get); 228 assertEquals(1, result.size()); 229 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 230 231 // Append with TTL 232 Append secondAppend = 233 new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE).setTTL(1000).setACL(USER, PERMS); 234 result = table.append(secondAppend); 235 236 // We should get "valuevalue"" 237 assertEquals(1, result.size()); 238 assertTrue(Bytes.equals(VALUE2, result.getValue(CF1_BYTES, CQ2))); 239 240 // Wait 4s to let the second append expire 241 Thread.sleep(4000); 242 get = new Get(ROW).addColumn(CF1_BYTES, CQ2); 243 result = table.get(get); 244 245 // The value should revert to "value" 246 assertEquals(1, result.size()); 247 assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2))); 248 } 249 } 250 251 private static boolean checkAclTag(byte[] acl, ExtendedCell cell) { 252 Iterator<Tag> iter = PrivateCellUtil.tagsIterator(cell); 253 while (iter.hasNext()) { 254 Tag tag = iter.next(); 255 if (tag.getType() == TagType.ACL_TAG_TYPE) { 256 Tag temp = 257 TagBuilderFactory.create().setTagType(TagType.ACL_TAG_TYPE).setTagValue(acl).build(); 258 return Tag.matchingValue(tag, temp); 259 } 260 } 261 return false; 262 } 263 264 public static class ChangeCellWithDifferntColumnFamilyObserver 265 implements RegionCoprocessor, RegionObserver { 266 @Override 267 public Optional<RegionObserver> getRegionObserver() { 268 return Optional.of(this); 269 } 270 271 @Override 272 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 273 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 274 List<Pair<Cell, Cell>> cellPairs) throws IOException { 275 return cellPairs.stream() 276 .map( 277 pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond()))) 278 .collect(Collectors.toList()); 279 } 280 281 private Cell newCellWithDifferentColumnFamily(Cell cell) { 282 return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 283 .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 284 .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell)) 285 .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode()) 286 .setValue(CellUtil.cloneValue(cell)).build(); 287 } 288 289 @Override 290 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 291 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 292 List<Pair<Cell, Cell>> cellPairs) throws IOException { 293 return cellPairs.stream() 294 .map( 295 pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond()))) 296 .collect(Collectors.toList()); 297 } 298 } 299 300 public static class ChangeCellWithNotExistColumnFamilyObserver 301 implements RegionCoprocessor, RegionObserver { 302 @Override 303 public Optional<RegionObserver> getRegionObserver() { 304 return Optional.of(this); 305 } 306 307 @Override 308 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 309 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 310 List<Pair<Cell, Cell>> cellPairs) throws IOException { 311 return cellPairs.stream() 312 .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond()))) 313 .collect(Collectors.toList()); 314 } 315 316 private Cell newCellWithNotExistColumnFamily(Cell cell) { 317 return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 318 .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 319 .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length) 320 .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp()) 321 .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build(); 322 } 323 324 @Override 325 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 326 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 327 List<Pair<Cell, Cell>> cellPairs) throws IOException { 328 return cellPairs.stream() 329 .map(pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond()))) 330 .collect(Collectors.toList()); 331 } 332 } 333 334 public static class ChangeCellWithACLTagObserver extends AccessController { 335 @Override 336 public Optional<RegionObserver> getRegionObserver() { 337 return Optional.of(this); 338 } 339 340 @Override 341 public List<Pair<Cell, Cell>> postIncrementBeforeWAL( 342 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 343 List<Pair<Cell, Cell>> cellPairs) throws IOException { 344 List<Pair<Cell, Cell>> result = super.postIncrementBeforeWAL(ctx, mutation, cellPairs); 345 for (Pair<Cell, Cell> pair : result) { 346 if ( 347 mutation.getACL() != null 348 && !checkAclTag(mutation.getACL(), (ExtendedCell) pair.getSecond()) 349 ) { 350 throw new DoNotRetryIOException("Unmatched ACL tag."); 351 } 352 } 353 return result; 354 } 355 356 @Override 357 public List<Pair<Cell, Cell>> postAppendBeforeWAL( 358 ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation, 359 List<Pair<Cell, Cell>> cellPairs) throws IOException { 360 List<Pair<Cell, Cell>> result = super.postAppendBeforeWAL(ctx, mutation, cellPairs); 361 for (Pair<Cell, Cell> pair : result) { 362 if ( 363 mutation.getACL() != null 364 && !checkAclTag(mutation.getACL(), (ExtendedCell) pair.getSecond()) 365 ) { 366 throw new DoNotRetryIOException("Unmatched ACL tag."); 367 } 368 } 369 return result; 370 } 371 } 372}