001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.ArgumentMatchers.any;
029import static org.mockito.Mockito.argThat;
030import static org.mockito.Mockito.atLeast;
031import static org.mockito.Mockito.doAnswer;
032import static org.mockito.Mockito.mock;
033import static org.mockito.Mockito.times;
034import static org.mockito.Mockito.verify;
035
036import java.io.IOException;
037import java.util.Arrays;
038import java.util.Optional;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.atomic.AtomicInteger;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.CellBuilderFactory;
045import org.apache.hadoop.hbase.CellBuilderType;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.ipc.HBaseRpcController;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.UserProvider;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.Before;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.mockito.ArgumentMatcher;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072
073/**
074 * Test that correct rpc priority is sent to server from blocking Table calls. Currently only
075 * implements checks for scans, but more could be added here.
076 */
077@Category({ ClientTests.class, MediumTests.class })
078public class TestTableRpcPriority {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestTableRpcPriority.class);
083
084  @Rule
085  public TestName name = new TestName();
086
087  private ClientProtos.ClientService.BlockingInterface stub;
088  private Connection conn;
089
090  @Before
091  public void setUp() throws IOException, ServiceException {
092    stub = mock(ClientProtos.ClientService.BlockingInterface.class);
093
094    Configuration conf = HBaseConfiguration.create();
095
096    ExecutorService executorService = Executors.newCachedThreadPool();
097    User user = UserProvider.instantiate(conf).getCurrent();
098    conn = new ConnectionImplementation(conf, executorService, user,
099      new DoNothingConnectionRegistry(conf, user)) {
100
101      @Override
102      public ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName)
103        throws IOException {
104        return stub;
105      }
106
107      @Override
108      public RegionLocations relocateRegion(final TableName tableName, final byte[] row,
109        int replicaId) throws IOException {
110        return locateRegion(tableName, row, true, false, replicaId);
111      }
112
113      @Override
114      public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
115        boolean retry, int replicaId) throws IOException {
116        RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
117        ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
118        HRegionLocation loc = new HRegionLocation(info, serverName);
119        return new RegionLocations(loc);
120      }
121    };
122  }
123
124  @Test
125  public void testScan() throws Exception {
126    mockScan(19);
127    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
128  }
129
130  /**
131   * This test verifies that our closeScanner request honors the original priority of the scan if
132   * it's greater than our expected HIGH_QOS for close calls.
133   */
134  @Test
135  public void testScanSuperHighPriority() throws Exception {
136    mockScan(1000);
137    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
138  }
139
140  @Test
141  public void testScanNormalTable() throws Exception {
142    mockScan(NORMAL_QOS);
143    testForTable(TableName.valueOf(name.getMethodName()), Optional.of(NORMAL_QOS));
144  }
145
146  @Test
147  public void testScanSystemTable() throws Exception {
148    mockScan(SYSTEMTABLE_QOS);
149    testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()),
150      Optional.empty());
151  }
152
153  @Test
154  public void testScanMetaTable() throws Exception {
155    mockScan(SYSTEMTABLE_QOS);
156    testForTable(TableName.META_TABLE_NAME, Optional.empty());
157  }
158
159  private void testForTable(TableName tableName, Optional<Integer> priority) throws Exception {
160    Scan scan = new Scan().setCaching(1);
161    priority.ifPresent(scan::setPriority);
162
163    try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
164      assertNotNull(scanner.next());
165      assertNotNull(scanner.next());
166    }
167
168    // just verify that the calls happened. verification of priority occurred in the mocking
169    // open, next, then several renew lease
170    verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
171    verify(stub, times(1)).scan(assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)),
172      assertScannerCloseRequest());
173  }
174
175  private void mockScan(int scanPriority) throws ServiceException {
176    int scannerId = 1;
177
178    doAnswer(new Answer<ClientProtos.ScanResponse>() {
179      @Override
180      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
181        throw new IllegalArgumentException(
182          "Call not covered by explicit mock for arguments controller=" + invocation.getArgument(0)
183            + ", request=" + invocation.getArgument(1));
184      }
185    }).when(stub).scan(any(), any());
186
187    AtomicInteger scanNextCalled = new AtomicInteger(0);
188    doAnswer(new Answer<ClientProtos.ScanResponse>() {
189
190      @Override
191      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
192        ClientProtos.ScanRequest req = invocation.getArgument(1);
193        assertFalse("close scanner should not come in with scan priority " + scanPriority,
194          req.hasCloseScanner() && req.getCloseScanner());
195        ClientProtos.ScanResponse.Builder builder = ClientProtos.ScanResponse.newBuilder();
196
197        if (!req.hasScannerId()) {
198          builder.setScannerId(scannerId);
199        } else {
200          builder.setScannerId(req.getScannerId());
201        }
202
203        Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
204          .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())).setFamily(Bytes.toBytes("cf"))
205          .setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build();
206        Result result = Result.create(Arrays.asList(cell));
207        return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
208          .addResults(ProtobufUtil.toResult(result)).build();
209      }
210    }).when(stub).scan(assertControllerArgs(scanPriority), any());
211
212    doAnswer(new Answer<ClientProtos.ScanResponse>() {
213
214      @Override
215      public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable {
216        ClientProtos.ScanRequest req = invocation.getArgument(1);
217        assertTrue("close request should have scannerId", req.hasScannerId());
218        assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
219        assertTrue("close request should have closerScanner set",
220          req.hasCloseScanner() && req.getCloseScanner());
221
222        return ClientProtos.ScanResponse.getDefaultInstance();
223      }
224    }).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
225      assertScannerCloseRequest());
226  }
227
228  private HBaseRpcController assertControllerArgs(int priority) {
229    return argThat(new ArgumentMatcher<HBaseRpcController>() {
230
231      @Override
232      public boolean matches(HBaseRpcController controller) {
233        // check specified priority, but also check that it has a timeout
234        // this ensures that our conversion from the base controller to the close-specific
235        // controller honored the original arguments.
236        return controller.getPriority() == priority && controller.hasCallTimeout();
237      }
238    });
239  }
240
241  private ClientProtos.ScanRequest assertScannerCloseRequest() {
242    return argThat(new ArgumentMatcher<ClientProtos.ScanRequest>() {
243
244      @Override
245      public boolean matches(ClientProtos.ScanRequest request) {
246        return request.hasCloseScanner() && request.getCloseScanner();
247      }
248    });
249  }
250}