001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Random;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.Append;
030import org.apache.hadoop.hbase.client.CheckAndMutate;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Delete;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Increment;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.RowMutations;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062
063/**
064 * Tests that one can implement their own RpcControllerFactory and expect it to successfully pass
065 * custom priority values to the server for all HTable calls.
066 */
067@Category({ ClientTests.class, MediumTests.class })
068public class TestCustomPriorityRpcControllerFactory {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestCustomPriorityRpcControllerFactory.class);
073
074  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  private static final AtomicReference<State> STATE = new AtomicReference<>(State.SETUP);
077  private static final AtomicInteger EXPECTED_PRIORITY = new AtomicInteger();
078
079  private enum State {
080    SETUP,
081    WAITING,
082    SUCCESS
083  }
084
085  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
086  private static final byte[] FAMILY = Bytes.toBytes("family");
087  private static final byte[] ROW = Bytes.toBytes("row");
088  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
089  private static final byte[] VALUE = Bytes.toBytes(1L);
090
091  private static final int MIN_CUSTOM_PRIORITY = 201;
092
093  private static Connection CONN;
094  private static Table TABLE;
095
096  @BeforeClass
097  public static void setUpClass() throws Exception {
098    // Set RegionServer class and use default values for other options.
099    UTIL.startMiniCluster(
100      StartTestingClusterOption.builder().rsClass(PriorityRegionServer.class).build());
101    TableDescriptor descriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
102      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
103    UTIL.getAdmin().createTable(descriptor);
104
105    Configuration conf = new Configuration(UTIL.getConfiguration());
106    conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
107      PriorityRpcControllerFactory.class, RpcControllerFactory.class);
108    CONN = ConnectionFactory.createConnection(conf);
109    TABLE = CONN.getTable(TABLE_NAME);
110  }
111
112  @Test
113  public void tetGetPriority() throws Exception {
114    testForCall(new ThrowingCallable() {
115      @Override
116      public void call() throws IOException {
117        TABLE.get(new Get(ROW));
118      }
119    });
120  }
121
122  @Test
123  public void testDeletePriority() throws Exception {
124    testForCall(new ThrowingCallable() {
125      @Override
126      public void call() throws IOException {
127        TABLE.delete(new Delete(ROW));
128      }
129    });
130  }
131
132  @Test
133  public void testIncrementPriority() throws Exception {
134    testForCall(new ThrowingCallable() {
135      @Override
136      public void call() throws IOException {
137        TABLE.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1));
138      }
139    });
140  }
141
142  @Test
143  public void testAppendPriority() throws Exception {
144    testForCall(new ThrowingCallable() {
145      @Override
146      public void call() throws IOException {
147        TABLE.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
148      }
149    });
150  }
151
152  @Test
153  public void testPutPriority() throws Exception {
154    testForCall(new ThrowingCallable() {
155      @Override
156      public void call() throws IOException {
157        Put put = new Put(ROW);
158        put.addColumn(FAMILY, QUALIFIER, VALUE);
159        TABLE.put(put);
160      }
161    });
162
163  }
164
165  @Test
166  public void testExistsPriority() throws Exception {
167    testForCall(new ThrowingCallable() {
168      @Override
169      public void call() throws IOException {
170        TABLE.exists(new Get(ROW));
171      }
172    });
173  }
174
175  @Test
176  public void testMutatePriority() throws Exception {
177    testForCall(new ThrowingCallable() {
178      @Override
179      public void call() throws IOException {
180        RowMutations mutation = new RowMutations(ROW);
181        mutation.add(new Delete(ROW));
182        mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
183        TABLE.mutateRow(mutation);
184      }
185    });
186  }
187
188  @Test
189  public void testCheckAndMutatePriority() throws Exception {
190    testForCall(new ThrowingCallable() {
191      @Override
192      public void call() throws IOException {
193        RowMutations mutation = new RowMutations(ROW);
194        mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
195        TABLE.checkAndMutate(
196          CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER).build(mutation));
197      }
198    });
199  }
200
201  @Test
202  public void testMultiGetsPriority() throws Exception {
203    testForCall(new ThrowingCallable() {
204      @Override
205      public void call() throws Exception {
206        Get get1 = new Get(ROW);
207        get1.addColumn(FAMILY, QUALIFIER);
208        Get get2 = new Get(ROW);
209        get2.addColumn(FAMILY, QUALIFIER);
210        List<Get> gets = new ArrayList<>();
211        gets.add(get1);
212        gets.add(get2);
213        TABLE.batch(gets, new Object[2]);
214      }
215    });
216  }
217
218  @Test
219  public void testMultiPutsPriority() throws Exception {
220    testForCall(new ThrowingCallable() {
221      @Override
222      public void call() throws Exception {
223        Put put1 = new Put(ROW);
224        put1.addColumn(FAMILY, QUALIFIER, VALUE);
225        Put put2 = new Put(ROW);
226        put2.addColumn(FAMILY, QUALIFIER, VALUE);
227        List<Put> puts = new ArrayList<>();
228        puts.add(put1);
229        puts.add(put2);
230        TABLE.batch(puts, new Object[2]);
231      }
232    });
233  }
234
235  @Test
236  public void testScanPriority() throws Exception {
237    testForCall(new ThrowingCallable() {
238      @Override
239      public void call() throws IOException {
240        ResultScanner scanner = TABLE.getScanner(new Scan());
241        scanner.next();
242      }
243    });
244  }
245
246  private void testForCall(ThrowingCallable callable) throws Exception {
247    STATE.set(State.WAITING);
248    // set it higher than MIN_CUSTOM_PRIORITY so we can ignore calls for meta, setup, etc
249    EXPECTED_PRIORITY.set(new Random().nextInt(MIN_CUSTOM_PRIORITY) + MIN_CUSTOM_PRIORITY);
250    callable.call();
251
252    assertEquals("Expected state to change to SUCCESS. Check for assertion error in logs",
253      STATE.get(), State.SUCCESS);
254  }
255
256  private interface ThrowingCallable {
257    void call() throws Exception;
258  }
259
260  public static class PriorityRegionServer
261    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
262    public PriorityRegionServer(Configuration conf) throws IOException, InterruptedException {
263      super(conf);
264    }
265
266    @Override
267    protected RSRpcServices createRpcServices() throws IOException {
268      return new PriorityRpcServices(this);
269    }
270  }
271
272  public static class PriorityRpcControllerFactory extends RpcControllerFactory {
273
274    public PriorityRpcControllerFactory(Configuration conf) {
275      super(conf);
276    }
277
278    @Override
279    public HBaseRpcController newController() {
280      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController());
281    }
282
283    @Override
284    public HBaseRpcController newController(CellScanner cellScanner) {
285      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellScanner));
286    }
287
288    @Override
289    public HBaseRpcController newController(List<CellScannable> cellIterables) {
290      return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellIterables));
291    }
292  }
293
294  private static class PriorityController extends DelegatingHBaseRpcController {
295    private final int priority;
296
297    public PriorityController(int priority, HBaseRpcController controller) {
298      super(controller);
299      this.priority = priority;
300    }
301
302    @Override
303    public int getPriority() {
304      return priority;
305    }
306  }
307
308  public static class PriorityRpcServices extends RSRpcServices {
309    PriorityRpcServices(HRegionServer rs) throws IOException {
310      super(rs);
311    }
312
313    private void checkPriorityIfWaiting() {
314      if (STATE.get() == State.WAITING) {
315        int priority = RpcServer.getCurrentCall().get().getPriority();
316        if (priority < MIN_CUSTOM_PRIORITY) {
317          return;
318        }
319        assertEquals(EXPECTED_PRIORITY.get(), priority);
320        STATE.set(State.SUCCESS);
321      }
322    }
323
324    @Override
325    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
326      throws ServiceException {
327      checkPriorityIfWaiting();
328      return super.get(controller, request);
329    }
330
331    @Override
332    public ClientProtos.MutateResponse mutate(RpcController rpcc,
333      ClientProtos.MutateRequest request) throws ServiceException {
334      checkPriorityIfWaiting();
335      return super.mutate(rpcc, request);
336    }
337
338    @Override
339    public ClientProtos.ScanResponse scan(RpcController controller,
340      ClientProtos.ScanRequest request) throws ServiceException {
341      checkPriorityIfWaiting();
342      return super.scan(controller, request);
343    }
344
345    @Override
346    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
347      throws ServiceException {
348      checkPriorityIfWaiting();
349      return super.multi(rpcc, request);
350    }
351  }
352}