001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.Append; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Increment; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.RowMutations; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RSRpcServices; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056 057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061 062/** 063 * Tests that one can implement their own RpcControllerFactory and expect it to successfully pass 064 * custom priority values to the server for all HTable calls. 065 */ 066@Tag(ClientTests.TAG) 067@Tag(MediumTests.TAG) 068public class TestCustomPriorityRpcControllerFactory { 069 070 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 071 072 private static final AtomicReference<State> STATE = new AtomicReference<>(State.SETUP); 073 private static final AtomicInteger EXPECTED_PRIORITY = new AtomicInteger(); 074 075 private enum State { 076 SETUP, 077 WAITING, 078 SUCCESS 079 } 080 081 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 082 private static final byte[] FAMILY = Bytes.toBytes("family"); 083 private static final byte[] ROW = Bytes.toBytes("row"); 084 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 085 private static final byte[] VALUE = Bytes.toBytes(1L); 086 087 private static final int MIN_CUSTOM_PRIORITY = 201; 088 089 private static Connection CONN; 090 private static Table TABLE; 091 092 @BeforeAll 093 public static void setUpClass() throws Exception { 094 // Set RegionServer class and use default values for other options. 095 UTIL.startMiniCluster( 096 StartTestingClusterOption.builder().rsClass(PriorityRegionServer.class).build()); 097 TableDescriptor descriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) 098 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 099 UTIL.getAdmin().createTable(descriptor); 100 101 Configuration conf = new Configuration(UTIL.getConfiguration()); 102 conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 103 PriorityRpcControllerFactory.class, RpcControllerFactory.class); 104 CONN = ConnectionFactory.createConnection(conf); 105 TABLE = CONN.getTable(TABLE_NAME); 106 } 107 108 @Test 109 public void tetGetPriority() throws Exception { 110 testForCall(new ThrowingCallable() { 111 @Override 112 public void call() throws IOException { 113 TABLE.get(new Get(ROW)); 114 } 115 }); 116 } 117 118 @Test 119 public void testDeletePriority() throws Exception { 120 testForCall(new ThrowingCallable() { 121 @Override 122 public void call() throws IOException { 123 TABLE.delete(new Delete(ROW)); 124 } 125 }); 126 } 127 128 @Test 129 public void testIncrementPriority() throws Exception { 130 testForCall(new ThrowingCallable() { 131 @Override 132 public void call() throws IOException { 133 TABLE.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1)); 134 } 135 }); 136 } 137 138 @Test 139 public void testAppendPriority() throws Exception { 140 testForCall(new ThrowingCallable() { 141 @Override 142 public void call() throws IOException { 143 TABLE.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 144 } 145 }); 146 } 147 148 @Test 149 public void testPutPriority() throws Exception { 150 testForCall(new ThrowingCallable() { 151 @Override 152 public void call() throws IOException { 153 Put put = new Put(ROW); 154 put.addColumn(FAMILY, QUALIFIER, VALUE); 155 TABLE.put(put); 156 } 157 }); 158 159 } 160 161 @Test 162 public void testExistsPriority() throws Exception { 163 testForCall(new ThrowingCallable() { 164 @Override 165 public void call() throws IOException { 166 TABLE.exists(new Get(ROW)); 167 } 168 }); 169 } 170 171 @Test 172 public void testMutatePriority() throws Exception { 173 testForCall(new ThrowingCallable() { 174 @Override 175 public void call() throws IOException { 176 RowMutations mutation = new RowMutations(ROW); 177 mutation.add(new Delete(ROW)); 178 mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 179 TABLE.mutateRow(mutation); 180 } 181 }); 182 } 183 184 @Test 185 public void testCheckAndMutatePriority() throws Exception { 186 testForCall(new ThrowingCallable() { 187 @Override 188 public void call() throws IOException { 189 RowMutations mutation = new RowMutations(ROW); 190 mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 191 TABLE.checkAndMutate( 192 CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER).build(mutation)); 193 } 194 }); 195 } 196 197 @Test 198 public void testMultiGetsPriority() throws Exception { 199 testForCall(new ThrowingCallable() { 200 @Override 201 public void call() throws Exception { 202 Get get1 = new Get(ROW); 203 get1.addColumn(FAMILY, QUALIFIER); 204 Get get2 = new Get(ROW); 205 get2.addColumn(FAMILY, QUALIFIER); 206 List<Get> gets = new ArrayList<>(); 207 gets.add(get1); 208 gets.add(get2); 209 TABLE.batch(gets, new Object[2]); 210 } 211 }); 212 } 213 214 @Test 215 public void testMultiPutsPriority() throws Exception { 216 testForCall(new ThrowingCallable() { 217 @Override 218 public void call() throws Exception { 219 Put put1 = new Put(ROW); 220 put1.addColumn(FAMILY, QUALIFIER, VALUE); 221 Put put2 = new Put(ROW); 222 put2.addColumn(FAMILY, QUALIFIER, VALUE); 223 List<Put> puts = new ArrayList<>(); 224 puts.add(put1); 225 puts.add(put2); 226 TABLE.batch(puts, new Object[2]); 227 } 228 }); 229 } 230 231 @Test 232 public void testScanPriority() throws Exception { 233 testForCall(new ThrowingCallable() { 234 @Override 235 public void call() throws IOException { 236 ResultScanner scanner = TABLE.getScanner(new Scan()); 237 scanner.next(); 238 } 239 }); 240 } 241 242 private void testForCall(ThrowingCallable callable) throws Exception { 243 STATE.set(State.WAITING); 244 // set it higher than MIN_CUSTOM_PRIORITY so we can ignore calls for meta, setup, etc 245 EXPECTED_PRIORITY.set(new Random().nextInt(MIN_CUSTOM_PRIORITY) + MIN_CUSTOM_PRIORITY); 246 callable.call(); 247 248 assertEquals(State.SUCCESS, STATE.get(), 249 "Expected state to change to SUCCESS. Check for assertion error in logs"); 250 } 251 252 private interface ThrowingCallable { 253 void call() throws Exception; 254 } 255 256 public static class PriorityRegionServer 257 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 258 public PriorityRegionServer(Configuration conf) throws IOException, InterruptedException { 259 super(conf); 260 } 261 262 @Override 263 protected RSRpcServices createRpcServices() throws IOException { 264 return new PriorityRpcServices(this); 265 } 266 } 267 268 public static class PriorityRpcControllerFactory extends RpcControllerFactory { 269 270 public PriorityRpcControllerFactory(Configuration conf) { 271 super(conf); 272 } 273 274 @Override 275 public HBaseRpcController newController() { 276 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController()); 277 } 278 279 @Override 280 public HBaseRpcController newController(ExtendedCellScanner cellScanner) { 281 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellScanner)); 282 } 283 284 @Override 285 public HBaseRpcController newController(List<ExtendedCellScannable> cellIterables) { 286 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellIterables)); 287 } 288 } 289 290 private static class PriorityController extends DelegatingHBaseRpcController { 291 private final int priority; 292 293 public PriorityController(int priority, HBaseRpcController controller) { 294 super(controller); 295 this.priority = priority; 296 } 297 298 @Override 299 public int getPriority() { 300 return priority; 301 } 302 } 303 304 public static class PriorityRpcServices extends RSRpcServices { 305 PriorityRpcServices(HRegionServer rs) throws IOException { 306 super(rs); 307 } 308 309 private void checkPriorityIfWaiting() { 310 if (STATE.get() == State.WAITING) { 311 int priority = RpcServer.getCurrentCall().get().getPriority(); 312 if (priority < MIN_CUSTOM_PRIORITY) { 313 return; 314 } 315 assertEquals(EXPECTED_PRIORITY.get(), priority); 316 STATE.set(State.SUCCESS); 317 } 318 } 319 320 @Override 321 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 322 throws ServiceException { 323 checkPriorityIfWaiting(); 324 return super.get(controller, request); 325 } 326 327 @Override 328 public ClientProtos.MutateResponse mutate(RpcController rpcc, 329 ClientProtos.MutateRequest request) throws ServiceException { 330 checkPriorityIfWaiting(); 331 return super.mutate(rpcc, request); 332 } 333 334 @Override 335 public ClientProtos.ScanResponse scan(RpcController controller, 336 ClientProtos.ScanRequest request) throws ServiceException { 337 checkPriorityIfWaiting(); 338 return super.scan(controller, request); 339 } 340 341 @Override 342 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 343 throws ServiceException { 344 checkPriorityIfWaiting(); 345 return super.multi(rpcc, request); 346 } 347 } 348}