001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.TimeUnit; 029import java.util.stream.Collectors; 030import java.util.stream.IntStream; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.testclassification.ClientTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.junit.jupiter.api.AfterAll; 037import org.junit.jupiter.api.AfterEach; 038import org.junit.jupiter.api.BeforeAll; 039import org.junit.jupiter.api.BeforeEach; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045@Tag(MediumTests.TAG) 046@Tag(ClientTests.TAG) 047public class TestBufferedMutator { 048 049 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 050 051 private static TableName TABLE_NAME = TableName.valueOf("test"); 052 053 private static byte[] CF = Bytes.toBytes("cf"); 054 055 private static byte[] CQ = Bytes.toBytes("cq"); 056 057 private static byte[] VALUE = new byte[1024]; 058 059 @BeforeAll 060 public static void setUp() throws Exception { 061 TEST_UTIL.startMiniCluster(1); 062 ThreadLocalRandom.current().nextBytes(VALUE); 063 } 064 065 @AfterAll 066 public static void tearDown() throws Exception { 067 TEST_UTIL.shutdownMiniCluster(); 068 } 069 070 @BeforeEach 071 public void setUpBeforeTest() throws IOException { 072 TEST_UTIL.createTable(TABLE_NAME, CF); 073 } 074 075 @AfterEach 076 public void tearDownAfterTest() throws IOException { 077 TEST_UTIL.deleteTable(TABLE_NAME); 078 } 079 080 @Test 081 public void test() throws Exception { 082 int count = 1024; 083 try (BufferedMutator mutator = TEST_UTIL.getConnection() 084 .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) { 085 mutator.mutate(IntStream.range(0, count / 2) 086 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 087 .collect(Collectors.toList())); 088 mutator.flush(); 089 mutator.mutate(IntStream.range(count / 2, count) 090 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 091 .collect(Collectors.toList())); 092 mutator.close(); 093 verifyData(count); 094 } 095 } 096 097 @Test 098 public void testMultiThread() throws Exception { 099 ExecutorService executor = 100 Executors.newFixedThreadPool(16, new ThreadFactoryBuilder().setDaemon(true).build()); 101 // use a greater count and less write buffer size to trigger auto flush when mutate 102 int count = 16384; 103 try (BufferedMutator mutator = TEST_UTIL.getConnection() 104 .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(4 * 1024))) { 105 IntStream.range(0, count / 2) 106 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 107 .forEach(put -> executor.execute(() -> { 108 try { 109 mutator.mutate(put); 110 } catch (IOException e) { 111 fail("failed to mutate: " + e.getMessage()); 112 } 113 })); 114 mutator.flush(); 115 IntStream.range(count / 2, count) 116 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 117 .forEach(put -> executor.execute(() -> { 118 try { 119 mutator.mutate(put); 120 } catch (IOException e) { 121 fail("failed to mutate: " + e.getMessage()); 122 } 123 })); 124 executor.shutdown(); 125 assertTrue(executor.awaitTermination(15, TimeUnit.SECONDS)); 126 mutator.close(); 127 } finally { 128 executor.shutdownNow(); 129 } 130 verifyData(count); 131 } 132 133 private void verifyData(int count) throws IOException { 134 try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { 135 for (int i = 0; i < count; i++) { 136 Result r = table.get(new Get(Bytes.toBytes(i))); 137 assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ)); 138 } 139 } 140 } 141}