001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.UUID;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.AuthUtil;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ExtendedCellScannable;
033import org.apache.hadoop.hbase.ExtendedCellScanner;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.RegionObserver;
043import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
044import org.apache.hadoop.hbase.ipc.HBaseRpcController;
045import org.apache.hadoop.hbase.ipc.RpcCall;
046import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
047import org.apache.hadoop.hbase.ipc.RpcServer;
048import org.apache.hadoop.hbase.regionserver.InternalScanner;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.junit.AfterClass;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059@Category({ ClientTests.class, MediumTests.class })
060public class TestRequestAttributes {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestRequestAttributes.class);
065
066  private static final byte[] ROW_KEY1 = Bytes.toBytes("1");
067  private static final byte[] ROW_KEY2A = Bytes.toBytes("2A");
068  private static final byte[] ROW_KEY2B = Bytes.toBytes("2B");
069  private static final byte[] ROW_KEY3 = Bytes.toBytes("3");
070  private static final byte[] ROW_KEY4 = Bytes.toBytes("4");
071  private static final byte[] ROW_KEY5 = Bytes.toBytes("5");
072  private static final byte[] ROW_KEY6 = Bytes.toBytes("6");
073  private static final byte[] ROW_KEY7 = Bytes.toBytes("7");
074  private static final byte[] ROW_KEY8 = Bytes.toBytes("8");
075  private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
076  private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes();
077  private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES =
078    new HashMap<>();
079  static {
080    CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
081    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes());
082    Map<String, byte[]> requestAttributes2 = addRandomRequestAttributes();
083    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2);
084    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2);
085    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes());
086    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes());
087    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes());
088    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes());
089    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes());
090    ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>());
091  }
092  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
093  private static final byte[] FAMILY = Bytes.toBytes("0");
094  private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes");
095
096  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
097  private static SingleProcessHBaseCluster cluster;
098
099  @BeforeClass
100  public static void setUp() throws Exception {
101    cluster = TEST_UTIL.startMiniCluster(1);
102    Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
103      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
104    table.close();
105  }
106
107  @AfterClass
108  public static void afterClass() throws Exception {
109    cluster.close();
110    TEST_UTIL.shutdownMiniCluster();
111  }
112
113  @Test
114  public void testRequestAttributesGet() throws IOException {
115    Configuration conf = TEST_UTIL.getConfiguration();
116    try (
117      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
118        CONNECTION_ATTRIBUTES);
119      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
120        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) {
121
122      table.get(new Get(ROW_KEY1));
123    }
124  }
125
126  @Test
127  public void testRequestAttributesMultiGet() throws IOException {
128    Configuration conf = TEST_UTIL.getConfiguration();
129    try (
130      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
131        CONNECTION_ATTRIBUTES);
132      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
133        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) {
134      List<Get> gets = List.of(new Get(ROW_KEY2A), new Get(ROW_KEY2B));
135      table.get(gets);
136    }
137  }
138
139  @Test
140  public void testRequestAttributesScan() throws IOException {
141    Configuration conf = TEST_UTIL.getConfiguration();
142    try (
143      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
144        CONNECTION_ATTRIBUTES);
145      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
146        REQUEST_ATTRIBUTES_SCAN).build()) {
147      ResultScanner scanner = table.getScanner(new Scan());
148      scanner.next();
149    }
150  }
151
152  @Test
153  public void testRequestAttributesPut() throws IOException {
154    Configuration conf = TEST_UTIL.getConfiguration();
155    try (
156      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
157        CONNECTION_ATTRIBUTES);
158      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
159        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) {
160      Put put = new Put(ROW_KEY3);
161      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
162      table.put(put);
163    }
164  }
165
166  @Test
167  public void testRequestAttributesMultiPut() throws IOException {
168    Configuration conf = TEST_UTIL.getConfiguration();
169    try (
170      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
171        CONNECTION_ATTRIBUTES);
172      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
173        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) {
174      Put put1 = new Put(ROW_KEY4);
175      put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1"));
176      Put put2 = new Put(ROW_KEY4);
177      put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2"));
178      table.put(List.of(put1, put2));
179    }
180  }
181
182  @Test
183  public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException {
184    Configuration conf = TEST_UTIL.getConfiguration();
185    try (
186      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
187        CONNECTION_ATTRIBUTES);
188      BufferedMutator bufferedMutator =
189        conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME),
190          ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) {
191      Put put = new Put(ROW_KEY5);
192      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
193      bufferedMutator.mutate(put);
194      bufferedMutator.flush();
195    }
196  }
197
198  @Test
199  public void testRequestAttributesExists() throws IOException {
200    Configuration conf = TEST_UTIL.getConfiguration();
201    try (
202      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
203        CONNECTION_ATTRIBUTES);
204      Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
205        ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) {
206
207      table.exists(new Get(ROW_KEY6));
208    }
209  }
210
211  @Test
212  public void testRequestAttributesFromRpcController() throws IOException, InterruptedException {
213    Configuration conf = TEST_UTIL.getConfiguration();
214    conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
215      RequestMetadataControllerFactory.class, RpcControllerFactory.class);
216    try (
217      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
218        CONNECTION_ATTRIBUTES);
219      BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) {
220      Put put = new Put(ROW_KEY7);
221      put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
222      bufferedMutator.mutate(put);
223      bufferedMutator.flush();
224    }
225    conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY);
226  }
227
228  @Test
229  public void testNoRequestAttributes() throws IOException {
230    Configuration conf = TEST_UTIL.getConfiguration();
231    try (Connection conn = ConnectionFactory.createConnection(conf, null,
232      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
233      TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null);
234      try (Table table = tableBuilder.build()) {
235        table.get(new Get(ROW_KEY8));
236      }
237    }
238  }
239
240  private static Map<String, byte[]> addRandomRequestAttributes() {
241    Map<String, byte[]> requestAttributes = new HashMap<>();
242    int j = Math.max(2, (int) (10 * Math.random()));
243    for (int i = 0; i < j; i++) {
244      requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString()));
245    }
246    return requestAttributes;
247  }
248
249  private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder,
250    Map<String, byte[]> requestAttributes) {
251    requestAttributes.forEach(tableBuilder::setRequestAttribute);
252    return tableBuilder;
253  }
254
255  private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params,
256    Map<String, byte[]> requestAttributes) {
257    requestAttributes.forEach(params::setRequestAttribute);
258    return params;
259  }
260
261  public static class RequestMetadataControllerFactory extends RpcControllerFactory {
262
263    public RequestMetadataControllerFactory(Configuration conf) {
264      super(conf);
265    }
266
267    @Override
268    public HBaseRpcController newController() {
269      return new RequestMetadataController(super.newController());
270    }
271
272    @Override
273    public HBaseRpcController newController(ExtendedCellScanner cellScanner) {
274      return new RequestMetadataController(super.newController(null, cellScanner));
275    }
276
277    @Override
278    public HBaseRpcController newController(RegionInfo regionInfo,
279      ExtendedCellScanner cellScanner) {
280      return new RequestMetadataController(super.newController(regionInfo, cellScanner));
281    }
282
283    @Override
284    public HBaseRpcController newController(final List<ExtendedCellScannable> cellIterables) {
285      return new RequestMetadataController(super.newController(null, cellIterables));
286    }
287
288    @Override
289    public HBaseRpcController newController(RegionInfo regionInfo,
290      final List<ExtendedCellScannable> cellIterables) {
291      return new RequestMetadataController(super.newController(regionInfo, cellIterables));
292    }
293
294    public static class RequestMetadataController extends DelegatingHBaseRpcController {
295      private final Map<String, byte[]> requestAttributes;
296
297      RequestMetadataController(HBaseRpcController delegate) {
298        super(delegate);
299        this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7);
300      }
301
302      @Override
303      public Map<String, byte[]> getRequestAttributes() {
304        return requestAttributes;
305      }
306    }
307  }
308
309  public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
310
311    @Override
312    public Optional<RegionObserver> getRegionObserver() {
313      return Optional.of(this);
314    }
315
316    @Override
317    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
318      List<Cell> result) throws IOException {
319      if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) {
320        throw new IOException("Incorrect request attributes");
321      }
322    }
323
324    @Override
325    public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c,
326      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
327      if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) {
328        throw new IOException("Incorrect request attributes");
329      }
330      return hasNext;
331    }
332
333    @Override
334    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
335      WALEdit edit) throws IOException {
336      if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) {
337        throw new IOException("Incorrect request attributes");
338      }
339    }
340
341    private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) {
342      for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) {
343        if (Arrays.equals(byteArray, rowKey)) {
344          return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray);
345        }
346      }
347      return null;
348    }
349
350    private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) {
351      RpcCall rpcCall = RpcServer.getCurrentCall().get();
352      Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
353      if (attrs.size() != requestAttributes.size()) {
354        return false;
355      }
356      for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
357        if (!requestAttributes.containsKey(attr.getKey())) {
358          return false;
359        }
360        if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) {
361          return false;
362        }
363      }
364      return true;
365    }
366  }
367}