001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.UUID; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.AuthUtil; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ExtendedCellScannable; 033import org.apache.hadoop.hbase.ExtendedCellScanner; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.coprocessor.ObserverContext; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 041import org.apache.hadoop.hbase.coprocessor.RegionObserver; 042import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; 043import org.apache.hadoop.hbase.ipc.HBaseRpcController; 044import org.apache.hadoop.hbase.ipc.RpcCall; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hadoop.hbase.ipc.RpcServer; 047import org.apache.hadoop.hbase.regionserver.InternalScanner; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.wal.WALEdit; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056 057@Tag(ClientTests.TAG) 058@Tag(MediumTests.TAG) 059public class TestRequestAttributes { 060 061 private static final byte[] ROW_KEY1 = Bytes.toBytes("1"); 062 private static final byte[] ROW_KEY2A = Bytes.toBytes("2A"); 063 private static final byte[] ROW_KEY2B = Bytes.toBytes("2B"); 064 private static final byte[] ROW_KEY3 = Bytes.toBytes("3"); 065 private static final byte[] ROW_KEY4 = Bytes.toBytes("4"); 066 private static final byte[] ROW_KEY5 = Bytes.toBytes("5"); 067 private static final byte[] ROW_KEY6 = Bytes.toBytes("6"); 068 private static final byte[] ROW_KEY7 = Bytes.toBytes("7"); 069 private static final byte[] ROW_KEY8 = Bytes.toBytes("8"); 070 private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>(); 071 private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes(); 072 private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES = 073 new HashMap<>(); 074 static { 075 CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); 076 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes()); 077 Map<String, byte[]> requestAttributes2 = addRandomRequestAttributes(); 078 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2); 079 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2); 080 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes()); 081 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes()); 082 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes()); 083 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes()); 084 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes()); 085 ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>()); 086 } 087 private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); 088 private static final byte[] FAMILY = Bytes.toBytes("0"); 089 private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes"); 090 091 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 092 private static SingleProcessHBaseCluster cluster; 093 094 @BeforeAll 095 public static void setUp() throws Exception { 096 cluster = TEST_UTIL.startMiniCluster(1); 097 Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, 098 HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); 099 table.close(); 100 } 101 102 @AfterAll 103 public static void afterClass() throws Exception { 104 cluster.close(); 105 TEST_UTIL.shutdownMiniCluster(); 106 } 107 108 @Test 109 public void testRequestAttributesGet() throws IOException { 110 Configuration conf = TEST_UTIL.getConfiguration(); 111 try ( 112 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 113 CONNECTION_ATTRIBUTES); 114 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 115 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) { 116 117 table.get(new Get(ROW_KEY1)); 118 } 119 } 120 121 @Test 122 public void testRequestAttributesMultiGet() throws IOException { 123 Configuration conf = TEST_UTIL.getConfiguration(); 124 try ( 125 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 126 CONNECTION_ATTRIBUTES); 127 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 128 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) { 129 List<Get> gets = List.of(new Get(ROW_KEY2A), new Get(ROW_KEY2B)); 130 table.get(gets); 131 } 132 } 133 134 @Test 135 public void testRequestAttributesScan() throws IOException { 136 Configuration conf = TEST_UTIL.getConfiguration(); 137 try ( 138 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 139 CONNECTION_ATTRIBUTES); 140 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 141 REQUEST_ATTRIBUTES_SCAN).build()) { 142 ResultScanner scanner = table.getScanner(new Scan()); 143 scanner.next(); 144 } 145 } 146 147 @Test 148 public void testRequestAttributesPut() throws IOException { 149 Configuration conf = TEST_UTIL.getConfiguration(); 150 try ( 151 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 152 CONNECTION_ATTRIBUTES); 153 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 154 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) { 155 Put put = new Put(ROW_KEY3); 156 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 157 table.put(put); 158 } 159 } 160 161 @Test 162 public void testRequestAttributesMultiPut() throws IOException { 163 Configuration conf = TEST_UTIL.getConfiguration(); 164 try ( 165 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 166 CONNECTION_ATTRIBUTES); 167 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 168 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) { 169 Put put1 = new Put(ROW_KEY4); 170 put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1")); 171 Put put2 = new Put(ROW_KEY4); 172 put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2")); 173 table.put(List.of(put1, put2)); 174 } 175 } 176 177 @Test 178 public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException { 179 Configuration conf = TEST_UTIL.getConfiguration(); 180 try ( 181 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 182 CONNECTION_ATTRIBUTES); 183 BufferedMutator bufferedMutator = 184 conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME), 185 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) { 186 Put put = new Put(ROW_KEY5); 187 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 188 bufferedMutator.mutate(put); 189 bufferedMutator.flush(); 190 } 191 } 192 193 @Test 194 public void testRequestAttributesExists() throws IOException { 195 Configuration conf = TEST_UTIL.getConfiguration(); 196 try ( 197 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 198 CONNECTION_ATTRIBUTES); 199 Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), 200 ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) { 201 202 table.exists(new Get(ROW_KEY6)); 203 } 204 } 205 206 @Test 207 public void testRequestAttributesFromRpcController() throws IOException, InterruptedException { 208 Configuration conf = TEST_UTIL.getConfiguration(); 209 conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 210 RequestMetadataControllerFactory.class, RpcControllerFactory.class); 211 try ( 212 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 213 CONNECTION_ATTRIBUTES); 214 BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) { 215 Put put = new Put(ROW_KEY7); 216 put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); 217 bufferedMutator.mutate(put); 218 bufferedMutator.flush(); 219 } 220 conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY); 221 } 222 223 @Test 224 public void testNoRequestAttributes() throws IOException { 225 Configuration conf = TEST_UTIL.getConfiguration(); 226 try (Connection conn = ConnectionFactory.createConnection(conf, null, 227 AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { 228 TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null); 229 try (Table table = tableBuilder.build()) { 230 table.get(new Get(ROW_KEY8)); 231 } 232 } 233 } 234 235 private static Map<String, byte[]> addRandomRequestAttributes() { 236 Map<String, byte[]> requestAttributes = new HashMap<>(); 237 int j = Math.max(2, (int) (10 * Math.random())); 238 for (int i = 0; i < j; i++) { 239 requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); 240 } 241 return requestAttributes; 242 } 243 244 private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder, 245 Map<String, byte[]> requestAttributes) { 246 requestAttributes.forEach(tableBuilder::setRequestAttribute); 247 return tableBuilder; 248 } 249 250 private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params, 251 Map<String, byte[]> requestAttributes) { 252 requestAttributes.forEach(params::setRequestAttribute); 253 return params; 254 } 255 256 public static class RequestMetadataControllerFactory extends RpcControllerFactory { 257 258 public RequestMetadataControllerFactory(Configuration conf) { 259 super(conf); 260 } 261 262 @Override 263 public HBaseRpcController newController() { 264 return new RequestMetadataController(super.newController()); 265 } 266 267 @Override 268 public HBaseRpcController newController(ExtendedCellScanner cellScanner) { 269 return new RequestMetadataController(super.newController(null, cellScanner)); 270 } 271 272 @Override 273 public HBaseRpcController newController(RegionInfo regionInfo, 274 ExtendedCellScanner cellScanner) { 275 return new RequestMetadataController(super.newController(regionInfo, cellScanner)); 276 } 277 278 @Override 279 public HBaseRpcController newController(final List<ExtendedCellScannable> cellIterables) { 280 return new RequestMetadataController(super.newController(null, cellIterables)); 281 } 282 283 @Override 284 public HBaseRpcController newController(RegionInfo regionInfo, 285 final List<ExtendedCellScannable> cellIterables) { 286 return new RequestMetadataController(super.newController(regionInfo, cellIterables)); 287 } 288 289 public static class RequestMetadataController extends DelegatingHBaseRpcController { 290 private final Map<String, byte[]> requestAttributes; 291 292 RequestMetadataController(HBaseRpcController delegate) { 293 super(delegate); 294 this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7); 295 } 296 297 @Override 298 public Map<String, byte[]> getRequestAttributes() { 299 return requestAttributes; 300 } 301 } 302 } 303 304 public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { 305 306 @Override 307 public Optional<RegionObserver> getRegionObserver() { 308 return Optional.of(this); 309 } 310 311 @Override 312 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 313 List<Cell> result) throws IOException { 314 if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) { 315 throw new IOException("Incorrect request attributes"); 316 } 317 } 318 319 @Override 320 public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 321 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 322 if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) { 323 throw new IOException("Incorrect request attributes"); 324 } 325 return hasNext; 326 } 327 328 @Override 329 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 330 WALEdit edit) throws IOException { 331 if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) { 332 throw new IOException("Incorrect request attributes"); 333 } 334 } 335 336 private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) { 337 for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) { 338 if (Arrays.equals(byteArray, rowKey)) { 339 return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray); 340 } 341 } 342 return null; 343 } 344 345 private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) { 346 RpcCall rpcCall = RpcServer.getCurrentCall().get(); 347 Map<String, byte[]> attrs = rpcCall.getRequestAttributes(); 348 if (attrs.size() != requestAttributes.size()) { 349 return false; 350 } 351 for (Map.Entry<String, byte[]> attr : attrs.entrySet()) { 352 if (!requestAttributes.containsKey(attr.getKey())) { 353 return false; 354 } 355 if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) { 356 return false; 357 } 358 } 359 return true; 360 } 361 } 362}