001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.filter.ByteArrayComparable;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064
065@Category({ CoprocessorTests.class, MediumTests.class })
066public class TestPassCustomCellViaRegionObserver {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class);
071
072  @Rule
073  public TestName testName = new TestName();
074
075  private TableName tableName;
076  private Table table = null;
077
078  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
079
080  private static final byte[] ROW = Bytes.toBytes("ROW");
081  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
082  private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
083  private static final byte[] VALUE = Bytes.toBytes(10L);
084  private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
085
086  private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
087
088  @BeforeClass
089  public static void setupBeforeClass() throws Exception {
090    // small retry number can speed up the failed tests.
091    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void clearTable() throws IOException {
102    RegionObserverImpl.COUNT.set(0);
103    tableName = TableName.valueOf(testName.getMethodName());
104    if (table != null) {
105      table.close();
106    }
107    try (Admin admin = UTIL.getAdmin()) {
108      for (TableName name : admin.listTableNames()) {
109        try {
110          admin.disableTable(name);
111        } catch (IOException e) {
112        }
113        admin.deleteTable(name);
114      }
115      table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
116        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
117        .setCoprocessor(RegionObserverImpl.class.getName())
118        .build(), null);
119    }
120  }
121
122  @Test
123  public void testMutation() throws Exception {
124
125    Put put = new Put(ROW);
126    put.addColumn(FAMILY, QUALIFIER, VALUE);
127    table.put(put);
128    byte[] value = VALUE;
129    assertResult(table.get(new Get(ROW)), value, value);
130    assertObserverHasExecuted();
131
132    Increment inc = new Increment(ROW);
133    inc.addColumn(FAMILY, QUALIFIER, 10L);
134    table.increment(inc);
135    // QUALIFIER -> 10 (put) + 10 (increment)
136    // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
137    value = Bytes.toBytes(20L);
138    assertResult(table.get(new Get(ROW)), value, value);
139    assertObserverHasExecuted();
140
141    Append append = new Append(ROW);
142    append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
143    table.append(append);
144    // 10L + "MB"
145    value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length])
146      .put(value)
147      .put(APPEND_VALUE)
148      .array();
149    assertResult(table.get(new Get(ROW)), value, value);
150    assertObserverHasExecuted();
151
152    Delete delete = new Delete(ROW);
153    delete.addColumns(FAMILY, QUALIFIER);
154    table.delete(delete);
155    assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
156      table.get(new Get(ROW)).isEmpty());
157    assertObserverHasExecuted();
158
159    assertTrue(table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put));
160    assertObserverHasExecuted();
161
162    assertTrue(
163      table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenDelete(delete));
164    assertObserverHasExecuted();
165
166    assertTrue(table.get(new Get(ROW)).isEmpty());
167  }
168
169  @Test
170  public void testMultiPut() throws Exception {
171    List<Put> puts = IntStream.range(0, 10)
172      .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
173      .collect(Collectors.toList());
174    table.put(puts);
175    assertResult(table.get(new Get(ROW)), VALUE);
176    assertObserverHasExecuted();
177
178    List<Delete> deletes = IntStream.range(0, 10)
179      .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
180      .collect(Collectors.toList());
181    table.delete(deletes);
182    assertTrue(table.get(new Get(ROW)).isEmpty());
183    assertObserverHasExecuted();
184  }
185
186  private static void assertObserverHasExecuted() {
187    assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
188  }
189
190  private static void assertResult(Result result, byte[] expectedValue) {
191    assertFalse(result.isEmpty());
192    for (Cell c : result.rawCells()) {
193      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
194      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
195      assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
196    }
197  }
198
199  private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
200    assertFalse(result.isEmpty());
201    for (Cell c : result.rawCells()) {
202      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
203      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
204      if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
205        assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
206      } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
207        assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
208      } else {
209        fail("No valid qualifier");
210      }
211    }
212  }
213
214  private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier,
215    Cell.Type type, byte[] value) {
216    return new Cell() {
217
218      @Override
219      public long heapSize() {
220        return 0;
221      }
222
223      private byte[] getArray(byte[] array) {
224        return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
225      }
226
227      private int length(byte[] array) {
228        return array == null ? 0 : array.length;
229      }
230
231      @Override
232      public byte[] getRowArray() {
233        return getArray(row);
234      }
235
236      @Override
237      public int getRowOffset() {
238        return 0;
239      }
240
241      @Override
242      public short getRowLength() {
243        return (short) length(row);
244      }
245
246      @Override
247      public byte[] getFamilyArray() {
248        return getArray(family);
249      }
250
251      @Override
252      public int getFamilyOffset() {
253        return 0;
254      }
255
256      @Override
257      public byte getFamilyLength() {
258        return (byte) length(family);
259      }
260
261      @Override
262      public byte[] getQualifierArray() {
263        return getArray(qualifier);
264      }
265
266      @Override
267      public int getQualifierOffset() {
268        return 0;
269      }
270
271      @Override
272      public int getQualifierLength() {
273        return length(qualifier);
274      }
275
276      @Override
277      public long getTimestamp() {
278        return HConstants.LATEST_TIMESTAMP;
279      }
280
281      @Override
282      public byte getTypeByte() {
283        return type.getCode();
284      }
285
286      @Override
287      public long getSequenceId() {
288        return 0;
289      }
290
291      @Override
292      public byte[] getValueArray() {
293        return getArray(value);
294      }
295
296      @Override
297      public int getValueOffset() {
298        return 0;
299      }
300
301      @Override
302      public int getValueLength() {
303        return length(value);
304      }
305
306      @Override
307      public int getSerializedSize() {
308        return KeyValueUtil.getSerializedSize(this, true);
309      }
310
311      @Override
312      public byte[] getTagsArray() {
313        return getArray(null);
314      }
315
316      @Override
317      public int getTagsOffset() {
318        return 0;
319      }
320
321      @Override
322      public int getTagsLength() {
323        return length(null);
324      }
325
326      @Override
327      public Type getType() {
328        return type;
329      }
330    };
331  }
332
333  private static Cell createCustomCell(Put put) {
334    return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
335  }
336
337  private static Cell createCustomCell(Append append) {
338    return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put,
339      APPEND_VALUE);
340  }
341
342  private static Cell createCustomCell(Increment inc) {
343    return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
344  }
345
346  private static Cell createCustomCell(Delete delete) {
347    return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP,
348      Cell.Type.DeleteColumn, null);
349  }
350
351  public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
352    static final AtomicInteger COUNT = new AtomicInteger(0);
353
354    @Override
355    public Optional<RegionObserver> getRegionObserver() {
356      return Optional.of(this);
357    }
358
359    @Override
360    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
361      Durability durability) throws IOException {
362      put.add(createCustomCell(put));
363      COUNT.incrementAndGet();
364    }
365
366    @Override
367    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
368      WALEdit edit, Durability durability) throws IOException {
369      delete.add(createCustomCell(delete));
370      COUNT.incrementAndGet();
371    }
372
373    @Override
374    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
375      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
376      boolean result) throws IOException {
377      put.add(createCustomCell(put));
378      COUNT.incrementAndGet();
379      return result;
380    }
381
382    @Override
383    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
384      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
385      Delete delete, boolean result) throws IOException {
386      delete.add(createCustomCell(delete));
387      COUNT.incrementAndGet();
388      return result;
389    }
390
391    @Override
392    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
393      throws IOException {
394      append.add(createCustomCell(append));
395      COUNT.incrementAndGet();
396      return null;
397    }
398
399
400    @Override
401    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
402      throws IOException {
403      increment.add(createCustomCell(increment));
404      COUNT.incrementAndGet();
405      return null;
406    }
407
408  }
409
410}