001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.ResultScanner; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALKey; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061@Category(MediumTests.class) 062public class TestRegionObserverForAddingMutationsFromCoprocessors { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestRegionObserverForAddingMutationsFromCoprocessors.class); 067 068 private static final Logger LOG 069 = LoggerFactory.getLogger(TestRegionObserverForAddingMutationsFromCoprocessors.class); 070 071 private static HBaseTestingUtility util; 072 private static final byte[] dummy = Bytes.toBytes("dummy"); 073 private static final byte[] row1 = Bytes.toBytes("r1"); 074 private static final byte[] row2 = Bytes.toBytes("r2"); 075 private static final byte[] row3 = Bytes.toBytes("r3"); 076 private static final byte[] test = Bytes.toBytes("test"); 077 078 @Rule 079 public TestName name = new TestName(); 080 private TableName tableName; 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 Configuration conf = HBaseConfiguration.create(); 085 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); 086 util = new HBaseTestingUtility(conf); 087 util.startMiniCluster(); 088 } 089 090 @AfterClass 091 public static void tearDownAfterClass() throws Exception { 092 util.shutdownMiniCluster(); 093 } 094 095 @Before 096 public void setUp() throws Exception { 097 tableName = TableName.valueOf(name.getMethodName()); 098 } 099 100 private void createTable(String coprocessor) throws IOException { 101 HTableDescriptor htd = new HTableDescriptor(tableName) 102 .addFamily(new HColumnDescriptor(dummy)) 103 .addFamily(new HColumnDescriptor(test)) 104 .addCoprocessor(coprocessor); 105 util.getAdmin().createTable(htd); 106 } 107 108 /** 109 * Test various multiput operations. 110 * @throws Exception 111 */ 112 @Test 113 public void testMulti() throws Exception { 114 createTable(TestMultiMutationCoprocessor.class.getName()); 115 116 try (Table t = util.getConnection().getTable(tableName)) { 117 t.put(new Put(row1).addColumn(test, dummy, dummy)); 118 assertRowCount(t, 3); 119 } 120 } 121 122 /** 123 * Tests that added mutations from coprocessors end up in the WAL. 124 */ 125 @Test 126 public void testCPMutationsAreWrittenToWALEdit() throws Exception { 127 createTable(TestMultiMutationCoprocessor.class.getName()); 128 129 try (Table t = util.getConnection().getTable(tableName)) { 130 t.put(new Put(row1).addColumn(test, dummy, dummy)); 131 assertRowCount(t, 3); 132 } 133 134 assertNotNull(TestWALObserver.savedEdit); 135 assertEquals(4, TestWALObserver.savedEdit.getCells().size()); 136 } 137 138 private static void assertRowCount(Table t, int expected) throws IOException { 139 try (ResultScanner scanner = t.getScanner(new Scan())) { 140 int i = 0; 141 for (Result r: scanner) { 142 LOG.info(r.toString()); 143 i++; 144 } 145 assertEquals(expected, i); 146 } 147 } 148 149 @Test 150 public void testDeleteCell() throws Exception { 151 createTable(TestDeleteCellCoprocessor.class.getName()); 152 153 try (Table t = util.getConnection().getTable(tableName)) { 154 t.put(Lists.newArrayList( 155 new Put(row1).addColumn(test, dummy, dummy), 156 new Put(row2).addColumn(test, dummy, dummy), 157 new Put(row3).addColumn(test, dummy, dummy) 158 )); 159 160 assertRowCount(t, 3); 161 162 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 163 assertRowCount(t, 1); 164 } 165 } 166 167 @Test 168 public void testDeleteFamily() throws Exception { 169 createTable(TestDeleteFamilyCoprocessor.class.getName()); 170 171 try (Table t = util.getConnection().getTable(tableName)) { 172 t.put(Lists.newArrayList( 173 new Put(row1).addColumn(test, dummy, dummy), 174 new Put(row2).addColumn(test, dummy, dummy), 175 new Put(row3).addColumn(test, dummy, dummy) 176 )); 177 178 assertRowCount(t, 3); 179 180 t.delete(new Delete(test).addFamily(test)); // delete non-existing row 181 assertRowCount(t, 1); 182 } 183 } 184 185 @Test 186 public void testDeleteRow() throws Exception { 187 createTable(TestDeleteRowCoprocessor.class.getName()); 188 189 try (Table t = util.getConnection().getTable(tableName)) { 190 t.put(Lists.newArrayList( 191 new Put(row1).addColumn(test, dummy, dummy), 192 new Put(row2).addColumn(test, dummy, dummy), 193 new Put(row3).addColumn(test, dummy, dummy) 194 )); 195 196 assertRowCount(t, 3); 197 198 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 199 assertRowCount(t, 1); 200 } 201 } 202 203 @Test 204 public void testPutWithTTL() throws Exception { 205 createTable(TestPutWithTTLCoprocessor.class.getName()); 206 207 try (Table t = util.getConnection().getTable(tableName)) { 208 t.put(new Put(row1).addColumn(test, dummy, dummy).setTTL(3000)); 209 assertRowCount(t, 2); 210 // wait long enough for the TTL to expire 211 Thread.sleep(5000); 212 assertRowCount(t, 0); 213 } 214 } 215 216 public static class TestPutWithTTLCoprocessor implements RegionCoprocessor, RegionObserver { 217 @Override 218 public Optional<RegionObserver> getRegionObserver() { 219 return Optional.of(this); 220 } 221 222 @Override 223 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 224 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 225 Mutation mut = miniBatchOp.getOperation(0); 226 List<Cell> cells = mut.getFamilyCellMap().get(test); 227 Put[] puts = new Put[] { 228 new Put(Bytes.toBytes("cpPut")).addColumn(test, dummy, cells.get(0).getTimestamp(), 229 Bytes.toBytes("cpdummy")).setTTL(mut.getTTL()) 230 }; 231 LOG.info("Putting:" + Arrays.toString(puts)); 232 miniBatchOp.addOperationsFromCP(0, puts); 233 } 234 } 235 236 public static class TestMultiMutationCoprocessor implements RegionCoprocessor, RegionObserver { 237 @Override 238 public Optional<RegionObserver> getRegionObserver() { 239 return Optional.of(this); 240 } 241 242 @Override 243 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 244 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 245 Mutation mut = miniBatchOp.getOperation(0); 246 List<Cell> cells = mut.getFamilyCellMap().get(test); 247 Put[] puts = new Put[] { 248 new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), 249 Bytes.toBytes("cpdummy")), 250 new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), 251 new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), 252 }; 253 LOG.info("Putting:" + Arrays.toString(puts)); 254 miniBatchOp.addOperationsFromCP(0, puts); 255 } 256 } 257 258 public static class TestDeleteCellCoprocessor implements RegionCoprocessor, RegionObserver { 259 @Override 260 public Optional<RegionObserver> getRegionObserver() { 261 return Optional.of(this); 262 } 263 264 @Override 265 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 266 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 267 Mutation mut = miniBatchOp.getOperation(0); 268 269 if (mut instanceof Delete) { 270 List<Cell> cells = mut.getFamilyCellMap().get(test); 271 Delete[] deletes = new Delete[] { 272 // delete only 2 rows 273 new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), 274 new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), 275 }; 276 LOG.info("Deleting:" + Arrays.toString(deletes)); 277 miniBatchOp.addOperationsFromCP(0, deletes); 278 } 279 } 280 } 281 282 public static class TestDeleteFamilyCoprocessor implements RegionCoprocessor, RegionObserver { 283 @Override 284 public Optional<RegionObserver> getRegionObserver() { 285 return Optional.of(this); 286 } 287 288 @Override 289 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 290 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 291 Mutation mut = miniBatchOp.getOperation(0); 292 293 if (mut instanceof Delete) { 294 List<Cell> cells = mut.getFamilyCellMap().get(test); 295 Delete[] deletes = new Delete[] { 296 // delete only 2 rows 297 new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), 298 new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), 299 }; 300 LOG.info("Deleting:" + Arrays.toString(deletes)); 301 miniBatchOp.addOperationsFromCP(0, deletes); 302 } 303 } 304 } 305 306 public static class TestDeleteRowCoprocessor implements RegionCoprocessor, RegionObserver { 307 @Override 308 public Optional<RegionObserver> getRegionObserver() { 309 return Optional.of(this); 310 } 311 312 @Override 313 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 314 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 315 Mutation mut = miniBatchOp.getOperation(0); 316 317 if (mut instanceof Delete) { 318 List<Cell> cells = mut.getFamilyCellMap().get(test); 319 Delete[] deletes = new Delete[] { 320 // delete only 2 rows 321 new Delete(row1, cells.get(0).getTimestamp()), 322 new Delete(row2, cells.get(0).getTimestamp()), 323 }; 324 LOG.info("Deleting:" + Arrays.toString(deletes)); 325 miniBatchOp.addOperationsFromCP(0, deletes); 326 } 327 } 328 } 329 330 public static class TestWALObserver implements WALCoprocessor, WALObserver { 331 static WALEdit savedEdit = null; 332 333 @Override 334 public Optional<WALObserver> getWALObserver() { 335 return Optional.of(this); 336 } 337 338 @Override 339 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 340 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 341 if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { 342 savedEdit = logEdit; 343 } 344 } 345 } 346}