001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static junit.framework.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 046import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055@Category({CoprocessorTests.class, MediumTests.class}) 056public class TestRegionObserverBypass { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRegionObserverBypass.class); 061 062 private static HBaseTestingUtility util; 063 private static final TableName tableName = TableName.valueOf("test"); 064 private static final byte[] dummy = Bytes.toBytes("dummy"); 065 private static final byte[] row1 = Bytes.toBytes("r1"); 066 private static final byte[] row2 = Bytes.toBytes("r2"); 067 private static final byte[] row3 = Bytes.toBytes("r3"); 068 private static final byte[] test = Bytes.toBytes("test"); 069 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 // Stack up three coprocessors just so I can check bypass skips subsequent calls. 073 Configuration conf = HBaseConfiguration.create(); 074 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 075 new String [] {TestCoprocessor.class.getName(), 076 TestCoprocessor2.class.getName(), 077 TestCoprocessor3.class.getName()}); 078 util = new HBaseTestingUtility(conf); 079 util.startMiniCluster(); 080 } 081 082 @AfterClass 083 public static void tearDownAfterClass() throws Exception { 084 util.shutdownMiniCluster(); 085 } 086 087 @Before 088 public void setUp() throws Exception { 089 Admin admin = util.getAdmin(); 090 if (admin.tableExists(tableName)) { 091 if (admin.isTableEnabled(tableName)) { 092 admin.disableTable(tableName); 093 } 094 admin.deleteTable(tableName); 095 } 096 util.createTable(tableName, new byte[][] {dummy, test}); 097 TestCoprocessor.PREPUT_BYPASSES.set(0); 098 TestCoprocessor.PREPUT_INVOCATIONS.set(0); 099 } 100 101 /** 102 * do a single put that is bypassed by a RegionObserver 103 * @throws Exception 104 */ 105 @Test 106 public void testSimple() throws Exception { 107 Table t = util.getConnection().getTable(tableName); 108 Put p = new Put(row1); 109 p.addColumn(test, dummy, dummy); 110 // before HBASE-4331, this would throw an exception 111 t.put(p); 112 checkRowAndDelete(t,row1,0); 113 t.close(); 114 } 115 116 /** 117 * Test various multiput operations. 118 * If the column family is 'test', then bypass is invoked. 119 * @throws Exception 120 */ 121 @Test 122 public void testMulti() throws Exception { 123 //ensure that server time increments every time we do an operation, otherwise 124 //previous deletes will eclipse successive puts having the same timestamp 125 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 126 127 Table t = util.getConnection().getTable(tableName); 128 List<Put> puts = new ArrayList<>(); 129 Put p = new Put(row1); 130 p.addColumn(dummy, dummy, dummy); 131 puts.add(p); 132 p = new Put(row2); 133 p.addColumn(test, dummy, dummy); 134 puts.add(p); 135 p = new Put(row3); 136 p.addColumn(test, dummy, dummy); 137 puts.add(p); 138 // before HBASE-4331, this would throw an exception 139 t.put(puts); 140 checkRowAndDelete(t,row1,1); 141 checkRowAndDelete(t,row2,0); 142 checkRowAndDelete(t,row3,0); 143 144 puts.clear(); 145 p = new Put(row1); 146 p.addColumn(test, dummy, dummy); 147 puts.add(p); 148 p = new Put(row2); 149 p.addColumn(test, dummy, dummy); 150 puts.add(p); 151 p = new Put(row3); 152 p.addColumn(test, dummy, dummy); 153 puts.add(p); 154 // before HBASE-4331, this would throw an exception 155 t.put(puts); 156 checkRowAndDelete(t,row1,0); 157 checkRowAndDelete(t,row2,0); 158 checkRowAndDelete(t,row3,0); 159 160 puts.clear(); 161 p = new Put(row1); 162 p.addColumn(test, dummy, dummy); 163 puts.add(p); 164 p = new Put(row2); 165 p.addColumn(test, dummy, dummy); 166 puts.add(p); 167 p = new Put(row3); 168 p.addColumn(dummy, dummy, dummy); 169 puts.add(p); 170 // this worked fine even before HBASE-4331 171 t.put(puts); 172 checkRowAndDelete(t,row1,0); 173 checkRowAndDelete(t,row2,0); 174 checkRowAndDelete(t,row3,1); 175 176 puts.clear(); 177 p = new Put(row1); 178 p.addColumn(dummy, dummy, dummy); 179 puts.add(p); 180 p = new Put(row2); 181 p.addColumn(test, dummy, dummy); 182 puts.add(p); 183 p = new Put(row3); 184 p.addColumn(dummy, dummy, dummy); 185 puts.add(p); 186 // this worked fine even before HBASE-4331 187 t.put(puts); 188 checkRowAndDelete(t,row1,1); 189 checkRowAndDelete(t,row2,0); 190 checkRowAndDelete(t,row3,1); 191 192 puts.clear(); 193 p = new Put(row1); 194 p.addColumn(test, dummy, dummy); 195 puts.add(p); 196 p = new Put(row2); 197 p.addColumn(dummy, dummy, dummy); 198 puts.add(p); 199 p = new Put(row3); 200 p.addColumn(test, dummy, dummy); 201 puts.add(p); 202 // before HBASE-4331, this would throw an exception 203 t.put(puts); 204 checkRowAndDelete(t,row1,0); 205 checkRowAndDelete(t,row2,1); 206 checkRowAndDelete(t,row3,0); 207 t.close(); 208 209 EnvironmentEdgeManager.reset(); 210 } 211 212 private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException { 213 Get g = new Get(row); 214 Result r = t.get(g); 215 assertEquals(count, r.size()); 216 Delete d = new Delete(row); 217 t.delete(d); 218 } 219 220 /** 221 * Test that when bypass is called, we skip out calling any other coprocessors stacked up method, 222 * in this case, a prePut. 223 * If the column family is 'test', then bypass is invoked. 224 */ 225 @Test 226 public void testBypassAlsoCompletes() throws IOException { 227 //ensure that server time increments every time we do an operation, otherwise 228 //previous deletes will eclipse successive puts having the same timestamp 229 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 230 231 Table t = util.getConnection().getTable(tableName); 232 List<Put> puts = new ArrayList<>(); 233 Put p = new Put(row1); 234 p.addColumn(dummy, dummy, dummy); 235 puts.add(p); 236 p = new Put(row2); 237 p.addColumn(test, dummy, dummy); 238 puts.add(p); 239 p = new Put(row3); 240 p.addColumn(test, dummy, dummy); 241 puts.add(p); 242 t.put(puts); 243 // Ensure expected result. 244 checkRowAndDelete(t,row1,1); 245 checkRowAndDelete(t,row2,0); 246 checkRowAndDelete(t,row3,0); 247 // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three 248 // puts above two of which bypassed. A bypass means do not call the other coprocessors in the 249 // stack so for the two 'test' calls in the above, we should not have call through to all all 250 // three coprocessors in the chain. So we should have: 251 // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation + 252 // 1 bypass for the last put. Assert. 253 assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get()); 254 assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get()); 255 } 256 257 258 public static class TestCoprocessor implements RegionCoprocessor, RegionObserver { 259 static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0); 260 static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0); 261 262 @Override 263 public Optional<RegionObserver> getRegionObserver() { 264 return Optional.of(this); 265 } 266 267 @Override 268 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, 269 final Put put, final WALEdit edit, final Durability durability) 270 throws IOException { 271 PREPUT_INVOCATIONS.incrementAndGet(); 272 Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); 273 if (familyMap.containsKey(test)) { 274 PREPUT_BYPASSES.incrementAndGet(); 275 e.bypass(); 276 } 277 } 278 } 279 280 /** 281 * Calls through to TestCoprocessor. 282 */ 283 public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {} 284 285 /** 286 * Calls through to TestCoprocessor. 287 */ 288 public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {} 289}