001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertArrayEquals;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Collectors;
037import java.util.stream.IntStream;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.jupiter.api.AfterAll;
044import org.junit.jupiter.api.BeforeAll;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
049import org.apache.hbase.thirdparty.io.netty.util.Timeout;
050
051@Tag(MediumTests.TAG)
052@Tag(ClientTests.TAG)
053public class TestAsyncBufferMutator {
054
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056
057  private static TableName TABLE_NAME = TableName.valueOf("async");
058
059  private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
060
061  private static byte[] CF = Bytes.toBytes("cf");
062
063  private static byte[] CQ = Bytes.toBytes("cq");
064
065  private static int COUNT = 100;
066
067  private static byte[] VALUE = new byte[1024];
068
069  private static AsyncConnection CONN;
070
071  @BeforeAll
072  public static void setUp() throws Exception {
073    TEST_UTIL.startMiniCluster(1);
074    TEST_UTIL.createTable(TABLE_NAME, CF);
075    TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
076    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
077    Bytes.random(VALUE);
078  }
079
080  @AfterAll
081  public static void tearDown() throws Exception {
082    CONN.close();
083    TEST_UTIL.shutdownMiniCluster();
084  }
085
086  @Test
087  public void testWithMultiRegionTable() throws InterruptedException {
088    test(MULTI_REGION_TABLE_NAME);
089  }
090
091  @Test
092  public void testWithSingleRegionTable() throws InterruptedException {
093    test(TABLE_NAME);
094  }
095
096  private void test(TableName tableName) throws InterruptedException {
097    List<CompletableFuture<Void>> futures = new ArrayList<>();
098    try (AsyncBufferedMutator mutator =
099      CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
100      List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
101        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
102        .collect(Collectors.toList()));
103      // exceeded the write buffer size, a flush will be called directly
104      fs.forEach(f -> f.join());
105      IntStream.range(COUNT / 2, COUNT).forEach(i -> {
106        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
107      });
108      // the first future should have been sent out.
109      futures.get(0).join();
110      Thread.sleep(2000);
111      // the last one should still be in write buffer
112      assertFalse(futures.get(futures.size() - 1).isDone());
113    }
114    // mutator.close will call mutator.flush automatically so all tasks should have been done.
115    futures.forEach(f -> f.join());
116    AsyncTable<?> table = CONN.getTable(tableName);
117    IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
118      .forEach(r -> {
119        assertArrayEquals(VALUE, r.getValue(CF, CQ));
120      });
121  }
122
123  @Test
124  public void testClosedMutate() throws InterruptedException {
125    AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
126    mutator.close();
127    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
128    try {
129      mutator.mutate(put).get();
130      fail("Close check failed");
131    } catch (ExecutionException e) {
132      assertThat(e.getCause(), instanceOf(IOException.class));
133      assertTrue(e.getCause().getMessage().startsWith("Already closed"));
134    }
135    for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
136      try {
137        f.get();
138        fail("Close check failed");
139      } catch (ExecutionException e) {
140        assertThat(e.getCause(), instanceOf(IOException.class));
141        assertTrue(e.getCause().getMessage().startsWith("Already closed"));
142      }
143    }
144  }
145
146  @Test
147  public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
148    try (AsyncBufferedMutator mutator =
149      CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
150      Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
151      CompletableFuture<?> future = mutator.mutate(put);
152      Thread.sleep(2000);
153      // assert that we have not flushed it out
154      assertFalse(future.isDone());
155      mutator.flush();
156      future.get();
157    }
158    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
159    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
160  }
161
162  @Test
163  public void testPeriodicFlush() throws InterruptedException, ExecutionException {
164    AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
165      .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
166    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
167    CompletableFuture<?> future = mutator.mutate(put);
168    future.get();
169    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
170    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
171  }
172
173  @Test
174  public void testMaxMutationsFlush() throws InterruptedException, ExecutionException {
175    AsyncBufferedMutator mutator =
176      CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
177    CompletableFuture<?> future1 =
178      mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
179    CompletableFuture<?> future2 =
180      mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
181    CompletableFuture<?> future3 =
182      mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
183    CompletableFuture.allOf(future1, future2, future3).join();
184    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
185    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
186    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
187    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
188  }
189
190  // a bit deep into the implementation
191  @Test
192  public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
193    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
194    try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
195      .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
196      .setWriteBufferSize(10 * put.heapSize()).build()) {
197      List<CompletableFuture<?>> futures = new ArrayList<>();
198      futures.add(mutator.mutate(put));
199      Timeout task = mutator.periodicFlushTask;
200      // we should have scheduled a periodic flush task
201      assertNotNull(task);
202      for (int i = 1;; i++) {
203        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
204        if (mutator.periodicFlushTask == null) {
205          break;
206        }
207      }
208      assertTrue(task.isCancelled());
209      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
210      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
211      for (int i = 0; i < futures.size(); i++) {
212        assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
213      }
214    }
215  }
216
217  @Test
218  public void testCancelPeriodicFlushByManuallyFlush()
219    throws InterruptedException, ExecutionException {
220    try (AsyncBufferedMutatorImpl mutator =
221      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
222        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
223      CompletableFuture<?> future =
224        mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
225      Timeout task = mutator.periodicFlushTask;
226      // we should have scheduled a periodic flush task
227      assertNotNull(task);
228      mutator.flush();
229      assertTrue(task.isCancelled());
230      future.get();
231      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
232      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
233    }
234  }
235
236  @Test
237  public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
238    CompletableFuture<?> future;
239    Timeout task;
240    try (AsyncBufferedMutatorImpl mutator =
241      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
242        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
243      future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
244      task = mutator.periodicFlushTask;
245      // we should have scheduled a periodic flush task
246      assertNotNull(task);
247    }
248    assertTrue(task.isCancelled());
249    future.get();
250    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
251    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
252  }
253
254  private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
255
256    private int drainCount;
257
258    AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
259      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) {
260      super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize,
261        maxMutation);
262    }
263
264    @Override
265    protected Batch drainBatch() {
266      drainCount++;
267      return super.drainBatch();
268    }
269  }
270
271  @Test
272  public void testRaceBetweenNormalFlushAndPeriodicFlush()
273    throws InterruptedException, ExecutionException {
274    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
275    try (AsyncBufferMutatorForTest mutator =
276      new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
277        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 100)) {
278      CompletableFuture<?> future = mutator.mutate(put);
279      Timeout task = mutator.periodicFlushTask;
280      // we should have scheduled a periodic flush task
281      assertNotNull(task);
282      // get the lock toprevent periodic flush to be executed
283      mutator.lock.lock();
284      try {
285        Thread.sleep(500);
286        // the timeout should be issued
287        assertTrue(task.isExpired());
288        // but no drain is issued as we hold the lock
289        assertEquals(0, mutator.drainCount);
290        assertFalse(future.isDone());
291        // manually flush and drain, then release the lock
292        mutator.flush();
293      } finally {
294        mutator.lock.unlock();
295      }
296      // this is a bit deep into the implementation in netty but anyway let's add a check here to
297      // confirm that an issued timeout can not be canceled by netty framework.
298      assertFalse(task.isCancelled());
299      // and the mutation is done
300      future.get();
301      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
302      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
303      // only the manual flush, the periodic flush should have been canceled by us
304      assertEquals(1, mutator.drainCount);
305    }
306  }
307}