001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.filter.ByteArrayComparable;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064
065@Category({ CoprocessorTests.class, MediumTests.class })
066public class TestPassCustomCellViaRegionObserver {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class);
071
072  @Rule
073  public TestName testName = new TestName();
074
075  private TableName tableName;
076  private Table table = null;
077
078  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
079
080  private static final byte[] ROW = Bytes.toBytes("ROW");
081  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
082  private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
083  private static final byte[] VALUE = Bytes.toBytes(10L);
084  private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
085
086  private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
087
088  @BeforeClass
089  public static void setupBeforeClass() throws Exception {
090    // small retry number can speed up the failed tests.
091    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void clearTable() throws IOException {
102    RegionObserverImpl.COUNT.set(0);
103    tableName = TableName.valueOf(testName.getMethodName());
104    if (table != null) {
105      table.close();
106    }
107    try (Admin admin = UTIL.getAdmin()) {
108      for (TableName name : admin.listTableNames()) {
109        try {
110          admin.disableTable(name);
111        } catch (IOException e) {
112        }
113        admin.deleteTable(name);
114      }
115      table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
116        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
117        .setCoprocessor(RegionObserverImpl.class.getName())
118        .build(), null);
119    }
120  }
121
122  @Test
123  public void testMutation() throws Exception {
124
125    Put put = new Put(ROW);
126    put.addColumn(FAMILY, QUALIFIER, VALUE);
127    table.put(put);
128    byte[] value = VALUE;
129    assertResult(table.get(new Get(ROW)), value, value);
130    assertObserverHasExecuted();
131
132    Increment inc = new Increment(ROW);
133    inc.addColumn(FAMILY, QUALIFIER, 10L);
134    table.increment(inc);
135    // QUALIFIER -> 10 (put) + 10 (increment)
136    // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
137    value = Bytes.toBytes(20L);
138    assertResult(table.get(new Get(ROW)), value, value);
139    assertObserverHasExecuted();
140
141    Append append = new Append(ROW);
142    append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
143    table.append(append);
144    // 10L + "MB"
145    value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length])
146      .put(value)
147      .put(APPEND_VALUE)
148      .array();
149    assertResult(table.get(new Get(ROW)), value, value);
150    assertObserverHasExecuted();
151
152    Delete delete = new Delete(ROW);
153    delete.addColumns(FAMILY, QUALIFIER);
154    table.delete(delete);
155    assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
156      table.get(new Get(ROW)).isEmpty());
157    assertObserverHasExecuted();
158
159    assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put));
160    assertObserverHasExecuted();
161
162    assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete));
163    assertObserverHasExecuted();
164
165    assertTrue(table.get(new Get(ROW)).isEmpty());
166  }
167
168  @Test
169  public void testMultiPut() throws Exception {
170    List<Put> puts = IntStream.range(0, 10)
171      .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
172      .collect(Collectors.toList());
173    table.put(puts);
174    assertResult(table.get(new Get(ROW)), VALUE);
175    assertObserverHasExecuted();
176
177    List<Delete> deletes = IntStream.range(0, 10)
178      .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
179      .collect(Collectors.toList());
180    table.delete(deletes);
181    assertTrue(table.get(new Get(ROW)).isEmpty());
182    assertObserverHasExecuted();
183  }
184
185  private static void assertObserverHasExecuted() {
186    assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
187  }
188
189  private static void assertResult(Result result, byte[] expectedValue) {
190    assertFalse(result.isEmpty());
191    for (Cell c : result.rawCells()) {
192      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
193      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
194      assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
195    }
196  }
197
198  private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
199    assertFalse(result.isEmpty());
200    for (Cell c : result.rawCells()) {
201      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
202      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
203      if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
204        assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
205      } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
206        assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
207      } else {
208        fail("No valid qualifier");
209      }
210    }
211  }
212
213  private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier,
214    Cell.Type type, byte[] value) {
215    return new Cell() {
216
217      @Override
218      public long heapSize() {
219        return 0;
220      }
221
222      private byte[] getArray(byte[] array) {
223        return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
224      }
225
226      private int length(byte[] array) {
227        return array == null ? 0 : array.length;
228      }
229
230      @Override
231      public byte[] getRowArray() {
232        return getArray(row);
233      }
234
235      @Override
236      public int getRowOffset() {
237        return 0;
238      }
239
240      @Override
241      public short getRowLength() {
242        return (short) length(row);
243      }
244
245      @Override
246      public byte[] getFamilyArray() {
247        return getArray(family);
248      }
249
250      @Override
251      public int getFamilyOffset() {
252        return 0;
253      }
254
255      @Override
256      public byte getFamilyLength() {
257        return (byte) length(family);
258      }
259
260      @Override
261      public byte[] getQualifierArray() {
262        return getArray(qualifier);
263      }
264
265      @Override
266      public int getQualifierOffset() {
267        return 0;
268      }
269
270      @Override
271      public int getQualifierLength() {
272        return length(qualifier);
273      }
274
275      @Override
276      public long getTimestamp() {
277        return HConstants.LATEST_TIMESTAMP;
278      }
279
280      @Override
281      public byte getTypeByte() {
282        return type.getCode();
283      }
284
285      @Override
286      public long getSequenceId() {
287        return 0;
288      }
289
290      @Override
291      public byte[] getValueArray() {
292        return getArray(value);
293      }
294
295      @Override
296      public int getValueOffset() {
297        return 0;
298      }
299
300      @Override
301      public int getValueLength() {
302        return length(value);
303      }
304
305      @Override
306      public int getSerializedSize() {
307        return KeyValueUtil.getSerializedSize(this, true);
308      }
309
310      @Override
311      public byte[] getTagsArray() {
312        return getArray(null);
313      }
314
315      @Override
316      public int getTagsOffset() {
317        return 0;
318      }
319
320      @Override
321      public int getTagsLength() {
322        return length(null);
323      }
324
325      @Override
326      public Type getType() {
327        return type;
328      }
329    };
330  }
331
332  private static Cell createCustomCell(Put put) {
333    return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
334  }
335
336  private static Cell createCustomCell(Append append) {
337    return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put,
338      APPEND_VALUE);
339  }
340
341  private static Cell createCustomCell(Increment inc) {
342    return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
343  }
344
345  private static Cell createCustomCell(Delete delete) {
346    return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP,
347      Cell.Type.DeleteColumn, null);
348  }
349
350  public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
351    static final AtomicInteger COUNT = new AtomicInteger(0);
352
353    @Override
354    public Optional<RegionObserver> getRegionObserver() {
355      return Optional.of(this);
356    }
357
358    @Override
359    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
360      Durability durability) throws IOException {
361      put.add(createCustomCell(put));
362      COUNT.incrementAndGet();
363    }
364
365    @Override
366    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
367      WALEdit edit, Durability durability) throws IOException {
368      delete.add(createCustomCell(delete));
369      COUNT.incrementAndGet();
370    }
371
372    @Override
373    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
374      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
375      boolean result) throws IOException {
376      put.add(createCustomCell(put));
377      COUNT.incrementAndGet();
378      return result;
379    }
380
381    @Override
382    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
383      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
384      Delete delete, boolean result) throws IOException {
385      delete.add(createCustomCell(delete));
386      COUNT.incrementAndGet();
387      return result;
388    }
389
390    @Override
391    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
392      throws IOException {
393      append.add(createCustomCell(append));
394      COUNT.incrementAndGet();
395      return null;
396    }
397
398
399    @Override
400    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
401      throws IOException {
402      increment.add(createCustomCell(increment));
403      COUNT.incrementAndGet();
404      return null;
405    }
406
407  }
408
409}