001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.filter.TimestampsFilter; 034import org.apache.hadoop.hbase.testclassification.ClientTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.junit.After; 038import org.junit.AfterClass; 039import org.junit.Before; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Rule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.junit.rules.TestName; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Run tests related to {@link TimestampsFilter} using HBase client APIs. 051 * Sets up the HBase mini cluster once at start. Each creates a table 052 * named for the method and does its stuff against that. 053 */ 054@Category({MediumTests.class, ClientTests.class}) 055public class TestTimestampsFilter { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestTimestampsFilter.class); 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestTimestampsFilter.class); 062 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 @Rule 065 public TestName name = new TestName(); 066 067 /** 068 * @throws java.lang.Exception 069 */ 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 TEST_UTIL.startMiniCluster(); 073 } 074 075 /** 076 * @throws java.lang.Exception 077 */ 078 @AfterClass 079 public static void tearDownAfterClass() throws Exception { 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 083 /** 084 * @throws java.lang.Exception 085 */ 086 @Before 087 public void setUp() throws Exception { 088 // Nothing to do. 089 } 090 091 /** 092 * @throws java.lang.Exception 093 */ 094 @After 095 public void tearDown() throws Exception { 096 // Nothing to do. 097 } 098 099 /** 100 * Test from client side for TimestampsFilter. 101 * 102 * The TimestampsFilter provides the ability to request cells (KeyValues) 103 * whose timestamp/version is in the specified list of timestamps/version. 104 * 105 * @throws Exception 106 */ 107 @Test 108 public void testTimestampsFilter() throws Exception { 109 final byte [] TABLE = Bytes.toBytes(name.getMethodName()); 110 byte [] FAMILY = Bytes.toBytes("event_log"); 111 byte [][] FAMILIES = new byte[][] { FAMILY }; 112 Cell kvs[]; 113 114 // create table; set versions to max... 115 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 116 117 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 118 for (int colIdx = 0; colIdx < 5; colIdx++) { 119 // insert versions 201..300 120 putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300); 121 // insert versions 1..100 122 putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100); 123 } 124 } 125 126 // do some verification before flush 127 verifyInsertedValues(ht, FAMILY); 128 129 TEST_UTIL.flush(); 130 131 // do some verification after flush 132 verifyInsertedValues(ht, FAMILY); 133 134 // Insert some more versions after flush. These should be in memstore. 135 // After this we should have data in both memstore & HFiles. 136 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 137 for (int colIdx = 0; colIdx < 5; colIdx++) { 138 putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400); 139 putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200); 140 } 141 } 142 143 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 144 for (int colIdx = 0; colIdx < 5; colIdx++) { 145 kvs = getNVersions(ht, FAMILY, rowIdx, colIdx, 146 Arrays.asList(505L, 5L, 105L, 305L, 205L)); 147 assertEquals(4, kvs.length); 148 checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305); 149 checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205); 150 checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105); 151 checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5); 152 } 153 } 154 155 // Request an empty list of versions using the Timestamps filter; 156 // Should return none. 157 kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<>()); 158 assertEquals(0, kvs == null? 0: kvs.length); 159 160 // 161 // Test the filter using a Scan operation 162 // Scan rows 0..4. For each row, get all its columns, but only 163 // those versions of the columns with the specified timestamps. 164 Result[] results = scanNVersions(ht, FAMILY, 0, 4, 165 Arrays.asList(6L, 106L, 306L)); 166 assertEquals("# of rows returned from scan", 5, results.length); 167 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 168 kvs = results[rowIdx].rawCells(); 169 // each row should have 5 columns. 170 // And we have requested 3 versions for each. 171 assertEquals("Number of KeyValues in result for row:" + rowIdx, 172 3*5, kvs.length); 173 for (int colIdx = 0; colIdx < 5; colIdx++) { 174 int offset = colIdx * 3; 175 checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306); 176 checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106); 177 checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6); 178 } 179 } 180 ht.close(); 181 } 182 183 @Test 184 public void testMultiColumns() throws Exception { 185 final byte [] TABLE = Bytes.toBytes(name.getMethodName()); 186 byte [] FAMILY = Bytes.toBytes("event_log"); 187 byte [][] FAMILIES = new byte[][] { FAMILY }; 188 189 // create table; set versions to max... 190 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 191 192 Put p = new Put(Bytes.toBytes("row")); 193 p.addColumn(FAMILY, Bytes.toBytes("column0"), 3L, Bytes.toBytes("value0-3")); 194 p.addColumn(FAMILY, Bytes.toBytes("column1"), 3L, Bytes.toBytes("value1-3")); 195 p.addColumn(FAMILY, Bytes.toBytes("column2"), 1L, Bytes.toBytes("value2-1")); 196 p.addColumn(FAMILY, Bytes.toBytes("column2"), 2L, Bytes.toBytes("value2-2")); 197 p.addColumn(FAMILY, Bytes.toBytes("column2"), 3L, Bytes.toBytes("value2-3")); 198 p.addColumn(FAMILY, Bytes.toBytes("column3"), 2L, Bytes.toBytes("value3-2")); 199 p.addColumn(FAMILY, Bytes.toBytes("column4"), 1L, Bytes.toBytes("value4-1")); 200 p.addColumn(FAMILY, Bytes.toBytes("column4"), 2L, Bytes.toBytes("value4-2")); 201 p.addColumn(FAMILY, Bytes.toBytes("column4"), 3L, Bytes.toBytes("value4-3")); 202 ht.put(p); 203 204 ArrayList<Long> timestamps = new ArrayList<>(); 205 timestamps.add(new Long(3)); 206 TimestampsFilter filter = new TimestampsFilter(timestamps); 207 208 Get g = new Get(Bytes.toBytes("row")); 209 g.setFilter(filter); 210 g.setMaxVersions(); 211 g.addColumn(FAMILY, Bytes.toBytes("column2")); 212 g.addColumn(FAMILY, Bytes.toBytes("column4")); 213 214 Result result = ht.get(g); 215 for (Cell kv : result.listCells()) { 216 System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) + 217 ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value " 218 + Bytes.toString(CellUtil.cloneValue(kv))); 219 } 220 221 assertEquals(2, result.listCells().size()); 222 assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3"))); 223 assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3"))); 224 225 ht.close(); 226 } 227 228 /** 229 * Test TimestampsFilter in the presence of version deletes. 230 * 231 * @throws Exception 232 */ 233 @Test 234 public void testWithVersionDeletes() throws Exception { 235 236 // first test from memstore (without flushing). 237 testWithVersionDeletes(false); 238 239 // run same test against HFiles (by forcing a flush). 240 testWithVersionDeletes(true); 241 } 242 243 private void testWithVersionDeletes(boolean flushTables) throws IOException { 244 final byte [] TABLE = Bytes.toBytes(name.getMethodName() + "_" + 245 (flushTables ? "flush" : "noflush")); 246 byte [] FAMILY = Bytes.toBytes("event_log"); 247 byte [][] FAMILIES = new byte[][] { FAMILY }; 248 249 // create table; set versions to max... 250 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); 251 252 // For row:0, col:0: insert versions 1 through 5. 253 putNVersions(ht, FAMILY, 0, 0, 1, 5); 254 255 // delete version 4. 256 deleteOneVersion(ht, FAMILY, 0, 0, 4); 257 258 if (flushTables) { 259 TEST_UTIL.flush(); 260 } 261 262 // request a bunch of versions including the deleted version. We should 263 // only get back entries for the versions that exist. 264 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); 265 assertEquals(3, kvs.length); 266 checkOneCell(kvs[0], FAMILY, 0, 0, 5); 267 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 268 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 269 270 ht.close(); 271 } 272 273 private void verifyInsertedValues(Table ht, byte[] cf) throws IOException { 274 for (int rowIdx = 0; rowIdx < 5; rowIdx++) { 275 for (int colIdx = 0; colIdx < 5; colIdx++) { 276 // ask for versions that exist. 277 Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx, 278 Arrays.asList(5L, 300L, 6L, 80L)); 279 assertEquals(4, kvs.length); 280 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300); 281 checkOneCell(kvs[1], cf, rowIdx, colIdx, 80); 282 checkOneCell(kvs[2], cf, rowIdx, colIdx, 6); 283 checkOneCell(kvs[3], cf, rowIdx, colIdx, 5); 284 285 // ask for versions that do not exist. 286 kvs = getNVersions(ht, cf, rowIdx, colIdx, 287 Arrays.asList(101L, 102L)); 288 assertEquals(0, kvs == null? 0: kvs.length); 289 290 // ask for some versions that exist and some that do not. 291 kvs = getNVersions(ht, cf, rowIdx, colIdx, 292 Arrays.asList(1L, 300L, 105L, 70L, 115L)); 293 assertEquals(3, kvs.length); 294 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300); 295 checkOneCell(kvs[1], cf, rowIdx, colIdx, 70); 296 checkOneCell(kvs[2], cf, rowIdx, colIdx, 1); 297 } 298 } 299 } 300 301 /** 302 * Assert that the passed in KeyValue has expected contents for the 303 * specified row, column & timestamp. 304 */ 305 private void checkOneCell(Cell kv, byte[] cf, 306 int rowIdx, int colIdx, long ts) { 307 308 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 309 310 assertEquals("Row mismatch which checking: " + ctx, 311 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv))); 312 313 assertEquals("ColumnFamily mismatch while checking: " + ctx, 314 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv))); 315 316 assertEquals("Column qualifier mismatch while checking: " + ctx, 317 "column:" + colIdx, 318 Bytes.toString(CellUtil.cloneQualifier(kv))); 319 320 assertEquals("Timestamp mismatch while checking: " + ctx, 321 ts, kv.getTimestamp()); 322 323 assertEquals("Value mismatch while checking: " + ctx, 324 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv))); 325 } 326 327 /** 328 * Uses the TimestampFilter on a Get to request a specified list of 329 * versions for the row/column specified by rowIdx & colIdx. 330 * 331 */ 332 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, 333 int colIdx, List<Long> versions) 334 throws IOException { 335 byte row[] = Bytes.toBytes("row:" + rowIdx); 336 byte column[] = Bytes.toBytes("column:" + colIdx); 337 Filter filter = new TimestampsFilter(versions); 338 Get get = new Get(row); 339 get.addColumn(cf, column); 340 get.setFilter(filter); 341 get.setMaxVersions(); 342 Result result = ht.get(get); 343 344 return result.rawCells(); 345 } 346 347 /** 348 * Uses the TimestampFilter on a Scan to request a specified list of 349 * versions for the rows from startRowIdx to endRowIdx (both inclusive). 350 */ 351 private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx, 352 int endRowIdx, List<Long> versions) 353 throws IOException { 354 byte startRow[] = Bytes.toBytes("row:" + startRowIdx); 355 byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive 356 Filter filter = new TimestampsFilter(versions); 357 Scan scan = new Scan(startRow, endRow); 358 scan.setFilter(filter); 359 scan.setMaxVersions(); 360 ResultScanner scanner = ht.getScanner(scan); 361 return scanner.next(endRowIdx - startRowIdx + 1); 362 } 363 364 /** 365 * Insert in specific row/column versions with timestamps 366 * versionStart..versionEnd. 367 */ 368 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, 369 long versionStart, long versionEnd) 370 throws IOException { 371 byte row[] = Bytes.toBytes("row:" + rowIdx); 372 byte column[] = Bytes.toBytes("column:" + colIdx); 373 Put put = new Put(row); 374 put.setDurability(Durability.SKIP_WAL); 375 376 for (long idx = versionStart; idx <= versionEnd; idx++) { 377 put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx)); 378 } 379 380 ht.put(put); 381 } 382 383 /** 384 * For row/column specified by rowIdx/colIdx, delete the cell 385 * corresponding to the specified version. 386 */ 387 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, 388 int colIdx, long version) 389 throws IOException { 390 byte row[] = Bytes.toBytes("row:" + rowIdx); 391 byte column[] = Bytes.toBytes("column:" + colIdx); 392 Delete del = new Delete(row); 393 del.addColumn(cf, column, version); 394 ht.delete(del); 395 } 396 397} 398 399