001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.stream.Collectors; 025import java.util.stream.IntStream; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.testclassification.ClientTests; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.Threads; 034import org.junit.AfterClass; 035import org.junit.BeforeClass; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039 040@Category({ LargeTests.class, ClientTests.class }) 041public class TestAsyncTableScanRenewLease { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestAsyncTableScanRenewLease.class); 046 047 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 048 049 private static TableName TABLE_NAME = TableName.valueOf("async"); 050 051 private static byte[] FAMILY = Bytes.toBytes("cf"); 052 053 private static byte[] CQ = Bytes.toBytes("cq"); 054 055 private static AsyncConnection CONN; 056 057 private static AsyncTable<AdvancedScanResultConsumer> TABLE; 058 059 private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000; 060 061 @BeforeClass 062 public static void setUp() throws Exception { 063 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 064 SCANNER_LEASE_TIMEOUT_PERIOD_MS); 065 TEST_UTIL.startMiniCluster(1); 066 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 067 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 068 TABLE = CONN.getTable(TABLE_NAME); 069 TABLE.putAll(IntStream.range(0, 10).mapToObj( 070 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 071 .collect(Collectors.toList())).get(); 072 } 073 074 @AfterClass 075 public static void tearDown() throws Exception { 076 CONN.close(); 077 TEST_UTIL.shutdownMiniCluster(); 078 } 079 080 private static final class RenewLeaseConsumer implements AdvancedScanResultConsumer { 081 082 private final List<Result> results = new ArrayList<>(); 083 084 private Throwable error; 085 086 private boolean finished = false; 087 088 private boolean suspended = false; 089 090 @Override 091 public synchronized void onNext(Result[] results, ScanController controller) { 092 for (Result result : results) { 093 this.results.add(result); 094 } 095 if (!suspended) { 096 ScanResumer resumer = controller.suspend(); 097 new Thread(() -> { 098 Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS); 099 try { 100 TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ, 101 Bytes.toBytes(10))).get(); 102 } catch (Exception e) { 103 onError(e); 104 } 105 resumer.resume(); 106 }).start(); 107 } 108 } 109 110 @Override 111 public synchronized void onError(Throwable error) { 112 this.finished = true; 113 this.error = error; 114 notifyAll(); 115 } 116 117 @Override 118 public synchronized void onComplete() { 119 this.finished = true; 120 notifyAll(); 121 } 122 123 public synchronized List<Result> get() throws Throwable { 124 while (!finished) { 125 wait(); 126 } 127 if (error != null) { 128 throw error; 129 } 130 return results; 131 } 132 } 133 134 @Test 135 public void test() throws Throwable { 136 RenewLeaseConsumer consumer = new RenewLeaseConsumer(); 137 TABLE.scan(new Scan(), consumer); 138 List<Result> results = consumer.get(); 139 // should not see the newly added value 140 assertEquals(10, results.size()); 141 IntStream.range(0, 10).forEach(i -> { 142 Result result = results.get(i); 143 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); 144 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); 145 }); 146 // now we can see the newly added value 147 List<Result> results2 = TABLE.scanAll(new Scan()).get(); 148 assertEquals(11, results2.size()); 149 IntStream.range(0, 11).forEach(i -> { 150 Result result = results2.get(i); 151 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); 152 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); 153 }); 154 } 155}