001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellBuilder;
025import org.apache.hadoop.hbase.CellBuilderFactory;
026import org.apache.hadoop.hbase.CellBuilderType;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.CoprocessorEnvironment;
029import org.apache.hadoop.hbase.coprocessor.ObserverContext;
030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
032import org.apache.hadoop.hbase.coprocessor.RegionObserver;
033import org.apache.hadoop.hbase.regionserver.InternalScanner;
034import org.apache.hadoop.hbase.regionserver.ScanType;
035import org.apache.hadoop.hbase.regionserver.ScannerContext;
036import org.apache.hadoop.hbase.regionserver.Store;
037import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.yetus.audience.InterfaceAudience;
041
042/**
043 * This RegionObserver replaces the values of Puts from one value to another on compaction.
044 */
045@InterfaceAudience.Private
046public class ValueRewritingObserver implements RegionObserver, RegionCoprocessor {
047  public static final String ORIGINAL_VALUE_KEY = "hbase.examples.coprocessor.value.rewrite.orig";
048  public static final String REPLACED_VALUE_KEY =
049    "hbase.examples.coprocessor.value.rewrite.replaced";
050
051  private byte[] sourceValue = null;
052  private byte[] replacedValue = null;
053  private Bytes.ByteArrayComparator comparator;
054  private CellBuilder cellBuilder;
055
056  @Override
057  public Optional<RegionObserver> getRegionObserver() {
058    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
059    return Optional.of(this);
060  }
061
062  @Override
063  public void start(@SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
064    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
065    sourceValue = Bytes.toBytes(renv.getConfiguration().get(ORIGINAL_VALUE_KEY));
066    replacedValue = Bytes.toBytes(renv.getConfiguration().get(REPLACED_VALUE_KEY));
067    comparator = new Bytes.ByteArrayComparator();
068    cellBuilder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
069  }
070
071  @Override
072  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
073    final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
074    CompactionRequest request) {
075    InternalScanner modifyingScanner = new InternalScanner() {
076      @Override
077      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
078        boolean ret = scanner.next(result, scannerContext);
079        for (int i = 0; i < result.size(); i++) {
080          Cell c = result.get(i);
081          // Replace the Cell if the value is the one we're replacing
082          if (CellUtil.isPut(c) && comparator.compare(CellUtil.cloneValue(c), sourceValue) == 0) {
083            try {
084              cellBuilder.setRow(CellUtil.copyRow(c));
085              cellBuilder.setFamily(CellUtil.cloneFamily(c));
086              cellBuilder.setQualifier(CellUtil.cloneQualifier(c));
087              cellBuilder.setTimestamp(c.getTimestamp());
088              cellBuilder.setType(Cell.Type.Put);
089              // Make sure each cell gets a unique value
090              byte[] clonedValue = new byte[replacedValue.length];
091              System.arraycopy(replacedValue, 0, clonedValue, 0, replacedValue.length);
092              cellBuilder.setValue(clonedValue);
093              result.set(i, cellBuilder.build());
094            } finally {
095              cellBuilder.clear();
096            }
097          }
098        }
099        return ret;
100      }
101
102      @Override
103      public void close() throws IOException {
104        scanner.close();
105      }
106    };
107
108    return modifyingScanner;
109  }
110}