001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.filter.TimestampsFilter; 034import org.apache.hadoop.hbase.testclassification.ClientTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.junit.After; 038import org.junit.AfterClass; 039import org.junit.Before; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Rule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.junit.rules.TestName; 046 047/** 048 * Run tests related to {@link TimestampsFilter} using HBase client APIs. Sets up the HBase mini 049 * cluster once at start. Each creates a table named for the method and does its stuff against that. 050 */ 051@Category({ MediumTests.class, ClientTests.class }) 052public class TestTimestampsFilter { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestTimestampsFilter.class); 057 058 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 059 060 @Rule 061 public TestName name = new TestName(); 062 063 /** 064 * @throws java.lang.Exception 065 */ 066 @BeforeClass 067 public static void setUpBeforeClass() throws Exception { 068 TEST_UTIL.startMiniCluster(); 069 } 070 071 /** 072 * @throws java.lang.Exception 073 */ 074 @AfterClass 075 public static void tearDownAfterClass() throws Exception { 076 TEST_UTIL.shutdownMiniCluster(); 077 } 078 079 /** 080 * @throws java.lang.Exception 081 */ 082 @Before 083 public void setUp() throws Exception { 084 // Nothing to do. 085 } 086 087 /** 088 * @throws java.lang.Exception 089 */ 090 @After 091 public void tearDown() throws Exception { 092 // Nothing to do. 093 } 094 095 /** 096 * Test from client side for TimestampsFilter. The TimestampsFilter provides the ability to 097 * request cells (KeyValues) whose timestamp/version is in the specified list of 098 * timestamps/version. 099 */ 100 @Test 101 public void testTimestampsFilter() throws Exception { 102 final byte[] TABLE = Bytes.toBytes(name.getMethodName()); 103 byte[] FAMILY = Bytes.toBytes("event_log"); 104 byte[][] FAMILIES = new byte[][] { FAMILY }; 105 Cell kvs[]; 106 107 // create table; set versions to max... 108 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 109 110 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 111 for (int colIdx = 0; colIdx < 5; colIdx++) { 112 // insert versions 201..300 113 putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300); 114 // insert versions 1..100 115 putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100); 116 } 117 } 118 119 // do some verification before flush 120 verifyInsertedValues(ht, FAMILY); 121 122 TEST_UTIL.flush(); 123 124 // do some verification after flush 125 verifyInsertedValues(ht, FAMILY); 126 127 // Insert some more versions after flush. These should be in memstore. 128 // After this we should have data in both memstore & HFiles. 129 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 130 for (int colIdx = 0; colIdx < 5; colIdx++) { 131 putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400); 132 putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200); 133 } 134 } 135 136 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 137 for (int colIdx = 0; colIdx < 5; colIdx++) { 138 kvs = getNVersions(ht, FAMILY, rowIdx, colIdx, Arrays.asList(505L, 5L, 105L, 305L, 205L)); 139 assertEquals(4, kvs.length); 140 checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305); 141 checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205); 142 checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105); 143 checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5); 144 } 145 } 146 147 // Request an empty list of versions using the Timestamps filter; 148 // Should return none. 149 kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<>()); 150 assertEquals(0, kvs == null ? 0 : kvs.length); 151 152 // 153 // Test the filter using a Scan operation 154 // Scan rows 0..4. For each row, get all its columns, but only 155 // those versions of the columns with the specified timestamps. 156 Result[] results = scanNVersions(ht, FAMILY, 0, 4, Arrays.asList(6L, 106L, 306L)); 157 assertEquals("# of rows returned from scan", 5, results.length); 158 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 159 kvs = results[rowIdx].rawCells(); 160 // each row should have 5 columns. 161 // And we have requested 3 versions for each. 162 assertEquals("Number of KeyValues in result for row:" + rowIdx, 3 * 5, kvs.length); 163 for (int colIdx = 0; colIdx < 5; colIdx++) { 164 int offset = colIdx * 3; 165 checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306); 166 checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106); 167 checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6); 168 } 169 } 170 ht.close(); 171 } 172 173 @Test 174 public void testMultiColumns() throws Exception { 175 final byte[] TABLE = Bytes.toBytes(name.getMethodName()); 176 byte[] FAMILY = Bytes.toBytes("event_log"); 177 byte[][] FAMILIES = new byte[][] { FAMILY }; 178 179 // create table; set versions to max... 180 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 181 182 Put p = new Put(Bytes.toBytes("row")); 183 p.addColumn(FAMILY, Bytes.toBytes("column0"), 3L, Bytes.toBytes("value0-3")); 184 p.addColumn(FAMILY, Bytes.toBytes("column1"), 3L, Bytes.toBytes("value1-3")); 185 p.addColumn(FAMILY, Bytes.toBytes("column2"), 1L, Bytes.toBytes("value2-1")); 186 p.addColumn(FAMILY, Bytes.toBytes("column2"), 2L, Bytes.toBytes("value2-2")); 187 p.addColumn(FAMILY, Bytes.toBytes("column2"), 3L, Bytes.toBytes("value2-3")); 188 p.addColumn(FAMILY, Bytes.toBytes("column3"), 2L, Bytes.toBytes("value3-2")); 189 p.addColumn(FAMILY, Bytes.toBytes("column4"), 1L, Bytes.toBytes("value4-1")); 190 p.addColumn(FAMILY, Bytes.toBytes("column4"), 2L, Bytes.toBytes("value4-2")); 191 p.addColumn(FAMILY, Bytes.toBytes("column4"), 3L, Bytes.toBytes("value4-3")); 192 ht.put(p); 193 194 ArrayList<Long> timestamps = new ArrayList<>(); 195 timestamps.add(new Long(3)); 196 TimestampsFilter filter = new TimestampsFilter(timestamps); 197 198 Get g = new Get(Bytes.toBytes("row")); 199 g.setFilter(filter); 200 g.readAllVersions(); 201 g.addColumn(FAMILY, Bytes.toBytes("column2")); 202 g.addColumn(FAMILY, Bytes.toBytes("column4")); 203 204 Result result = ht.get(g); 205 for (Cell kv : result.listCells()) { 206 System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) + ", column " 207 + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value " 208 + Bytes.toString(CellUtil.cloneValue(kv))); 209 } 210 211 assertEquals(2, result.listCells().size()); 212 assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3"))); 213 assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3"))); 214 215 ht.close(); 216 } 217 218 /** 219 * Test TimestampsFilter in the presence of version deletes. 220 */ 221 @Test 222 public void testWithVersionDeletes() throws Exception { 223 224 // first test from memstore (without flushing). 225 testWithVersionDeletes(false); 226 227 // run same test against HFiles (by forcing a flush). 228 testWithVersionDeletes(true); 229 } 230 231 private void testWithVersionDeletes(boolean flushTables) throws IOException { 232 final byte[] TABLE = 233 Bytes.toBytes(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush")); 234 byte[] FAMILY = Bytes.toBytes("event_log"); 235 byte[][] FAMILIES = new byte[][] { FAMILY }; 236 237 // create table; set versions to max... 238 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 239 240 // For row:0, col:0: insert versions 1 through 5. 241 putNVersions(ht, FAMILY, 0, 0, 1, 5); 242 243 // delete version 4. 244 deleteOneVersion(ht, FAMILY, 0, 0, 4); 245 246 if (flushTables) { 247 TEST_UTIL.flush(); 248 } 249 250 // request a bunch of versions including the deleted version. We should 251 // only get back entries for the versions that exist. 252 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); 253 assertEquals(3, kvs.length); 254 checkOneCell(kvs[0], FAMILY, 0, 0, 5); 255 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 256 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 257 258 ht.close(); 259 } 260 261 private void verifyInsertedValues(Table ht, byte[] cf) throws IOException { 262 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 263 for (int colIdx = 0; colIdx < 5; colIdx++) { 264 // ask for versions that exist. 265 Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(5L, 300L, 6L, 80L)); 266 assertEquals(4, kvs.length); 267 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300); 268 checkOneCell(kvs[1], cf, rowIdx, colIdx, 80); 269 checkOneCell(kvs[2], cf, rowIdx, colIdx, 6); 270 checkOneCell(kvs[3], cf, rowIdx, colIdx, 5); 271 272 // ask for versions that do not exist. 273 kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(101L, 102L)); 274 assertEquals(0, kvs == null ? 0 : kvs.length); 275 276 // ask for some versions that exist and some that do not. 277 kvs = getNVersions(ht, cf, rowIdx, colIdx, Arrays.asList(1L, 300L, 105L, 70L, 115L)); 278 assertEquals(3, kvs.length); 279 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300); 280 checkOneCell(kvs[1], cf, rowIdx, colIdx, 70); 281 checkOneCell(kvs[2], cf, rowIdx, colIdx, 1); 282 } 283 } 284 } 285 286 /** 287 * Assert that the passed in KeyValue has expected contents for the specified row, column & 288 * timestamp. 289 */ 290 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 291 292 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 293 294 assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx, 295 Bytes.toString(CellUtil.cloneRow(kv))); 296 297 assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf), 298 Bytes.toString(CellUtil.cloneFamily(kv))); 299 300 assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx, 301 Bytes.toString(CellUtil.cloneQualifier(kv))); 302 303 assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp()); 304 305 assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, 306 Bytes.toString(CellUtil.cloneValue(kv))); 307 } 308 309 /** 310 * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column 311 * specified by rowIdx & colIdx. 312 */ 313 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions) 314 throws IOException { 315 byte row[] = Bytes.toBytes("row:" + rowIdx); 316 byte column[] = Bytes.toBytes("column:" + colIdx); 317 Filter filter = new TimestampsFilter(versions); 318 Get get = new Get(row); 319 get.addColumn(cf, column); 320 get.setFilter(filter); 321 get.readAllVersions(); 322 Result result = ht.get(get); 323 324 return result.rawCells(); 325 } 326 327 /** 328 * Uses the TimestampFilter on a Scan to request a specified list of versions for the rows from 329 * startRowIdx to endRowIdx (both inclusive). 330 */ 331 private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx, int endRowIdx, 332 List<Long> versions) throws IOException { 333 byte startRow[] = Bytes.toBytes("row:" + startRowIdx); 334 byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive 335 Filter filter = new TimestampsFilter(versions); 336 Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); 337 scan.setFilter(filter); 338 scan.readAllVersions(); 339 ResultScanner scanner = ht.getScanner(scan); 340 return scanner.next(endRowIdx - startRowIdx + 1); 341 } 342 343 /** 344 * Insert in specific row/column versions with timestamps versionStart..versionEnd. 345 */ 346 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart, 347 long versionEnd) throws IOException { 348 byte row[] = Bytes.toBytes("row:" + rowIdx); 349 byte column[] = Bytes.toBytes("column:" + colIdx); 350 Put put = new Put(row); 351 put.setDurability(Durability.SKIP_WAL); 352 353 for (long idx = versionStart; idx <= versionEnd; idx++) { 354 put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx)); 355 } 356 357 ht.put(put); 358 } 359 360 /** 361 * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified 362 * version. 363 */ 364 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version) 365 throws IOException { 366 byte row[] = Bytes.toBytes("row:" + rowIdx); 367 byte column[] = Bytes.toBytes("column:" + colIdx); 368 Delete del = new Delete(row); 369 del.addColumn(cf, column, version); 370 ht.delete(del); 371 } 372 373}