001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.UUID; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.AuthUtil; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ExtendedCellScannable; 033import org.apache.hadoop.hbase.ExtendedCellScanner; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.coprocessor.ObserverContext; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 042import org.apache.hadoop.hbase.coprocessor.RegionObserver; 043import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; 044import org.apache.hadoop.hbase.ipc.HBaseRpcController; 045import org.apache.hadoop.hbase.ipc.RpcCall; 046import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.InternalScanner; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058 059@Category({ ClientTests.class, MediumTests.class }) 060public class TestRequestAttributes { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestRequestAttributes.class); 065 066 private static final byte[] ROW_KEY1 = Bytes.toBytes("1"); 067 private static final byte[] ROW_KEY2A = Bytes.toBytes("2A"); 068 private static final byte[] ROW_KEY2B = Bytes.toBytes("2B"); 069 private static final byte[] ROW_KEY3 = Bytes.toBytes("3"); 070 private static final byte[] ROW_KEY4 = Bytes.toBytes("4"); 071 private static final byte[] ROW_KEY5 = Bytes.toBytes("5"); 072 private static final byte[] ROW_KEY6 = Bytes.toBytes("6"); 073 private static final byte[] ROW_KEY7 = Bytes.toBytes("7"); 074 private static final byte[] ROW_KEY8 = Bytes.toBytes("8"); 075 private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>(); 076 private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes(); 077 private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES = 078 new HashMap<>(); 079 static { 080 CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); 081 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes()); 082 Map<String, byte[]> requestAttributes2 = addRandomRequestAttributes(); 083 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2); 084 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2); 085 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes()); 086 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes()); 087 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes()); 088 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes()); 089 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes()); 090 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>()); 091 } 092 private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); 093 private static final byte[] FAMILY = Bytes.toBytes("0"); 094 private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes"); 095 096 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 097 private static SingleProcessHBaseCluster cluster; 098 099 @BeforeClass 100 public static void setUp() throws Exception { 101 cluster = TEST_UTIL.startMiniCluster(1); 102 Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, 103 HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); 104 table.close(); 105 } 106 107 @AfterClass 108 public static void afterClass() throws Exception { 109 cluster.close(); 110 TEST_UTIL.shutdownMiniCluster(); 111 } 112 113 @Test 114 public void testRequestAttributesGet() throws IOException { 115 Configuration conf = TEST_UTIL.getConfiguration(); 116 try ( 117 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 118 CONNECTION_ATTRIBUTES); 119 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 120 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) { 121 122 table.get(new Get(ROW_KEY1)); 123 } 124 } 125 126 @Test 127 public void testRequestAttributesMultiGet() throws IOException { 128 Configuration conf = TEST_UTIL.getConfiguration(); 129 try ( 130 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 131 CONNECTION_ATTRIBUTES); 132 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 133 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) { 134 List<Get> gets = List.of(new Get(ROW_KEY2A), new Get(ROW_KEY2B)); 135 table.get(gets); 136 } 137 } 138 139 @Test 140 public void testRequestAttributesScan() throws IOException { 141 Configuration conf = TEST_UTIL.getConfiguration(); 142 try ( 143 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 144 CONNECTION_ATTRIBUTES); 145 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 146 REQUEST_ATTRIBUTES_SCAN).build()) { 147 ResultScanner scanner = table.getScanner(new Scan()); 148 scanner.next(); 149 } 150 } 151 152 @Test 153 public void testRequestAttributesPut() throws IOException { 154 Configuration conf = TEST_UTIL.getConfiguration(); 155 try ( 156 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 157 CONNECTION_ATTRIBUTES); 158 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 159 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) { 160 Put put = new Put(ROW_KEY3); 161 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 162 table.put(put); 163 } 164 } 165 166 @Test 167 public void testRequestAttributesMultiPut() throws IOException { 168 Configuration conf = TEST_UTIL.getConfiguration(); 169 try ( 170 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 171 CONNECTION_ATTRIBUTES); 172 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 173 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) { 174 Put put1 = new Put(ROW_KEY4); 175 put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1")); 176 Put put2 = new Put(ROW_KEY4); 177 put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2")); 178 table.put(List.of(put1, put2)); 179 } 180 } 181 182 @Test 183 public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException { 184 Configuration conf = TEST_UTIL.getConfiguration(); 185 try ( 186 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 187 CONNECTION_ATTRIBUTES); 188 BufferedMutator bufferedMutator = 189 conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME), 190 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) { 191 Put put = new Put(ROW_KEY5); 192 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 193 bufferedMutator.mutate(put); 194 bufferedMutator.flush(); 195 } 196 } 197 198 @Test 199 public void testRequestAttributesExists() throws IOException { 200 Configuration conf = TEST_UTIL.getConfiguration(); 201 try ( 202 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 203 CONNECTION_ATTRIBUTES); 204 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 205 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) { 206 207 table.exists(new Get(ROW_KEY6)); 208 } 209 } 210 211 @Test 212 public void testRequestAttributesFromRpcController() throws IOException, InterruptedException { 213 Configuration conf = TEST_UTIL.getConfiguration(); 214 conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 215 RequestMetadataControllerFactory.class, RpcControllerFactory.class); 216 try ( 217 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 218 CONNECTION_ATTRIBUTES); 219 BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) { 220 Put put = new Put(ROW_KEY7); 221 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 222 bufferedMutator.mutate(put); 223 bufferedMutator.flush(); 224 } 225 conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY); 226 } 227 228 @Test 229 public void testNoRequestAttributes() throws IOException { 230 Configuration conf = TEST_UTIL.getConfiguration(); 231 try (Connection conn = ConnectionFactory.createConnection(conf, null, 232 AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { 233 TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null); 234 try (Table table = tableBuilder.build()) { 235 table.get(new Get(ROW_KEY8)); 236 } 237 } 238 } 239 240 private static Map<String, byte[]> addRandomRequestAttributes() { 241 Map<String, byte[]> requestAttributes = new HashMap<>(); 242 int j = Math.max(2, (int) (10 * Math.random())); 243 for (int i = 0; i < j; i++) { 244 requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); 245 } 246 return requestAttributes; 247 } 248 249 private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder, 250 Map<String, byte[]> requestAttributes) { 251 requestAttributes.forEach(tableBuilder::setRequestAttribute); 252 return tableBuilder; 253 } 254 255 private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params, 256 Map<String, byte[]> requestAttributes) { 257 requestAttributes.forEach(params::setRequestAttribute); 258 return params; 259 } 260 261 public static class RequestMetadataControllerFactory extends RpcControllerFactory { 262 263 public RequestMetadataControllerFactory(Configuration conf) { 264 super(conf); 265 } 266 267 @Override 268 public HBaseRpcController newController() { 269 return new RequestMetadataController(super.newController()); 270 } 271 272 @Override 273 public HBaseRpcController newController(ExtendedCellScanner cellScanner) { 274 return new RequestMetadataController(super.newController(null, cellScanner)); 275 } 276 277 @Override 278 public HBaseRpcController newController(RegionInfo regionInfo, 279 ExtendedCellScanner cellScanner) { 280 return new RequestMetadataController(super.newController(regionInfo, cellScanner)); 281 } 282 283 @Override 284 public HBaseRpcController newController(final List<ExtendedCellScannable> cellIterables) { 285 return new RequestMetadataController(super.newController(null, cellIterables)); 286 } 287 288 @Override 289 public HBaseRpcController newController(RegionInfo regionInfo, 290 final List<ExtendedCellScannable> cellIterables) { 291 return new RequestMetadataController(super.newController(regionInfo, cellIterables)); 292 } 293 294 public static class RequestMetadataController extends DelegatingHBaseRpcController { 295 private final Map<String, byte[]> requestAttributes; 296 297 RequestMetadataController(HBaseRpcController delegate) { 298 super(delegate); 299 this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7); 300 } 301 302 @Override 303 public Map<String, byte[]> getRequestAttributes() { 304 return requestAttributes; 305 } 306 } 307 } 308 309 public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { 310 311 @Override 312 public Optional<RegionObserver> getRegionObserver() { 313 return Optional.of(this); 314 } 315 316 @Override 317 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 318 List<Cell> result) throws IOException { 319 if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) { 320 throw new IOException("Incorrect request attributes"); 321 } 322 } 323 324 @Override 325 public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 326 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 327 if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) { 328 throw new IOException("Incorrect request attributes"); 329 } 330 return hasNext; 331 } 332 333 @Override 334 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 335 WALEdit edit) throws IOException { 336 if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) { 337 throw new IOException("Incorrect request attributes"); 338 } 339 } 340 341 private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) { 342 for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) { 343 if (Arrays.equals(byteArray, rowKey)) { 344 return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray); 345 } 346 } 347 return null; 348 } 349 350 private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) { 351 RpcCall rpcCall = RpcServer.getCurrentCall().get(); 352 Map<String, byte[]> attrs = rpcCall.getRequestAttributes(); 353 if (attrs.size() != requestAttributes.size()) { 354 return false; 355 } 356 for (Map.Entry<String, byte[]> attr : attrs.entrySet()) { 357 if (!requestAttributes.containsKey(attr.getKey())) { 358 return false; 359 } 360 if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) { 361 return false; 362 } 363 } 364 return true; 365 } 366 } 367}