001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.Row; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.TimeRange; 040import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 041import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 046import org.junit.After; 047import org.junit.AfterClass; 048import org.junit.Before; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054/** 055 * This test runs batch mutation with Increments which have custom TimeRange. 056 * Custom Observer records the TimeRange. 057 * We then verify that the recorded TimeRange has same bounds as the initial TimeRange. 058 * See HBASE-15698 059 */ 060@Category({CoprocessorTests.class, MediumTests.class}) 061public class TestIncrementTimeRange { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestIncrementTimeRange.class); 066 067 private static final HBaseTestingUtility util = new HBaseTestingUtility(); 068 private static ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 069 070 private static final TableName TEST_TABLE = TableName.valueOf("test"); 071 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 072 073 private static final byte[] ROW_A = Bytes.toBytes("aaa"); 074 private static final byte[] ROW_B = Bytes.toBytes("bbb"); 075 private static final byte[] ROW_C = Bytes.toBytes("ccc"); 076 077 private static final byte[] qualifierCol1 = Bytes.toBytes("col1"); 078 079 private static final byte[] bytes1 = Bytes.toBytes(1); 080 private static final byte[] bytes2 = Bytes.toBytes(2); 081 private static final byte[] bytes3 = Bytes.toBytes(3); 082 083 private Table hTableInterface; 084 private Table table; 085 086 @BeforeClass 087 public static void setupBeforeClass() throws Exception { 088 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 089 MyObserver.class.getName()); 090 // Make general delay zero rather than default. Timing is off in this 091 // test that depends on an evironment edge that is manually moved forward. 092 util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); 093 util.startMiniCluster(); 094 EnvironmentEdgeManager.injectEdge(mee); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 util.shutdownMiniCluster(); 100 } 101 102 @Before 103 public void before() throws Exception { 104 table = util.createTable(TEST_TABLE, TEST_FAMILY); 105 106 Put puta = new Put(ROW_A); 107 puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1); 108 table.put(puta); 109 110 Put putb = new Put(ROW_B); 111 putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2); 112 table.put(putb); 113 114 Put putc = new Put(ROW_C); 115 putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3); 116 table.put(putc); 117 } 118 119 @After 120 public void after() throws Exception { 121 try { 122 if (table != null) { 123 table.close(); 124 } 125 } finally { 126 try { 127 util.deleteTable(TEST_TABLE); 128 } catch (IOException ioe) { 129 } 130 } 131 } 132 133 public static class MyObserver extends SimpleRegionObserver { 134 static TimeRange tr10 = null, tr2 = null; 135 @Override 136 public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, 137 final Increment increment) throws IOException { 138 NavigableMap<byte [], List<Cell>> map = increment.getFamilyCellMap(); 139 for (Map.Entry<byte [], List<Cell>> entry : map.entrySet()) { 140 for (Cell cell : entry.getValue()) { 141 long incr = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), 142 cell.getValueLength()); 143 if (incr == 10) { 144 tr10 = increment.getTimeRange(); 145 } else if (incr == 2 && !increment.getTimeRange().isAllTime()) { 146 tr2 = increment.getTimeRange(); 147 } 148 } 149 } 150 return super.preIncrement(e, increment); 151 } 152 } 153 154 @Test 155 public void testHTableInterfaceMethods() throws Exception { 156 hTableInterface = util.getConnection().getTable(TEST_TABLE); 157 checkHTableInterfaceMethods(); 158 } 159 160 private void checkHTableInterfaceMethods() throws Exception { 161 long time = EnvironmentEdgeManager.currentTime(); 162 mee.setValue(time); 163 hTableInterface.put(new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L))); 164 checkRowValue(ROW_A, Bytes.toBytes(1L)); 165 166 time = EnvironmentEdgeManager.currentTime(); 167 mee.setValue(time); 168 TimeRange range10 = new TimeRange(1, time+10); 169 hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 10L) 170 .setTimeRange(range10.getMin(), range10.getMax())); 171 checkRowValue(ROW_A, Bytes.toBytes(11L)); 172 assertEquals(MyObserver.tr10.getMin(), range10.getMin()); 173 assertEquals(MyObserver.tr10.getMax(), range10.getMax()); 174 175 time = EnvironmentEdgeManager.currentTime(); 176 mee.setValue(time); 177 TimeRange range2 = new TimeRange(1, time+20); 178 List<Row> actions = 179 Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) 180 .setTimeRange(range2.getMin(), range2.getMax()), 181 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) 182 .setTimeRange(range2.getMin(), range2.getMax()) }); 183 Object[] results3 = new Object[actions.size()]; 184 Object[] results1 = results3; 185 hTableInterface.batch(actions, results1); 186 assertEquals(MyObserver.tr2.getMin(), range2.getMin()); 187 assertEquals(MyObserver.tr2.getMax(), range2.getMax()); 188 for (Object r2 : results1) { 189 assertTrue(r2 instanceof Result); 190 } 191 checkRowValue(ROW_A, Bytes.toBytes(15L)); 192 193 hTableInterface.close(); 194 } 195 196 private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException { 197 Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1); 198 Result result = hTableInterface.get(get); 199 byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1); 200 assertArrayEquals(expectedValue, actualValue); 201 } 202}