001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertThat;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.TimeUnit;
037import java.util.stream.Collectors;
038import java.util.stream.IntStream;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
052import org.apache.hbase.thirdparty.io.netty.util.Timeout;
053
054@Category({ MediumTests.class, ClientTests.class })
055public class TestAsyncBufferMutator {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
060
061  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062
063  private static TableName TABLE_NAME = TableName.valueOf("async");
064
065  private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
066
067  private static byte[] CF = Bytes.toBytes("cf");
068
069  private static byte[] CQ = Bytes.toBytes("cq");
070
071  private static int COUNT = 100;
072
073  private static byte[] VALUE = new byte[1024];
074
075  private static AsyncConnection CONN;
076
077  @BeforeClass
078  public static void setUp() throws Exception {
079    TEST_UTIL.startMiniCluster(1);
080    TEST_UTIL.createTable(TABLE_NAME, CF);
081    TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
082    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
083    ThreadLocalRandom.current().nextBytes(VALUE);
084  }
085
086  @AfterClass
087  public static void tearDown() throws Exception {
088    CONN.close();
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  @Test
093  public void testWithMultiRegionTable() throws InterruptedException {
094    test(MULTI_REGION_TABLE_NAME);
095  }
096
097  @Test
098  public void testWithSingleRegionTable() throws InterruptedException {
099    test(TABLE_NAME);
100  }
101
102  private void test(TableName tableName) throws InterruptedException {
103    List<CompletableFuture<Void>> futures = new ArrayList<>();
104    try (AsyncBufferedMutator mutator =
105      CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
106      List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
107        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
108        .collect(Collectors.toList()));
109      // exceeded the write buffer size, a flush will be called directly
110      fs.forEach(f -> f.join());
111      IntStream.range(COUNT / 2, COUNT).forEach(i -> {
112        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
113      });
114      // the first future should have been sent out.
115      futures.get(0).join();
116      Thread.sleep(2000);
117      // the last one should still be in write buffer
118      assertFalse(futures.get(futures.size() - 1).isDone());
119    }
120    // mutator.close will call mutator.flush automatically so all tasks should have been done.
121    futures.forEach(f -> f.join());
122    AsyncTable<?> table = CONN.getTable(tableName);
123    IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
124      .forEach(r -> {
125        assertArrayEquals(VALUE, r.getValue(CF, CQ));
126      });
127  }
128
129  @Test
130  public void testClosedMutate() throws InterruptedException {
131    AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
132    mutator.close();
133    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
134    try {
135      mutator.mutate(put).get();
136      fail("Close check failed");
137    } catch (ExecutionException e) {
138      assertThat(e.getCause(), instanceOf(IOException.class));
139      assertTrue(e.getCause().getMessage().startsWith("Already closed"));
140    }
141    for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
142      try {
143        f.get();
144        fail("Close check failed");
145      } catch (ExecutionException e) {
146        assertThat(e.getCause(), instanceOf(IOException.class));
147        assertTrue(e.getCause().getMessage().startsWith("Already closed"));
148      }
149    }
150  }
151
152  @Test
153  public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
154    try (AsyncBufferedMutator mutator =
155      CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
156      Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
157      CompletableFuture<?> future = mutator.mutate(put);
158      Thread.sleep(2000);
159      // assert that we have not flushed it out
160      assertFalse(future.isDone());
161      mutator.flush();
162      future.get();
163    }
164    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
165    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
166  }
167
168  @Test
169  public void testPeriodicFlush() throws InterruptedException, ExecutionException {
170    AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
171      .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
172    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
173    CompletableFuture<?> future = mutator.mutate(put);
174    future.get();
175    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
176    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
177  }
178
179  // a bit deep into the implementation
180  @Test
181  public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
182    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
183    try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
184      .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
185      .setWriteBufferSize(10 * put.heapSize()).build()) {
186      List<CompletableFuture<?>> futures = new ArrayList<>();
187      futures.add(mutator.mutate(put));
188      Timeout task = mutator.periodicFlushTask;
189      // we should have scheduled a periodic flush task
190      assertNotNull(task);
191      for (int i = 1;; i++) {
192        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
193        if (mutator.periodicFlushTask == null) {
194          break;
195        }
196      }
197      assertTrue(task.isCancelled());
198      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
199      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
200      for (int i = 0; i < futures.size(); i++) {
201        assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
202      }
203    }
204  }
205
206  @Test
207  public void testCancelPeriodicFlushByManuallyFlush()
208      throws InterruptedException, ExecutionException {
209    try (AsyncBufferedMutatorImpl mutator =
210      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
211        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
212      CompletableFuture<?> future =
213        mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
214      Timeout task = mutator.periodicFlushTask;
215      // we should have scheduled a periodic flush task
216      assertNotNull(task);
217      mutator.flush();
218      assertTrue(task.isCancelled());
219      future.get();
220      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
221      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
222    }
223  }
224
225  @Test
226  public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
227    CompletableFuture<?> future;
228    Timeout task;
229    try (AsyncBufferedMutatorImpl mutator =
230      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
231        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
232      future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
233      task = mutator.periodicFlushTask;
234      // we should have scheduled a periodic flush task
235      assertNotNull(task);
236    }
237    assertTrue(task.isCancelled());
238    future.get();
239    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
240    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
241  }
242
243  private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
244
245    private int flushCount;
246
247    AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
248        long writeBufferSize, long periodicFlushTimeoutNs) {
249      super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
250    }
251
252    @Override
253    protected void internalFlush() {
254      flushCount++;
255      super.internalFlush();
256    }
257  }
258
259  @Test
260  public void testRaceBetweenNormalFlushAndPeriodicFlush()
261      throws InterruptedException, ExecutionException {
262    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
263    try (AsyncBufferMutatorForTest mutator =
264      new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
265        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
266      CompletableFuture<?> future = mutator.mutate(put);
267      Timeout task = mutator.periodicFlushTask;
268      // we should have scheduled a periodic flush task
269      assertNotNull(task);
270      synchronized (mutator) {
271        // synchronized on mutator to prevent periodic flush to be executed
272        Thread.sleep(500);
273        // the timeout should be issued
274        assertTrue(task.isExpired());
275        // but no flush is issued as we hold the lock
276        assertEquals(0, mutator.flushCount);
277        assertFalse(future.isDone());
278        // manually flush, then release the lock
279        mutator.flush();
280      }
281      // this is a bit deep into the implementation in netty but anyway let's add a check here to
282      // confirm that an issued timeout can not be canceled by netty framework.
283      assertFalse(task.isCancelled());
284      // and the mutation is done
285      future.get();
286      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
287      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
288      // only the manual flush, the periodic flush should have been canceled by us
289      assertEquals(1, mutator.flushCount);
290    }
291  }
292}