001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Append;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Durability;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Increment;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.filter.ByteArrayComparable;
051import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.wal.WALEdit;
055import org.junit.AfterClass;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063
064@Category({ CoprocessorTests.class, MediumTests.class })
065public class TestPassCustomCellViaRegionObserver {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069      HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class);
070
071  @Rule
072  public TestName testName = new TestName();
073
074  private TableName tableName;
075  private Table table = null;
076
077  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
078
079  private static final byte[] ROW = Bytes.toBytes("ROW");
080  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
081  private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
082  private static final byte[] VALUE = Bytes.toBytes(10L);
083  private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
084
085  private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
086
087  @BeforeClass
088  public static void setupBeforeClass() throws Exception {
089    // small retry number can speed up the failed tests.
090    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
091    UTIL.startMiniCluster();
092  }
093
094  @AfterClass
095  public static void tearDownAfterClass() throws Exception {
096    UTIL.shutdownMiniCluster();
097  }
098
099  @Before
100  public void clearTable() throws IOException {
101    RegionObserverImpl.COUNT.set(0);
102    tableName = TableName.valueOf(testName.getMethodName());
103    if (table != null) {
104      table.close();
105    }
106    try (Admin admin = UTIL.getAdmin()) {
107      for (TableName name : admin.listTableNames()) {
108        try {
109          admin.disableTable(name);
110        } catch (IOException e) {
111        }
112        admin.deleteTable(name);
113      }
114      table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
115        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
116        .setCoprocessor(RegionObserverImpl.class.getName())
117        .build(), null);
118    }
119  }
120
121  @Test
122  public void testMutation() throws Exception {
123
124    Put put = new Put(ROW);
125    put.addColumn(FAMILY, QUALIFIER, VALUE);
126    table.put(put);
127    byte[] value = VALUE;
128    assertResult(table.get(new Get(ROW)), value, value);
129    assertObserverHasExecuted();
130
131    Increment inc = new Increment(ROW);
132    inc.addColumn(FAMILY, QUALIFIER, 10L);
133    table.increment(inc);
134    // QUALIFIER -> 10 (put) + 10 (increment)
135    // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
136    value = Bytes.toBytes(20L);
137    assertResult(table.get(new Get(ROW)), value, value);
138    assertObserverHasExecuted();
139
140    Append append = new Append(ROW);
141    append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
142    table.append(append);
143    // 10L + "MB"
144    value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length])
145      .put(value)
146      .put(APPEND_VALUE)
147      .array();
148    assertResult(table.get(new Get(ROW)), value, value);
149    assertObserverHasExecuted();
150
151    Delete delete = new Delete(ROW);
152    delete.addColumns(FAMILY, QUALIFIER);
153    table.delete(delete);
154    assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
155      table.get(new Get(ROW)).isEmpty());
156    assertObserverHasExecuted();
157
158    assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put));
159    assertObserverHasExecuted();
160
161    assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete));
162    assertObserverHasExecuted();
163
164    assertTrue(table.get(new Get(ROW)).isEmpty());
165  }
166
167  @Test
168  public void testMultiPut() throws Exception {
169    List<Put> puts = IntStream.range(0, 10)
170      .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
171      .collect(Collectors.toList());
172    table.put(puts);
173    assertResult(table.get(new Get(ROW)), VALUE);
174    assertObserverHasExecuted();
175
176    List<Delete> deletes = IntStream.range(0, 10)
177      .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
178      .collect(Collectors.toList());
179    table.delete(deletes);
180    assertTrue(table.get(new Get(ROW)).isEmpty());
181    assertObserverHasExecuted();
182  }
183
184  private static void assertObserverHasExecuted() {
185    assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
186  }
187
188  private static void assertResult(Result result, byte[] expectedValue) {
189    assertFalse(result.isEmpty());
190    for (Cell c : result.rawCells()) {
191      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
192      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
193      assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
194    }
195  }
196
197  private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
198    assertFalse(result.isEmpty());
199    for (Cell c : result.rawCells()) {
200      assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
201      assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
202      if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
203        assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
204      } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
205        assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
206      } else {
207        fail("No valid qualifier");
208      }
209    }
210  }
211
212  private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier,
213    Cell.Type type, byte[] value) {
214    return new Cell() {
215
216      private byte[] getArray(byte[] array) {
217        return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
218      }
219
220      private int length(byte[] array) {
221        return array == null ? 0 : array.length;
222      }
223
224      @Override
225      public byte[] getRowArray() {
226        return getArray(row);
227      }
228
229      @Override
230      public int getRowOffset() {
231        return 0;
232      }
233
234      @Override
235      public short getRowLength() {
236        return (short) length(row);
237      }
238
239      @Override
240      public byte[] getFamilyArray() {
241        return getArray(family);
242      }
243
244      @Override
245      public int getFamilyOffset() {
246        return 0;
247      }
248
249      @Override
250      public byte getFamilyLength() {
251        return (byte) length(family);
252      }
253
254      @Override
255      public byte[] getQualifierArray() {
256        return getArray(qualifier);
257      }
258
259      @Override
260      public int getQualifierOffset() {
261        return 0;
262      }
263
264      @Override
265      public int getQualifierLength() {
266        return length(qualifier);
267      }
268
269      @Override
270      public long getTimestamp() {
271        return HConstants.LATEST_TIMESTAMP;
272      }
273
274      @Override
275      public byte getTypeByte() {
276        return type.getCode();
277      }
278
279      @Override
280      public long getSequenceId() {
281        return 0;
282      }
283
284      @Override
285      public byte[] getValueArray() {
286        return getArray(value);
287      }
288
289      @Override
290      public int getValueOffset() {
291        return 0;
292      }
293
294      @Override
295      public int getValueLength() {
296        return length(value);
297      }
298
299      @Override
300      public byte[] getTagsArray() {
301        return getArray(null);
302      }
303
304      @Override
305      public int getTagsOffset() {
306        return 0;
307      }
308
309      @Override
310      public int getTagsLength() {
311        return length(null);
312      }
313
314      @Override
315      public Type getType() {
316        return type;
317      }
318    };
319  }
320
321  private static Cell createCustomCell(Put put) {
322    return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
323  }
324
325  private static Cell createCustomCell(Append append) {
326    return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put,
327      APPEND_VALUE);
328  }
329
330  private static Cell createCustomCell(Increment inc) {
331    return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE);
332  }
333
334  private static Cell createCustomCell(Delete delete) {
335    return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP,
336      Cell.Type.DeleteColumn, null);
337  }
338
339  public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
340    static final AtomicInteger COUNT = new AtomicInteger(0);
341
342    @Override
343    public Optional<RegionObserver> getRegionObserver() {
344      return Optional.of(this);
345    }
346
347    @Override
348    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
349      Durability durability) throws IOException {
350      put.add(createCustomCell(put));
351      COUNT.incrementAndGet();
352    }
353
354    @Override
355    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
356      WALEdit edit, Durability durability) throws IOException {
357      delete.add(createCustomCell(delete));
358      COUNT.incrementAndGet();
359    }
360
361    @Override
362    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
363      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
364      boolean result) throws IOException {
365      put.add(createCustomCell(put));
366      COUNT.incrementAndGet();
367      return result;
368    }
369
370    @Override
371    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
372      byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
373      Delete delete, boolean result) throws IOException {
374      delete.add(createCustomCell(delete));
375      COUNT.incrementAndGet();
376      return result;
377    }
378
379    @Override
380    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
381      throws IOException {
382      append.add(createCustomCell(append));
383      COUNT.incrementAndGet();
384      return null;
385    }
386
387
388    @Override
389    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
390      throws IOException {
391      increment.add(createCustomCell(increment));
392      COUNT.incrementAndGet();
393      return null;
394    }
395
396  }
397
398}