001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.CellScannable; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 036import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; 037import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.AfterClass; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.ConcurrentHashMultiset; 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053import org.apache.hbase.thirdparty.com.google.common.collect.Multiset; 054 055@Category({MediumTests.class, ClientTests.class}) 056public class TestRpcControllerFactory { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRpcControllerFactory.class); 061 062 public static class StaticRpcControllerFactory extends RpcControllerFactory { 063 064 public StaticRpcControllerFactory(Configuration conf) { 065 super(conf); 066 } 067 068 @Override 069 public HBaseRpcController newController() { 070 return new CountingRpcController(super.newController()); 071 } 072 073 @Override 074 public HBaseRpcController newController(final CellScanner cellScanner) { 075 return new CountingRpcController(super.newController(cellScanner)); 076 } 077 078 @Override 079 public HBaseRpcController newController(final List<CellScannable> cellIterables) { 080 return new CountingRpcController(super.newController(cellIterables)); 081 } 082 } 083 084 public static class CountingRpcController extends DelegatingHBaseRpcController { 085 086 private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create(); 087 private static AtomicInteger INT_PRIORITY = new AtomicInteger(); 088 private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); 089 090 public CountingRpcController(HBaseRpcController delegate) { 091 super(delegate); 092 } 093 094 @Override 095 public void setPriority(int priority) { 096 int oldPriority = getPriority(); 097 super.setPriority(priority); 098 int newPriority = getPriority(); 099 if (newPriority != oldPriority) { 100 INT_PRIORITY.incrementAndGet(); 101 GROUPED_PRIORITY.add(priority); 102 } 103 } 104 105 @Override 106 public void setPriority(TableName tn) { 107 super.setPriority(tn); 108 // ignore counts for system tables - it could change and we really only want to check on what 109 // the client should change 110 if (tn != null && !tn.isSystemTable()) { 111 TABLE_PRIORITY.incrementAndGet(); 112 } 113 114 } 115 } 116 117 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 118 119 @Rule 120 public TestName name = new TestName(); 121 122 @BeforeClass 123 public static void setup() throws Exception { 124 // load an endpoint so we have an endpoint to test - it doesn't matter which one, but 125 // this is already in tests, so we can just use it. 126 Configuration conf = UTIL.getConfiguration(); 127 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 128 ProtobufCoprocessorService.class.getName()); 129 130 UTIL.startMiniCluster(); 131 } 132 133 @AfterClass 134 public static void teardown() throws Exception { 135 UTIL.shutdownMiniCluster(); 136 } 137 138 /** 139 * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to 140 * cover all methods here and really is a bit brittle since we can always add new methods but 141 * won't be sure to add them here. So we just can cover the major ones. 142 * @throws Exception on failure 143 */ 144 @Test 145 public void testCountController() throws Exception { 146 Configuration conf = new Configuration(UTIL.getConfiguration()); 147 // setup our custom controller 148 conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 149 StaticRpcControllerFactory.class.getName()); 150 151 final TableName tableName = TableName.valueOf(name.getMethodName()); 152 UTIL.createTable(tableName, fam1).close(); 153 154 // change one of the connection properties so we get a new Connection with our configuration 155 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1); 156 157 Connection connection = ConnectionFactory.createConnection(conf); 158 Table table = connection.getTable(tableName); 159 byte[] row = Bytes.toBytes("row"); 160 Put p = new Put(row); 161 p.addColumn(fam1, fam1, Bytes.toBytes("val0")); 162 table.put(p); 163 164 Integer counter = 1; 165 counter = verifyCount(counter); 166 167 Delete d = new Delete(row); 168 d.addColumn(fam1, fam1); 169 table.delete(d); 170 counter = verifyCount(counter); 171 172 Put p2 = new Put(row); 173 p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1")); 174 table.batch(Lists.newArrayList(p, p2), null); 175 // this only goes to a single server, so we don't need to change the count here 176 counter = verifyCount(counter); 177 178 Append append = new Append(row); 179 append.addColumn(fam1, fam1, Bytes.toBytes("val2")); 180 table.append(append); 181 counter = verifyCount(counter); 182 183 // and check the major lookup calls as well 184 Get g = new Get(row); 185 table.get(g); 186 counter = verifyCount(counter); 187 188 ResultScanner scan = table.getScanner(fam1); 189 scan.next(); 190 scan.close(); 191 counter = verifyCount(counter + 1); 192 193 Get g2 = new Get(row); 194 table.get(Lists.newArrayList(g, g2)); 195 // same server, so same as above for not changing count 196 counter = verifyCount(counter); 197 198 // make sure all the scanner types are covered 199 Scan scanInfo = new Scan(row); 200 // regular small 201 scanInfo.setSmall(true); 202 counter = doScan(table, scanInfo, counter); 203 204 // reversed, small 205 scanInfo.setReversed(true); 206 counter = doScan(table, scanInfo, counter); 207 208 // reversed, regular 209 scanInfo.setSmall(false); 210 counter = doScan(table, scanInfo, counter + 1); 211 212 // make sure we have no priority count 213 verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0); 214 // lets set a custom priority on a get 215 Get get = new Get(row); 216 get.setPriority(HConstants.ADMIN_QOS); 217 table.get(get); 218 verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1); 219 220 table.close(); 221 connection.close(); 222 } 223 224 int doScan(Table table, Scan scan, int expectedCount) throws IOException { 225 ResultScanner results = table.getScanner(scan); 226 results.next(); 227 results.close(); 228 return verifyCount(expectedCount); 229 } 230 231 int verifyCount(Integer counter) { 232 assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter); 233 assertEquals(0, CountingRpcController.INT_PRIORITY.get()); 234 return CountingRpcController.TABLE_PRIORITY.get() + 1; 235 } 236 237 void verifyPriorityGroupCount(int priorityLevel, int count) { 238 assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel)); 239 } 240 241 @Test 242 public void testFallbackToDefaultRpcControllerFactory() { 243 Configuration conf = new Configuration(UTIL.getConfiguration()); 244 conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz"); 245 246 // Should not fail 247 RpcControllerFactory factory = RpcControllerFactory.instantiate(conf); 248 assertNotNull(factory); 249 assertEquals(factory.getClass(), RpcControllerFactory.class); 250 } 251}