001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.Increment; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Row; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.TimeRange; 039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 040import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.AfterEach; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.BeforeEach; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051 052/** 053 * This test runs batch mutation with Increments which have custom TimeRange. Custom Observer 054 * records the TimeRange. We then verify that the recorded TimeRange has same bounds as the initial 055 * TimeRange. See HBASE-15698 056 */ 057@Tag(CoprocessorTests.TAG) 058@Tag(MediumTests.TAG) 059public class TestIncrementTimeRange { 060 061 private static final HBaseTestingUtil util = new HBaseTestingUtil(); 062 private static ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 063 064 private static final TableName TEST_TABLE = TableName.valueOf("test"); 065 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 066 067 private static final byte[] ROW_A = Bytes.toBytes("aaa"); 068 private static final byte[] ROW_B = Bytes.toBytes("bbb"); 069 private static final byte[] ROW_C = Bytes.toBytes("ccc"); 070 071 private static final byte[] qualifierCol1 = Bytes.toBytes("col1"); 072 073 private static final byte[] bytes1 = Bytes.toBytes(1); 074 private static final byte[] bytes2 = Bytes.toBytes(2); 075 private static final byte[] bytes3 = Bytes.toBytes(3); 076 077 private Table hTableInterface; 078 private Table table; 079 080 @BeforeAll 081 public static void setupBeforeClass() throws Exception { 082 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 083 MyObserver.class.getName()); 084 // Make general delay zero rather than default. Timing is off in this 085 // test that depends on an evironment edge that is manually moved forward. 086 util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); 087 util.startMiniCluster(); 088 EnvironmentEdgeManager.injectEdge(mee); 089 } 090 091 @AfterAll 092 public static void tearDownAfterClass() throws Exception { 093 util.shutdownMiniCluster(); 094 } 095 096 @BeforeEach 097 public void before() throws Exception { 098 table = util.createTable(TEST_TABLE, TEST_FAMILY); 099 100 Put puta = new Put(ROW_A); 101 puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1); 102 table.put(puta); 103 104 Put putb = new Put(ROW_B); 105 putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2); 106 table.put(putb); 107 108 Put putc = new Put(ROW_C); 109 putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3); 110 table.put(putc); 111 } 112 113 @AfterEach 114 public void after() throws Exception { 115 try { 116 if (table != null) { 117 table.close(); 118 } 119 } finally { 120 try { 121 util.deleteTable(TEST_TABLE); 122 } catch (IOException ioe) { 123 } 124 } 125 } 126 127 public static class MyObserver extends SimpleRegionObserver { 128 static TimeRange tr10 = null, tr2 = null; 129 130 @Override 131 public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 132 final Increment increment) throws IOException { 133 NavigableMap<byte[], List<Cell>> map = increment.getFamilyCellMap(); 134 for (Map.Entry<byte[], List<Cell>> entry : map.entrySet()) { 135 for (Cell cell : entry.getValue()) { 136 long incr = 137 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 138 if (incr == 10) { 139 tr10 = increment.getTimeRange(); 140 } else if (incr == 2 && !increment.getTimeRange().isAllTime()) { 141 tr2 = increment.getTimeRange(); 142 } 143 } 144 } 145 return super.preIncrement(e, increment); 146 } 147 } 148 149 @Test 150 public void testHTableInterfaceMethods() throws Exception { 151 hTableInterface = util.getConnection().getTable(TEST_TABLE); 152 checkHTableInterfaceMethods(); 153 } 154 155 private void checkHTableInterfaceMethods() throws Exception { 156 long time = EnvironmentEdgeManager.currentTime(); 157 mee.setValue(time); 158 hTableInterface.put(new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L))); 159 checkRowValue(ROW_A, Bytes.toBytes(1L)); 160 161 time = EnvironmentEdgeManager.currentTime(); 162 mee.setValue(time); 163 TimeRange range10 = TimeRange.between(1, time + 10); 164 hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 10L) 165 .setTimeRange(range10.getMin(), range10.getMax())); 166 checkRowValue(ROW_A, Bytes.toBytes(11L)); 167 assertEquals(MyObserver.tr10.getMin(), range10.getMin()); 168 assertEquals(MyObserver.tr10.getMax(), range10.getMax()); 169 170 time = EnvironmentEdgeManager.currentTime(); 171 mee.setValue(time); 172 TimeRange range2 = TimeRange.between(1, time + 20); 173 List<Row> actions = Arrays.asList(new Row[] { 174 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(), 175 range2.getMax()), 176 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(), 177 range2.getMax()) }); 178 Object[] results3 = new Object[actions.size()]; 179 Object[] results1 = results3; 180 hTableInterface.batch(actions, results1); 181 assertEquals(MyObserver.tr2.getMin(), range2.getMin()); 182 assertEquals(MyObserver.tr2.getMax(), range2.getMax()); 183 for (Object r2 : results1) { 184 assertTrue(r2 instanceof Result); 185 } 186 checkRowValue(ROW_A, Bytes.toBytes(15L)); 187 188 hTableInterface.close(); 189 } 190 191 private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException { 192 Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1); 193 Result result = hTableInterface.get(get); 194 byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1); 195 assertArrayEquals(expectedValue, actualValue); 196 } 197}