001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Durability; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 045import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052 053@Tag(CoprocessorTests.TAG) 054@Tag(MediumTests.TAG) 055public class TestRegionObserverBypass { 056 057 private static HBaseTestingUtil util; 058 private static final TableName tableName = TableName.valueOf("test"); 059 private static final byte[] dummy = Bytes.toBytes("dummy"); 060 private static final byte[] row1 = Bytes.toBytes("r1"); 061 private static final byte[] row2 = Bytes.toBytes("r2"); 062 private static final byte[] row3 = Bytes.toBytes("r3"); 063 private static final byte[] test = Bytes.toBytes("test"); 064 065 @BeforeAll 066 public static void setUpBeforeClass() throws Exception { 067 // Stack up three coprocessors just so I can check bypass skips subsequent calls. 068 Configuration conf = HBaseConfiguration.create(); 069 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 070 new String[] { TestCoprocessor.class.getName(), TestCoprocessor2.class.getName(), 071 TestCoprocessor3.class.getName() }); 072 util = new HBaseTestingUtil(conf); 073 util.startMiniCluster(); 074 } 075 076 @AfterAll 077 public static void tearDownAfterClass() throws Exception { 078 util.shutdownMiniCluster(); 079 } 080 081 @BeforeEach 082 public void setUp() throws Exception { 083 Admin admin = util.getAdmin(); 084 if (admin.tableExists(tableName)) { 085 if (admin.isTableEnabled(tableName)) { 086 admin.disableTable(tableName); 087 } 088 admin.deleteTable(tableName); 089 } 090 util.createTable(tableName, new byte[][] { dummy, test }); 091 TestCoprocessor.PREPUT_BYPASSES.set(0); 092 TestCoprocessor.PREPUT_INVOCATIONS.set(0); 093 } 094 095 /** 096 * do a single put that is bypassed by a RegionObserver 097 */ 098 @Test 099 public void testSimple() throws Exception { 100 Table t = util.getConnection().getTable(tableName); 101 Put p = new Put(row1); 102 p.addColumn(test, dummy, dummy); 103 // before HBASE-4331, this would throw an exception 104 t.put(p); 105 checkRowAndDelete(t, row1, 0); 106 t.close(); 107 } 108 109 /** 110 * Test various multiput operations. If the column family is 'test', then bypass is invoked. 111 */ 112 @Test 113 public void testMulti() throws Exception { 114 // ensure that server time increments every time we do an operation, otherwise 115 // previous deletes will eclipse successive puts having the same timestamp 116 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 117 118 Table t = util.getConnection().getTable(tableName); 119 List<Put> puts = new ArrayList<>(); 120 Put p = new Put(row1); 121 p.addColumn(dummy, dummy, dummy); 122 puts.add(p); 123 p = new Put(row2); 124 p.addColumn(test, dummy, dummy); 125 puts.add(p); 126 p = new Put(row3); 127 p.addColumn(test, dummy, dummy); 128 puts.add(p); 129 // before HBASE-4331, this would throw an exception 130 t.put(puts); 131 checkRowAndDelete(t, row1, 1); 132 checkRowAndDelete(t, row2, 0); 133 checkRowAndDelete(t, row3, 0); 134 135 puts.clear(); 136 p = new Put(row1); 137 p.addColumn(test, dummy, dummy); 138 puts.add(p); 139 p = new Put(row2); 140 p.addColumn(test, dummy, dummy); 141 puts.add(p); 142 p = new Put(row3); 143 p.addColumn(test, dummy, dummy); 144 puts.add(p); 145 // before HBASE-4331, this would throw an exception 146 t.put(puts); 147 checkRowAndDelete(t, row1, 0); 148 checkRowAndDelete(t, row2, 0); 149 checkRowAndDelete(t, row3, 0); 150 151 puts.clear(); 152 p = new Put(row1); 153 p.addColumn(test, dummy, dummy); 154 puts.add(p); 155 p = new Put(row2); 156 p.addColumn(test, dummy, dummy); 157 puts.add(p); 158 p = new Put(row3); 159 p.addColumn(dummy, dummy, dummy); 160 puts.add(p); 161 // this worked fine even before HBASE-4331 162 t.put(puts); 163 checkRowAndDelete(t, row1, 0); 164 checkRowAndDelete(t, row2, 0); 165 checkRowAndDelete(t, row3, 1); 166 167 puts.clear(); 168 p = new Put(row1); 169 p.addColumn(dummy, dummy, dummy); 170 puts.add(p); 171 p = new Put(row2); 172 p.addColumn(test, dummy, dummy); 173 puts.add(p); 174 p = new Put(row3); 175 p.addColumn(dummy, dummy, dummy); 176 puts.add(p); 177 // this worked fine even before HBASE-4331 178 t.put(puts); 179 checkRowAndDelete(t, row1, 1); 180 checkRowAndDelete(t, row2, 0); 181 checkRowAndDelete(t, row3, 1); 182 183 puts.clear(); 184 p = new Put(row1); 185 p.addColumn(test, dummy, dummy); 186 puts.add(p); 187 p = new Put(row2); 188 p.addColumn(dummy, dummy, dummy); 189 puts.add(p); 190 p = new Put(row3); 191 p.addColumn(test, dummy, dummy); 192 puts.add(p); 193 // before HBASE-4331, this would throw an exception 194 t.put(puts); 195 checkRowAndDelete(t, row1, 0); 196 checkRowAndDelete(t, row2, 1); 197 checkRowAndDelete(t, row3, 0); 198 t.close(); 199 200 EnvironmentEdgeManager.reset(); 201 } 202 203 private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException { 204 Get g = new Get(row); 205 Result r = t.get(g); 206 assertEquals(count, r.size()); 207 Delete d = new Delete(row); 208 t.delete(d); 209 } 210 211 /** 212 * Test that when bypass is called, we skip out calling any other coprocessors stacked up method, 213 * in this case, a prePut. If the column family is 'test', then bypass is invoked. 214 */ 215 @Test 216 public void testBypassAlsoCompletes() throws IOException { 217 // ensure that server time increments every time we do an operation, otherwise 218 // previous deletes will eclipse successive puts having the same timestamp 219 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 220 221 Table t = util.getConnection().getTable(tableName); 222 List<Put> puts = new ArrayList<>(); 223 Put p = new Put(row1); 224 p.addColumn(dummy, dummy, dummy); 225 puts.add(p); 226 p = new Put(row2); 227 p.addColumn(test, dummy, dummy); 228 puts.add(p); 229 p = new Put(row3); 230 p.addColumn(test, dummy, dummy); 231 puts.add(p); 232 t.put(puts); 233 // Ensure expected result. 234 checkRowAndDelete(t, row1, 1); 235 checkRowAndDelete(t, row2, 0); 236 checkRowAndDelete(t, row3, 0); 237 // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three 238 // puts above two of which bypassed. A bypass means do not call the other coprocessors in the 239 // stack so for the two 'test' calls in the above, we should not have call through to all all 240 // three coprocessors in the chain. So we should have: 241 // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation + 242 // 1 bypass for the last put. Assert. 243 assertEquals(5, TestCoprocessor.PREPUT_INVOCATIONS.get(), "Total CP invocation count"); 244 assertEquals(2, TestCoprocessor.PREPUT_BYPASSES.get(), "Total CP bypasses"); 245 } 246 247 public static class TestCoprocessor implements RegionCoprocessor, RegionObserver { 248 static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0); 249 static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0); 250 251 @Override 252 public Optional<RegionObserver> getRegionObserver() { 253 return Optional.of(this); 254 } 255 256 @Override 257 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 258 final Put put, final WALEdit edit, final Durability durability) throws IOException { 259 PREPUT_INVOCATIONS.incrementAndGet(); 260 Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); 261 if (familyMap.containsKey(test)) { 262 PREPUT_BYPASSES.incrementAndGet(); 263 e.bypass(); 264 } 265 } 266 } 267 268 /** 269 * Calls through to TestCoprocessor. 270 */ 271 public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor { 272 } 273 274 /** 275 * Calls through to TestCoprocessor. 276 */ 277 public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor { 278 } 279}