001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.ArgumentMatchers.any;
029import static org.mockito.Mockito.argThat;
030import static org.mockito.Mockito.atLeast;
031import static org.mockito.Mockito.doAnswer;
032import static org.mockito.Mockito.mock;
033import static org.mockito.Mockito.times;
034import static org.mockito.Mockito.verify;
035
036import java.io.IOException;
037import java.util.Arrays;
038import java.util.Optional;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.atomic.AtomicInteger;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.CellBuilderFactory;
045import org.apache.hadoop.hbase.CellBuilderType;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.ipc.HBaseRpcController;
053import org.apache.hadoop.hbase.security.UserProvider;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.mockito.ArgumentMatcher;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066
067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
071
072/**
073 * Test that correct rpc priority is sent to server from blocking Table calls. Currently only
074 * implements checks for scans, but more could be added here.
075 */
076@Category({ ClientTests.class, MediumTests.class })
077public class TestTableRpcPriority {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestTableRpcPriority.class);
082
083  @Rule
084  public TestName name = new TestName();
085
086  private ClientProtos.ClientService.BlockingInterface stub;
087  private Connection conn;
088
089  @Before
090  public void setUp() throws IOException, ServiceException {
091    stub = mock(ClientProtos.ClientService.BlockingInterface.class);
092
093    Configuration conf = HBaseConfiguration.create();
094
095    ExecutorService executorService = Executors.newCachedThreadPool();
096    conn = new ConnectionImplementation(conf, executorService,
097      UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
098
099      @Override
100      public ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName)
101        throws IOException {
102        return stub;
103      }
104
105      @Override
106      public RegionLocations relocateRegion(final TableName tableName, final byte[] row,
107        int replicaId) throws IOException {
108        return locateRegion(tableName, row, true, false, replicaId);
109      }
110
111      @Override
112      public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
113        boolean retry, int replicaId) throws IOException {
114        RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
115        ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
116        HRegionLocation loc = new HRegionLocation(info, serverName);
117        return new RegionLocations(loc);
118      }
119    };
120  }
121
122  @Test
123  public void testScan() throws Exception {
124    mockScan(19);
125    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
126  }
127
128  /**
129   * This test verifies that our closeScanner request honors the original priority of the scan if
130   * it's greater than our expected HIGH_QOS for close calls.
131   */
132  @Test
133  public void testScanSuperHighPriority() throws Exception {
134    mockScan(1000);
135    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
136  }
137
138  @Test
139  public void testScanNormalTable() throws Exception {
140    mockScan(NORMAL_QOS);
141    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(NORMAL_QOS));
142  }
143
144  @Test
145  public void testScanSystemTable() throws Exception {
146    mockScan(SYSTEMTABLE_QOS);
147    testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()),
148      Optional.empty());
149  }
150
151  @Test
152  public void testScanMetaTable() throws Exception {
153    mockScan(SYSTEMTABLE_QOS);
154    testForTable(TableName.META_TABLE_NAME, Optional.empty());
155  }
156
157  private void testForTable(TableName tableName, Optional<Integer> priority) throws Exception {
158    Scan scan = new Scan().setCaching(1);
159    priority.ifPresent(scan::setPriority);
160
161    try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
162      assertNotNull(scanner.next());
163      assertNotNull(scanner.next());
164    }
165
166    // just verify that the calls happened. verification of priority occurred in the mocking
167    // open, next, then several renew lease
168    verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
169    verify(stub, times(1)).scan(assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)),
170      assertScannerCloseRequest());
171  }
172
173  private void mockScan(int scanPriority) throws ServiceException {
174    int scannerId = 1;
175
176    doAnswer(new Answer<ClientProtos.ScanResponse>() {
177      @Override
178      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
179        throw new IllegalArgumentException(
180          "Call not covered by explicit mock for arguments controller=" + invocation.getArgument(0)
181            + ", request=" + invocation.getArgument(1));
182      }
183    }).when(stub).scan(any(), any());
184
185    AtomicInteger scanNextCalled = new AtomicInteger(0);
186    doAnswer(new Answer<ClientProtos.ScanResponse>() {
187
188      @Override
189      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
190        ClientProtos.ScanRequest req = invocation.getArgument(1);
191        assertFalse("close scanner should not come in with scan priority " + scanPriority,
192          req.hasCloseScanner() && req.getCloseScanner());
193        ClientProtos.ScanResponse.Builder builder = ClientProtos.ScanResponse.newBuilder();
194
195        if (!req.hasScannerId()) {
196          builder.setScannerId(scannerId);
197        } else {
198          builder.setScannerId(req.getScannerId());
199        }
200
201        Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
202          .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())).setFamily(Bytes.toBytes("cf"))
203          .setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build();
204        Result result = Result.create(Arrays.asList(cell));
205        return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
206          .addResults(ProtobufUtil.toResult(result)).build();
207      }
208    }).when(stub).scan(assertControllerArgs(scanPriority), any());
209
210    doAnswer(new Answer<ClientProtos.ScanResponse>() {
211
212      @Override
213      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
214        ClientProtos.ScanRequest req = invocation.getArgument(1);
215        assertTrue("close request should have scannerId", req.hasScannerId());
216        assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
217        assertTrue("close request should have closerScanner set",
218          req.hasCloseScanner() && req.getCloseScanner());
219
220        return ClientProtos.ScanResponse.getDefaultInstance();
221      }
222    }).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
223      assertScannerCloseRequest());
224  }
225
226  private HBaseRpcController assertControllerArgs(int priority) {
227    return argThat(new ArgumentMatcher<HBaseRpcController>() {
228
229      @Override
230      public boolean matches(HBaseRpcController controller) {
231        // check specified priority, but also check that it has a timeout
232        // this ensures that our conversion from the base controller to the close-specific
233        // controller honored the original arguments.
234        return controller.getPriority() == priority && controller.hasCallTimeout();
235      }
236    });
237  }
238
239  private ClientProtos.ScanRequest assertScannerCloseRequest() {
240    return argThat(new ArgumentMatcher<ClientProtos.ScanRequest>() {
241
242      @Override
243      public boolean matches(ClientProtos.ScanRequest request) {
244        return request.hasCloseScanner() && request.getCloseScanner();
245      }
246    });
247  }
248}