001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Random;
031import java.util.UUID;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.atomic.AtomicBoolean;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparator;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.ipc.RpcCall;
048import org.apache.hadoop.hbase.ipc.RpcServer;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
062
063@Category({ ClientTests.class, MediumTests.class })
064public class TestRequestAndConnectionAttributes {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class);
069
070  private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
071  static {
072    CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
073  }
074  private static final Map<String, byte[]> REQUEST_ATTRIBUTES = new HashMap<>();
075  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
076  private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false);
077  private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0");
078  private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE =
079    TableName.valueOf("testRequestAttributes");
080
081  private static HBaseTestingUtil TEST_UTIL = null;
082
083  @BeforeClass
084  public static void setUp() throws Exception {
085    TEST_UTIL = new HBaseTestingUtil();
086    TEST_UTIL.startMiniCluster(1);
087    TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE,
088      new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE,
089      AttributesCoprocessor.class.getName());
090  }
091
092  @AfterClass
093  public static void afterClass() throws Exception {
094    TEST_UTIL.shutdownMiniCluster();
095  }
096
097  @Before
098  public void setup() {
099    REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false);
100  }
101
102  @Test
103  public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
104    TableName tableName = TableName.valueOf("testConnectionAttributes");
105    byte[] cf = Bytes.toBytes("0");
106    TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE,
107      AttributesCoprocessor.class.getName());
108
109    Configuration conf = TEST_UTIL.getConfiguration();
110    try (Connection conn = ConnectionFactory.createConnection(conf, null,
111      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) {
112
113      // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection
114      // header
115      byte[] bytes = new byte[300];
116      new Random().nextBytes(bytes);
117      Result result = table.get(new Get(bytes));
118
119      assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
120      for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
121        byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey()));
122        assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val));
123      }
124    }
125  }
126
127  @Test
128  public void testRequestAttributesGet() throws IOException {
129    addRandomRequestAttributes();
130
131    Configuration conf = TEST_UTIL.getConfiguration();
132    try (
133      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
134        CONNECTION_ATTRIBUTES);
135      Table table = configureRequestAttributes(
136        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
137
138      table.get(new Get(Bytes.toBytes(0)));
139    }
140
141    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
142  }
143
144  @Test
145  public void testRequestAttributesMultiGet() throws IOException {
146    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
147    addRandomRequestAttributes();
148
149    Configuration conf = TEST_UTIL.getConfiguration();
150    try (
151      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
152        CONNECTION_ATTRIBUTES);
153      Table table = configureRequestAttributes(
154        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
155      List<Get> gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1)));
156      table.get(gets);
157    }
158
159    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
160  }
161
162  @Test
163  public void testRequestAttributesExists() throws IOException {
164    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
165    addRandomRequestAttributes();
166
167    Configuration conf = TEST_UTIL.getConfiguration();
168    try (
169      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
170        CONNECTION_ATTRIBUTES);
171      Table table = configureRequestAttributes(
172        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
173
174      table.exists(new Get(Bytes.toBytes(0)));
175    }
176
177    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
178  }
179
180  @Test
181  public void testRequestAttributesScan() throws IOException {
182    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
183    addRandomRequestAttributes();
184
185    Configuration conf = TEST_UTIL.getConfiguration();
186    try (
187      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
188        CONNECTION_ATTRIBUTES);
189      Table table = configureRequestAttributes(
190        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
191      ResultScanner scanner = table.getScanner(new Scan());
192      scanner.next();
193    }
194    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
195  }
196
197  @Test
198  public void testRequestAttributesPut() throws IOException {
199    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
200    addRandomRequestAttributes();
201
202    Configuration conf = TEST_UTIL.getConfiguration();
203    try (
204      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
205        CONNECTION_ATTRIBUTES);
206      Table table = configureRequestAttributes(
207        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
208      Put put = new Put(Bytes.toBytes("a"));
209      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"));
210      table.put(put);
211    }
212    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
213  }
214
215  @Test
216  public void testRequestAttributesMultiPut() throws IOException {
217    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
218    addRandomRequestAttributes();
219
220    Configuration conf = TEST_UTIL.getConfiguration();
221    try (
222      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
223        CONNECTION_ATTRIBUTES);
224      Table table = configureRequestAttributes(
225        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
226      Put put = new Put(Bytes.toBytes("a"));
227      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"));
228      table.put(put);
229    }
230    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
231  }
232
233  @Test
234  public void testNoRequestAttributes() throws IOException {
235    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
236    TableName tableName = TableName.valueOf("testNoRequestAttributesScan");
237    TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
238      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
239
240    REQUEST_ATTRIBUTES.clear();
241    Configuration conf = TEST_UTIL.getConfiguration();
242    try (Connection conn = ConnectionFactory.createConnection(conf, null,
243      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
244      TableBuilder tableBuilder = conn.getTableBuilder(tableName, null);
245      try (Table table = tableBuilder.build()) {
246        table.get(new Get(Bytes.toBytes(0)));
247        assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
248      }
249    }
250  }
251
252  private void addRandomRequestAttributes() {
253    REQUEST_ATTRIBUTES.clear();
254    int j = Math.max(2, (int) (10 * Math.random()));
255    for (int i = 0; i < j; i++) {
256      REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString()));
257    }
258  }
259
260  private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) {
261    REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute);
262    return tableBuilder;
263  }
264
265  public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
266
267    @Override
268    public Optional<RegionObserver> getRegionObserver() {
269      return Optional.of(this);
270    }
271
272    @Override
273    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
274      List<Cell> result) throws IOException {
275      validateRequestAttributes();
276
277      // for connection attrs test
278      RpcCall rpcCall = RpcServer.getCurrentCall().get();
279      for (Map.Entry<String, byte[]> attr : rpcCall.getRequestAttributes().entrySet()) {
280        result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
281          .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey()))
282          .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
283      }
284      for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
285        result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
286          .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey()))
287          .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
288      }
289      result.sort(CellComparator.getInstance());
290      c.bypass();
291    }
292
293    @Override
294    public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
295      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
296      validateRequestAttributes();
297      return hasNext;
298    }
299
300    @Override
301    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
302      throws IOException {
303      validateRequestAttributes();
304    }
305
306    private void validateRequestAttributes() {
307      RpcCall rpcCall = RpcServer.getCurrentCall().get();
308      Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
309      if (attrs.size() != REQUEST_ATTRIBUTES.size()) {
310        return;
311      }
312      for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
313        if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) {
314          return;
315        }
316        if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) {
317          return;
318        }
319      }
320      REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true);
321    }
322  }
323}