001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.filter.ByteArrayComparable;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064
065@Category({ CoprocessorTests.class, MediumTests.class })
066public class TestPassCustomCellViaRegionObserver {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class);
071
072  @Rule
073  public TestName testName = new TestName();
074
075  private TableName tableName;
076  private Table table = null;
077
078  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
079
080  private static final byte[] ROW = Bytes.toBytes("ROW");
081  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
082  private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
083  private static final byte[] VALUE = Bytes.toBytes(10L);
084  private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
085
086  private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
087
088  @BeforeClass
089  public static void setupBeforeClass() throws Exception {
090    // small retry number can speed up the failed tests.
091    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
092    UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void clearTable() throws IOException {
102    RegionObserverImpl.COUNT.set(0);
103    tableName = TableName.valueOf(testName.getMethodName());
104    if (table != null) {
105      table.close();
106    }
107    try (Admin admin = UTIL.getAdmin()) {
108      for (TableName name : admin.listTableNames()) {
109        try {
110          admin.disableTable(name);
111        } catch (IOException e) {
112        }
113        admin.deleteTable(name);
114      }
115      table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
116        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
117        .setCoprocessor(RegionObserverImpl.class.getName()).build(), null);
118    }
119  }
120
121  @Test
122  public void testMutation() throws Exception {
123
124    Put put = new Put(ROW);
125    put.addColumn(FAMILY, QUALIFIER, VALUE);
126    table.put(put);
127    byte[] value = VALUE;
128    assertResult(table.get(new Get(ROW)), value, value);
129    assertObserverHasExecuted();
130
131    Increment inc = new Increment(ROW);
132    inc.addColumn(FAMILY, QUALIFIER, 10L);
133    table.increment(inc);
134    // QUALIFIER -> 10 (put) + 10 (increment)
135    // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
136    value = Bytes.toBytes(20L);
137    assertResult(table.get(new Get(ROW)), value, value);
138    assertObserverHasExecuted();
139
140    Append append = new Append(ROW);
141    append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
142    table.append(append);
143    // 10L + "MB"
144    value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]).put(value)
145      .put(APPEND_VALUE).array();
146    assertResult(table.get(new Get(ROW)), value, value);
147    assertObserverHasExecuted();
148
149    Delete delete = new Delete(ROW);
150    delete.addColumns(FAMILY, QUALIFIER);
151    table.delete(delete);
152    assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
153      table.get(new Get(ROW)).isEmpty());
154    assertObserverHasExecuted();
155
156    assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put));
157    assertObserverHasExecuted();
158
159    assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete));
160    assertObserverHasExecuted();
161
162    assertTrue(table.get(new Get(ROW)).isEmpty());
163  }
164
165  @Test
166  public void testMultiPut() throws Exception {
167    List<Put> puts =
168      IntStream.range(0, 10).mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
169        .collect(Collectors.toList());
170    table.put(puts);
171    assertResult(table.get(new Get(ROW)), VALUE);
172    assertObserverHasExecuted();
173
174    List<Delete> deletes =
175      IntStream.range(0, 10).mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
176        .collect(Collectors.toList());
177    table.delete(deletes);
178    assertTrue(table.get(new Get(ROW)).isEmpty());
179    assertObserverHasExecuted();
180  }
181
182  private static void assertObserverHasExecuted() {
183    assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
184  }
185
186  private static void assertResult(Result result, byte[] expectedValue) {
187    assertFalse(result.isEmpty());
188    for (Cell c : result.rawCells()) {
189      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
190      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
191      assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
192    }
193  }
194
195  private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
196    assertFalse(result.isEmpty());
197    for (Cell c : result.rawCells()) {
198      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
199      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
200      if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
201        assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
202      } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
203        assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
204      } else {
205        fail("No valid qualifier");
206      }
207    }
208  }
209
210  private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, Cell.Type type,
211    byte[] value) {
212    return new Cell() {
213
214      @Override
215      public long heapSize() {
216        return 0;
217      }
218
219      private byte[] getArray(byte[] array) {
220        return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
221      }
222
223      private int length(byte[] array) {
224        return array == null ? 0 : array.length;
225      }
226
227      @Override
228      public byte[] getRowArray() {
229        return getArray(row);
230      }
231
232      @Override
233      public int getRowOffset() {
234        return 0;
235      }
236
237      @Override
238      public short getRowLength() {
239        return (short) length(row);
240      }
241
242      @Override
243      public byte[] getFamilyArray() {
244        return getArray(family);
245      }
246
247      @Override
248      public int getFamilyOffset() {
249        return 0;
250      }
251
252      @Override
253      public byte getFamilyLength() {
254        return (byte) length(family);
255      }
256
257      @Override
258      public byte[] getQualifierArray() {
259        return getArray(qualifier);
260      }
261
262      @Override
263      public int getQualifierOffset() {
264        return 0;
265      }
266
267      @Override
268      public int getQualifierLength() {
269        return length(qualifier);
270      }
271
272      @Override
273      public long getTimestamp() {
274        return HConstants.LATEST_TIMESTAMP;
275      }
276
277      @Override
278      public byte getTypeByte() {
279        return type.getCode();
280      }
281
282      @Override
283      public long getSequenceId() {
284        return 0;
285      }
286
287      @Override
288      public byte[] getValueArray() {
289        return getArray(value);
290      }
291
292      @Override
293      public int getValueOffset() {
294        return 0;
295      }
296
297      @Override
298      public int getValueLength() {
299        return length(value);
300      }
301
302      @Override
303      public int getSerializedSize() {
304        return KeyValueUtil.getSerializedSize(this, true);
305      }
306
307      @Override
308      public byte[] getTagsArray() {
309        return getArray(null);
310      }
311
312      @Override
313      public int getTagsOffset() {
314        return 0;
315      }
316
317      @Override
318      public int getTagsLength() {
319        return length(null);
320      }
321
322      @Override
323      public Type getType() {
324        return type;
325      }
326    };
327  }
328
329  private static Cell createCustomCell(Put put) {
330    return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
331  }
332
333  private static Cell createCustomCell(Append append) {
334    return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put,
335      APPEND_VALUE);
336  }
337
338  private static Cell createCustomCell(Increment inc) {
339    return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
340  }
341
342  private static Cell createCustomCell(Delete delete) {
343    return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.DeleteColumn,
344      null);
345  }
346
347  public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
348    static final AtomicInteger COUNT = new AtomicInteger(0);
349
350    @Override
351    public Optional<RegionObserver> getRegionObserver() {
352      return Optional.of(this);
353    }
354
355    @Override
356    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
357      Durability durability) throws IOException {
358      put.add(createCustomCell(put));
359      COUNT.incrementAndGet();
360    }
361
362    @Override
363    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
364      WALEdit edit, Durability durability) throws IOException {
365      delete.add(createCustomCell(delete));
366      COUNT.incrementAndGet();
367    }
368
369    @Override
370    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
371      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
372      boolean result) throws IOException {
373      put.add(createCustomCell(put));
374      COUNT.incrementAndGet();
375      return result;
376    }
377
378    @Override
379    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
380      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
381      Delete delete, boolean result) throws IOException {
382      delete.add(createCustomCell(delete));
383      COUNT.incrementAndGet();
384      return result;
385    }
386
387    @Override
388    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
389      throws IOException {
390      append.add(createCustomCell(append));
391      COUNT.incrementAndGet();
392      return null;
393    }
394
395    @Override
396    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
397      throws IOException {
398      increment.add(createCustomCell(increment));
399      COUNT.incrementAndGet();
400      return null;
401    }
402
403  }
404
405}