001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.TimeUnit;
029import java.util.stream.Collectors;
030import java.util.stream.IntStream;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.testclassification.ClientTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.junit.jupiter.api.AfterAll;
037import org.junit.jupiter.api.AfterEach;
038import org.junit.jupiter.api.BeforeAll;
039import org.junit.jupiter.api.BeforeEach;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045@Tag(MediumTests.TAG)
046@Tag(ClientTests.TAG)
047public class TestBufferedMutator {
048
049  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
050
051  private static TableName TABLE_NAME = TableName.valueOf("test");
052
053  private static byte[] CF = Bytes.toBytes("cf");
054
055  private static byte[] CQ = Bytes.toBytes("cq");
056
057  private static byte[] VALUE = new byte[1024];
058
059  @BeforeAll
060  public static void setUp() throws Exception {
061    TEST_UTIL.startMiniCluster(1);
062    ThreadLocalRandom.current().nextBytes(VALUE);
063  }
064
065  @AfterAll
066  public static void tearDown() throws Exception {
067    TEST_UTIL.shutdownMiniCluster();
068  }
069
070  @BeforeEach
071  public void setUpBeforeTest() throws IOException {
072    TEST_UTIL.createTable(TABLE_NAME, CF);
073  }
074
075  @AfterEach
076  public void tearDownAfterTest() throws IOException {
077    TEST_UTIL.deleteTable(TABLE_NAME);
078  }
079
080  @Test
081  public void test() throws Exception {
082    int count = 1024;
083    try (BufferedMutator mutator = TEST_UTIL.getConnection()
084      .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) {
085      mutator.mutate(IntStream.range(0, count / 2)
086        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
087        .collect(Collectors.toList()));
088      mutator.flush();
089      mutator.mutate(IntStream.range(count / 2, count)
090        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
091        .collect(Collectors.toList()));
092      mutator.close();
093      verifyData(count);
094    }
095  }
096
097  @Test
098  public void testMultiThread() throws Exception {
099    ExecutorService executor =
100      Executors.newFixedThreadPool(16, new ThreadFactoryBuilder().setDaemon(true).build());
101    // use a greater count and less write buffer size to trigger auto flush when mutate
102    int count = 16384;
103    try (BufferedMutator mutator = TEST_UTIL.getConnection()
104      .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(4 * 1024))) {
105      IntStream.range(0, count / 2)
106        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
107        .forEach(put -> executor.execute(() -> {
108          try {
109            mutator.mutate(put);
110          } catch (IOException e) {
111            fail("failed to mutate: " + e.getMessage());
112          }
113        }));
114      mutator.flush();
115      IntStream.range(count / 2, count)
116        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
117        .forEach(put -> executor.execute(() -> {
118          try {
119            mutator.mutate(put);
120          } catch (IOException e) {
121            fail("failed to mutate: " + e.getMessage());
122          }
123        }));
124      executor.shutdown();
125      assertTrue(executor.awaitTermination(15, TimeUnit.SECONDS));
126      mutator.close();
127    } finally {
128      executor.shutdownNow();
129    }
130    verifyData(count);
131  }
132
133  private void verifyData(int count) throws IOException {
134    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
135      for (int i = 0; i < count; i++) {
136        Result r = table.get(new Get(Bytes.toBytes(i)));
137        assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ));
138      }
139    }
140  }
141}