001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift2; 019 020import static java.nio.ByteBuffer.wrap; 021import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; 022import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; 023import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; 024import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift; 025import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift; 026import static org.junit.Assert.assertArrayEquals; 027import static org.junit.Assert.assertEquals; 028import static org.junit.Assert.assertFalse; 029import static org.junit.Assert.assertNull; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.io.InterruptedIOException; 035import java.nio.ByteBuffer; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collections; 039import java.util.Comparator; 040import java.util.HashMap; 041import java.util.List; 042import java.util.Map; 043import java.util.Optional; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CompatibilityFactory; 048import org.apache.hadoop.hbase.CoprocessorEnvironment; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HColumnDescriptor; 052import org.apache.hadoop.hbase.HTableDescriptor; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Delete; 056import org.apache.hadoop.hbase.client.Durability; 057import org.apache.hadoop.hbase.client.Get; 058import org.apache.hadoop.hbase.client.Increment; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.coprocessor.ObserverContext; 063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 065import org.apache.hadoop.hbase.coprocessor.RegionObserver; 066import org.apache.hadoop.hbase.filter.ParseFilter; 067import org.apache.hadoop.hbase.security.UserProvider; 068import org.apache.hadoop.hbase.test.MetricsAssertHelper; 069import org.apache.hadoop.hbase.testclassification.ClientTests; 070import org.apache.hadoop.hbase.testclassification.MediumTests; 071import org.apache.hadoop.hbase.thrift.ErrorThrowingGetObserver; 072import org.apache.hadoop.hbase.thrift.ThriftMetrics; 073import org.apache.hadoop.hbase.thrift2.generated.TAppend; 074import org.apache.hadoop.hbase.thrift2.generated.TColumn; 075import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement; 076import org.apache.hadoop.hbase.thrift2.generated.TColumnValue; 077import org.apache.hadoop.hbase.thrift2.generated.TCompareOp; 078import org.apache.hadoop.hbase.thrift2.generated.TDelete; 079import org.apache.hadoop.hbase.thrift2.generated.TDeleteType; 080import org.apache.hadoop.hbase.thrift2.generated.TDurability; 081import org.apache.hadoop.hbase.thrift2.generated.TGet; 082import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 083import org.apache.hadoop.hbase.thrift2.generated.TIOError; 084import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument; 085import org.apache.hadoop.hbase.thrift2.generated.TIncrement; 086import org.apache.hadoop.hbase.thrift2.generated.TMutation; 087import org.apache.hadoop.hbase.thrift2.generated.TPut; 088import org.apache.hadoop.hbase.thrift2.generated.TReadType; 089import org.apache.hadoop.hbase.thrift2.generated.TResult; 090import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; 091import org.apache.hadoop.hbase.thrift2.generated.TScan; 092import org.apache.hadoop.hbase.thrift2.generated.TTimeRange; 093import org.apache.hadoop.hbase.util.Bytes; 094import org.apache.thrift.TException; 095import org.junit.AfterClass; 096import org.junit.Before; 097import org.junit.BeforeClass; 098import org.junit.ClassRule; 099import org.junit.Rule; 100import org.junit.Test; 101import org.junit.experimental.categories.Category; 102import org.junit.rules.TestName; 103import org.slf4j.Logger; 104import org.slf4j.LoggerFactory; 105 106import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 108 109/** 110 * Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 111 * package. 112 */ 113@Category({ClientTests.class, MediumTests.class}) 114public class TestThriftHBaseServiceHandler { 115 116 @ClassRule 117 public static final HBaseClassTestRule CLASS_RULE = 118 HBaseClassTestRule.forClass(TestThriftHBaseServiceHandler.class); 119 120 private static final Logger LOG = LoggerFactory.getLogger(TestThriftHBaseServiceHandler.class); 121 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 122 123 // Static names for tables, columns, rows, and values 124 private static byte[] tableAname = Bytes.toBytes("tableA"); 125 private static byte[] familyAname = Bytes.toBytes("familyA"); 126 private static byte[] familyBname = Bytes.toBytes("familyB"); 127 private static byte[] qualifierAname = Bytes.toBytes("qualifierA"); 128 private static byte[] qualifierBname = Bytes.toBytes("qualifierB"); 129 private static byte[] valueAname = Bytes.toBytes("valueA"); 130 private static byte[] valueBname = Bytes.toBytes("valueB"); 131 private static HColumnDescriptor[] families = new HColumnDescriptor[] { 132 new HColumnDescriptor(familyAname).setMaxVersions(3), 133 new HColumnDescriptor(familyBname).setMaxVersions(2) 134 }; 135 136 137 private static final MetricsAssertHelper metricsHelper = 138 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 139 140 @Rule 141 public TestName name = new TestName(); 142 143 144 public void assertTColumnValuesEqual(List<TColumnValue> columnValuesA, 145 List<TColumnValue> columnValuesB) { 146 assertEquals(columnValuesA.size(), columnValuesB.size()); 147 Comparator<TColumnValue> comparator = new Comparator<TColumnValue>() { 148 @Override 149 public int compare(TColumnValue o1, TColumnValue o2) { 150 return Bytes.compareTo(Bytes.add(o1.getFamily(), o1.getQualifier()), 151 Bytes.add(o2.getFamily(), o2.getQualifier())); 152 } 153 }; 154 Collections.sort(columnValuesA, comparator); 155 Collections.sort(columnValuesB, comparator); 156 157 for (int i = 0; i < columnValuesA.size(); i++) { 158 TColumnValue a = columnValuesA.get(i); 159 TColumnValue b = columnValuesB.get(i); 160 assertTColumnValueEqual(a, b); 161 } 162 } 163 164 public void assertTColumnValueEqual(TColumnValue a, TColumnValue b) { 165 assertArrayEquals(a.getFamily(), b.getFamily()); 166 assertArrayEquals(a.getQualifier(), b.getQualifier()); 167 assertArrayEquals(a.getValue(), b.getValue()); 168 } 169 170 @BeforeClass 171 public static void beforeClass() throws Exception { 172 UTIL.getConfiguration().set("hbase.client.retries.number", "3"); 173 UTIL.startMiniCluster(); 174 Admin admin = UTIL.getAdmin(); 175 HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname)); 176 for (HColumnDescriptor family : families) { 177 tableDescriptor.addFamily(family); 178 } 179 admin.createTable(tableDescriptor); 180 admin.close(); 181 } 182 183 @AfterClass 184 public static void afterClass() throws Exception { 185 UTIL.shutdownMiniCluster(); 186 } 187 188 @Before 189 public void setup() throws Exception { 190 191 } 192 193 private ThriftHBaseServiceHandler createHandler() throws TException { 194 try { 195 Configuration conf = UTIL.getConfiguration(); 196 return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf)); 197 } catch (IOException ie) { 198 throw new TException(ie); 199 } 200 } 201 202 @Test 203 public void testExists() throws TIOError, TException { 204 ThriftHBaseServiceHandler handler = createHandler(); 205 byte[] rowName = Bytes.toBytes("testExists"); 206 ByteBuffer table = wrap(tableAname); 207 208 TGet get = new TGet(wrap(rowName)); 209 assertFalse(handler.exists(table, get)); 210 211 List<TColumnValue> columnValues = new ArrayList<>(2); 212 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 213 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 214 TPut put = new TPut(wrap(rowName), columnValues); 215 put.setColumnValues(columnValues); 216 217 handler.put(table, put); 218 219 assertTrue(handler.exists(table, get)); 220 } 221 222 @Test 223 public void testExistsAll() throws TIOError, TException { 224 ThriftHBaseServiceHandler handler = createHandler(); 225 byte[] rowName1 = Bytes.toBytes("testExistsAll1"); 226 byte[] rowName2 = Bytes.toBytes("testExistsAll2"); 227 ByteBuffer table = wrap(tableAname); 228 229 List<TGet> gets = new ArrayList<>(); 230 gets.add(new TGet(wrap(rowName2))); 231 gets.add(new TGet(wrap(rowName2))); 232 List<Boolean> existsResult1 = handler.existsAll(table, gets); 233 assertFalse(existsResult1.get(0)); 234 assertFalse(existsResult1.get(1)); 235 236 List<TColumnValue> columnValues = new ArrayList<TColumnValue>(); 237 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 238 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 239 List<TPut> puts = new ArrayList<TPut>(); 240 puts.add(new TPut(wrap(rowName1), columnValues)); 241 puts.add(new TPut(wrap(rowName2), columnValues)); 242 243 handler.putMultiple(table, puts); 244 List<Boolean> existsResult2 = handler.existsAll(table,gets ); 245 246 assertTrue(existsResult2.get(0)); 247 assertTrue(existsResult2.get(1)); 248 } 249 250 @Test 251 public void testPutGet() throws Exception { 252 ThriftHBaseServiceHandler handler = createHandler(); 253 byte[] rowName = Bytes.toBytes("testPutGet"); 254 ByteBuffer table = wrap(tableAname); 255 256 List<TColumnValue> columnValues = new ArrayList<>(2); 257 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 258 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 259 TPut put = new TPut(wrap(rowName), columnValues); 260 261 put.setColumnValues(columnValues); 262 263 handler.put(table, put); 264 265 TGet get = new TGet(wrap(rowName)); 266 267 TResult result = handler.get(table, get); 268 assertArrayEquals(rowName, result.getRow()); 269 List<TColumnValue> returnedColumnValues = result.getColumnValues(); 270 assertTColumnValuesEqual(columnValues, returnedColumnValues); 271 } 272 273 @Test 274 public void testPutGetMultiple() throws Exception { 275 ThriftHBaseServiceHandler handler = createHandler(); 276 ByteBuffer table = wrap(tableAname); 277 byte[] rowName1 = Bytes.toBytes("testPutGetMultiple1"); 278 byte[] rowName2 = Bytes.toBytes("testPutGetMultiple2"); 279 280 List<TColumnValue> columnValues = new ArrayList<>(2); 281 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 282 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 283 List<TPut> puts = new ArrayList<>(2); 284 puts.add(new TPut(wrap(rowName1), columnValues)); 285 puts.add(new TPut(wrap(rowName2), columnValues)); 286 287 handler.putMultiple(table, puts); 288 289 List<TGet> gets = new ArrayList<>(2); 290 gets.add(new TGet(wrap(rowName1))); 291 gets.add(new TGet(wrap(rowName2))); 292 293 List<TResult> results = handler.getMultiple(table, gets); 294 assertEquals(2, results.size()); 295 296 assertArrayEquals(rowName1, results.get(0).getRow()); 297 assertTColumnValuesEqual(columnValues, results.get(0).getColumnValues()); 298 299 assertArrayEquals(rowName2, results.get(1).getRow()); 300 assertTColumnValuesEqual(columnValues, results.get(1).getColumnValues()); 301 } 302 303 @Test 304 public void testDeleteMultiple() throws Exception { 305 ThriftHBaseServiceHandler handler = createHandler(); 306 ByteBuffer table = wrap(tableAname); 307 byte[] rowName1 = Bytes.toBytes("testDeleteMultiple1"); 308 byte[] rowName2 = Bytes.toBytes("testDeleteMultiple2"); 309 310 List<TColumnValue> columnValues = new ArrayList<>(2); 311 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 312 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 313 List<TPut> puts = new ArrayList<>(2); 314 puts.add(new TPut(wrap(rowName1), columnValues)); 315 puts.add(new TPut(wrap(rowName2), columnValues)); 316 317 handler.putMultiple(table, puts); 318 319 List<TDelete> deletes = new ArrayList<>(2); 320 deletes.add(new TDelete(wrap(rowName1))); 321 deletes.add(new TDelete(wrap(rowName2))); 322 323 List<TDelete> deleteResults = handler.deleteMultiple(table, deletes); 324 // 0 means they were all successfully applies 325 assertEquals(0, deleteResults.size()); 326 327 assertFalse(handler.exists(table, new TGet(wrap(rowName1)))); 328 assertFalse(handler.exists(table, new TGet(wrap(rowName2)))); 329 } 330 331 @Test 332 public void testDelete() throws Exception { 333 ThriftHBaseServiceHandler handler = createHandler(); 334 byte[] rowName = Bytes.toBytes("testDelete"); 335 ByteBuffer table = wrap(tableAname); 336 337 List<TColumnValue> columnValues = new ArrayList<>(2); 338 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 339 wrap(valueAname)); 340 TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname), 341 wrap(valueBname)); 342 columnValues.add(columnValueA); 343 columnValues.add(columnValueB); 344 TPut put = new TPut(wrap(rowName), columnValues); 345 346 put.setColumnValues(columnValues); 347 348 handler.put(table, put); 349 350 TDelete delete = new TDelete(wrap(rowName)); 351 List<TColumn> deleteColumns = new ArrayList<>(1); 352 TColumn deleteColumn = new TColumn(wrap(familyAname)); 353 deleteColumn.setQualifier(qualifierAname); 354 deleteColumns.add(deleteColumn); 355 delete.setColumns(deleteColumns); 356 357 handler.deleteSingle(table, delete); 358 359 TGet get = new TGet(wrap(rowName)); 360 TResult result = handler.get(table, get); 361 assertArrayEquals(rowName, result.getRow()); 362 List<TColumnValue> returnedColumnValues = result.getColumnValues(); 363 List<TColumnValue> expectedColumnValues = new ArrayList<>(1); 364 expectedColumnValues.add(columnValueB); 365 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 366 } 367 368 @Test 369 public void testDeleteAllTimestamps() throws Exception { 370 ThriftHBaseServiceHandler handler = createHandler(); 371 byte[] rowName = Bytes.toBytes("testDeleteAllTimestamps"); 372 ByteBuffer table = wrap(tableAname); 373 374 List<TColumnValue> columnValues = new ArrayList<>(1); 375 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 376 wrap(valueAname)); 377 columnValueA.setTimestamp(System.currentTimeMillis() - 10); 378 columnValues.add(columnValueA); 379 TPut put = new TPut(wrap(rowName), columnValues); 380 381 put.setColumnValues(columnValues); 382 383 handler.put(table, put); 384 columnValueA.setTimestamp(System.currentTimeMillis()); 385 handler.put(table, put); 386 387 TGet get = new TGet(wrap(rowName)); 388 get.setMaxVersions(2); 389 TResult result = handler.get(table, get); 390 assertEquals(2, result.getColumnValuesSize()); 391 392 TDelete delete = new TDelete(wrap(rowName)); 393 List<TColumn> deleteColumns = new ArrayList<>(1); 394 TColumn deleteColumn = new TColumn(wrap(familyAname)); 395 deleteColumn.setQualifier(qualifierAname); 396 deleteColumns.add(deleteColumn); 397 delete.setColumns(deleteColumns); 398 delete.setDeleteType(TDeleteType.DELETE_COLUMNS); // This is the default anyway. 399 400 handler.deleteSingle(table, delete); 401 402 get = new TGet(wrap(rowName)); 403 result = handler.get(table, get); 404 assertNull(result.getRow()); 405 assertEquals(0, result.getColumnValuesSize()); 406 } 407 408 @Test 409 public void testDeleteSingleTimestamp() throws Exception { 410 ThriftHBaseServiceHandler handler = createHandler(); 411 byte[] rowName = Bytes.toBytes("testDeleteSingleTimestamp"); 412 ByteBuffer table = wrap(tableAname); 413 414 long timestamp1 = System.currentTimeMillis() - 10; 415 long timestamp2 = System.currentTimeMillis(); 416 417 List<TColumnValue> columnValues = new ArrayList<>(1); 418 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 419 wrap(valueAname)); 420 columnValueA.setTimestamp(timestamp1); 421 columnValues.add(columnValueA); 422 TPut put = new TPut(wrap(rowName), columnValues); 423 424 put.setColumnValues(columnValues); 425 426 handler.put(table, put); 427 columnValueA.setTimestamp(timestamp2); 428 handler.put(table, put); 429 430 TGet get = new TGet(wrap(rowName)); 431 get.setMaxVersions(2); 432 TResult result = handler.get(table, get); 433 assertEquals(2, result.getColumnValuesSize()); 434 435 TDelete delete = new TDelete(wrap(rowName)); 436 List<TColumn> deleteColumns = new ArrayList<>(1); 437 TColumn deleteColumn = new TColumn(wrap(familyAname)); 438 deleteColumn.setQualifier(qualifierAname); 439 deleteColumns.add(deleteColumn); 440 delete.setColumns(deleteColumns); 441 delete.setDeleteType(TDeleteType.DELETE_COLUMN); 442 443 handler.deleteSingle(table, delete); 444 445 get = new TGet(wrap(rowName)); 446 result = handler.get(table, get); 447 assertArrayEquals(rowName, result.getRow()); 448 assertEquals(1, result.getColumnValuesSize()); 449 // the older timestamp should remain. 450 assertEquals(timestamp1, result.getColumnValues().get(0).getTimestamp()); 451 } 452 453 @Test 454 public void testDeleteFamily() throws Exception { 455 ThriftHBaseServiceHandler handler = createHandler(); 456 byte[] rowName = Bytes.toBytes("testDeleteFamily"); 457 ByteBuffer table = wrap(tableAname); 458 459 long timestamp1 = System.currentTimeMillis() - 10; 460 long timestamp2 = System.currentTimeMillis(); 461 462 List<TColumnValue> columnValues = new ArrayList<>(); 463 TColumnValue columnValueA = 464 new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)); 465 columnValueA.setTimestamp(timestamp1); 466 columnValues.add(columnValueA); 467 TPut put = new TPut(wrap(rowName), columnValues); 468 469 put.setColumnValues(columnValues); 470 471 handler.put(table, put); 472 columnValueA.setTimestamp(timestamp2); 473 handler.put(table, put); 474 475 TGet get = new TGet(wrap(rowName)); 476 get.setMaxVersions(2); 477 TResult result = handler.get(table, get); 478 assertEquals(2, result.getColumnValuesSize()); 479 480 TDelete delete = new TDelete(wrap(rowName)); 481 List<TColumn> deleteColumns = new ArrayList<>(); 482 TColumn deleteColumn = new TColumn(wrap(familyAname)); 483 deleteColumns.add(deleteColumn); 484 delete.setColumns(deleteColumns); 485 delete.setDeleteType(TDeleteType.DELETE_FAMILY); 486 487 handler.deleteSingle(table, delete); 488 489 get = new TGet(wrap(rowName)); 490 result = handler.get(table, get); 491 assertArrayEquals(null, result.getRow()); 492 assertEquals(0, result.getColumnValuesSize()); 493 } 494 495 @Test 496 public void testDeleteFamilyVersion() throws Exception { 497 ThriftHBaseServiceHandler handler = createHandler(); 498 byte[] rowName = Bytes.toBytes("testDeleteFamilyVersion"); 499 ByteBuffer table = wrap(tableAname); 500 501 long timestamp1 = System.currentTimeMillis() - 10; 502 long timestamp2 = System.currentTimeMillis(); 503 504 List<TColumnValue> columnValues = new ArrayList<>(); 505 TColumnValue columnValueA = 506 new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)); 507 columnValueA.setTimestamp(timestamp1); 508 columnValues.add(columnValueA); 509 TPut put = new TPut(wrap(rowName), columnValues); 510 511 put.setColumnValues(columnValues); 512 513 handler.put(table, put); 514 columnValueA.setTimestamp(timestamp2); 515 handler.put(table, put); 516 517 TGet get = new TGet(wrap(rowName)); 518 get.setMaxVersions(2); 519 TResult result = handler.get(table, get); 520 assertEquals(2, result.getColumnValuesSize()); 521 522 TDelete delete = new TDelete(wrap(rowName)); 523 List<TColumn> deleteColumns = new ArrayList<>(); 524 TColumn deleteColumn = new TColumn(wrap(familyAname)); 525 deleteColumn.setTimestamp(timestamp1); 526 deleteColumns.add(deleteColumn); 527 delete.setColumns(deleteColumns); 528 delete.setDeleteType(TDeleteType.DELETE_FAMILY_VERSION); 529 530 handler.deleteSingle(table, delete); 531 532 get = new TGet(wrap(rowName)); 533 result = handler.get(table, get); 534 assertArrayEquals(rowName, result.getRow()); 535 assertEquals(1, result.getColumnValuesSize()); 536 assertEquals(timestamp2, result.getColumnValues().get(0).getTimestamp()); 537 } 538 539 @Test 540 public void testIncrement() throws Exception { 541 ThriftHBaseServiceHandler handler = createHandler(); 542 byte[] rowName = Bytes.toBytes("testIncrement"); 543 ByteBuffer table = wrap(tableAname); 544 545 List<TColumnValue> columnValues = new ArrayList<>(1); 546 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), 547 wrap(Bytes.toBytes(1L)))); 548 TPut put = new TPut(wrap(rowName), columnValues); 549 put.setColumnValues(columnValues); 550 handler.put(table, put); 551 552 List<TColumnIncrement> incrementColumns = new ArrayList<>(1); 553 incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname))); 554 TIncrement increment = new TIncrement(wrap(rowName), incrementColumns); 555 handler.increment(table, increment); 556 557 TGet get = new TGet(wrap(rowName)); 558 TResult result = handler.get(table, get); 559 560 assertArrayEquals(rowName, result.getRow()); 561 assertEquals(1, result.getColumnValuesSize()); 562 TColumnValue columnValue = result.getColumnValues().get(0); 563 assertArrayEquals(Bytes.toBytes(2L), columnValue.getValue()); 564 } 565 566 @Test 567 public void testAppend() throws Exception { 568 ThriftHBaseServiceHandler handler = createHandler(); 569 byte[] rowName = Bytes.toBytes("testAppend"); 570 ByteBuffer table = wrap(tableAname); 571 byte[] v1 = Bytes.toBytes("42"); 572 byte[] v2 = Bytes.toBytes("23"); 573 List<TColumnValue> columnValues = new ArrayList<>(1); 574 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(v1))); 575 TPut put = new TPut(wrap(rowName), columnValues); 576 put.setColumnValues(columnValues); 577 handler.put(table, put); 578 579 List<TColumnValue> appendColumns = new ArrayList<>(1); 580 appendColumns.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(v2))); 581 TAppend append = new TAppend(wrap(rowName), appendColumns); 582 handler.append(table, append); 583 584 TGet get = new TGet(wrap(rowName)); 585 TResult result = handler.get(table, get); 586 587 assertArrayEquals(rowName, result.getRow()); 588 assertEquals(1, result.getColumnValuesSize()); 589 TColumnValue columnValue = result.getColumnValues().get(0); 590 assertArrayEquals(Bytes.add(v1, v2), columnValue.getValue()); 591 } 592 593 /** 594 * check that checkAndPut fails if the cell does not exist, then put in the cell, then check 595 * that the checkAndPut succeeds. 596 */ 597 @Test 598 public void testCheckAndPut() throws Exception { 599 ThriftHBaseServiceHandler handler = createHandler(); 600 byte[] rowName = Bytes.toBytes("testCheckAndPut"); 601 ByteBuffer table = wrap(tableAname); 602 603 List<TColumnValue> columnValuesA = new ArrayList<>(1); 604 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 605 wrap(valueAname)); 606 columnValuesA.add(columnValueA); 607 TPut putA = new TPut(wrap(rowName), columnValuesA); 608 putA.setColumnValues(columnValuesA); 609 610 List<TColumnValue> columnValuesB = new ArrayList<>(1); 611 TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname), 612 wrap(valueBname)); 613 columnValuesB.add(columnValueB); 614 TPut putB = new TPut(wrap(rowName), columnValuesB); 615 putB.setColumnValues(columnValuesB); 616 617 assertFalse(handler.checkAndPut(table, wrap(rowName), wrap(familyAname), 618 wrap(qualifierAname), wrap(valueAname), putB)); 619 620 TGet get = new TGet(wrap(rowName)); 621 TResult result = handler.get(table, get); 622 assertEquals(0, result.getColumnValuesSize()); 623 624 handler.put(table, putA); 625 626 assertTrue(handler.checkAndPut(table, wrap(rowName), wrap(familyAname), 627 wrap(qualifierAname), wrap(valueAname), putB)); 628 629 result = handler.get(table, get); 630 assertArrayEquals(rowName, result.getRow()); 631 List<TColumnValue> returnedColumnValues = result.getColumnValues(); 632 List<TColumnValue> expectedColumnValues = new ArrayList<>(2); 633 expectedColumnValues.add(columnValueA); 634 expectedColumnValues.add(columnValueB); 635 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 636 } 637 638 /** 639 * check that checkAndDelete fails if the cell does not exist, then put in the cell, then 640 * check that the checkAndDelete succeeds. 641 */ 642 @Test 643 public void testCheckAndDelete() throws Exception { 644 ThriftHBaseServiceHandler handler = createHandler(); 645 byte[] rowName = Bytes.toBytes("testCheckAndDelete"); 646 ByteBuffer table = wrap(tableAname); 647 648 List<TColumnValue> columnValuesA = new ArrayList<>(1); 649 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 650 wrap(valueAname)); 651 columnValuesA.add(columnValueA); 652 TPut putA = new TPut(wrap(rowName), columnValuesA); 653 putA.setColumnValues(columnValuesA); 654 655 List<TColumnValue> columnValuesB = new ArrayList<>(1); 656 TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname), 657 wrap(valueBname)); 658 columnValuesB.add(columnValueB); 659 TPut putB = new TPut(wrap(rowName), columnValuesB); 660 putB.setColumnValues(columnValuesB); 661 662 // put putB so that we know whether the row has been deleted or not 663 handler.put(table, putB); 664 665 TDelete delete = new TDelete(wrap(rowName)); 666 667 assertFalse(handler.checkAndDelete(table, wrap(rowName), wrap(familyAname), 668 wrap(qualifierAname), wrap(valueAname), delete)); 669 670 TGet get = new TGet(wrap(rowName)); 671 TResult result = handler.get(table, get); 672 assertArrayEquals(rowName, result.getRow()); 673 assertTColumnValuesEqual(columnValuesB, result.getColumnValues()); 674 675 handler.put(table, putA); 676 677 assertTrue(handler.checkAndDelete(table, wrap(rowName), wrap(familyAname), 678 wrap(qualifierAname), wrap(valueAname), delete)); 679 680 result = handler.get(table, get); 681 assertFalse(result.isSetRow()); 682 assertEquals(0, result.getColumnValuesSize()); 683 } 684 685 @Test 686 public void testScan() throws Exception { 687 ThriftHBaseServiceHandler handler = createHandler(); 688 ByteBuffer table = wrap(tableAname); 689 690 // insert data 691 TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 692 wrap(valueAname)); 693 List<TColumnValue> columnValues = new ArrayList<>(1); 694 columnValues.add(columnValue); 695 for (int i = 0; i < 10; i++) { 696 TPut put = new TPut(wrap(Bytes.toBytes("testScan" + i)), columnValues); 697 handler.put(table, put); 698 } 699 700 // create scan instance 701 TScan scan = new TScan(); 702 List<TColumn> columns = new ArrayList<>(1); 703 TColumn column = new TColumn(); 704 column.setFamily(familyAname); 705 column.setQualifier(qualifierAname); 706 columns.add(column); 707 scan.setColumns(columns); 708 scan.setStartRow(Bytes.toBytes("testScan")); 709 scan.setStopRow(Bytes.toBytes("testScan\uffff")); 710 711 // get scanner and rows 712 int scanId = handler.openScanner(table, scan); 713 List<TResult> results = handler.getScannerRows(scanId, 10); 714 assertEquals(10, results.size()); 715 for (int i = 0; i < 10; i++) { 716 // check if the rows are returned and in order 717 assertArrayEquals(Bytes.toBytes("testScan" + i), results.get(i).getRow()); 718 } 719 720 // check that we are at the end of the scan 721 results = handler.getScannerRows(scanId, 10); 722 assertEquals(0, results.size()); 723 724 // close scanner and check that it was indeed closed 725 handler.closeScanner(scanId); 726 try { 727 handler.getScannerRows(scanId, 10); 728 fail("Scanner id should be invalid"); 729 } catch (TIllegalArgument e) { 730 } 731 } 732 733 /** 734 * Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow() 735 * should reset the ConnectionCache timeout for the scanner's connection 736 */ 737 @Test 738 public void testLongLivedScan() throws Exception { 739 int numTrials = 6; 740 int trialPause = 1000; 741 int cleanUpInterval = 100; 742 Configuration conf = new Configuration(UTIL.getConfiguration()); 743 // Set the ConnectionCache timeout to trigger halfway through the trials 744 conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause); 745 conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval); 746 ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf, 747 UserProvider.instantiate(conf)); 748 749 ByteBuffer table = wrap(tableAname); 750 // insert data 751 TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 752 wrap(valueAname)); 753 List<TColumnValue> columnValues = new ArrayList<>(1); 754 columnValues.add(columnValue); 755 for (int i = 0; i < numTrials; i++) { 756 TPut put = new TPut(wrap(Bytes.toBytes("testScan" + i)), columnValues); 757 handler.put(table, put); 758 } 759 760 // create scan instance 761 TScan scan = new TScan(); 762 List<TColumn> columns = new ArrayList<>(1); 763 TColumn column = new TColumn(); 764 column.setFamily(familyAname); 765 column.setQualifier(qualifierAname); 766 columns.add(column); 767 scan.setColumns(columns); 768 scan.setStartRow(Bytes.toBytes("testScan")); 769 scan.setStopRow(Bytes.toBytes("testScan\uffff")); 770 // Prevent the scanner from caching results 771 scan.setCaching(1); 772 773 // get scanner and rows 774 int scanId = handler.openScanner(table, scan); 775 for (int i = 0; i < numTrials; i++) { 776 // Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout 777 List<TResult> results = handler.getScannerRows(scanId, 1); 778 assertArrayEquals(Bytes.toBytes("testScan" + i), results.get(0).getRow()); 779 Thread.sleep(trialPause); 780 } 781 } 782 783 @Test 784 public void testReverseScan() throws Exception { 785 ThriftHBaseServiceHandler handler = createHandler(); 786 ByteBuffer table = wrap(tableAname); 787 788 // insert data 789 TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 790 wrap(valueAname)); 791 List<TColumnValue> columnValues = new ArrayList<>(1); 792 columnValues.add(columnValue); 793 for (int i = 0; i < 10; i++) { 794 TPut put = new TPut(wrap(Bytes.toBytes("testReverseScan" + i)), columnValues); 795 handler.put(table, put); 796 } 797 798 // create reverse scan instance 799 TScan scan = new TScan(); 800 scan.setReversed(true); 801 List<TColumn> columns = new ArrayList<>(1); 802 TColumn column = new TColumn(); 803 column.setFamily(familyAname); 804 column.setQualifier(qualifierAname); 805 columns.add(column); 806 scan.setColumns(columns); 807 scan.setStartRow(Bytes.toBytes("testReverseScan\uffff")); 808 scan.setStopRow(Bytes.toBytes("testReverseScan")); 809 810 // get scanner and rows 811 int scanId = handler.openScanner(table, scan); 812 List<TResult> results = handler.getScannerRows(scanId, 10); 813 assertEquals(10, results.size()); 814 for (int i = 0; i < 10; i++) { 815 // check if the rows are returned and in order 816 assertArrayEquals(Bytes.toBytes("testReverseScan" + (9 - i)), results.get(i).getRow()); 817 } 818 819 // check that we are at the end of the scan 820 results = handler.getScannerRows(scanId, 10); 821 assertEquals(0, results.size()); 822 823 // close scanner and check that it was indeed closed 824 handler.closeScanner(scanId); 825 try { 826 handler.getScannerRows(scanId, 10); 827 fail("Scanner id should be invalid"); 828 } catch (TIllegalArgument e) { 829 } 830 } 831 832 @Test 833 public void testScanWithFilter() throws Exception { 834 ThriftHBaseServiceHandler handler = createHandler(); 835 ByteBuffer table = wrap(tableAname); 836 837 // insert data 838 TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 839 wrap(valueAname)); 840 List<TColumnValue> columnValues = new ArrayList<>(1); 841 columnValues.add(columnValue); 842 for (int i = 0; i < 10; i++) { 843 TPut put = new TPut(wrap(Bytes.toBytes("testScanWithFilter" + i)), columnValues); 844 handler.put(table, put); 845 } 846 847 // create scan instance with filter 848 TScan scan = new TScan(); 849 List<TColumn> columns = new ArrayList<>(1); 850 TColumn column = new TColumn(); 851 column.setFamily(familyAname); 852 column.setQualifier(qualifierAname); 853 columns.add(column); 854 scan.setColumns(columns); 855 scan.setStartRow(Bytes.toBytes("testScanWithFilter")); 856 scan.setStopRow(Bytes.toBytes("testScanWithFilter\uffff")); 857 // only get the key part 858 scan.setFilterString(wrap(Bytes.toBytes("KeyOnlyFilter()"))); 859 860 // get scanner and rows 861 int scanId = handler.openScanner(table, scan); 862 List<TResult> results = handler.getScannerRows(scanId, 10); 863 assertEquals(10, results.size()); 864 for (int i = 0; i < 10; i++) { 865 // check if the rows are returned and in order 866 assertArrayEquals(Bytes.toBytes("testScanWithFilter" + i), results.get(i).getRow()); 867 // check that the value is indeed stripped by the filter 868 assertEquals(0, results.get(i).getColumnValues().get(0).getValue().length); 869 } 870 871 // check that we are at the end of the scan 872 results = handler.getScannerRows(scanId, 10); 873 assertEquals(0, results.size()); 874 875 // close scanner and check that it was indeed closed 876 handler.closeScanner(scanId); 877 try { 878 handler.getScannerRows(scanId, 10); 879 fail("Scanner id should be invalid"); 880 } catch (TIllegalArgument e) { 881 } 882 } 883 884 @Test 885 public void testScanWithColumnFamilyTimeRange() throws Exception { 886 ThriftHBaseServiceHandler handler = createHandler(); 887 ByteBuffer table = wrap(tableAname); 888 889 // insert data 890 TColumnValue familyAColumnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 891 wrap(valueAname)); 892 TColumnValue familyBColumnValue = new TColumnValue(wrap(familyBname), wrap(qualifierBname), 893 wrap(valueBname)); 894 long minTimestamp = System.currentTimeMillis(); 895 for (int i = 0; i < 10; i++) { 896 familyAColumnValue.setTimestamp(minTimestamp + i); 897 familyBColumnValue.setTimestamp(minTimestamp + i); 898 List<TColumnValue> columnValues = new ArrayList<>(2); 899 columnValues.add(familyAColumnValue); 900 columnValues.add(familyBColumnValue); 901 TPut put = new TPut(wrap(Bytes.toBytes("testScanWithColumnFamilyTimeRange" + i)), 902 columnValues); 903 handler.put(table, put); 904 } 905 906 // create scan instance with column family time range 907 TScan scan = new TScan(); 908 Map<ByteBuffer,TTimeRange> colFamTimeRangeMap = new HashMap<>(2); 909 colFamTimeRangeMap.put(wrap(familyAname), new TTimeRange(minTimestamp + 3, minTimestamp + 5)); 910 colFamTimeRangeMap.put(wrap(familyBname), new TTimeRange(minTimestamp + 6, minTimestamp + 9)); 911 scan.setColFamTimeRangeMap(colFamTimeRangeMap); 912 913 // get scanner and rows 914 int scanId = handler.openScanner(table, scan); 915 List<TResult> results = handler.getScannerRows(scanId, 5); 916 assertEquals(5, results.size()); 917 int familyACount = 0; 918 int familyBCount = 0; 919 for (TResult result : results) { 920 List<TColumnValue> columnValues = result.getColumnValues(); 921 if (CollectionUtils.isNotEmpty(columnValues)) { 922 if (Bytes.equals(familyAname, columnValues.get(0).getFamily())) { 923 familyACount++; 924 } else if (Bytes.equals(familyBname, columnValues.get(0).getFamily())) { 925 familyBCount++; 926 } 927 } 928 } 929 assertEquals(2, familyACount); 930 assertEquals(3, familyBCount); 931 932 // check that we are at the end of the scan 933 results = handler.getScannerRows(scanId, 1); 934 assertEquals(0, results.size()); 935 936 // close scanner and check that it was indeed closed 937 handler.closeScanner(scanId); 938 try { 939 handler.getScannerRows(scanId, 1); 940 fail("Scanner id should be invalid"); 941 } catch (TIllegalArgument e) { 942 } 943 } 944 945 @Test 946 public void testSmallScan() throws Exception { 947 ThriftHBaseServiceHandler handler = createHandler(); 948 ByteBuffer table = wrap(tableAname); 949 950 // insert data 951 TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 952 wrap(valueAname)); 953 List<TColumnValue> columnValues = new ArrayList<>(); 954 columnValues.add(columnValue); 955 for (int i = 0; i < 10; i++) { 956 TPut put = new TPut(wrap(Bytes.toBytes("testSmallScan" + i)), columnValues); 957 handler.put(table, put); 958 } 959 960 // small scan instance 961 TScan scan = new TScan(); 962 scan.setStartRow(Bytes.toBytes("testSmallScan")); 963 scan.setStopRow(Bytes.toBytes("testSmallScan\uffff")); 964 scan.setReadType(TReadType.PREAD); 965 scan.setCaching(2); 966 967 // get scanner and rows 968 int scanId = handler.openScanner(table, scan); 969 List<TResult> results = handler.getScannerRows(scanId, 10); 970 assertEquals(10, results.size()); 971 for (int i = 0; i < 10; i++) { 972 // check if the rows are returned and in order 973 assertArrayEquals(Bytes.toBytes("testSmallScan" + i), results.get(i).getRow()); 974 } 975 976 // check that we are at the end of the scan 977 results = handler.getScannerRows(scanId, 10); 978 assertEquals(0, results.size()); 979 980 // close scanner and check that it was indeed closed 981 handler.closeScanner(scanId); 982 try { 983 handler.getScannerRows(scanId, 10); 984 fail("Scanner id should be invalid"); 985 } catch (TIllegalArgument e) { 986 } 987 } 988 989 @Test 990 public void testPutTTL() throws Exception { 991 ThriftHBaseServiceHandler handler = createHandler(); 992 byte[] rowName = Bytes.toBytes("testPutTTL"); 993 ByteBuffer table = wrap(tableAname); 994 List<TColumnValue> columnValues = new ArrayList<>(1); 995 996 // Add some dummy data 997 columnValues.add( 998 new TColumnValue( 999 wrap(familyAname), 1000 wrap(qualifierAname), 1001 wrap(Bytes.toBytes(1L)))); 1002 1003 1004 TPut put = new TPut(wrap(rowName), columnValues); 1005 put.setColumnValues(columnValues); 1006 1007 Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>(); 1008 1009 // Time in ms for the kv's to live. 1010 long ttlTimeMs = 2000L; 1011 1012 // the _ttl attribute is a number of ms ttl for key values in this put. 1013 attributes.put(wrap(Bytes.toBytes("_ttl")), wrap(Bytes.toBytes(ttlTimeMs))); 1014 // Attach the attributes 1015 put.setAttributes(attributes); 1016 // Send it. 1017 handler.put(table, put); 1018 1019 // Now get the data back 1020 TGet getOne = new TGet(wrap(rowName)); 1021 TResult resultOne = handler.get(table, getOne); 1022 1023 // It's there. 1024 assertArrayEquals(rowName, resultOne.getRow()); 1025 assertEquals(1, resultOne.getColumnValuesSize()); 1026 1027 // Sleep 30 seconds just to make 100% sure that the key value should be expired. 1028 Thread.sleep(ttlTimeMs * 15); 1029 1030 TGet getTwo = new TGet(wrap(rowName)); 1031 TResult resultTwo = handler.get(table, getTwo); 1032 1033 1034 // Nothing should be there since it's ttl'd out. 1035 assertNull(resultTwo.getRow()); 1036 assertEquals(0, resultTwo.getColumnValuesSize()); 1037 } 1038 1039 /** 1040 * Padding numbers to make comparison of sort order easier in a for loop 1041 * 1042 * @param n The number to pad. 1043 * @param pad The length to pad up to. 1044 * @return The padded number as a string. 1045 */ 1046 private String pad(int n, byte pad) { 1047 String res = Integer.toString(n); 1048 while (res.length() < pad) res = "0" + res; 1049 return res; 1050 } 1051 1052 @Test 1053 public void testScanWithBatchSize() throws Exception { 1054 ThriftHBaseServiceHandler handler = createHandler(); 1055 ByteBuffer table = wrap(tableAname); 1056 1057 // insert data 1058 List<TColumnValue> columnValues = new ArrayList<>(100); 1059 for (int i = 0; i < 100; i++) { 1060 String colNum = pad(i, (byte) 3); 1061 TColumnValue columnValue = new TColumnValue(wrap(familyAname), 1062 wrap(Bytes.toBytes("col" + colNum)), wrap(Bytes.toBytes("val" + colNum))); 1063 columnValues.add(columnValue); 1064 } 1065 TPut put = new TPut(wrap(Bytes.toBytes("testScanWithBatchSize")), columnValues); 1066 handler.put(table, put); 1067 1068 // create scan instance 1069 TScan scan = new TScan(); 1070 List<TColumn> columns = new ArrayList<>(1); 1071 TColumn column = new TColumn(); 1072 column.setFamily(familyAname); 1073 columns.add(column); 1074 scan.setColumns(columns); 1075 scan.setStartRow(Bytes.toBytes("testScanWithBatchSize")); 1076 scan.setStopRow(Bytes.toBytes("testScanWithBatchSize\uffff")); 1077 // set batch size to 10 columns per call 1078 scan.setBatchSize(10); 1079 1080 // get scanner 1081 int scanId = handler.openScanner(table, scan); 1082 List<TResult> results = null; 1083 for (int i = 0; i < 10; i++) { 1084 // get batch for single row (10x10 is what we expect) 1085 results = handler.getScannerRows(scanId, 1); 1086 assertEquals(1, results.size()); 1087 // check length of batch 1088 List<TColumnValue> cols = results.get(0).getColumnValues(); 1089 assertEquals(10, cols.size()); 1090 // check if the columns are returned and in order 1091 for (int y = 0; y < 10; y++) { 1092 int colNum = y + (10 * i); 1093 String colNumPad = pad(colNum, (byte) 3); 1094 assertArrayEquals(Bytes.toBytes("col" + colNumPad), cols.get(y).getQualifier()); 1095 } 1096 } 1097 1098 // check that we are at the end of the scan 1099 results = handler.getScannerRows(scanId, 1); 1100 assertEquals(0, results.size()); 1101 1102 // close scanner and check that it was indeed closed 1103 handler.closeScanner(scanId); 1104 try { 1105 handler.getScannerRows(scanId, 1); 1106 fail("Scanner id should be invalid"); 1107 } catch (TIllegalArgument e) { 1108 } 1109 } 1110 1111 @Test 1112 public void testGetScannerResults() throws Exception { 1113 ThriftHBaseServiceHandler handler = createHandler(); 1114 ByteBuffer table = wrap(tableAname); 1115 1116 // insert data 1117 TColumnValue columnValue = 1118 new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)); 1119 List<TColumnValue> columnValues = new ArrayList<>(1); 1120 columnValues.add(columnValue); 1121 for (int i = 0; i < 20; i++) { 1122 TPut put = 1123 new TPut(wrap(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2))), columnValues); 1124 handler.put(table, put); 1125 } 1126 1127 // create scan instance 1128 TScan scan = new TScan(); 1129 List<TColumn> columns = new ArrayList<>(1); 1130 TColumn column = new TColumn(); 1131 column.setFamily(familyAname); 1132 column.setQualifier(qualifierAname); 1133 columns.add(column); 1134 scan.setColumns(columns); 1135 scan.setStartRow(Bytes.toBytes("testGetScannerResults")); 1136 1137 // get 5 rows and check the returned results 1138 scan.setStopRow(Bytes.toBytes("testGetScannerResults05")); 1139 List<TResult> results = handler.getScannerResults(table, scan, 5); 1140 assertEquals(5, results.size()); 1141 for (int i = 0; i < 5; i++) { 1142 // check if the rows are returned and in order 1143 assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i) 1144 .getRow()); 1145 } 1146 1147 // get 10 rows and check the returned results 1148 scan.setStopRow(Bytes.toBytes("testGetScannerResults10")); 1149 results = handler.getScannerResults(table, scan, 10); 1150 assertEquals(10, results.size()); 1151 for (int i = 0; i < 10; i++) { 1152 // check if the rows are returned and in order 1153 assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i) 1154 .getRow()); 1155 } 1156 1157 // get 20 rows and check the returned results 1158 scan.setStopRow(Bytes.toBytes("testGetScannerResults20")); 1159 results = handler.getScannerResults(table, scan, 20); 1160 assertEquals(20, results.size()); 1161 for (int i = 0; i < 20; i++) { 1162 // check if the rows are returned and in order 1163 assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(i, (byte) 2)), results.get(i) 1164 .getRow()); 1165 } 1166 1167 // reverse scan 1168 scan = new TScan(); 1169 scan.setColumns(columns); 1170 scan.setReversed(true); 1171 scan.setStartRow(Bytes.toBytes("testGetScannerResults20")); 1172 scan.setStopRow(Bytes.toBytes("testGetScannerResults")); 1173 results = handler.getScannerResults(table, scan, 20); 1174 assertEquals(20, results.size()); 1175 for (int i = 0; i < 20; i++) { 1176 // check if the rows are returned and in order 1177 assertArrayEquals(Bytes.toBytes("testGetScannerResults" + pad(19 - i, (byte) 2)), 1178 results.get(i).getRow()); 1179 } 1180 } 1181 1182 @Test 1183 public void testFilterRegistration() throws Exception { 1184 Configuration conf = UTIL.getConfiguration(); 1185 conf.set("hbase.thrift.filters", "MyFilter:filterclass"); 1186 ThriftServer.registerFilters(conf); 1187 Map<String, String> registeredFilters = ParseFilter.getAllFilters(); 1188 assertEquals("filterclass", registeredFilters.get("MyFilter")); 1189 } 1190 1191 @Test 1192 public void testMetrics() throws Exception { 1193 Configuration conf = UTIL.getConfiguration(); 1194 ThriftMetrics metrics = getMetrics(conf); 1195 ThriftHBaseServiceHandler hbaseHandler = createHandler(); 1196 THBaseService.Iface handler = 1197 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); 1198 byte[] rowName = Bytes.toBytes("testMetrics"); 1199 ByteBuffer table = wrap(tableAname); 1200 1201 TGet get = new TGet(wrap(rowName)); 1202 assertFalse(handler.exists(table, get)); 1203 1204 List<TColumnValue> columnValues = new ArrayList<>(2); 1205 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 1206 columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname))); 1207 TPut put = new TPut(wrap(rowName), columnValues); 1208 put.setColumnValues(columnValues); 1209 1210 handler.put(table, put); 1211 1212 assertTrue(handler.exists(table, get)); 1213 metricsHelper.assertCounter("put_num_ops", 1, metrics.getSource()); 1214 metricsHelper.assertCounter( "exists_num_ops", 2, metrics.getSource()); 1215 } 1216 1217 private static ThriftMetrics getMetrics(Configuration conf) throws Exception { 1218 ThriftMetrics m = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); 1219 m.getSource().init(); //Clear all the metrics 1220 return m; 1221 } 1222 1223 @Test 1224 public void testMetricsWithException() throws Exception { 1225 byte[] rowkey = Bytes.toBytes("row1"); 1226 byte[] family = Bytes.toBytes("f"); 1227 byte[] col = Bytes.toBytes("c"); 1228 // create a table which will throw exceptions for requests 1229 TableName tableName = TableName.valueOf(name.getMethodName()); 1230 HTableDescriptor tableDesc = new HTableDescriptor(tableName); 1231 tableDesc.addCoprocessor(ErrorThrowingGetObserver.class.getName()); 1232 tableDesc.addFamily(new HColumnDescriptor(family)); 1233 1234 Table table = UTIL.createTable(tableDesc, null); 1235 table.put(new Put(rowkey).addColumn(family, col, Bytes.toBytes("val1"))); 1236 1237 ThriftHBaseServiceHandler hbaseHandler = createHandler(); 1238 ThriftMetrics metrics = getMetrics(UTIL.getConfiguration()); 1239 THBaseService.Iface handler = 1240 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); 1241 ByteBuffer tTableName = wrap(tableName.getName()); 1242 1243 // check metrics increment with a successful get 1244 long preGetCounter = metricsHelper.checkCounterExists("get_num_ops", metrics.getSource()) ? 1245 metricsHelper.getCounter("get_num_ops", metrics.getSource()) : 1246 0; 1247 TGet tGet = new TGet(wrap(rowkey)); 1248 TResult tResult = handler.get(tTableName, tGet); 1249 1250 List<TColumnValue> expectedColumnValues = Lists.newArrayList( 1251 new TColumnValue(wrap(family), wrap(col), wrap(Bytes.toBytes("val1"))) 1252 ); 1253 assertArrayEquals(rowkey, tResult.getRow()); 1254 List<TColumnValue> returnedColumnValues = tResult.getColumnValues(); 1255 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 1256 1257 metricsHelper.assertCounter("get_num_ops", preGetCounter + 1, metrics.getSource()); 1258 1259 // check metrics increment when the get throws each exception type 1260 for (ErrorThrowingGetObserver.ErrorType type : ErrorThrowingGetObserver.ErrorType.values()) { 1261 testExceptionType(handler, metrics, tTableName, rowkey, type); 1262 } 1263 } 1264 1265 private void testExceptionType(THBaseService.Iface handler, ThriftMetrics metrics, 1266 ByteBuffer tTableName, byte[] rowkey, ErrorThrowingGetObserver.ErrorType errorType) { 1267 long preGetCounter = metricsHelper.getCounter("get_num_ops", metrics.getSource()); 1268 String exceptionKey = errorType.getMetricName(); 1269 long preExceptionCounter = metricsHelper.checkCounterExists(exceptionKey, metrics.getSource()) ? 1270 metricsHelper.getCounter(exceptionKey, metrics.getSource()) : 1271 0; 1272 TGet tGet = new TGet(wrap(rowkey)); 1273 Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>(); 1274 attributes.put(wrap(Bytes.toBytes(ErrorThrowingGetObserver.SHOULD_ERROR_ATTRIBUTE)), 1275 wrap(Bytes.toBytes(errorType.name()))); 1276 tGet.setAttributes(attributes); 1277 try { 1278 TResult tResult = handler.get(tTableName, tGet); 1279 fail("Get with error attribute should have thrown an exception"); 1280 } catch (TException e) { 1281 LOG.info("Received exception: ", e); 1282 metricsHelper.assertCounter("get_num_ops", preGetCounter + 1, metrics.getSource()); 1283 metricsHelper.assertCounter(exceptionKey, preExceptionCounter + 1, metrics.getSource()); 1284 } 1285 1286 } 1287 1288 /** 1289 * See HBASE-17611 1290 * 1291 * Latency metrics were capped at ~ 2 seconds due to the use of an int variable to capture the 1292 * duration. 1293 */ 1294 @Test 1295 public void testMetricsPrecision() throws Exception { 1296 byte[] rowkey = Bytes.toBytes("row1"); 1297 byte[] family = Bytes.toBytes("f"); 1298 byte[] col = Bytes.toBytes("c"); 1299 // create a table which will throw exceptions for requests 1300 TableName tableName = TableName.valueOf("testMetricsPrecision"); 1301 HTableDescriptor tableDesc = new HTableDescriptor(tableName); 1302 tableDesc.addCoprocessor(DelayingRegionObserver.class.getName()); 1303 tableDesc.addFamily(new HColumnDescriptor(family)); 1304 1305 Table table = null; 1306 try { 1307 table = UTIL.createTable(tableDesc, null); 1308 1309 table.put(new Put(rowkey).addColumn(family, col, Bytes.toBytes("val1"))); 1310 1311 ThriftHBaseServiceHandler hbaseHandler = createHandler(); 1312 ThriftMetrics metrics = getMetrics(UTIL.getConfiguration()); 1313 THBaseService.Iface handler = 1314 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); 1315 ByteBuffer tTableName = wrap(tableName.getName()); 1316 1317 // check metrics latency with a successful get 1318 TGet tGet = new TGet(wrap(rowkey)); 1319 TResult tResult = handler.get(tTableName, tGet); 1320 1321 List<TColumnValue> expectedColumnValues = Lists.newArrayList( 1322 new TColumnValue(wrap(family), wrap(col), wrap(Bytes.toBytes("val1"))) 1323 ); 1324 assertArrayEquals(rowkey, tResult.getRow()); 1325 List<TColumnValue> returnedColumnValues = tResult.getColumnValues(); 1326 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 1327 1328 metricsHelper.assertGaugeGt("get_max", 3000L, metrics.getSource()); 1329 } finally { 1330 if (table != null) { 1331 try { 1332 table.close(); 1333 } catch (IOException ignored) { 1334 } 1335 UTIL.deleteTable(tableName); 1336 } 1337 } 1338 } 1339 1340 1341 @Test 1342 public void testAttribute() throws Exception { 1343 byte[] rowName = Bytes.toBytes("testAttribute"); 1344 byte[] attributeKey = Bytes.toBytes("attribute1"); 1345 byte[] attributeValue = Bytes.toBytes("value1"); 1346 Map<ByteBuffer, ByteBuffer> attributes = new HashMap<>(); 1347 attributes.put(wrap(attributeKey), wrap(attributeValue)); 1348 1349 TGet tGet = new TGet(wrap(rowName)); 1350 tGet.setAttributes(attributes); 1351 Get get = getFromThrift(tGet); 1352 assertArrayEquals(get.getAttribute("attribute1"), attributeValue); 1353 1354 List<TColumnValue> columnValues = new ArrayList<>(1); 1355 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 1356 TPut tPut = new TPut(wrap(rowName) , columnValues); 1357 tPut.setAttributes(attributes); 1358 Put put = putFromThrift(tPut); 1359 assertArrayEquals(put.getAttribute("attribute1"), attributeValue); 1360 1361 TScan tScan = new TScan(); 1362 tScan.setAttributes(attributes); 1363 Scan scan = scanFromThrift(tScan); 1364 assertArrayEquals(scan.getAttribute("attribute1"), attributeValue); 1365 1366 List<TColumnIncrement> incrementColumns = new ArrayList<>(1); 1367 incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname))); 1368 TIncrement tIncrement = new TIncrement(wrap(rowName), incrementColumns); 1369 tIncrement.setAttributes(attributes); 1370 Increment increment = incrementFromThrift(tIncrement); 1371 assertArrayEquals(increment.getAttribute("attribute1"), attributeValue); 1372 1373 TDelete tDelete = new TDelete(wrap(rowName)); 1374 tDelete.setAttributes(attributes); 1375 Delete delete = deleteFromThrift(tDelete); 1376 assertArrayEquals(delete.getAttribute("attribute1"), attributeValue); 1377 } 1378 1379 /** 1380 * Put valueA to a row, make sure put has happened, then create a mutation object to put valueB 1381 * and delete ValueA, then check that the row value is only valueB. 1382 */ 1383 @Test 1384 public void testMutateRow() throws Exception { 1385 ThriftHBaseServiceHandler handler = createHandler(); 1386 byte[] rowName = Bytes.toBytes("testMutateRow"); 1387 ByteBuffer table = wrap(tableAname); 1388 1389 List<TColumnValue> columnValuesA = new ArrayList<>(1); 1390 TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname), 1391 wrap(valueAname)); 1392 columnValuesA.add(columnValueA); 1393 TPut putA = new TPut(wrap(rowName), columnValuesA); 1394 putA.setColumnValues(columnValuesA); 1395 1396 handler.put(table,putA); 1397 1398 TGet get = new TGet(wrap(rowName)); 1399 TResult result = handler.get(table, get); 1400 assertArrayEquals(rowName, result.getRow()); 1401 List<TColumnValue> returnedColumnValues = result.getColumnValues(); 1402 1403 List<TColumnValue> expectedColumnValues = new ArrayList<>(1); 1404 expectedColumnValues.add(columnValueA); 1405 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 1406 1407 List<TColumnValue> columnValuesB = new ArrayList<>(1); 1408 TColumnValue columnValueB = new TColumnValue(wrap(familyAname), wrap(qualifierBname), 1409 wrap(valueBname)); 1410 columnValuesB.add(columnValueB); 1411 TPut putB = new TPut(wrap(rowName), columnValuesB); 1412 putB.setColumnValues(columnValuesB); 1413 1414 TDelete delete = new TDelete(wrap(rowName)); 1415 List<TColumn> deleteColumns = new ArrayList<>(1); 1416 TColumn deleteColumn = new TColumn(wrap(familyAname)); 1417 deleteColumn.setQualifier(qualifierAname); 1418 deleteColumns.add(deleteColumn); 1419 delete.setColumns(deleteColumns); 1420 1421 List<TMutation> mutations = new ArrayList<>(2); 1422 TMutation mutationA = TMutation.put(putB); 1423 mutations.add(mutationA); 1424 1425 TMutation mutationB = TMutation.deleteSingle(delete); 1426 mutations.add(mutationB); 1427 1428 TRowMutations tRowMutations = new TRowMutations(wrap(rowName),mutations); 1429 handler.mutateRow(table,tRowMutations); 1430 1431 result = handler.get(table, get); 1432 assertArrayEquals(rowName, result.getRow()); 1433 returnedColumnValues = result.getColumnValues(); 1434 1435 expectedColumnValues = new ArrayList<>(1); 1436 expectedColumnValues.add(columnValueB); 1437 assertTColumnValuesEqual(expectedColumnValues, returnedColumnValues); 1438 } 1439 1440 /** 1441 * Create TPut, TDelete , TIncrement objects, set durability then call ThriftUtility 1442 * functions to get Put , Delete and Increment respectively. Use getDurability to make sure 1443 * the returned objects have the appropriate durability setting. 1444 */ 1445 @Test 1446 public void testDurability() throws Exception { 1447 byte[] rowName = Bytes.toBytes("testDurability"); 1448 List<TColumnValue> columnValues = new ArrayList<>(1); 1449 columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname))); 1450 1451 List<TColumnIncrement> incrementColumns = new ArrayList<>(1); 1452 incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname))); 1453 1454 TDelete tDelete = new TDelete(wrap(rowName)); 1455 tDelete.setDurability(TDurability.SKIP_WAL); 1456 Delete delete = deleteFromThrift(tDelete); 1457 assertEquals(Durability.SKIP_WAL, delete.getDurability()); 1458 1459 tDelete.setDurability(TDurability.ASYNC_WAL); 1460 delete = deleteFromThrift(tDelete); 1461 assertEquals(Durability.ASYNC_WAL, delete.getDurability()); 1462 1463 tDelete.setDurability(TDurability.SYNC_WAL); 1464 delete = deleteFromThrift(tDelete); 1465 assertEquals(Durability.SYNC_WAL, delete.getDurability()); 1466 1467 tDelete.setDurability(TDurability.FSYNC_WAL); 1468 delete = deleteFromThrift(tDelete); 1469 assertEquals(Durability.FSYNC_WAL, delete.getDurability()); 1470 1471 TPut tPut = new TPut(wrap(rowName), columnValues); 1472 tPut.setDurability(TDurability.SKIP_WAL); 1473 Put put = putFromThrift(tPut); 1474 assertEquals(Durability.SKIP_WAL, put.getDurability()); 1475 1476 tPut.setDurability(TDurability.ASYNC_WAL); 1477 put = putFromThrift(tPut); 1478 assertEquals(Durability.ASYNC_WAL, put.getDurability()); 1479 1480 tPut.setDurability(TDurability.SYNC_WAL); 1481 put = putFromThrift(tPut); 1482 assertEquals(Durability.SYNC_WAL, put.getDurability()); 1483 1484 tPut.setDurability(TDurability.FSYNC_WAL); 1485 put = putFromThrift(tPut); 1486 assertEquals(Durability.FSYNC_WAL, put.getDurability()); 1487 1488 TIncrement tIncrement = new TIncrement(wrap(rowName), incrementColumns); 1489 1490 tIncrement.setDurability(TDurability.SKIP_WAL); 1491 Increment increment = incrementFromThrift(tIncrement); 1492 assertEquals(Durability.SKIP_WAL, increment.getDurability()); 1493 1494 tIncrement.setDurability(TDurability.ASYNC_WAL); 1495 increment = incrementFromThrift(tIncrement); 1496 assertEquals(Durability.ASYNC_WAL, increment.getDurability()); 1497 1498 tIncrement.setDurability(TDurability.SYNC_WAL); 1499 increment = incrementFromThrift(tIncrement); 1500 assertEquals(Durability.SYNC_WAL, increment.getDurability()); 1501 1502 tIncrement.setDurability(TDurability.FSYNC_WAL); 1503 increment = incrementFromThrift(tIncrement); 1504 assertEquals(Durability.FSYNC_WAL, increment.getDurability()); 1505 } 1506 1507 @Test 1508 public void testCheckAndMutate() throws Exception { 1509 ThriftHBaseServiceHandler handler = createHandler(); 1510 ByteBuffer table = wrap(tableAname); 1511 ByteBuffer row = wrap(Bytes.toBytes("row")); 1512 ByteBuffer family = wrap(familyAname); 1513 ByteBuffer qualifier = wrap(qualifierAname); 1514 ByteBuffer value = wrap(valueAname); 1515 1516 // Create a mutation to write to 'B', our "mutate" of "checkAndMutate" 1517 List<TColumnValue> columnValuesB = new ArrayList<>(1); 1518 TColumnValue columnValueB = new TColumnValue(family, wrap(qualifierBname), wrap(valueBname)); 1519 columnValuesB.add(columnValueB); 1520 TPut putB = new TPut(row, columnValuesB); 1521 putB.setColumnValues(columnValuesB); 1522 1523 TRowMutations tRowMutations = new TRowMutations(row, 1524 Arrays.<TMutation> asList(TMutation.put(putB))); 1525 1526 // Empty table when we begin 1527 TResult result = handler.get(table, new TGet(row)); 1528 assertEquals(0, result.getColumnValuesSize()); 1529 1530 // checkAndMutate -- condition should fail because the value doesn't exist. 1531 assertFalse("Expected condition to not pass", 1532 handler.checkAndMutate(table, row, family, qualifier, TCompareOp.EQUAL, value, 1533 tRowMutations)); 1534 1535 List<TColumnValue> columnValuesA = new ArrayList<>(1); 1536 TColumnValue columnValueA = new TColumnValue(family, qualifier, value); 1537 columnValuesA.add(columnValueA); 1538 1539 // Put an update 'A' 1540 handler.put(table, new TPut(row, columnValuesA)); 1541 1542 // Verify that the update is there 1543 result = handler.get(table, new TGet(row)); 1544 assertEquals(1, result.getColumnValuesSize()); 1545 assertTColumnValueEqual(columnValueA, result.getColumnValues().get(0)); 1546 1547 // checkAndMutate -- condition should pass since we added the value 1548 assertTrue("Expected condition to pass", 1549 handler.checkAndMutate(table, row, family, qualifier, TCompareOp.EQUAL, value, 1550 tRowMutations)); 1551 1552 result = handler.get(table, new TGet(row)); 1553 assertEquals(2, result.getColumnValuesSize()); 1554 assertTColumnValueEqual(columnValueA, result.getColumnValues().get(0)); 1555 assertTColumnValueEqual(columnValueB, result.getColumnValues().get(1)); 1556 } 1557 1558 public static class DelayingRegionObserver implements RegionCoprocessor, RegionObserver { 1559 private static final Logger LOG = LoggerFactory.getLogger(DelayingRegionObserver.class); 1560 // sleep time in msec 1561 private long delayMillis; 1562 1563 @Override 1564 public Optional<RegionObserver> getRegionObserver() { 1565 return Optional.of(this); 1566 } 1567 1568 @Override 1569 public void start(CoprocessorEnvironment e) throws IOException { 1570 this.delayMillis = e.getConfiguration() 1571 .getLong("delayingregionobserver.delay", 3000); 1572 } 1573 1574 @Override 1575 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 1576 List<Cell> results) throws IOException { 1577 try { 1578 long start = System.currentTimeMillis(); 1579 TimeUnit.MILLISECONDS.sleep(delayMillis); 1580 if (LOG.isTraceEnabled()) { 1581 LOG.trace("Slept for " + (System.currentTimeMillis() - start) + " msec"); 1582 } 1583 } catch (InterruptedException ie) { 1584 throw new InterruptedIOException("Interrupted while sleeping"); 1585 } 1586 } 1587 } 1588} 1589