001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.RegionInfoBuilder;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.testclassification.RegionServerTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.junit.jupiter.api.TestInfo;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Testing of multiPut in parallel.
055 */
056@Tag(RegionServerTests.TAG)
057@Tag(MediumTests.TAG)
058public class TestParallelPut {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class);
061
062  private HRegion region = null;
063  private static HBaseTestingUtil HBTU = new HBaseTestingUtil();
064  private static final int THREADS100 = 100;
065  private String methodName;
066
067  // Test names
068  static byte[] tableName;
069  static final byte[] qual1 = Bytes.toBytes("qual1");
070  static final byte[] qual2 = Bytes.toBytes("qual2");
071  static final byte[] qual3 = Bytes.toBytes("qual3");
072  static final byte[] value1 = Bytes.toBytes("value1");
073  static final byte[] value2 = Bytes.toBytes("value2");
074  static final byte[] row = Bytes.toBytes("rowA");
075  static final byte[] row2 = Bytes.toBytes("rowB");
076
077  @BeforeAll
078  public static void beforeClass() {
079    // Make sure enough handlers.
080    HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100);
081  }
082
083  @BeforeEach
084  public void setUp(TestInfo testInfo) throws Exception {
085    methodName = testInfo.getTestMethod().get().getName();
086    tableName = Bytes.toBytes(methodName);
087  }
088
089  @AfterEach
090  public void tearDown() throws Exception {
091    EnvironmentEdgeManagerTestHelper.reset();
092    if (region != null) {
093      region.close(true);
094    }
095  }
096
097  public String getMethodName() {
098    return methodName;
099  }
100
101  //////////////////////////////////////////////////////////////////////////////
102  // New tests that don't spin up a mini cluster but rather just test the
103  // individual code pieces in the HRegion.
104  //////////////////////////////////////////////////////////////////////////////
105
106  /**
107   * Test one put command.
108   */
109  @Test
110  public void testPut() throws IOException {
111    LOG.info("Starting testPut");
112    this.region = initHRegion(tableName, getMethodName(), fam1);
113
114    long value = 1L;
115
116    Put put = new Put(row);
117    put.addColumn(fam1, qual1, Bytes.toBytes(value));
118    region.put(put);
119
120    assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value));
121  }
122
123  /**
124   * Test multi-threaded Puts.
125   */
126  @Test
127  public void testParallelPuts() throws IOException {
128
129    LOG.info("Starting testParallelPuts");
130
131    this.region = initHRegion(tableName, getMethodName(), fam1);
132    int numOps = 1000; // these many operations per thread
133
134    // create 100 threads, each will do its own puts
135    Putter[] all = new Putter[THREADS100];
136
137    // create all threads
138    for (int i = 0; i < THREADS100; i++) {
139      all[i] = new Putter(region, i, numOps);
140    }
141
142    // run all threads
143    for (int i = 0; i < THREADS100; i++) {
144      all[i].start();
145    }
146
147    // wait for all threads to finish
148    for (int i = 0; i < THREADS100; i++) {
149      try {
150        all[i].join();
151      } catch (InterruptedException e) {
152        LOG.warn("testParallelPuts encountered InterruptedException." + " Ignoring....", e);
153      }
154    }
155    LOG
156      .info("testParallelPuts successfully verified " + (numOps * THREADS100) + " put operations.");
157  }
158
159  private static void assertGet(final HRegion region, byte[] row, byte[] familiy, byte[] qualifier,
160    byte[] value) throws IOException {
161    // run a get and see if the value matches
162    Get get = new Get(row);
163    get.addColumn(familiy, qualifier);
164    Result result = region.get(get);
165    assertEquals(1, result.size());
166
167    Cell kv = result.rawCells()[0];
168    byte[] r = CellUtil.cloneValue(kv);
169    assertTrue(Bytes.compareTo(r, value) == 0);
170  }
171
172  private HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families)
173    throws IOException {
174    TableDescriptorBuilder builder =
175      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
176    for (byte[] family : families) {
177      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
178    }
179    TableDescriptor tableDescriptor = builder.build();
180    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
181    return HBTU.createLocalHRegion(info, tableDescriptor);
182  }
183
184  /**
185   * A thread that makes a few put calls
186   */
187  public static class Putter extends Thread {
188
189    private final HRegion region;
190    private final int threadNumber;
191    private final int numOps;
192    byte[] rowkey = null;
193
194    public Putter(HRegion region, int threadNumber, int numOps) {
195      this.region = region;
196      this.threadNumber = threadNumber;
197      this.numOps = numOps;
198      this.rowkey = Bytes.toBytes((long) threadNumber); // unique rowid per thread
199      setDaemon(true);
200    }
201
202    @Override
203    public void run() {
204      byte[] value = new byte[100];
205      Put[] in = new Put[1];
206
207      // iterate for the specified number of operations
208      for (int i = 0; i < numOps; i++) {
209        // generate random bytes
210        Bytes.random(value);
211
212        // put the randombytes and verify that we can read it. This is one
213        // way of ensuring that rwcc manipulation in HRegion.put() is fine.
214        Put put = new Put(rowkey);
215        put.addColumn(fam1, qual1, value);
216        in[0] = put;
217        try {
218          OperationStatus[] ret = region.batchMutate(in);
219          assertEquals(1, ret.length);
220          assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
221          assertGet(this.region, rowkey, fam1, qual1, value);
222        } catch (IOException e) {
223          fail("Thread id " + threadNumber + " operation " + i + " failed.", e);
224        }
225      }
226    }
227  }
228}