001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Collectors;
037import java.util.stream.IntStream;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
051import org.apache.hbase.thirdparty.io.netty.util.Timeout;
052
053@Category({ MediumTests.class, ClientTests.class })
054public class TestAsyncBufferMutator {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
059
060  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061
062  private static TableName TABLE_NAME = TableName.valueOf("async");
063
064  private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
065
066  private static byte[] CF = Bytes.toBytes("cf");
067
068  private static byte[] CQ = Bytes.toBytes("cq");
069
070  private static int COUNT = 100;
071
072  private static byte[] VALUE = new byte[1024];
073
074  private static AsyncConnection CONN;
075
076  @BeforeClass
077  public static void setUp() throws Exception {
078    TEST_UTIL.startMiniCluster(1);
079    TEST_UTIL.createTable(TABLE_NAME, CF);
080    TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
081    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
082    Bytes.random(VALUE);
083  }
084
085  @AfterClass
086  public static void tearDown() throws Exception {
087    CONN.close();
088    TEST_UTIL.shutdownMiniCluster();
089  }
090
091  @Test
092  public void testWithMultiRegionTable() throws InterruptedException {
093    test(MULTI_REGION_TABLE_NAME);
094  }
095
096  @Test
097  public void testWithSingleRegionTable() throws InterruptedException {
098    test(TABLE_NAME);
099  }
100
101  private void test(TableName tableName) throws InterruptedException {
102    List<CompletableFuture<Void>> futures = new ArrayList<>();
103    try (AsyncBufferedMutator mutator =
104      CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
105      List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
106        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
107        .collect(Collectors.toList()));
108      // exceeded the write buffer size, a flush will be called directly
109      fs.forEach(f -> f.join());
110      IntStream.range(COUNT / 2, COUNT).forEach(i -> {
111        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
112      });
113      // the first future should have been sent out.
114      futures.get(0).join();
115      Thread.sleep(2000);
116      // the last one should still be in write buffer
117      assertFalse(futures.get(futures.size() - 1).isDone());
118    }
119    // mutator.close will call mutator.flush automatically so all tasks should have been done.
120    futures.forEach(f -> f.join());
121    AsyncTable<?> table = CONN.getTable(tableName);
122    IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
123      .forEach(r -> {
124        assertArrayEquals(VALUE, r.getValue(CF, CQ));
125      });
126  }
127
128  @Test
129  public void testClosedMutate() throws InterruptedException {
130    AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
131    mutator.close();
132    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
133    try {
134      mutator.mutate(put).get();
135      fail("Close check failed");
136    } catch (ExecutionException e) {
137      assertThat(e.getCause(), instanceOf(IOException.class));
138      assertTrue(e.getCause().getMessage().startsWith("Already closed"));
139    }
140    for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
141      try {
142        f.get();
143        fail("Close check failed");
144      } catch (ExecutionException e) {
145        assertThat(e.getCause(), instanceOf(IOException.class));
146        assertTrue(e.getCause().getMessage().startsWith("Already closed"));
147      }
148    }
149  }
150
151  @Test
152  public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
153    try (AsyncBufferedMutator mutator =
154      CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
155      Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
156      CompletableFuture<?> future = mutator.mutate(put);
157      Thread.sleep(2000);
158      // assert that we have not flushed it out
159      assertFalse(future.isDone());
160      mutator.flush();
161      future.get();
162    }
163    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
164    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
165  }
166
167  @Test
168  public void testPeriodicFlush() throws InterruptedException, ExecutionException {
169    AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
170      .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
171    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
172    CompletableFuture<?> future = mutator.mutate(put);
173    future.get();
174    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
175    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
176  }
177
178  @Test
179  public void testMaxMutationsFlush() throws InterruptedException, ExecutionException {
180    AsyncBufferedMutator mutator =
181      CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
182    CompletableFuture<?> future1 =
183      mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
184    CompletableFuture<?> future2 =
185      mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
186    CompletableFuture<?> future3 =
187      mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
188    CompletableFuture.allOf(future1, future2, future3).join();
189    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
190    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
191    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
192    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
193  }
194
195  // a bit deep into the implementation
196  @Test
197  public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
198    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
199    try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
200      .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
201      .setWriteBufferSize(10 * put.heapSize()).build()) {
202      List<CompletableFuture<?>> futures = new ArrayList<>();
203      futures.add(mutator.mutate(put));
204      Timeout task = mutator.periodicFlushTask;
205      // we should have scheduled a periodic flush task
206      assertNotNull(task);
207      for (int i = 1;; i++) {
208        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
209        if (mutator.periodicFlushTask == null) {
210          break;
211        }
212      }
213      assertTrue(task.isCancelled());
214      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
215      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
216      for (int i = 0; i < futures.size(); i++) {
217        assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
218      }
219    }
220  }
221
222  @Test
223  public void testCancelPeriodicFlushByManuallyFlush()
224    throws InterruptedException, ExecutionException {
225    try (AsyncBufferedMutatorImpl mutator =
226      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
227        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
228      CompletableFuture<?> future =
229        mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
230      Timeout task = mutator.periodicFlushTask;
231      // we should have scheduled a periodic flush task
232      assertNotNull(task);
233      mutator.flush();
234      assertTrue(task.isCancelled());
235      future.get();
236      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
237      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
238    }
239  }
240
241  @Test
242  public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
243    CompletableFuture<?> future;
244    Timeout task;
245    try (AsyncBufferedMutatorImpl mutator =
246      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
247        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
248      future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
249      task = mutator.periodicFlushTask;
250      // we should have scheduled a periodic flush task
251      assertNotNull(task);
252    }
253    assertTrue(task.isCancelled());
254    future.get();
255    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
256    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
257  }
258
259  private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
260
261    private int flushCount;
262
263    AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
264      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) {
265      super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize,
266        maxMutation);
267    }
268
269    @Override
270    protected void internalFlush() {
271      flushCount++;
272      super.internalFlush();
273    }
274  }
275
276  @Test
277  public void testRaceBetweenNormalFlushAndPeriodicFlush()
278    throws InterruptedException, ExecutionException {
279    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
280    try (AsyncBufferMutatorForTest mutator =
281      new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
282        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 100)) {
283      CompletableFuture<?> future = mutator.mutate(put);
284      Timeout task = mutator.periodicFlushTask;
285      // we should have scheduled a periodic flush task
286      assertNotNull(task);
287      synchronized (mutator) {
288        // synchronized on mutator to prevent periodic flush to be executed
289        Thread.sleep(500);
290        // the timeout should be issued
291        assertTrue(task.isExpired());
292        // but no flush is issued as we hold the lock
293        assertEquals(0, mutator.flushCount);
294        assertFalse(future.isDone());
295        // manually flush, then release the lock
296        mutator.flush();
297      }
298      // this is a bit deep into the implementation in netty but anyway let's add a check here to
299      // confirm that an issued timeout can not be canceled by netty framework.
300      assertFalse(task.isCancelled());
301      // and the mutation is done
302      future.get();
303      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
304      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
305      // only the manual flush, the periodic flush should have been canceled by us
306      assertEquals(1, mutator.flushCount);
307    }
308  }
309}