001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.filter.ByteArrayComparable; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.junit.AfterClass; 057import org.junit.Before; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064 065@Category({ CoprocessorTests.class, MediumTests.class }) 066public class TestPassCustomCellViaRegionObserver { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class); 071 072 @Rule 073 public TestName testName = new TestName(); 074 075 private TableName tableName; 076 private Table table = null; 077 078 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 079 080 private static final byte[] ROW = Bytes.toBytes("ROW"); 081 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 082 private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER"); 083 private static final byte[] VALUE = Bytes.toBytes(10L); 084 private static final byte[] APPEND_VALUE = Bytes.toBytes("MB"); 085 086 private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP"); 087 088 @BeforeClass 089 public static void setupBeforeClass() throws Exception { 090 // small retry number can speed up the failed tests. 091 UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void clearTable() throws IOException { 102 RegionObserverImpl.COUNT.set(0); 103 tableName = TableName.valueOf(testName.getMethodName()); 104 if (table != null) { 105 table.close(); 106 } 107 try (Admin admin = UTIL.getAdmin()) { 108 for (TableName name : admin.listTableNames()) { 109 try { 110 admin.disableTable(name); 111 } catch (IOException e) { 112 } 113 admin.deleteTable(name); 114 } 115 table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) 116 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 117 .setCoprocessor(RegionObserverImpl.class.getName()).build(), null); 118 } 119 } 120 121 @Test 122 public void testMutation() throws Exception { 123 124 Put put = new Put(ROW); 125 put.addColumn(FAMILY, QUALIFIER, VALUE); 126 table.put(put); 127 byte[] value = VALUE; 128 assertResult(table.get(new Get(ROW)), value, value); 129 assertObserverHasExecuted(); 130 131 Increment inc = new Increment(ROW); 132 inc.addColumn(FAMILY, QUALIFIER, 10L); 133 table.increment(inc); 134 // QUALIFIER -> 10 (put) + 10 (increment) 135 // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment) 136 value = Bytes.toBytes(20L); 137 assertResult(table.get(new Get(ROW)), value, value); 138 assertObserverHasExecuted(); 139 140 Append append = new Append(ROW); 141 append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE); 142 table.append(append); 143 // 10L + "MB" 144 value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]).put(value) 145 .put(APPEND_VALUE).array(); 146 assertResult(table.get(new Get(ROW)), value, value); 147 assertObserverHasExecuted(); 148 149 Delete delete = new Delete(ROW); 150 delete.addColumns(FAMILY, QUALIFIER); 151 table.delete(delete); 152 assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(), 153 table.get(new Get(ROW)).isEmpty()); 154 assertObserverHasExecuted(); 155 156 assertTrue(table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put)); 157 assertObserverHasExecuted(); 158 159 assertTrue( 160 table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenDelete(delete)); 161 assertObserverHasExecuted(); 162 163 assertTrue(table.get(new Get(ROW)).isEmpty()); 164 } 165 166 @Test 167 public void testMultiPut() throws Exception { 168 List<Put> puts = 169 IntStream.range(0, 10).mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE)) 170 .collect(Collectors.toList()); 171 table.put(puts); 172 assertResult(table.get(new Get(ROW)), VALUE); 173 assertObserverHasExecuted(); 174 175 List<Delete> deletes = 176 IntStream.range(0, 10).mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i))) 177 .collect(Collectors.toList()); 178 table.delete(deletes); 179 assertTrue(table.get(new Get(ROW)).isEmpty()); 180 assertObserverHasExecuted(); 181 } 182 183 private static void assertObserverHasExecuted() { 184 assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0); 185 } 186 187 private static void assertResult(Result result, byte[] expectedValue) { 188 assertFalse(result.isEmpty()); 189 for (Cell c : result.rawCells()) { 190 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 191 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 192 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 193 } 194 } 195 196 private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) { 197 assertFalse(result.isEmpty()); 198 for (Cell c : result.rawCells()) { 199 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 200 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 201 if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) { 202 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 203 } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) { 204 assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c))); 205 } else { 206 fail("No valid qualifier"); 207 } 208 } 209 } 210 211 private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, Cell.Type type, 212 byte[] value) { 213 return new Cell() { 214 215 @Override 216 public long heapSize() { 217 return 0; 218 } 219 220 private byte[] getArray(byte[] array) { 221 return array == null ? HConstants.EMPTY_BYTE_ARRAY : array; 222 } 223 224 private int length(byte[] array) { 225 return array == null ? 0 : array.length; 226 } 227 228 @Override 229 public byte[] getRowArray() { 230 return getArray(row); 231 } 232 233 @Override 234 public int getRowOffset() { 235 return 0; 236 } 237 238 @Override 239 public short getRowLength() { 240 return (short) length(row); 241 } 242 243 @Override 244 public byte[] getFamilyArray() { 245 return getArray(family); 246 } 247 248 @Override 249 public int getFamilyOffset() { 250 return 0; 251 } 252 253 @Override 254 public byte getFamilyLength() { 255 return (byte) length(family); 256 } 257 258 @Override 259 public byte[] getQualifierArray() { 260 return getArray(qualifier); 261 } 262 263 @Override 264 public int getQualifierOffset() { 265 return 0; 266 } 267 268 @Override 269 public int getQualifierLength() { 270 return length(qualifier); 271 } 272 273 @Override 274 public long getTimestamp() { 275 return HConstants.LATEST_TIMESTAMP; 276 } 277 278 @Override 279 public byte getTypeByte() { 280 return type.getCode(); 281 } 282 283 @Override 284 public long getSequenceId() { 285 return 0; 286 } 287 288 @Override 289 public byte[] getValueArray() { 290 return getArray(value); 291 } 292 293 @Override 294 public int getValueOffset() { 295 return 0; 296 } 297 298 @Override 299 public int getValueLength() { 300 return length(value); 301 } 302 303 @Override 304 public int getSerializedSize() { 305 return KeyValueUtil.getSerializedSize(this, true); 306 } 307 308 @Override 309 public byte[] getTagsArray() { 310 return getArray(null); 311 } 312 313 @Override 314 public int getTagsOffset() { 315 return 0; 316 } 317 318 @Override 319 public int getTagsLength() { 320 return length(null); 321 } 322 323 @Override 324 public Type getType() { 325 return type; 326 } 327 }; 328 } 329 330 private static Cell createCustomCell(Put put) { 331 return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 332 } 333 334 private static Cell createCustomCell(Append append) { 335 return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, 336 APPEND_VALUE); 337 } 338 339 private static Cell createCustomCell(Increment inc) { 340 return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 341 } 342 343 private static Cell createCustomCell(Delete delete) { 344 return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.DeleteColumn, 345 null); 346 } 347 348 public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver { 349 static final AtomicInteger COUNT = new AtomicInteger(0); 350 351 @Override 352 public Optional<RegionObserver> getRegionObserver() { 353 return Optional.of(this); 354 } 355 356 @Override 357 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 358 Durability durability) throws IOException { 359 put.add(createCustomCell(put)); 360 COUNT.incrementAndGet(); 361 } 362 363 @Override 364 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, 365 WALEdit edit, Durability durability) throws IOException { 366 delete.add(createCustomCell(delete)); 367 COUNT.incrementAndGet(); 368 } 369 370 @Override 371 public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 372 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, 373 boolean result) throws IOException { 374 put.add(createCustomCell(put)); 375 COUNT.incrementAndGet(); 376 return result; 377 } 378 379 @Override 380 public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 381 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, 382 Delete delete, boolean result) throws IOException { 383 delete.add(createCustomCell(delete)); 384 COUNT.incrementAndGet(); 385 return result; 386 } 387 388 @Override 389 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) 390 throws IOException { 391 append.add(createCustomCell(append)); 392 COUNT.incrementAndGet(); 393 return null; 394 } 395 396 @Override 397 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 398 throws IOException { 399 increment.add(createCustomCell(increment)); 400 COUNT.incrementAndGet(); 401 return null; 402 } 403 404 } 405 406}