001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.UUID;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.AuthUtil;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ExtendedCellScannable;
033import org.apache.hadoop.hbase.ExtendedCellScanner;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.coprocessor.ObserverContext;
039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
041import org.apache.hadoop.hbase.coprocessor.RegionObserver;
042import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
043import org.apache.hadoop.hbase.ipc.HBaseRpcController;
044import org.apache.hadoop.hbase.ipc.RpcCall;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.ipc.RpcServer;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.wal.WALEdit;
052import org.junit.jupiter.api.AfterAll;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056
057@Tag(ClientTests.TAG)
058@Tag(MediumTests.TAG)
059public class TestRequestAttributes {
060
061  private static final byte[] ROW_KEY1 = Bytes.toBytes("1");
062  private static final byte[] ROW_KEY2A = Bytes.toBytes("2A");
063  private static final byte[] ROW_KEY2B = Bytes.toBytes("2B");
064  private static final byte[] ROW_KEY3 = Bytes.toBytes("3");
065  private static final byte[] ROW_KEY4 = Bytes.toBytes("4");
066  private static final byte[] ROW_KEY5 = Bytes.toBytes("5");
067  private static final byte[] ROW_KEY6 = Bytes.toBytes("6");
068  private static final byte[] ROW_KEY7 = Bytes.toBytes("7");
069  private static final byte[] ROW_KEY8 = Bytes.toBytes("8");
070  private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
071  private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes();
072  private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES =
073    new HashMap<>();
074  static {
075    CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
076    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes());
077    Map<String, byte[]> requestAttributes2 = addRandomRequestAttributes();
078    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2);
079    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2);
080    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes());
081    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes());
082    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes());
083    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes());
084    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes());
085    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>());
086  }
087  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
088  private static final byte[] FAMILY = Bytes.toBytes("0");
089  private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes");
090
091  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
092  private static SingleProcessHBaseCluster cluster;
093
094  @BeforeAll
095  public static void setUp() throws Exception {
096    cluster = TEST_UTIL.startMiniCluster(1);
097    Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
098      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
099    table.close();
100  }
101
102  @AfterAll
103  public static void afterClass() throws Exception {
104    cluster.close();
105    TEST_UTIL.shutdownMiniCluster();
106  }
107
108  @Test
109  public void testRequestAttributesGet() throws IOException {
110    Configuration conf = TEST_UTIL.getConfiguration();
111    try (
112      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
113        CONNECTION_ATTRIBUTES);
114      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
115        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) {
116
117      table.get(new Get(ROW_KEY1));
118    }
119  }
120
121  @Test
122  public void testRequestAttributesMultiGet() throws IOException {
123    Configuration conf = TEST_UTIL.getConfiguration();
124    try (
125      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
126        CONNECTION_ATTRIBUTES);
127      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
128        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) {
129      List<Get> gets = List.of(new Get(ROW_KEY2A), new Get(ROW_KEY2B));
130      table.get(gets);
131    }
132  }
133
134  @Test
135  public void testRequestAttributesScan() throws IOException {
136    Configuration conf = TEST_UTIL.getConfiguration();
137    try (
138      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
139        CONNECTION_ATTRIBUTES);
140      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
141        REQUEST_ATTRIBUTES_SCAN).build()) {
142      ResultScanner scanner = table.getScanner(new Scan());
143      scanner.next();
144    }
145  }
146
147  @Test
148  public void testRequestAttributesPut() throws IOException {
149    Configuration conf = TEST_UTIL.getConfiguration();
150    try (
151      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
152        CONNECTION_ATTRIBUTES);
153      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
154        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) {
155      Put put = new Put(ROW_KEY3);
156      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
157      table.put(put);
158    }
159  }
160
161  @Test
162  public void testRequestAttributesMultiPut() throws IOException {
163    Configuration conf = TEST_UTIL.getConfiguration();
164    try (
165      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
166        CONNECTION_ATTRIBUTES);
167      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
168        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) {
169      Put put1 = new Put(ROW_KEY4);
170      put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1"));
171      Put put2 = new Put(ROW_KEY4);
172      put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2"));
173      table.put(List.of(put1, put2));
174    }
175  }
176
177  @Test
178  public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException {
179    Configuration conf = TEST_UTIL.getConfiguration();
180    try (
181      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
182        CONNECTION_ATTRIBUTES);
183      BufferedMutator bufferedMutator =
184        conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME),
185          ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) {
186      Put put = new Put(ROW_KEY5);
187      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
188      bufferedMutator.mutate(put);
189      bufferedMutator.flush();
190    }
191  }
192
193  @Test
194  public void testRequestAttributesExists() throws IOException {
195    Configuration conf = TEST_UTIL.getConfiguration();
196    try (
197      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
198        CONNECTION_ATTRIBUTES);
199      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
200        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) {
201
202      table.exists(new Get(ROW_KEY6));
203    }
204  }
205
206  @Test
207  public void testRequestAttributesFromRpcController() throws IOException, InterruptedException {
208    Configuration conf = TEST_UTIL.getConfiguration();
209    conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
210      RequestMetadataControllerFactory.class, RpcControllerFactory.class);
211    try (
212      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
213        CONNECTION_ATTRIBUTES);
214      BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) {
215      Put put = new Put(ROW_KEY7);
216      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
217      bufferedMutator.mutate(put);
218      bufferedMutator.flush();
219    }
220    conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY);
221  }
222
223  @Test
224  public void testNoRequestAttributes() throws IOException {
225    Configuration conf = TEST_UTIL.getConfiguration();
226    try (Connection conn = ConnectionFactory.createConnection(conf, null,
227      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
228      TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null);
229      try (Table table = tableBuilder.build()) {
230        table.get(new Get(ROW_KEY8));
231      }
232    }
233  }
234
235  private static Map<String, byte[]> addRandomRequestAttributes() {
236    Map<String, byte[]> requestAttributes = new HashMap<>();
237    int j = Math.max(2, (int) (10 * Math.random()));
238    for (int i = 0; i < j; i++) {
239      requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString()));
240    }
241    return requestAttributes;
242  }
243
244  private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder,
245    Map<String, byte[]> requestAttributes) {
246    requestAttributes.forEach(tableBuilder::setRequestAttribute);
247    return tableBuilder;
248  }
249
250  private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params,
251    Map<String, byte[]> requestAttributes) {
252    requestAttributes.forEach(params::setRequestAttribute);
253    return params;
254  }
255
256  public static class RequestMetadataControllerFactory extends RpcControllerFactory {
257
258    public RequestMetadataControllerFactory(Configuration conf) {
259      super(conf);
260    }
261
262    @Override
263    public HBaseRpcController newController() {
264      return new RequestMetadataController(super.newController());
265    }
266
267    @Override
268    public HBaseRpcController newController(ExtendedCellScanner cellScanner) {
269      return new RequestMetadataController(super.newController(null, cellScanner));
270    }
271
272    @Override
273    public HBaseRpcController newController(RegionInfo regionInfo,
274      ExtendedCellScanner cellScanner) {
275      return new RequestMetadataController(super.newController(regionInfo, cellScanner));
276    }
277
278    @Override
279    public HBaseRpcController newController(final List<ExtendedCellScannable> cellIterables) {
280      return new RequestMetadataController(super.newController(null, cellIterables));
281    }
282
283    @Override
284    public HBaseRpcController newController(RegionInfo regionInfo,
285      final List<ExtendedCellScannable> cellIterables) {
286      return new RequestMetadataController(super.newController(regionInfo, cellIterables));
287    }
288
289    public static class RequestMetadataController extends DelegatingHBaseRpcController {
290      private final Map<String, byte[]> requestAttributes;
291
292      RequestMetadataController(HBaseRpcController delegate) {
293        super(delegate);
294        this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7);
295      }
296
297      @Override
298      public Map<String, byte[]> getRequestAttributes() {
299        return requestAttributes;
300      }
301    }
302  }
303
304  public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
305
306    @Override
307    public Optional<RegionObserver> getRegionObserver() {
308      return Optional.of(this);
309    }
310
311    @Override
312    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
313      List<Cell> result) throws IOException {
314      if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) {
315        throw new IOException("Incorrect request attributes");
316      }
317    }
318
319    @Override
320    public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c,
321      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
322      if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) {
323        throw new IOException("Incorrect request attributes");
324      }
325      return hasNext;
326    }
327
328    @Override
329    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
330      WALEdit edit) throws IOException {
331      if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) {
332        throw new IOException("Incorrect request attributes");
333      }
334    }
335
336    private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) {
337      for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) {
338        if (Arrays.equals(byteArray, rowKey)) {
339          return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray);
340        }
341      }
342      return null;
343    }
344
345    private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) {
346      RpcCall rpcCall = RpcServer.getCurrentCall().get();
347      Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
348      if (attrs.size() != requestAttributes.size()) {
349        return false;
350      }
351      for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
352        if (!requestAttributes.containsKey(attr.getKey())) {
353          return false;
354        }
355        if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) {
356          return false;
357        }
358      }
359      return true;
360    }
361  }
362}