001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Delete; 034import org.apache.hadoop.hbase.client.Mutation; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALKey; 048import org.junit.jupiter.api.AfterAll; 049import org.junit.jupiter.api.BeforeAll; 050import org.junit.jupiter.api.BeforeEach; 051import org.junit.jupiter.api.Tag; 052import org.junit.jupiter.api.Test; 053import org.junit.jupiter.api.TestInfo; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058 059@Tag(MediumTests.TAG) 060public class TestRegionObserverForAddingMutationsFromCoprocessors { 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestRegionObserverForAddingMutationsFromCoprocessors.class); 064 065 private static HBaseTestingUtil util; 066 private static final byte[] dummy = Bytes.toBytes("dummy"); 067 private static final byte[] row1 = Bytes.toBytes("r1"); 068 private static final byte[] row2 = Bytes.toBytes("r2"); 069 private static final byte[] row3 = Bytes.toBytes("r3"); 070 private static final byte[] test = Bytes.toBytes("test"); 071 072 private String currentTestName; 073 private TableName tableName; 074 075 @BeforeAll 076 public static void setUpBeforeClass() throws Exception { 077 Configuration conf = HBaseConfiguration.create(); 078 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); 079 util = new HBaseTestingUtil(conf); 080 util.startMiniCluster(); 081 } 082 083 @AfterAll 084 public static void tearDownAfterClass() throws Exception { 085 util.shutdownMiniCluster(); 086 } 087 088 @BeforeEach 089 public void setUp(TestInfo testInfo) throws Exception { 090 currentTestName = testInfo.getTestMethod().get().getName(); 091 tableName = TableName.valueOf(currentTestName); 092 } 093 094 private void createTable(String coprocessor) throws IOException { 095 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 096 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(dummy)) 097 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(test)).setCoprocessor(coprocessor).build(); 098 util.getAdmin().createTable(tableDescriptor); 099 } 100 101 /** 102 * Test various multiput operations. 103 */ 104 @Test 105 public void testMulti() throws Exception { 106 createTable(TestMultiMutationCoprocessor.class.getName()); 107 108 try (Table t = util.getConnection().getTable(tableName)) { 109 t.put(new Put(row1).addColumn(test, dummy, dummy)); 110 assertRowCount(t, 3); 111 } 112 } 113 114 /** 115 * Tests that added mutations from coprocessors end up in the WAL. 116 */ 117 @Test 118 public void testCPMutationsAreWrittenToWALEdit() throws Exception { 119 createTable(TestMultiMutationCoprocessor.class.getName()); 120 121 try (Table t = util.getConnection().getTable(tableName)) { 122 t.put(new Put(row1).addColumn(test, dummy, dummy)); 123 assertRowCount(t, 3); 124 } 125 126 assertNotNull(TestWALObserver.savedEdit); 127 assertEquals(4, TestWALObserver.savedEdit.getCells().size()); 128 } 129 130 private static void assertRowCount(Table t, int expected) throws IOException { 131 try (ResultScanner scanner = t.getScanner(new Scan())) { 132 int i = 0; 133 for (Result r : scanner) { 134 LOG.info(r.toString()); 135 i++; 136 } 137 assertEquals(expected, i); 138 } 139 } 140 141 @Test 142 public void testDeleteCell() throws Exception { 143 createTable(TestDeleteCellCoprocessor.class.getName()); 144 145 try (Table t = util.getConnection().getTable(tableName)) { 146 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 147 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 148 149 assertRowCount(t, 3); 150 151 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 152 assertRowCount(t, 1); 153 } 154 } 155 156 @Test 157 public void testDeleteFamily() throws Exception { 158 createTable(TestDeleteFamilyCoprocessor.class.getName()); 159 160 try (Table t = util.getConnection().getTable(tableName)) { 161 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 162 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 163 164 assertRowCount(t, 3); 165 166 t.delete(new Delete(test).addFamily(test)); // delete non-existing row 167 assertRowCount(t, 1); 168 } 169 } 170 171 @Test 172 public void testDeleteRow() throws Exception { 173 createTable(TestDeleteRowCoprocessor.class.getName()); 174 175 try (Table t = util.getConnection().getTable(tableName)) { 176 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 177 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 178 179 assertRowCount(t, 3); 180 181 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 182 assertRowCount(t, 1); 183 } 184 } 185 186 @Test 187 public void testPutWithTTL() throws Exception { 188 createTable(TestPutWithTTLCoprocessor.class.getName()); 189 190 try (Table t = util.getConnection().getTable(tableName)) { 191 t.put(new Put(row1).addColumn(test, dummy, dummy).setTTL(3000)); 192 assertRowCount(t, 2); 193 // wait long enough for the TTL to expire 194 Thread.sleep(5000); 195 assertRowCount(t, 0); 196 } 197 } 198 199 public static class TestPutWithTTLCoprocessor implements RegionCoprocessor, RegionObserver { 200 @Override 201 public Optional<RegionObserver> getRegionObserver() { 202 return Optional.of(this); 203 } 204 205 @Override 206 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 207 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 208 Mutation mut = miniBatchOp.getOperation(0); 209 List<Cell> cells = mut.getFamilyCellMap().get(test); 210 Put[] puts = new Put[] { new Put(Bytes.toBytes("cpPut")) 211 .addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")) 212 .setTTL(mut.getTTL()) }; 213 LOG.info("Putting:" + Arrays.toString(puts)); 214 miniBatchOp.addOperationsFromCP(0, puts); 215 } 216 } 217 218 public static class TestMultiMutationCoprocessor implements RegionCoprocessor, RegionObserver { 219 @Override 220 public Optional<RegionObserver> getRegionObserver() { 221 return Optional.of(this); 222 } 223 224 @Override 225 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 226 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 227 Mutation mut = miniBatchOp.getOperation(0); 228 List<Cell> cells = mut.getFamilyCellMap().get(test); 229 Put[] puts = new Put[] { 230 new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")), 231 new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), 232 new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), }; 233 LOG.info("Putting:" + Arrays.toString(puts)); 234 miniBatchOp.addOperationsFromCP(0, puts); 235 } 236 } 237 238 public static class TestDeleteCellCoprocessor implements RegionCoprocessor, RegionObserver { 239 @Override 240 public Optional<RegionObserver> getRegionObserver() { 241 return Optional.of(this); 242 } 243 244 @Override 245 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 246 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 247 Mutation mut = miniBatchOp.getOperation(0); 248 249 if (mut instanceof Delete) { 250 List<Cell> cells = mut.getFamilyCellMap().get(test); 251 Delete[] deletes = new Delete[] { 252 // delete only 2 rows 253 new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), 254 new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), }; 255 LOG.info("Deleting:" + Arrays.toString(deletes)); 256 miniBatchOp.addOperationsFromCP(0, deletes); 257 } 258 } 259 } 260 261 public static class TestDeleteFamilyCoprocessor implements RegionCoprocessor, RegionObserver { 262 @Override 263 public Optional<RegionObserver> getRegionObserver() { 264 return Optional.of(this); 265 } 266 267 @Override 268 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 269 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 270 Mutation mut = miniBatchOp.getOperation(0); 271 272 if (mut instanceof Delete) { 273 List<Cell> cells = mut.getFamilyCellMap().get(test); 274 Delete[] deletes = new Delete[] { 275 // delete only 2 rows 276 new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), 277 new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), }; 278 LOG.info("Deleting:" + Arrays.toString(deletes)); 279 miniBatchOp.addOperationsFromCP(0, deletes); 280 } 281 } 282 } 283 284 public static class TestDeleteRowCoprocessor implements RegionCoprocessor, RegionObserver { 285 @Override 286 public Optional<RegionObserver> getRegionObserver() { 287 return Optional.of(this); 288 } 289 290 @Override 291 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 292 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 293 Mutation mut = miniBatchOp.getOperation(0); 294 295 if (mut instanceof Delete) { 296 List<Cell> cells = mut.getFamilyCellMap().get(test); 297 Delete[] deletes = new Delete[] { 298 // delete only 2 rows 299 new Delete(row1, cells.get(0).getTimestamp()), 300 new Delete(row2, cells.get(0).getTimestamp()), }; 301 LOG.info("Deleting:" + Arrays.toString(deletes)); 302 miniBatchOp.addOperationsFromCP(0, deletes); 303 } 304 } 305 } 306 307 public static class TestWALObserver implements WALCoprocessor, WALObserver { 308 static WALEdit savedEdit = null; 309 310 @Override 311 public Optional<WALObserver> getWALObserver() { 312 return Optional.of(this); 313 } 314 315 @Override 316 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 317 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 318 if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { 319 savedEdit = logEdit; 320 } 321 } 322 } 323}