001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Random;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.Append;
030import org.apache.hadoop.hbase.client.CheckAndMutate;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Delete;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Increment;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.RowMutations;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056
057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061
062/**
063 * Tests that one can implement their own RpcControllerFactory and expect it to successfully pass
064 * custom priority values to the server for all HTable calls.
065 */
066@Tag(ClientTests.TAG)
067@Tag(MediumTests.TAG)
068public class TestCustomPriorityRpcControllerFactory {
069
070  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
071
072  private static final AtomicReference<State> STATE = new AtomicReference<>(State.SETUP);
073  private static final AtomicInteger EXPECTED_PRIORITY = new AtomicInteger();
074
075  private enum State {
076    SETUP,
077    WAITING,
078    SUCCESS
079  }
080
081  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
082  private static final byte[] FAMILY = Bytes.toBytes("family");
083  private static final byte[] ROW = Bytes.toBytes("row");
084  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
085  private static final byte[] VALUE = Bytes.toBytes(1L);
086
087  private static final int MIN_CUSTOM_PRIORITY = 201;
088
089  private static Connection CONN;
090  private static Table TABLE;
091
092  @BeforeAll
093  public static void setUpClass() throws Exception {
094    // Set RegionServer class and use default values for other options.
095    UTIL.startMiniCluster(
096      StartTestingClusterOption.builder().rsClass(PriorityRegionServer.class).build());
097    TableDescriptor descriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
098      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
099    UTIL.getAdmin().createTable(descriptor);
100
101    Configuration conf = new Configuration(UTIL.getConfiguration());
102    conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
103      PriorityRpcControllerFactory.class, RpcControllerFactory.class);
104    CONN = ConnectionFactory.createConnection(conf);
105    TABLE = CONN.getTable(TABLE_NAME);
106  }
107
108  @Test
109  public void tetGetPriority() throws Exception {
110    testForCall(new ThrowingCallable() {
111      @Override
112      public void call() throws IOException {
113        TABLE.get(new Get(ROW));
114      }
115    });
116  }
117
118  @Test
119  public void testDeletePriority() throws Exception {
120    testForCall(new ThrowingCallable() {
121      @Override
122      public void call() throws IOException {
123        TABLE.delete(new Delete(ROW));
124      }
125    });
126  }
127
128  @Test
129  public void testIncrementPriority() throws Exception {
130    testForCall(new ThrowingCallable() {
131      @Override
132      public void call() throws IOException {
133        TABLE.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1));
134      }
135    });
136  }
137
138  @Test
139  public void testAppendPriority() throws Exception {
140    testForCall(new ThrowingCallable() {
141      @Override
142      public void call() throws IOException {
143        TABLE.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
144      }
145    });
146  }
147
148  @Test
149  public void testPutPriority() throws Exception {
150    testForCall(new ThrowingCallable() {
151      @Override
152      public void call() throws IOException {
153        Put put = new Put(ROW);
154        put.addColumn(FAMILY, QUALIFIER, VALUE);
155        TABLE.put(put);
156      }
157    });
158
159  }
160
161  @Test
162  public void testExistsPriority() throws Exception {
163    testForCall(new ThrowingCallable() {
164      @Override
165      public void call() throws IOException {
166        TABLE.exists(new Get(ROW));
167      }
168    });
169  }
170
171  @Test
172  public void testMutatePriority() throws Exception {
173    testForCall(new ThrowingCallable() {
174      @Override
175      public void call() throws IOException {
176        RowMutations mutation = new RowMutations(ROW);
177        mutation.add(new Delete(ROW));
178        mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
179        TABLE.mutateRow(mutation);
180      }
181    });
182  }
183
184  @Test
185  public void testCheckAndMutatePriority() throws Exception {
186    testForCall(new ThrowingCallable() {
187      @Override
188      public void call() throws IOException {
189        RowMutations mutation = new RowMutations(ROW);
190        mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
191        TABLE.checkAndMutate(
192          CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER).build(mutation));
193      }
194    });
195  }
196
197  @Test
198  public void testMultiGetsPriority() throws Exception {
199    testForCall(new ThrowingCallable() {
200      @Override
201      public void call() throws Exception {
202        Get get1 = new Get(ROW);
203        get1.addColumn(FAMILY, QUALIFIER);
204        Get get2 = new Get(ROW);
205        get2.addColumn(FAMILY, QUALIFIER);
206        List<Get> gets = new ArrayList<>();
207        gets.add(get1);
208        gets.add(get2);
209        TABLE.batch(gets, new Object[2]);
210      }
211    });
212  }
213
214  @Test
215  public void testMultiPutsPriority() throws Exception {
216    testForCall(new ThrowingCallable() {
217      @Override
218      public void call() throws Exception {
219        Put put1 = new Put(ROW);
220        put1.addColumn(FAMILY, QUALIFIER, VALUE);
221        Put put2 = new Put(ROW);
222        put2.addColumn(FAMILY, QUALIFIER, VALUE);
223        List<Put> puts = new ArrayList<>();
224        puts.add(put1);
225        puts.add(put2);
226        TABLE.batch(puts, new Object[2]);
227      }
228    });
229  }
230
231  @Test
232  public void testScanPriority() throws Exception {
233    testForCall(new ThrowingCallable() {
234      @Override
235      public void call() throws IOException {
236        ResultScanner scanner = TABLE.getScanner(new Scan());
237        scanner.next();
238      }
239    });
240  }
241
242  private void testForCall(ThrowingCallable callable) throws Exception {
243    STATE.set(State.WAITING);
244    // set it higher than MIN_CUSTOM_PRIORITY so we can ignore calls for meta, setup, etc
245    EXPECTED_PRIORITY.set(new Random().nextInt(MIN_CUSTOM_PRIORITY) + MIN_CUSTOM_PRIORITY);
246    callable.call();
247
248    assertEquals(State.SUCCESS, STATE.get(),
249      "Expected state to change to SUCCESS. Check for assertion error in logs");
250  }
251
252  private interface ThrowingCallable {
253    void call() throws Exception;
254  }
255
256  public static class PriorityRegionServer
257    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
258    public PriorityRegionServer(Configuration conf) throws IOException, InterruptedException {
259      super(conf);
260    }
261
262    @Override
263    protected RSRpcServices createRpcServices() throws IOException {
264      return new PriorityRpcServices(this);
265    }
266  }
267
268  public static class PriorityRpcControllerFactory extends RpcControllerFactory {
269
270    public PriorityRpcControllerFactory(Configuration conf) {
271      super(conf);
272    }
273
274    @Override
275    public HBaseRpcController newController() {
276      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController());
277    }
278
279    @Override
280    public HBaseRpcController newController(ExtendedCellScanner cellScanner) {
281      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellScanner));
282    }
283
284    @Override
285    public HBaseRpcController newController(List<ExtendedCellScannable> cellIterables) {
286      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellIterables));
287    }
288  }
289
290  private static class PriorityController extends DelegatingHBaseRpcController {
291    private final int priority;
292
293    public PriorityController(int priority, HBaseRpcController controller) {
294      super(controller);
295      this.priority = priority;
296    }
297
298    @Override
299    public int getPriority() {
300      return priority;
301    }
302  }
303
304  public static class PriorityRpcServices extends RSRpcServices {
305    PriorityRpcServices(HRegionServer rs) throws IOException {
306      super(rs);
307    }
308
309    private void checkPriorityIfWaiting() {
310      if (STATE.get() == State.WAITING) {
311        int priority = RpcServer.getCurrentCall().get().getPriority();
312        if (priority < MIN_CUSTOM_PRIORITY) {
313          return;
314        }
315        assertEquals(EXPECTED_PRIORITY.get(), priority);
316        STATE.set(State.SUCCESS);
317      }
318    }
319
320    @Override
321    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
322      throws ServiceException {
323      checkPriorityIfWaiting();
324      return super.get(controller, request);
325    }
326
327    @Override
328    public ClientProtos.MutateResponse mutate(RpcController rpcc,
329      ClientProtos.MutateRequest request) throws ServiceException {
330      checkPriorityIfWaiting();
331      return super.mutate(rpcc, request);
332    }
333
334    @Override
335    public ClientProtos.ScanResponse scan(RpcController controller,
336      ClientProtos.ScanRequest request) throws ServiceException {
337      checkPriorityIfWaiting();
338      return super.scan(controller, request);
339    }
340
341    @Override
342    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
343      throws ServiceException {
344      checkPriorityIfWaiting();
345      return super.multi(rpcc, request);
346    }
347  }
348}