001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CellScannable;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
036import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
037import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController;
039import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.AfterClass;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.rules.TestName;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.ConcurrentHashMultiset;
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053import org.apache.hbase.thirdparty.com.google.common.collect.Multiset;
054
055@Category({MediumTests.class, ClientTests.class})
056public class TestRpcControllerFactory {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestRpcControllerFactory.class);
061
062  public static class StaticRpcControllerFactory extends RpcControllerFactory {
063
064    public StaticRpcControllerFactory(Configuration conf) {
065      super(conf);
066    }
067
068    @Override
069    public HBaseRpcController newController() {
070      return new CountingRpcController(super.newController());
071    }
072
073    @Override
074    public HBaseRpcController newController(final CellScanner cellScanner) {
075      return new CountingRpcController(super.newController(cellScanner));
076    }
077
078    @Override
079    public HBaseRpcController newController(final List<CellScannable> cellIterables) {
080      return new CountingRpcController(super.newController(cellIterables));
081    }
082  }
083
084  public static class CountingRpcController extends DelegatingHBaseRpcController {
085
086    private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
087    private static AtomicInteger INT_PRIORITY = new AtomicInteger();
088    private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
089
090    public CountingRpcController(HBaseRpcController delegate) {
091      super(delegate);
092    }
093
094    @Override
095    public void setPriority(int priority) {
096      int oldPriority = getPriority();
097      super.setPriority(priority);
098      int newPriority = getPriority();
099      if (newPriority != oldPriority) {
100        INT_PRIORITY.incrementAndGet();
101        GROUPED_PRIORITY.add(priority);
102      }
103    }
104
105    @Override
106    public void setPriority(TableName tn) {
107      super.setPriority(tn);
108      // ignore counts for system tables - it could change and we really only want to check on what
109      // the client should change
110      if (tn != null && !tn.isSystemTable()) {
111        TABLE_PRIORITY.incrementAndGet();
112      }
113
114    }
115  }
116
117  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
118
119  @Rule
120  public TestName name = new TestName();
121
122  @BeforeClass
123  public static void setup() throws Exception {
124    // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
125    // this is already in tests, so we can just use it.
126    Configuration conf = UTIL.getConfiguration();
127    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
128      ProtobufCoprocessorService.class.getName());
129
130    UTIL.startMiniCluster();
131  }
132
133  @AfterClass
134  public static void teardown() throws Exception {
135    UTIL.shutdownMiniCluster();
136  }
137
138  /**
139   * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
140   * cover all methods here and really is a bit brittle since we can always add new methods but
141   * won't be sure to add them here. So we just can cover the major ones.
142   * @throws Exception on failure
143   */
144  @Test
145  public void testCountController() throws Exception {
146    Configuration conf = new Configuration(UTIL.getConfiguration());
147    // setup our custom controller
148    conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
149      StaticRpcControllerFactory.class.getName());
150
151    final TableName tableName = TableName.valueOf(name.getMethodName());
152    UTIL.createTable(tableName, fam1).close();
153
154    // change one of the connection properties so we get a new Connection with our configuration
155    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
156
157    Connection connection = ConnectionFactory.createConnection(conf);
158    Table table = connection.getTable(tableName);
159    byte[] row = Bytes.toBytes("row");
160    Put p = new Put(row);
161    p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
162    table.put(p);
163
164    Integer counter = 1;
165    counter = verifyCount(counter);
166
167    Delete d = new Delete(row);
168    d.addColumn(fam1, fam1);
169    table.delete(d);
170    counter = verifyCount(counter);
171
172    Put p2 = new Put(row);
173    p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
174    table.batch(Lists.newArrayList(p, p2), null);
175    // this only goes to a single server, so we don't need to change the count here
176    counter = verifyCount(counter);
177
178    Append append = new Append(row);
179    append.addColumn(fam1, fam1, Bytes.toBytes("val2"));
180    table.append(append);
181    counter = verifyCount(counter);
182
183    // and check the major lookup calls as well
184    Get g = new Get(row);
185    table.get(g);
186    counter = verifyCount(counter);
187
188    ResultScanner scan = table.getScanner(fam1);
189    scan.next();
190    scan.close();
191    counter = verifyCount(counter + 1);
192
193    Get g2 = new Get(row);
194    table.get(Lists.newArrayList(g, g2));
195    // same server, so same as above for not changing count
196    counter = verifyCount(counter);
197
198    // make sure all the scanner types are covered
199    Scan scanInfo = new Scan(row);
200    // regular small
201    scanInfo.setSmall(true);
202    counter = doScan(table, scanInfo, counter);
203
204    // reversed, small
205    scanInfo.setReversed(true);
206    counter = doScan(table, scanInfo, counter);
207
208    // reversed, regular
209    scanInfo.setSmall(false);
210    counter = doScan(table, scanInfo, counter + 1);
211
212    // make sure we have no priority count
213    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
214    // lets set a custom priority on a get
215    Get get = new Get(row);
216    get.setPriority(HConstants.ADMIN_QOS);
217    table.get(get);
218    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
219
220    table.close();
221    connection.close();
222  }
223
224  int doScan(Table table, Scan scan, int expectedCount) throws IOException {
225    ResultScanner results = table.getScanner(scan);
226    results.next();
227    results.close();
228    return verifyCount(expectedCount);
229  }
230
231  int verifyCount(Integer counter) {
232    assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
233    assertEquals(0, CountingRpcController.INT_PRIORITY.get());
234    return CountingRpcController.TABLE_PRIORITY.get() + 1;
235  }
236
237  void verifyPriorityGroupCount(int priorityLevel, int count) {
238    assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
239  }
240
241  @Test
242  public void testFallbackToDefaultRpcControllerFactory() {
243    Configuration conf = new Configuration(UTIL.getConfiguration());
244    conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
245
246    // Should not fail
247    RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
248    assertNotNull(factory);
249    assertEquals(factory.getClass(), RpcControllerFactory.class);
250  }
251}