001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import org.apache.hadoop.hbase.testclassification.ClientTests; 029import org.apache.hadoop.hbase.testclassification.LargeTests; 030import org.junit.jupiter.api.AfterAll; 031import org.junit.jupiter.api.BeforeAll; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035@Tag(LargeTests.TAG) 036@Tag(ClientTests.TAG) 037public class TestRawAsyncScanCursor extends AbstractTestScanCursor { 038 039 private static AsyncConnection CONN; 040 041 @BeforeAll 042 public static void setUpBeforeClass() throws Exception { 043 AbstractTestScanCursor.startCluster(); 044 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 045 } 046 047 @AfterAll 048 public static void tearDownAfterClass() throws Exception { 049 if (CONN != null) { 050 CONN.close(); 051 } 052 stopCluster(); 053 } 054 055 private void doTest(boolean reversed) 056 throws InterruptedException, ExecutionException, IOException { 057 CompletableFuture<Void> future = new CompletableFuture<>(); 058 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 059 table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), 060 new AdvancedScanResultConsumer() { 061 062 private int count; 063 064 @Override 065 public void onHeartbeat(ScanController controller) { 066 int row = count / NUM_FAMILIES / NUM_QUALIFIERS; 067 if (reversed) { 068 row = NUM_ROWS - 1 - row; 069 } 070 try { 071 assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); 072 count++; 073 } catch (Throwable e) { 074 future.completeExceptionally(e); 075 throw e; 076 } 077 } 078 079 @Override 080 public void onNext(Result[] results, ScanController controller) { 081 try { 082 assertEquals(1, results.length); 083 assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); 084 // we will always provide a scan cursor if time limit is reached. 085 assertTrue(controller.cursor().isPresent()); 086 assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], 087 controller.cursor().get().getRow()); 088 assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); 089 count++; 090 } catch (Throwable e) { 091 future.completeExceptionally(e); 092 throw e; 093 } 094 } 095 096 @Override 097 public void onError(Throwable error) { 098 future.completeExceptionally(error); 099 } 100 101 @Override 102 public void onComplete() { 103 future.complete(null); 104 } 105 }); 106 future.get(); 107 } 108 109 @Test 110 public void testHeartbeatWithSparseFilter() 111 throws IOException, InterruptedException, ExecutionException { 112 doTest(false); 113 } 114 115 @Test 116 public void testHeartbeatWithSparseFilterReversed() 117 throws IOException, InterruptedException, ExecutionException { 118 doTest(true); 119 } 120 121 @Test 122 public void testSizeLimit() throws InterruptedException, ExecutionException { 123 CompletableFuture<Void> future = new CompletableFuture<>(); 124 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 125 table.scan(createScanWithSizeLimit(), new AdvancedScanResultConsumer() { 126 127 private int count; 128 129 @Override 130 public void onHeartbeat(ScanController controller) { 131 try { 132 assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], 133 controller.cursor().get().getRow()); 134 count++; 135 } catch (Throwable e) { 136 future.completeExceptionally(e); 137 throw e; 138 } 139 } 140 141 @Override 142 public void onNext(Result[] results, ScanController controller) { 143 try { 144 assertFalse(controller.cursor().isPresent()); 145 assertEquals(1, results.length); 146 assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow()); 147 count++; 148 } catch (Throwable e) { 149 future.completeExceptionally(e); 150 throw e; 151 } 152 } 153 154 @Override 155 public void onError(Throwable error) { 156 future.completeExceptionally(error); 157 } 158 159 @Override 160 public void onComplete() { 161 future.complete(null); 162 } 163 }); 164 future.get(); 165 } 166}