001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; 022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; 023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.ArgumentMatchers.any; 029import static org.mockito.Mockito.argThat; 030import static org.mockito.Mockito.atLeast; 031import static org.mockito.Mockito.doAnswer; 032import static org.mockito.Mockito.mock; 033import static org.mockito.Mockito.times; 034import static org.mockito.Mockito.verify; 035 036import java.io.IOException; 037import java.util.Arrays; 038import java.util.Optional; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.atomic.AtomicInteger; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.CellBuilderFactory; 045import org.apache.hadoop.hbase.CellBuilderType; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.ipc.HBaseRpcController; 053import org.apache.hadoop.hbase.security.UserProvider; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.mockito.ArgumentMatcher; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066 067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 071 072/** 073 * Test that correct rpc priority is sent to server from blocking Table calls. Currently only 074 * implements checks for scans, but more could be added here. 075 */ 076@Category({ ClientTests.class, MediumTests.class }) 077public class TestTableRpcPriority { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestTableRpcPriority.class); 082 083 @Rule 084 public TestName name = new TestName(); 085 086 private ClientProtos.ClientService.BlockingInterface stub; 087 private Connection conn; 088 089 @Before 090 public void setUp() throws IOException, ServiceException { 091 stub = mock(ClientProtos.ClientService.BlockingInterface.class); 092 093 Configuration conf = HBaseConfiguration.create(); 094 095 ExecutorService executorService = Executors.newCachedThreadPool(); 096 conn = new ConnectionImplementation(conf, executorService, 097 UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) { 098 099 @Override 100 public ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName) 101 throws IOException { 102 return stub; 103 } 104 105 @Override 106 public RegionLocations relocateRegion(final TableName tableName, final byte[] row, 107 int replicaId) throws IOException { 108 return locateRegion(tableName, row, true, false, replicaId); 109 } 110 111 @Override 112 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 113 boolean retry, int replicaId) throws IOException { 114 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 115 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 116 HRegionLocation loc = new HRegionLocation(info, serverName); 117 return new RegionLocations(loc); 118 } 119 }; 120 } 121 122 @Test 123 public void testScan() throws Exception { 124 mockScan(19); 125 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19)); 126 } 127 128 /** 129 * This test verifies that our closeScanner request honors the original priority of the scan if 130 * it's greater than our expected HIGH_QOS for close calls. 131 */ 132 @Test 133 public void testScanSuperHighPriority() throws Exception { 134 mockScan(1000); 135 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000)); 136 } 137 138 @Test 139 public void testScanNormalTable() throws Exception { 140 mockScan(NORMAL_QOS); 141 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(NORMAL_QOS)); 142 } 143 144 @Test 145 public void testScanSystemTable() throws Exception { 146 mockScan(SYSTEMTABLE_QOS); 147 testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), 148 Optional.empty()); 149 } 150 151 @Test 152 public void testScanMetaTable() throws Exception { 153 mockScan(SYSTEMTABLE_QOS); 154 testForTable(TableName.META_TABLE_NAME, Optional.empty()); 155 } 156 157 private void testForTable(TableName tableName, Optional<Integer> priority) throws Exception { 158 Scan scan = new Scan().setCaching(1); 159 priority.ifPresent(scan::setPriority); 160 161 try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { 162 assertNotNull(scanner.next()); 163 assertNotNull(scanner.next()); 164 } 165 166 // just verify that the calls happened. verification of priority occurred in the mocking 167 // open, next, then several renew lease 168 verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class)); 169 verify(stub, times(1)).scan(assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), 170 assertScannerCloseRequest()); 171 } 172 173 private void mockScan(int scanPriority) throws ServiceException { 174 int scannerId = 1; 175 176 doAnswer(new Answer<ClientProtos.ScanResponse>() { 177 @Override 178 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 179 throw new IllegalArgumentException( 180 "Call not covered by explicit mock for arguments controller=" + invocation.getArgument(0) 181 + ", request=" + invocation.getArgument(1)); 182 } 183 }).when(stub).scan(any(), any()); 184 185 AtomicInteger scanNextCalled = new AtomicInteger(0); 186 doAnswer(new Answer<ClientProtos.ScanResponse>() { 187 188 @Override 189 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 190 ClientProtos.ScanRequest req = invocation.getArgument(1); 191 assertFalse("close scanner should not come in with scan priority " + scanPriority, 192 req.hasCloseScanner() && req.getCloseScanner()); 193 ClientProtos.ScanResponse.Builder builder = ClientProtos.ScanResponse.newBuilder(); 194 195 if (!req.hasScannerId()) { 196 builder.setScannerId(scannerId); 197 } else { 198 builder.setScannerId(req.getScannerId()); 199 } 200 201 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 202 .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())).setFamily(Bytes.toBytes("cf")) 203 .setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); 204 Result result = Result.create(Arrays.asList(cell)); 205 return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true) 206 .addResults(ProtobufUtil.toResult(result)).build(); 207 } 208 }).when(stub).scan(assertControllerArgs(scanPriority), any()); 209 210 doAnswer(new Answer<ClientProtos.ScanResponse>() { 211 212 @Override 213 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 214 ClientProtos.ScanRequest req = invocation.getArgument(1); 215 assertTrue("close request should have scannerId", req.hasScannerId()); 216 assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); 217 assertTrue("close request should have closerScanner set", 218 req.hasCloseScanner() && req.getCloseScanner()); 219 220 return ClientProtos.ScanResponse.getDefaultInstance(); 221 } 222 }).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)), 223 assertScannerCloseRequest()); 224 } 225 226 private HBaseRpcController assertControllerArgs(int priority) { 227 return argThat(new ArgumentMatcher<HBaseRpcController>() { 228 229 @Override 230 public boolean matches(HBaseRpcController controller) { 231 // check specified priority, but also check that it has a timeout 232 // this ensures that our conversion from the base controller to the close-specific 233 // controller honored the original arguments. 234 return controller.getPriority() == priority && controller.hasCallTimeout(); 235 } 236 }); 237 } 238 239 private ClientProtos.ScanRequest assertScannerCloseRequest() { 240 return argThat(new ArgumentMatcher<ClientProtos.ScanRequest>() { 241 242 @Override 243 public boolean matches(ClientProtos.ScanRequest request) { 244 return request.hasCloseScanner() && request.getCloseScanner(); 245 } 246 }); 247 } 248}