001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Append; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Durability; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Increment; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.filter.ByteArrayComparable; 051import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.wal.WALEdit; 055import org.junit.AfterClass; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063 064@Category({ CoprocessorTests.class, MediumTests.class }) 065public class TestPassCustomCellViaRegionObserver { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class); 070 071 @Rule 072 public TestName testName = new TestName(); 073 074 private TableName tableName; 075 private Table table = null; 076 077 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 078 079 private static final byte[] ROW = Bytes.toBytes("ROW"); 080 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 081 private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER"); 082 private static final byte[] VALUE = Bytes.toBytes(10L); 083 private static final byte[] APPEND_VALUE = Bytes.toBytes("MB"); 084 085 private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP"); 086 087 @BeforeClass 088 public static void setupBeforeClass() throws Exception { 089 // small retry number can speed up the failed tests. 090 UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 091 UTIL.startMiniCluster(); 092 } 093 094 @AfterClass 095 public static void tearDownAfterClass() throws Exception { 096 UTIL.shutdownMiniCluster(); 097 } 098 099 @Before 100 public void clearTable() throws IOException { 101 RegionObserverImpl.COUNT.set(0); 102 tableName = TableName.valueOf(testName.getMethodName()); 103 if (table != null) { 104 table.close(); 105 } 106 try (Admin admin = UTIL.getAdmin()) { 107 for (TableName name : admin.listTableNames()) { 108 try { 109 admin.disableTable(name); 110 } catch (IOException e) { 111 } 112 admin.deleteTable(name); 113 } 114 table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 116 .setCoprocessor(RegionObserverImpl.class.getName()) 117 .build(), null); 118 } 119 } 120 121 @Test 122 public void testMutation() throws Exception { 123 124 Put put = new Put(ROW); 125 put.addColumn(FAMILY, QUALIFIER, VALUE); 126 table.put(put); 127 byte[] value = VALUE; 128 assertResult(table.get(new Get(ROW)), value, value); 129 assertObserverHasExecuted(); 130 131 Increment inc = new Increment(ROW); 132 inc.addColumn(FAMILY, QUALIFIER, 10L); 133 table.increment(inc); 134 // QUALIFIER -> 10 (put) + 10 (increment) 135 // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment) 136 value = Bytes.toBytes(20L); 137 assertResult(table.get(new Get(ROW)), value, value); 138 assertObserverHasExecuted(); 139 140 Append append = new Append(ROW); 141 append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE); 142 table.append(append); 143 // 10L + "MB" 144 value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]) 145 .put(value) 146 .put(APPEND_VALUE) 147 .array(); 148 assertResult(table.get(new Get(ROW)), value, value); 149 assertObserverHasExecuted(); 150 151 Delete delete = new Delete(ROW); 152 delete.addColumns(FAMILY, QUALIFIER); 153 table.delete(delete); 154 assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(), 155 table.get(new Get(ROW)).isEmpty()); 156 assertObserverHasExecuted(); 157 158 assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put)); 159 assertObserverHasExecuted(); 160 161 assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete)); 162 assertObserverHasExecuted(); 163 164 assertTrue(table.get(new Get(ROW)).isEmpty()); 165 } 166 167 @Test 168 public void testMultiPut() throws Exception { 169 List<Put> puts = IntStream.range(0, 10) 170 .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE)) 171 .collect(Collectors.toList()); 172 table.put(puts); 173 assertResult(table.get(new Get(ROW)), VALUE); 174 assertObserverHasExecuted(); 175 176 List<Delete> deletes = IntStream.range(0, 10) 177 .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i))) 178 .collect(Collectors.toList()); 179 table.delete(deletes); 180 assertTrue(table.get(new Get(ROW)).isEmpty()); 181 assertObserverHasExecuted(); 182 } 183 184 private static void assertObserverHasExecuted() { 185 assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0); 186 } 187 188 private static void assertResult(Result result, byte[] expectedValue) { 189 assertFalse(result.isEmpty()); 190 for (Cell c : result.rawCells()) { 191 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 192 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 193 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 194 } 195 } 196 197 private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) { 198 assertFalse(result.isEmpty()); 199 for (Cell c : result.rawCells()) { 200 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 201 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 202 if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) { 203 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 204 } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) { 205 assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c))); 206 } else { 207 fail("No valid qualifier"); 208 } 209 } 210 } 211 212 private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, 213 Cell.Type type, byte[] value) { 214 return new Cell() { 215 216 private byte[] getArray(byte[] array) { 217 return array == null ? HConstants.EMPTY_BYTE_ARRAY : array; 218 } 219 220 private int length(byte[] array) { 221 return array == null ? 0 : array.length; 222 } 223 224 @Override 225 public byte[] getRowArray() { 226 return getArray(row); 227 } 228 229 @Override 230 public int getRowOffset() { 231 return 0; 232 } 233 234 @Override 235 public short getRowLength() { 236 return (short) length(row); 237 } 238 239 @Override 240 public byte[] getFamilyArray() { 241 return getArray(family); 242 } 243 244 @Override 245 public int getFamilyOffset() { 246 return 0; 247 } 248 249 @Override 250 public byte getFamilyLength() { 251 return (byte) length(family); 252 } 253 254 @Override 255 public byte[] getQualifierArray() { 256 return getArray(qualifier); 257 } 258 259 @Override 260 public int getQualifierOffset() { 261 return 0; 262 } 263 264 @Override 265 public int getQualifierLength() { 266 return length(qualifier); 267 } 268 269 @Override 270 public long getTimestamp() { 271 return HConstants.LATEST_TIMESTAMP; 272 } 273 274 @Override 275 public byte getTypeByte() { 276 return type.getCode(); 277 } 278 279 @Override 280 public long getSequenceId() { 281 return 0; 282 } 283 284 @Override 285 public byte[] getValueArray() { 286 return getArray(value); 287 } 288 289 @Override 290 public int getValueOffset() { 291 return 0; 292 } 293 294 @Override 295 public int getValueLength() { 296 return length(value); 297 } 298 299 @Override 300 public byte[] getTagsArray() { 301 return getArray(null); 302 } 303 304 @Override 305 public int getTagsOffset() { 306 return 0; 307 } 308 309 @Override 310 public int getTagsLength() { 311 return length(null); 312 } 313 314 @Override 315 public Type getType() { 316 return type; 317 } 318 }; 319 } 320 321 private static Cell createCustomCell(Put put) { 322 return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 323 } 324 325 private static Cell createCustomCell(Append append) { 326 return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, 327 APPEND_VALUE); 328 } 329 330 private static Cell createCustomCell(Increment inc) { 331 return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 332 } 333 334 private static Cell createCustomCell(Delete delete) { 335 return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, 336 Cell.Type.DeleteColumn, null); 337 } 338 339 public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver { 340 static final AtomicInteger COUNT = new AtomicInteger(0); 341 342 @Override 343 public Optional<RegionObserver> getRegionObserver() { 344 return Optional.of(this); 345 } 346 347 @Override 348 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 349 Durability durability) throws IOException { 350 put.add(createCustomCell(put)); 351 COUNT.incrementAndGet(); 352 } 353 354 @Override 355 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, 356 WALEdit edit, Durability durability) throws IOException { 357 delete.add(createCustomCell(delete)); 358 COUNT.incrementAndGet(); 359 } 360 361 @Override 362 public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 363 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, 364 boolean result) throws IOException { 365 put.add(createCustomCell(put)); 366 COUNT.incrementAndGet(); 367 return result; 368 } 369 370 @Override 371 public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 372 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, 373 Delete delete, boolean result) throws IOException { 374 delete.add(createCustomCell(delete)); 375 COUNT.incrementAndGet(); 376 return result; 377 } 378 379 @Override 380 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) 381 throws IOException { 382 append.add(createCustomCell(append)); 383 COUNT.incrementAndGet(); 384 return null; 385 } 386 387 388 @Override 389 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 390 throws IOException { 391 increment.add(createCustomCell(increment)); 392 COUNT.incrementAndGet(); 393 return null; 394 } 395 396 } 397 398}