001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.filter.ByteArrayComparable;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064
065@Category({ CoprocessorTests.class, MediumTests.class })
066public class TestPassCustomCellViaRegionObserver {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class);
071
072  @Rule
073  public TestName testName = new TestName();
074
075  private TableName tableName;
076  private Table table = null;
077
078  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
079
080  private static final byte[] ROW = Bytes.toBytes("ROW");
081  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
082  private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
083  private static final byte[] VALUE = Bytes.toBytes(10L);
084  private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
085
086  private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
087
088  @BeforeClass
089  public static void setupBeforeClass() throws Exception {
090    // small retry number can speed up the failed tests.
091    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void clearTable() throws IOException {
102    RegionObserverImpl.COUNT.set(0);
103    tableName = TableName.valueOf(testName.getMethodName());
104    if (table != null) {
105      table.close();
106    }
107    try (Admin admin = UTIL.getAdmin()) {
108      for (TableName name : admin.listTableNames()) {
109        try {
110          admin.disableTable(name);
111        } catch (IOException e) {
112        }
113        admin.deleteTable(name);
114      }
115      table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
116        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
117        .setCoprocessor(RegionObserverImpl.class.getName()).build(), null);
118    }
119  }
120
121  @Test
122  public void testMutation() throws Exception {
123
124    Put put = new Put(ROW);
125    put.addColumn(FAMILY, QUALIFIER, VALUE);
126    table.put(put);
127    byte[] value = VALUE;
128    assertResult(table.get(new Get(ROW)), value, value);
129    assertObserverHasExecuted();
130
131    Increment inc = new Increment(ROW);
132    inc.addColumn(FAMILY, QUALIFIER, 10L);
133    table.increment(inc);
134    // QUALIFIER -> 10 (put) + 10 (increment)
135    // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
136    value = Bytes.toBytes(20L);
137    assertResult(table.get(new Get(ROW)), value, value);
138    assertObserverHasExecuted();
139
140    Append append = new Append(ROW);
141    append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
142    table.append(append);
143    // 10L + "MB"
144    value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]).put(value)
145      .put(APPEND_VALUE).array();
146    assertResult(table.get(new Get(ROW)), value, value);
147    assertObserverHasExecuted();
148
149    Delete delete = new Delete(ROW);
150    delete.addColumns(FAMILY, QUALIFIER);
151    table.delete(delete);
152    assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
153      table.get(new Get(ROW)).isEmpty());
154    assertObserverHasExecuted();
155
156    assertTrue(table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put));
157    assertObserverHasExecuted();
158
159    assertTrue(
160      table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenDelete(delete));
161    assertObserverHasExecuted();
162
163    assertTrue(table.get(new Get(ROW)).isEmpty());
164  }
165
166  @Test
167  public void testMultiPut() throws Exception {
168    List<Put> puts =
169      IntStream.range(0, 10).mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
170        .collect(Collectors.toList());
171    table.put(puts);
172    assertResult(table.get(new Get(ROW)), VALUE);
173    assertObserverHasExecuted();
174
175    List<Delete> deletes =
176      IntStream.range(0, 10).mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
177        .collect(Collectors.toList());
178    table.delete(deletes);
179    assertTrue(table.get(new Get(ROW)).isEmpty());
180    assertObserverHasExecuted();
181  }
182
183  private static void assertObserverHasExecuted() {
184    assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
185  }
186
187  private static void assertResult(Result result, byte[] expectedValue) {
188    assertFalse(result.isEmpty());
189    for (Cell c : result.rawCells()) {
190      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
191      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
192      assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
193    }
194  }
195
196  private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
197    assertFalse(result.isEmpty());
198    for (Cell c : result.rawCells()) {
199      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
200      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
201      if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
202        assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
203      } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
204        assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
205      } else {
206        fail("No valid qualifier");
207      }
208    }
209  }
210
211  private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, Cell.Type type,
212    byte[] value) {
213    return new Cell() {
214
215      @Override
216      public long heapSize() {
217        return 0;
218      }
219
220      private byte[] getArray(byte[] array) {
221        return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
222      }
223
224      private int length(byte[] array) {
225        return array == null ? 0 : array.length;
226      }
227
228      @Override
229      public byte[] getRowArray() {
230        return getArray(row);
231      }
232
233      @Override
234      public int getRowOffset() {
235        return 0;
236      }
237
238      @Override
239      public short getRowLength() {
240        return (short) length(row);
241      }
242
243      @Override
244      public byte[] getFamilyArray() {
245        return getArray(family);
246      }
247
248      @Override
249      public int getFamilyOffset() {
250        return 0;
251      }
252
253      @Override
254      public byte getFamilyLength() {
255        return (byte) length(family);
256      }
257
258      @Override
259      public byte[] getQualifierArray() {
260        return getArray(qualifier);
261      }
262
263      @Override
264      public int getQualifierOffset() {
265        return 0;
266      }
267
268      @Override
269      public int getQualifierLength() {
270        return length(qualifier);
271      }
272
273      @Override
274      public long getTimestamp() {
275        return HConstants.LATEST_TIMESTAMP;
276      }
277
278      @Override
279      public byte getTypeByte() {
280        return type.getCode();
281      }
282
283      @Override
284      public long getSequenceId() {
285        return 0;
286      }
287
288      @Override
289      public byte[] getValueArray() {
290        return getArray(value);
291      }
292
293      @Override
294      public int getValueOffset() {
295        return 0;
296      }
297
298      @Override
299      public int getValueLength() {
300        return length(value);
301      }
302
303      @Override
304      public int getSerializedSize() {
305        return KeyValueUtil.getSerializedSize(this, true);
306      }
307
308      @Override
309      public byte[] getTagsArray() {
310        return getArray(null);
311      }
312
313      @Override
314      public int getTagsOffset() {
315        return 0;
316      }
317
318      @Override
319      public int getTagsLength() {
320        return length(null);
321      }
322
323      @Override
324      public Type getType() {
325        return type;
326      }
327    };
328  }
329
330  private static Cell createCustomCell(Put put) {
331    return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
332  }
333
334  private static Cell createCustomCell(Append append) {
335    return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put,
336      APPEND_VALUE);
337  }
338
339  private static Cell createCustomCell(Increment inc) {
340    return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
341  }
342
343  private static Cell createCustomCell(Delete delete) {
344    return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.DeleteColumn,
345      null);
346  }
347
348  public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
349    static final AtomicInteger COUNT = new AtomicInteger(0);
350
351    @Override
352    public Optional<RegionObserver> getRegionObserver() {
353      return Optional.of(this);
354    }
355
356    @Override
357    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
358      Durability durability) throws IOException {
359      put.add(createCustomCell(put));
360      COUNT.incrementAndGet();
361    }
362
363    @Override
364    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
365      WALEdit edit, Durability durability) throws IOException {
366      delete.add(createCustomCell(delete));
367      COUNT.incrementAndGet();
368    }
369
370    @Override
371    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
372      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
373      boolean result) throws IOException {
374      put.add(createCustomCell(put));
375      COUNT.incrementAndGet();
376      return result;
377    }
378
379    @Override
380    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
381      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
382      Delete delete, boolean result) throws IOException {
383      delete.add(createCustomCell(delete));
384      COUNT.incrementAndGet();
385      return result;
386    }
387
388    @Override
389    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
390      throws IOException {
391      append.add(createCustomCell(append));
392      COUNT.incrementAndGet();
393      return null;
394    }
395
396    @Override
397    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
398      throws IOException {
399      increment.add(createCustomCell(increment));
400      COUNT.incrementAndGet();
401      return null;
402    }
403
404  }
405
406}