001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.filter.ByteArrayComparable; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.junit.AfterClass; 057import org.junit.Before; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064 065@Category({ CoprocessorTests.class, MediumTests.class }) 066public class TestPassCustomCellViaRegionObserver { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class); 071 072 @Rule 073 public TestName testName = new TestName(); 074 075 private TableName tableName; 076 private Table table = null; 077 078 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 079 080 private static final byte[] ROW = Bytes.toBytes("ROW"); 081 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 082 private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER"); 083 private static final byte[] VALUE = Bytes.toBytes(10L); 084 private static final byte[] APPEND_VALUE = Bytes.toBytes("MB"); 085 086 private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP"); 087 088 @BeforeClass 089 public static void setupBeforeClass() throws Exception { 090 // small retry number can speed up the failed tests. 091 UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void clearTable() throws IOException { 102 RegionObserverImpl.COUNT.set(0); 103 tableName = TableName.valueOf(testName.getMethodName()); 104 if (table != null) { 105 table.close(); 106 } 107 try (Admin admin = UTIL.getAdmin()) { 108 for (TableName name : admin.listTableNames()) { 109 try { 110 admin.disableTable(name); 111 } catch (IOException e) { 112 } 113 admin.deleteTable(name); 114 } 115 table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) 116 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 117 .setCoprocessor(RegionObserverImpl.class.getName()) 118 .build(), null); 119 } 120 } 121 122 @Test 123 public void testMutation() throws Exception { 124 125 Put put = new Put(ROW); 126 put.addColumn(FAMILY, QUALIFIER, VALUE); 127 table.put(put); 128 byte[] value = VALUE; 129 assertResult(table.get(new Get(ROW)), value, value); 130 assertObserverHasExecuted(); 131 132 Increment inc = new Increment(ROW); 133 inc.addColumn(FAMILY, QUALIFIER, 10L); 134 table.increment(inc); 135 // QUALIFIER -> 10 (put) + 10 (increment) 136 // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment) 137 value = Bytes.toBytes(20L); 138 assertResult(table.get(new Get(ROW)), value, value); 139 assertObserverHasExecuted(); 140 141 Append append = new Append(ROW); 142 append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE); 143 table.append(append); 144 // 10L + "MB" 145 value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]) 146 .put(value) 147 .put(APPEND_VALUE) 148 .array(); 149 assertResult(table.get(new Get(ROW)), value, value); 150 assertObserverHasExecuted(); 151 152 Delete delete = new Delete(ROW); 153 delete.addColumns(FAMILY, QUALIFIER); 154 table.delete(delete); 155 assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(), 156 table.get(new Get(ROW)).isEmpty()); 157 assertObserverHasExecuted(); 158 159 assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put)); 160 assertObserverHasExecuted(); 161 162 assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete)); 163 assertObserverHasExecuted(); 164 165 assertTrue(table.get(new Get(ROW)).isEmpty()); 166 } 167 168 @Test 169 public void testMultiPut() throws Exception { 170 List<Put> puts = IntStream.range(0, 10) 171 .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE)) 172 .collect(Collectors.toList()); 173 table.put(puts); 174 assertResult(table.get(new Get(ROW)), VALUE); 175 assertObserverHasExecuted(); 176 177 List<Delete> deletes = IntStream.range(0, 10) 178 .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i))) 179 .collect(Collectors.toList()); 180 table.delete(deletes); 181 assertTrue(table.get(new Get(ROW)).isEmpty()); 182 assertObserverHasExecuted(); 183 } 184 185 private static void assertObserverHasExecuted() { 186 assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0); 187 } 188 189 private static void assertResult(Result result, byte[] expectedValue) { 190 assertFalse(result.isEmpty()); 191 for (Cell c : result.rawCells()) { 192 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 193 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 194 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 195 } 196 } 197 198 private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) { 199 assertFalse(result.isEmpty()); 200 for (Cell c : result.rawCells()) { 201 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 202 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 203 if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) { 204 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 205 } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) { 206 assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c))); 207 } else { 208 fail("No valid qualifier"); 209 } 210 } 211 } 212 213 private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, 214 Cell.Type type, byte[] value) { 215 return new Cell() { 216 217 @Override 218 public long heapSize() { 219 return 0; 220 } 221 222 private byte[] getArray(byte[] array) { 223 return array == null ? HConstants.EMPTY_BYTE_ARRAY : array; 224 } 225 226 private int length(byte[] array) { 227 return array == null ? 0 : array.length; 228 } 229 230 @Override 231 public byte[] getRowArray() { 232 return getArray(row); 233 } 234 235 @Override 236 public int getRowOffset() { 237 return 0; 238 } 239 240 @Override 241 public short getRowLength() { 242 return (short) length(row); 243 } 244 245 @Override 246 public byte[] getFamilyArray() { 247 return getArray(family); 248 } 249 250 @Override 251 public int getFamilyOffset() { 252 return 0; 253 } 254 255 @Override 256 public byte getFamilyLength() { 257 return (byte) length(family); 258 } 259 260 @Override 261 public byte[] getQualifierArray() { 262 return getArray(qualifier); 263 } 264 265 @Override 266 public int getQualifierOffset() { 267 return 0; 268 } 269 270 @Override 271 public int getQualifierLength() { 272 return length(qualifier); 273 } 274 275 @Override 276 public long getTimestamp() { 277 return HConstants.LATEST_TIMESTAMP; 278 } 279 280 @Override 281 public byte getTypeByte() { 282 return type.getCode(); 283 } 284 285 @Override 286 public long getSequenceId() { 287 return 0; 288 } 289 290 @Override 291 public byte[] getValueArray() { 292 return getArray(value); 293 } 294 295 @Override 296 public int getValueOffset() { 297 return 0; 298 } 299 300 @Override 301 public int getValueLength() { 302 return length(value); 303 } 304 305 @Override 306 public int getSerializedSize() { 307 return KeyValueUtil.getSerializedSize(this, true); 308 } 309 310 @Override 311 public byte[] getTagsArray() { 312 return getArray(null); 313 } 314 315 @Override 316 public int getTagsOffset() { 317 return 0; 318 } 319 320 @Override 321 public int getTagsLength() { 322 return length(null); 323 } 324 325 @Override 326 public Type getType() { 327 return type; 328 } 329 }; 330 } 331 332 private static Cell createCustomCell(Put put) { 333 return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 334 } 335 336 private static Cell createCustomCell(Append append) { 337 return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, 338 APPEND_VALUE); 339 } 340 341 private static Cell createCustomCell(Increment inc) { 342 return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 343 } 344 345 private static Cell createCustomCell(Delete delete) { 346 return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, 347 Cell.Type.DeleteColumn, null); 348 } 349 350 public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver { 351 static final AtomicInteger COUNT = new AtomicInteger(0); 352 353 @Override 354 public Optional<RegionObserver> getRegionObserver() { 355 return Optional.of(this); 356 } 357 358 @Override 359 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 360 Durability durability) throws IOException { 361 put.add(createCustomCell(put)); 362 COUNT.incrementAndGet(); 363 } 364 365 @Override 366 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, 367 WALEdit edit, Durability durability) throws IOException { 368 delete.add(createCustomCell(delete)); 369 COUNT.incrementAndGet(); 370 } 371 372 @Override 373 public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 374 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, 375 boolean result) throws IOException { 376 put.add(createCustomCell(put)); 377 COUNT.incrementAndGet(); 378 return result; 379 } 380 381 @Override 382 public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 383 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, 384 Delete delete, boolean result) throws IOException { 385 delete.add(createCustomCell(delete)); 386 COUNT.incrementAndGet(); 387 return result; 388 } 389 390 @Override 391 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) 392 throws IOException { 393 append.add(createCustomCell(append)); 394 COUNT.incrementAndGet(); 395 return null; 396 } 397 398 399 @Override 400 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 401 throws IOException { 402 increment.add(createCustomCell(increment)); 403 COUNT.incrementAndGet(); 404 return null; 405 } 406 407 } 408 409}