001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.Iterator;
027import java.util.List;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.testclassification.RegionServerTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ RegionServerTests.class, SmallTests.class })
056public class TestWideScanner {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestWideScanner.class);
061
062  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestWideScanner.class);
065
066  private static final byte[] A = Bytes.toBytes("A");
067  private static final byte[] B = Bytes.toBytes("B");
068  private static final byte[] C = Bytes.toBytes("C");
069  private static byte[][] COLUMNS = { A, B, C };
070
071  private static final TableDescriptor TESTTABLEDESC;
072  static {
073    TableDescriptorBuilder builder =
074      TableDescriptorBuilder.newBuilder(TableName.valueOf("testwidescan"));
075    for (byte[] cfName : new byte[][] { A, B, C }) {
076      // Keep versions to help debugging.
077      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName).setMaxVersions(100)
078        .setBlocksize(8 * 1024).build());
079    }
080    TESTTABLEDESC = builder.build();
081  }
082
083  /** HRegionInfo for root region */
084  private static HRegion REGION;
085
086  @BeforeClass
087  public static void setUp() throws IOException {
088    Path testDir = UTIL.getDataTestDir();
089    RegionInfo hri = RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build();
090    REGION =
091      HBaseTestingUtil.createRegionAndWAL(hri, testDir, UTIL.getConfiguration(), TESTTABLEDESC);
092  }
093
094  @AfterClass
095  public static void tearDown() throws IOException {
096    if (REGION != null) {
097      HBaseTestingUtil.closeRegionAndWAL(REGION);
098      REGION = null;
099    }
100    UTIL.cleanupTestDir();
101  }
102
103  private int addWideContent(HRegion region) throws IOException {
104    int count = 0;
105    for (char c = 'a'; c <= 'c'; c++) {
106      byte[] row = Bytes.toBytes("ab" + c);
107      int i, j;
108      long ts = EnvironmentEdgeManager.currentTime();
109      for (i = 0; i < 100; i++) {
110        byte[] b = Bytes.toBytes(String.format("%10d", i));
111        for (j = 0; j < 100; j++) {
112          Put put = new Put(row);
113          put.setDurability(Durability.SKIP_WAL);
114          long ts1 = ++ts;
115          put.addColumn(COLUMNS[ThreadLocalRandom.current().nextInt(COLUMNS.length)], b, ts1, b);
116          region.put(put);
117          count++;
118        }
119      }
120    }
121    return count;
122  }
123
124  @Test
125  public void testWideScanBatching() throws IOException {
126    final int batch = 256;
127    int inserted = addWideContent(REGION);
128    List<Cell> results = new ArrayList<>();
129    Scan scan = new Scan();
130    scan.addFamily(A);
131    scan.addFamily(B);
132    scan.addFamily(C);
133    scan.readVersions(100);
134    scan.setBatch(batch);
135    try (InternalScanner s = REGION.getScanner(scan)) {
136      int total = 0;
137      int i = 0;
138      boolean more;
139      do {
140        more = s.next(results);
141        i++;
142        LOG.info("iteration #" + i + ", results.size=" + results.size());
143
144        // assert that the result set is no larger
145        assertTrue(results.size() <= batch);
146
147        total += results.size();
148
149        if (results.size() > 0) {
150          // assert that all results are from the same row
151          byte[] row = CellUtil.cloneRow(results.get(0));
152          for (Cell kv : results) {
153            assertTrue(Bytes.equals(row, CellUtil.cloneRow(kv)));
154          }
155        }
156
157        results.clear();
158
159        // trigger ChangedReadersObservers
160        Iterator<KeyValueScanner> scanners = ((RegionScannerImpl) s).storeHeap.getHeap().iterator();
161        while (scanners.hasNext()) {
162          StoreScanner ss = (StoreScanner) scanners.next();
163          ss.updateReaders(Collections.emptyList(), Collections.emptyList());
164        }
165      } while (more);
166
167      // assert that the scanner returned all values
168      LOG.info("inserted " + inserted + ", scanned " + total);
169      assertEquals(total, inserted);
170    }
171  }
172}