001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Random;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestCase;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Durability;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({RegionServerTests.class, SmallTests.class})
047public class TestWideScanner extends HBaseTestCase {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestWideScanner.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestWideScanner.class);
054
055  static final byte[] A = Bytes.toBytes("A");
056  static final byte[] B = Bytes.toBytes("B");
057  static final byte[] C = Bytes.toBytes("C");
058  static byte[][] COLUMNS = { A, B, C };
059  static final Random rng = new Random();
060  static final HTableDescriptor TESTTABLEDESC =
061    new HTableDescriptor(TableName.valueOf("testwidescan"));
062  static {
063    for (byte[] cfName : new byte[][] { A, B, C }) {
064      TESTTABLEDESC.addFamily(new HColumnDescriptor(cfName)
065          // Keep versions to help debugging.
066          .setMaxVersions(100)
067          .setBlocksize(8 * 1024)
068      );
069    }
070  }
071
072  /** HRegionInfo for root region */
073  HRegion r;
074
075  private int addWideContent(HRegion region) throws IOException {
076    int count = 0;
077    for (char c = 'a'; c <= 'c'; c++) {
078      byte[] row = Bytes.toBytes("ab" + c);
079      int i, j;
080      long ts = System.currentTimeMillis();
081      for (i = 0; i < 100; i++) {
082        byte[] b = Bytes.toBytes(String.format("%10d", i));
083        for (j = 0; j < 100; j++) {
084          Put put = new Put(row);
085          put.setDurability(Durability.SKIP_WAL);
086          long ts1 = ++ts;
087          put.addColumn(COLUMNS[rng.nextInt(COLUMNS.length)], b, ts1, b);
088          region.put(put);
089          count++;
090        }
091      }
092    }
093    return count;
094  }
095
096  @Test
097  public void testWideScanBatching() throws IOException {
098    final int batch = 256;
099    try {
100      this.r = createNewHRegion(TESTTABLEDESC, null, null);
101      int inserted = addWideContent(this.r);
102      List<Cell> results = new ArrayList<>();
103      Scan scan = new Scan();
104      scan.addFamily(A);
105      scan.addFamily(B);
106      scan.addFamily(C);
107      scan.setMaxVersions(100);
108      scan.setBatch(batch);
109      InternalScanner s = r.getScanner(scan);
110      int total = 0;
111      int i = 0;
112      boolean more;
113      do {
114        more = s.next(results);
115        i++;
116        LOG.info("iteration #" + i + ", results.size=" + results.size());
117
118        // assert that the result set is no larger
119        assertTrue(results.size() <= batch);
120
121        total += results.size();
122
123        if (results.size() > 0) {
124          // assert that all results are from the same row
125          byte[] row = CellUtil.cloneRow(results.get(0));
126          for (Cell kv: results) {
127            assertTrue(Bytes.equals(row, CellUtil.cloneRow(kv)));
128          }
129        }
130
131        results.clear();
132
133        // trigger ChangedReadersObservers
134        Iterator<KeyValueScanner> scanners =
135          ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
136        while (scanners.hasNext()) {
137          StoreScanner ss = (StoreScanner)scanners.next();
138          ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
139        }
140      } while (more);
141
142      // assert that the scanner returned all values
143      LOG.info("inserted " + inserted + ", scanned " + total);
144      assertEquals(total, inserted);
145
146      s.close();
147    } finally {
148      HBaseTestingUtility.closeRegionAndWAL(this.r);
149    }
150  }
151
152}
153