001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellBuilder;
025import org.apache.hadoop.hbase.CellBuilderFactory;
026import org.apache.hadoop.hbase.CellBuilderType;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.CoprocessorEnvironment;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.regionserver.InternalScanner;
035import org.apache.hadoop.hbase.regionserver.ScanType;
036import org.apache.hadoop.hbase.regionserver.ScannerContext;
037import org.apache.hadoop.hbase.regionserver.Store;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
039import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.yetus.audience.InterfaceAudience;
042
043/**
044 * This RegionObserver replaces the values of Puts from one value to another on compaction.
045 */
046@InterfaceAudience.Private
047public class ValueRewritingObserver implements RegionObserver, RegionCoprocessor {
048  public static final String ORIGINAL_VALUE_KEY = "hbase.examples.coprocessor.value.rewrite.orig";
049  public static final String REPLACED_VALUE_KEY =
050    "hbase.examples.coprocessor.value.rewrite.replaced";
051
052  private byte[] sourceValue = null;
053  private byte[] replacedValue = null;
054  private Bytes.ByteArrayComparator comparator;
055  private CellBuilder cellBuilder;
056
057  @Override
058  public Optional<RegionObserver> getRegionObserver() {
059    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
060    return Optional.of(this);
061  }
062
063  @Override
064  public void start(@SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
065    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
066    sourceValue = Bytes.toBytes(renv.getConfiguration().get(ORIGINAL_VALUE_KEY));
067    replacedValue = Bytes.toBytes(renv.getConfiguration().get(REPLACED_VALUE_KEY));
068    comparator = new Bytes.ByteArrayComparator();
069    cellBuilder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
070  }
071
072  @Override
073  public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
074    Store store, final InternalScanner scanner, ScanType scanType,
075    CompactionLifeCycleTracker tracker, CompactionRequest request) {
076    InternalScanner modifyingScanner = new InternalScanner() {
077      @Override
078      public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
079        throws IOException {
080        boolean ret = scanner.next(result, scannerContext);
081        for (int i = 0; i < result.size(); i++) {
082          Cell c = (Cell) result.get(i);
083          // Replace the Cell if the value is the one we're replacing
084          if (CellUtil.isPut(c) && comparator.compare(CellUtil.cloneValue(c), sourceValue) == 0) {
085            try {
086              cellBuilder.setRow(CellUtil.copyRow(c));
087              cellBuilder.setFamily(CellUtil.cloneFamily(c));
088              cellBuilder.setQualifier(CellUtil.cloneQualifier(c));
089              cellBuilder.setTimestamp(c.getTimestamp());
090              cellBuilder.setType(Cell.Type.Put);
091              // Make sure each cell gets a unique value
092              byte[] clonedValue = new byte[replacedValue.length];
093              System.arraycopy(replacedValue, 0, clonedValue, 0, replacedValue.length);
094              cellBuilder.setValue(clonedValue);
095              // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell,
096              // just do not use its methods since it may change without any deprecation cycle
097              result.set(i, (ExtendedCell) cellBuilder.build());
098            } finally {
099              cellBuilder.clear();
100            }
101          }
102        }
103        return ret;
104      }
105
106      @Override
107      public void close() throws IOException {
108        scanner.close();
109      }
110    };
111
112    return modifyingScanner;
113  }
114}