001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HColumnDescriptor; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 032import org.apache.hadoop.hbase.HRegionInfo; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 042import org.junit.After; 043import org.junit.Before; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Testing of multiPut in parallel. 055 */ 056@Category({ RegionServerTests.class, MediumTests.class }) 057public class TestParallelPut { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestParallelPut.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class); 064 @Rule 065 public TestName name = new TestName(); 066 067 private HRegion region = null; 068 private static HBaseTestingUtility HBTU = new HBaseTestingUtility(); 069 private static final int THREADS100 = 100; 070 071 // Test names 072 static byte[] tableName; 073 static final byte[] qual1 = Bytes.toBytes("qual1"); 074 static final byte[] qual2 = Bytes.toBytes("qual2"); 075 static final byte[] qual3 = Bytes.toBytes("qual3"); 076 static final byte[] value1 = Bytes.toBytes("value1"); 077 static final byte[] value2 = Bytes.toBytes("value2"); 078 static final byte[] row = Bytes.toBytes("rowA"); 079 static final byte[] row2 = Bytes.toBytes("rowB"); 080 081 @BeforeClass 082 public static void beforeClass() { 083 // Make sure enough handlers. 084 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100); 085 } 086 087 /** 088 * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() 089 */ 090 @Before 091 public void setUp() throws Exception { 092 tableName = Bytes.toBytes(name.getMethodName()); 093 } 094 095 @After 096 public void tearDown() throws Exception { 097 EnvironmentEdgeManagerTestHelper.reset(); 098 if (region != null) { 099 region.close(true); 100 } 101 } 102 103 public String getName() { 104 return name.getMethodName(); 105 } 106 107 ////////////////////////////////////////////////////////////////////////////// 108 // New tests that don't spin up a mini cluster but rather just test the 109 // individual code pieces in the HRegion. 110 ////////////////////////////////////////////////////////////////////////////// 111 112 /** 113 * Test one put command. 114 */ 115 @Test 116 public void testPut() throws IOException { 117 LOG.info("Starting testPut"); 118 this.region = initHRegion(tableName, getName(), fam1); 119 120 long value = 1L; 121 122 Put put = new Put(row); 123 put.addColumn(fam1, qual1, Bytes.toBytes(value)); 124 region.put(put); 125 126 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value)); 127 } 128 129 /** 130 * Test multi-threaded Puts. 131 */ 132 @Test 133 public void testParallelPuts() throws IOException { 134 135 LOG.info("Starting testParallelPuts"); 136 137 this.region = initHRegion(tableName, getName(), fam1); 138 int numOps = 1000; // these many operations per thread 139 140 // create 100 threads, each will do its own puts 141 Putter[] all = new Putter[THREADS100]; 142 143 // create all threads 144 for (int i = 0; i < THREADS100; i++) { 145 all[i] = new Putter(region, i, numOps); 146 } 147 148 // run all threads 149 for (int i = 0; i < THREADS100; i++) { 150 all[i].start(); 151 } 152 153 // wait for all threads to finish 154 for (int i = 0; i < THREADS100; i++) { 155 try { 156 all[i].join(); 157 } catch (InterruptedException e) { 158 LOG.warn("testParallelPuts encountered InterruptedException." + " Ignoring....", e); 159 } 160 } 161 LOG 162 .info("testParallelPuts successfully verified " + (numOps * THREADS100) + " put operations."); 163 } 164 165 private static void assertGet(final HRegion region, byte[] row, byte[] familiy, byte[] qualifier, 166 byte[] value) throws IOException { 167 // run a get and see if the value matches 168 Get get = new Get(row); 169 get.addColumn(familiy, qualifier); 170 Result result = region.get(get); 171 assertEquals(1, result.size()); 172 173 Cell kv = result.rawCells()[0]; 174 byte[] r = CellUtil.cloneValue(kv); 175 assertTrue(Bytes.compareTo(r, value) == 0); 176 } 177 178 private HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) 179 throws IOException { 180 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 181 for (byte[] family : families) { 182 htd.addFamily(new HColumnDescriptor(family)); 183 } 184 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 185 return HBTU.createLocalHRegion(info, htd); 186 } 187 188 /** 189 * A thread that makes a few put calls 190 */ 191 public static class Putter extends Thread { 192 193 private final HRegion region; 194 private final int threadNumber; 195 private final int numOps; 196 byte[] rowkey = null; 197 198 public Putter(HRegion region, int threadNumber, int numOps) { 199 this.region = region; 200 this.threadNumber = threadNumber; 201 this.numOps = numOps; 202 this.rowkey = Bytes.toBytes((long) threadNumber); // unique rowid per thread 203 setDaemon(true); 204 } 205 206 @Override 207 public void run() { 208 byte[] value = new byte[100]; 209 Put[] in = new Put[1]; 210 211 // iterate for the specified number of operations 212 for (int i = 0; i < numOps; i++) { 213 // generate random bytes 214 Bytes.random(value); 215 216 // put the randombytes and verify that we can read it. This is one 217 // way of ensuring that rwcc manipulation in HRegion.put() is fine. 218 Put put = new Put(rowkey); 219 put.addColumn(fam1, qual1, value); 220 in[0] = put; 221 try { 222 OperationStatus[] ret = region.batchMutate(in); 223 assertEquals(1, ret.length); 224 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); 225 assertGet(this.region, rowkey, fam1, qual1, value); 226 } catch (IOException e) { 227 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.", false); 228 } 229 } 230 } 231 } 232}