001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.stream.Collectors; 025import java.util.stream.IntStream; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.testclassification.ClientTests; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.jupiter.api.AfterAll; 034import org.junit.jupiter.api.BeforeAll; 035import org.junit.jupiter.api.Tag; 036import org.junit.jupiter.api.Test; 037 038@Tag(LargeTests.TAG) 039@Tag(ClientTests.TAG) 040public class TestAsyncTableScanRenewLease { 041 042 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 043 044 private static TableName TABLE_NAME = TableName.valueOf("async"); 045 046 private static byte[] FAMILY = Bytes.toBytes("cf"); 047 048 private static byte[] CQ = Bytes.toBytes("cq"); 049 050 private static AsyncConnection CONN; 051 052 private static AsyncTable<AdvancedScanResultConsumer> TABLE; 053 054 private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000; 055 056 @BeforeAll 057 public static void setUp() throws Exception { 058 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 059 SCANNER_LEASE_TIMEOUT_PERIOD_MS); 060 TEST_UTIL.startMiniCluster(1); 061 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 062 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 063 TABLE = CONN.getTable(TABLE_NAME); 064 TABLE.putAll(IntStream.range(0, 10).mapToObj( 065 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 066 .collect(Collectors.toList())).get(); 067 } 068 069 @AfterAll 070 public static void tearDown() throws Exception { 071 CONN.close(); 072 TEST_UTIL.shutdownMiniCluster(); 073 } 074 075 private static final class RenewLeaseConsumer implements AdvancedScanResultConsumer { 076 077 private final List<Result> results = new ArrayList<>(); 078 079 private Throwable error; 080 081 private boolean finished = false; 082 083 private boolean suspended = false; 084 085 @Override 086 public synchronized void onNext(Result[] results, ScanController controller) { 087 for (Result result : results) { 088 this.results.add(result); 089 } 090 if (!suspended) { 091 ScanResumer resumer = controller.suspend(); 092 new Thread(() -> { 093 Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS); 094 try { 095 TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ, 096 Bytes.toBytes(10))).get(); 097 } catch (Exception e) { 098 onError(e); 099 } 100 resumer.resume(); 101 }).start(); 102 } 103 } 104 105 @Override 106 public synchronized void onError(Throwable error) { 107 this.finished = true; 108 this.error = error; 109 notifyAll(); 110 } 111 112 @Override 113 public synchronized void onComplete() { 114 this.finished = true; 115 notifyAll(); 116 } 117 118 public synchronized List<Result> get() throws Throwable { 119 while (!finished) { 120 wait(); 121 } 122 if (error != null) { 123 throw error; 124 } 125 return results; 126 } 127 } 128 129 @Test 130 public void test() throws Throwable { 131 RenewLeaseConsumer consumer = new RenewLeaseConsumer(); 132 TABLE.scan(new Scan(), consumer); 133 List<Result> results = consumer.get(); 134 // should not see the newly added value 135 assertEquals(10, results.size()); 136 IntStream.range(0, 10).forEach(i -> { 137 Result result = results.get(i); 138 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); 139 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); 140 }); 141 // now we can see the newly added value 142 List<Result> results2 = TABLE.scanAll(new Scan()).get(); 143 assertEquals(11, results2.size()); 144 IntStream.range(0, 11).forEach(i -> { 145 Result result = results2.get(i); 146 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); 147 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); 148 }); 149 } 150}