001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import org.apache.hadoop.hbase.testclassification.ClientTests;
029import org.apache.hadoop.hbase.testclassification.LargeTests;
030import org.junit.jupiter.api.AfterAll;
031import org.junit.jupiter.api.BeforeAll;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035@Tag(LargeTests.TAG)
036@Tag(ClientTests.TAG)
037public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
038
039  private static AsyncConnection CONN;
040
041  @BeforeAll
042  public static void setUpBeforeClass() throws Exception {
043    AbstractTestScanCursor.startCluster();
044    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
045  }
046
047  @AfterAll
048  public static void tearDownAfterClass() throws Exception {
049    if (CONN != null) {
050      CONN.close();
051    }
052    stopCluster();
053  }
054
055  private void doTest(boolean reversed)
056    throws InterruptedException, ExecutionException, IOException {
057    CompletableFuture<Void> future = new CompletableFuture<>();
058    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
059    table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
060      new AdvancedScanResultConsumer() {
061
062        private int count;
063
064        @Override
065        public void onHeartbeat(ScanController controller) {
066          int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
067          if (reversed) {
068            row = NUM_ROWS - 1 - row;
069          }
070          try {
071            assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
072            count++;
073          } catch (Throwable e) {
074            future.completeExceptionally(e);
075            throw e;
076          }
077        }
078
079        @Override
080        public void onNext(Result[] results, ScanController controller) {
081          try {
082            assertEquals(1, results.length);
083            assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
084            // we will always provide a scan cursor if time limit is reached.
085            assertTrue(controller.cursor().isPresent());
086            assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
087              controller.cursor().get().getRow());
088            assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
089            count++;
090          } catch (Throwable e) {
091            future.completeExceptionally(e);
092            throw e;
093          }
094        }
095
096        @Override
097        public void onError(Throwable error) {
098          future.completeExceptionally(error);
099        }
100
101        @Override
102        public void onComplete() {
103          future.complete(null);
104        }
105      });
106    future.get();
107  }
108
109  @Test
110  public void testHeartbeatWithSparseFilter()
111    throws IOException, InterruptedException, ExecutionException {
112    doTest(false);
113  }
114
115  @Test
116  public void testHeartbeatWithSparseFilterReversed()
117    throws IOException, InterruptedException, ExecutionException {
118    doTest(true);
119  }
120
121  @Test
122  public void testSizeLimit() throws InterruptedException, ExecutionException {
123    CompletableFuture<Void> future = new CompletableFuture<>();
124    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
125    table.scan(createScanWithSizeLimit(), new AdvancedScanResultConsumer() {
126
127      private int count;
128
129      @Override
130      public void onHeartbeat(ScanController controller) {
131        try {
132          assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS],
133            controller.cursor().get().getRow());
134          count++;
135        } catch (Throwable e) {
136          future.completeExceptionally(e);
137          throw e;
138        }
139      }
140
141      @Override
142      public void onNext(Result[] results, ScanController controller) {
143        try {
144          assertFalse(controller.cursor().isPresent());
145          assertEquals(1, results.length);
146          assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow());
147          count++;
148        } catch (Throwable e) {
149          future.completeExceptionally(e);
150          throw e;
151        }
152      }
153
154      @Override
155      public void onError(Throwable error) {
156        future.completeExceptionally(error);
157      }
158
159      @Override
160      public void onComplete() {
161        future.complete(null);
162      }
163    });
164    future.get();
165  }
166}