001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.coprocessor.example;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.Optional;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellBuilder;
024import org.apache.hadoop.hbase.CellBuilderFactory;
025import org.apache.hadoop.hbase.CellBuilderType;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.CoprocessorEnvironment;
028import org.apache.hadoop.hbase.coprocessor.ObserverContext;
029import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
031import org.apache.hadoop.hbase.coprocessor.RegionObserver;
032import org.apache.hadoop.hbase.regionserver.InternalScanner;
033import org.apache.hadoop.hbase.regionserver.ScanType;
034import org.apache.hadoop.hbase.regionserver.ScannerContext;
035import org.apache.hadoop.hbase.regionserver.Store;
036import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
037import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * This RegionObserver replaces the values of Puts from one value to another on compaction.
043 */
044@InterfaceAudience.Private
045public class ValueRewritingObserver implements RegionObserver, RegionCoprocessor {
046  public static final String ORIGINAL_VALUE_KEY =
047      "hbase.examples.coprocessor.value.rewrite.orig";
048  public static final String REPLACED_VALUE_KEY =
049      "hbase.examples.coprocessor.value.rewrite.replaced";
050
051  private byte[] sourceValue = null;
052  private byte[] replacedValue = null;
053  private Bytes.ByteArrayComparator comparator;
054  private CellBuilder cellBuilder;
055
056
057  @Override
058  public Optional<RegionObserver> getRegionObserver() {
059    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
060    return Optional.of(this);
061  }
062
063  @Override
064  public void start(
065      @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
066    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
067    sourceValue = Bytes.toBytes(renv.getConfiguration().get(ORIGINAL_VALUE_KEY));
068    replacedValue = Bytes.toBytes(renv.getConfiguration().get(REPLACED_VALUE_KEY));
069    comparator = new Bytes.ByteArrayComparator();
070    cellBuilder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
071  }
072
073  @Override
074  public InternalScanner preCompact(
075      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
076      final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
077      CompactionRequest request) {
078    InternalScanner modifyingScanner = new InternalScanner() {
079      @Override
080      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
081        boolean ret = scanner.next(result, scannerContext);
082        for (int i = 0; i < result.size(); i++) {
083          Cell c = result.get(i);
084          // Replace the Cell if the value is the one we're replacing
085          if (CellUtil.isPut(c) &&
086              comparator.compare(CellUtil.cloneValue(c), sourceValue) == 0) {
087            try {
088              cellBuilder.setRow(CellUtil.copyRow(c));
089              cellBuilder.setFamily(CellUtil.cloneFamily(c));
090              cellBuilder.setQualifier(CellUtil.cloneQualifier(c));
091              cellBuilder.setTimestamp(c.getTimestamp());
092              cellBuilder.setType(Cell.Type.Put);
093              // Make sure each cell gets a unique value
094              byte[] clonedValue = new byte[replacedValue.length];
095              System.arraycopy(replacedValue, 0, clonedValue, 0, replacedValue.length);
096              cellBuilder.setValue(clonedValue);
097              result.set(i, cellBuilder.build());
098            } finally {
099              cellBuilder.clear();
100            }
101          }
102        }
103        return ret;
104      }
105
106      @Override
107      public void close() throws IOException {
108        scanner.close();
109      }
110    };
111
112    return modifyingScanner;
113  }
114}