001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.testclassification.ClientTests; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.junit.BeforeClass; 032import org.junit.ClassRule; 033import org.junit.Test; 034import org.junit.experimental.categories.Category; 035 036@Category({ LargeTests.class, ClientTests.class }) 037public class TestRawAsyncScanCursor extends AbstractTestScanCursor { 038 039 @ClassRule 040 public static final HBaseClassTestRule CLASS_RULE = 041 HBaseClassTestRule.forClass(TestRawAsyncScanCursor.class); 042 043 private static AsyncConnection CONN; 044 045 @BeforeClass 046 public static void setUpBeforeClass() throws Exception { 047 AbstractTestScanCursor.setUpBeforeClass(); 048 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 049 } 050 051 public static void tearDownAfterClass() throws Exception { 052 if (CONN != null) { 053 CONN.close(); 054 } 055 AbstractTestScanCursor.tearDownAfterClass(); 056 } 057 058 private void doTest(boolean reversed) 059 throws InterruptedException, ExecutionException, IOException { 060 CompletableFuture<Void> future = new CompletableFuture<>(); 061 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 062 table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), 063 new AdvancedScanResultConsumer() { 064 065 private int count; 066 067 @Override 068 public void onHeartbeat(ScanController controller) { 069 int row = count / NUM_FAMILIES / NUM_QUALIFIERS; 070 if (reversed) { 071 row = NUM_ROWS - 1 - row; 072 } 073 try { 074 assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); 075 count++; 076 } catch (Throwable e) { 077 future.completeExceptionally(e); 078 throw e; 079 } 080 } 081 082 @Override 083 public void onNext(Result[] results, ScanController controller) { 084 try { 085 assertEquals(1, results.length); 086 assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); 087 // we will always provide a scan cursor if time limit is reached. 088 assertTrue(controller.cursor().isPresent()); 089 assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], 090 controller.cursor().get().getRow()); 091 assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); 092 count++; 093 } catch (Throwable e) { 094 future.completeExceptionally(e); 095 throw e; 096 } 097 } 098 099 @Override 100 public void onError(Throwable error) { 101 future.completeExceptionally(error); 102 } 103 104 @Override 105 public void onComplete() { 106 future.complete(null); 107 } 108 }); 109 future.get(); 110 } 111 112 @Test 113 public void testHeartbeatWithSparseFilter() 114 throws IOException, InterruptedException, ExecutionException { 115 doTest(false); 116 } 117 118 @Test 119 public void testHeartbeatWithSparseFilterReversed() 120 throws IOException, InterruptedException, ExecutionException { 121 doTest(true); 122 } 123 124 @Test 125 public void testSizeLimit() throws InterruptedException, ExecutionException { 126 CompletableFuture<Void> future = new CompletableFuture<>(); 127 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 128 table.scan(createScanWithSizeLimit(), new AdvancedScanResultConsumer() { 129 130 private int count; 131 132 @Override 133 public void onHeartbeat(ScanController controller) { 134 try { 135 assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], 136 controller.cursor().get().getRow()); 137 count++; 138 } catch (Throwable e) { 139 future.completeExceptionally(e); 140 throw e; 141 } 142 } 143 144 @Override 145 public void onNext(Result[] results, ScanController controller) { 146 try { 147 assertFalse(controller.cursor().isPresent()); 148 assertEquals(1, results.length); 149 assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow()); 150 count++; 151 } catch (Throwable e) { 152 future.completeExceptionally(e); 153 throw e; 154 } 155 } 156 157 @Override 158 public void onError(Throwable error) { 159 future.completeExceptionally(error); 160 } 161 162 @Override 163 public void onComplete() { 164 future.complete(null); 165 } 166 }); 167 future.get(); 168 } 169}