001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 044import org.junit.jupiter.api.AfterEach; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.junit.jupiter.api.TestInfo; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Testing of multiPut in parallel. 055 */ 056@Tag(RegionServerTests.TAG) 057@Tag(MediumTests.TAG) 058public class TestParallelPut { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class); 061 062 private HRegion region = null; 063 private static HBaseTestingUtil HBTU = new HBaseTestingUtil(); 064 private static final int THREADS100 = 100; 065 private String methodName; 066 067 // Test names 068 static byte[] tableName; 069 static final byte[] qual1 = Bytes.toBytes("qual1"); 070 static final byte[] qual2 = Bytes.toBytes("qual2"); 071 static final byte[] qual3 = Bytes.toBytes("qual3"); 072 static final byte[] value1 = Bytes.toBytes("value1"); 073 static final byte[] value2 = Bytes.toBytes("value2"); 074 static final byte[] row = Bytes.toBytes("rowA"); 075 static final byte[] row2 = Bytes.toBytes("rowB"); 076 077 @BeforeAll 078 public static void beforeClass() { 079 // Make sure enough handlers. 080 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100); 081 } 082 083 @BeforeEach 084 public void setUp(TestInfo testInfo) throws Exception { 085 methodName = testInfo.getTestMethod().get().getName(); 086 tableName = Bytes.toBytes(methodName); 087 } 088 089 @AfterEach 090 public void tearDown() throws Exception { 091 EnvironmentEdgeManagerTestHelper.reset(); 092 if (region != null) { 093 region.close(true); 094 } 095 } 096 097 public String getMethodName() { 098 return methodName; 099 } 100 101 ////////////////////////////////////////////////////////////////////////////// 102 // New tests that don't spin up a mini cluster but rather just test the 103 // individual code pieces in the HRegion. 104 ////////////////////////////////////////////////////////////////////////////// 105 106 /** 107 * Test one put command. 108 */ 109 @Test 110 public void testPut() throws IOException { 111 LOG.info("Starting testPut"); 112 this.region = initHRegion(tableName, getMethodName(), fam1); 113 114 long value = 1L; 115 116 Put put = new Put(row); 117 put.addColumn(fam1, qual1, Bytes.toBytes(value)); 118 region.put(put); 119 120 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value)); 121 } 122 123 /** 124 * Test multi-threaded Puts. 125 */ 126 @Test 127 public void testParallelPuts() throws IOException { 128 129 LOG.info("Starting testParallelPuts"); 130 131 this.region = initHRegion(tableName, getMethodName(), fam1); 132 int numOps = 1000; // these many operations per thread 133 134 // create 100 threads, each will do its own puts 135 Putter[] all = new Putter[THREADS100]; 136 137 // create all threads 138 for (int i = 0; i < THREADS100; i++) { 139 all[i] = new Putter(region, i, numOps); 140 } 141 142 // run all threads 143 for (int i = 0; i < THREADS100; i++) { 144 all[i].start(); 145 } 146 147 // wait for all threads to finish 148 for (int i = 0; i < THREADS100; i++) { 149 try { 150 all[i].join(); 151 } catch (InterruptedException e) { 152 LOG.warn("testParallelPuts encountered InterruptedException." + " Ignoring....", e); 153 } 154 } 155 LOG 156 .info("testParallelPuts successfully verified " + (numOps * THREADS100) + " put operations."); 157 } 158 159 private static void assertGet(final HRegion region, byte[] row, byte[] familiy, byte[] qualifier, 160 byte[] value) throws IOException { 161 // run a get and see if the value matches 162 Get get = new Get(row); 163 get.addColumn(familiy, qualifier); 164 Result result = region.get(get); 165 assertEquals(1, result.size()); 166 167 Cell kv = result.rawCells()[0]; 168 byte[] r = CellUtil.cloneValue(kv); 169 assertTrue(Bytes.compareTo(r, value) == 0); 170 } 171 172 private HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) 173 throws IOException { 174 TableDescriptorBuilder builder = 175 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 176 for (byte[] family : families) { 177 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 178 } 179 TableDescriptor tableDescriptor = builder.build(); 180 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 181 return HBTU.createLocalHRegion(info, tableDescriptor); 182 } 183 184 /** 185 * A thread that makes a few put calls 186 */ 187 public static class Putter extends Thread { 188 189 private final HRegion region; 190 private final int threadNumber; 191 private final int numOps; 192 byte[] rowkey = null; 193 194 public Putter(HRegion region, int threadNumber, int numOps) { 195 this.region = region; 196 this.threadNumber = threadNumber; 197 this.numOps = numOps; 198 this.rowkey = Bytes.toBytes((long) threadNumber); // unique rowid per thread 199 setDaemon(true); 200 } 201 202 @Override 203 public void run() { 204 byte[] value = new byte[100]; 205 Put[] in = new Put[1]; 206 207 // iterate for the specified number of operations 208 for (int i = 0; i < numOps; i++) { 209 // generate random bytes 210 Bytes.random(value); 211 212 // put the randombytes and verify that we can read it. This is one 213 // way of ensuring that rwcc manipulation in HRegion.put() is fine. 214 Put put = new Put(rowkey); 215 put.addColumn(fam1, qual1, value); 216 in[0] = put; 217 try { 218 OperationStatus[] ret = region.batchMutate(in); 219 assertEquals(1, ret.length); 220 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); 221 assertGet(this.region, rowkey, fam1, qual1, value); 222 } catch (IOException e) { 223 fail("Thread id " + threadNumber + " operation " + i + " failed.", e); 224 } 225 } 226 } 227 } 228}