001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.util.ArrayList;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.junit.jupiter.api.TestTemplate;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029public class FromClientSideTestPutThenGetWithMultipleThreads extends FromClientSideTestBase {
030
031  private static final Logger LOG =
032    LoggerFactory.getLogger(FromClientSideTestPutThenGetWithMultipleThreads.class);
033
034  protected FromClientSideTestPutThenGetWithMultipleThreads(
035    Class<? extends ConnectionRegistry> registryImpl, int numHedgedReqs) {
036    super(registryImpl, numHedgedReqs);
037  }
038
039  @TestTemplate
040  public void testPutThenGetWithMultipleThreads() throws Exception {
041    final int numThreads = 20;
042    final int numRounds = 10;
043    for (int round = 0; round < numRounds; round++) {
044      ArrayList<Thread> threads = new ArrayList<>(numThreads);
045      final AtomicInteger successCnt = new AtomicInteger(0);
046      TEST_UTIL.createTable(tableName, FAMILY);
047      TEST_UTIL.waitTableAvailable(tableName, 10_000);
048      try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
049        for (int i = 0; i < numThreads; i++) {
050          final int index = i;
051          Thread t = new Thread(new Runnable() {
052
053            @Override
054            public void run() {
055              final byte[] row = Bytes.toBytes("row-" + index);
056              final byte[] value = Bytes.toBytes("v" + index);
057              try {
058                Put put = new Put(row);
059                put.addColumn(FAMILY, QUALIFIER, value);
060                ht.put(put);
061                Get get = new Get(row);
062                Result result = ht.get(get);
063                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
064                if (Bytes.equals(value, returnedValue)) {
065                  successCnt.getAndIncrement();
066                } else {
067                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
068                    + ", returned value: "
069                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
070                }
071              } catch (Exception e) {
072                throw new RuntimeException(e);
073              }
074            }
075          });
076          threads.add(t);
077        }
078        for (Thread t : threads) {
079          t.start();
080        }
081        for (Thread t : threads) {
082          t.join();
083        }
084        assertEquals(numThreads, successCnt.get(), "Not equal in round " + round);
085      } finally {
086        TEST_UTIL.deleteTable(tableName);
087      }
088    }
089  }
090}