001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Optional;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.wal.WALEdit;
031import org.apache.hadoop.hbase.wal.WALKey;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Class for testing WALObserver coprocessor. It will monitor WAL writing and restoring, and modify
037 * passed-in WALEdit, i.e, ignore specified columns when writing, or add a KeyValue. On the other
038 * side, it checks whether the ignored column is still in WAL when Restoreed at region reconstruct.
039 */
040public class SampleRegionWALCoprocessor
041  implements WALCoprocessor, RegionCoprocessor, WALObserver, RegionObserver {
042
043  private static final Logger LOG = LoggerFactory.getLogger(SampleRegionWALCoprocessor.class);
044
045  private byte[] tableName;
046  private byte[] row;
047  private byte[] ignoredFamily;
048  private byte[] ignoredQualifier;
049  private byte[] addedFamily;
050  private byte[] addedQualifier;
051  private byte[] changedFamily;
052  private byte[] changedQualifier;
053
054  private boolean preWALWriteCalled = false;
055  private boolean postWALWriteCalled = false;
056  private boolean preWALRestoreCalled = false;
057  private boolean postWALRestoreCalled = false;
058  private boolean preWALRollCalled = false;
059  private boolean postWALRollCalled = false;
060
061  /**
062   * Set values: with a table name, a column name which will be ignored, and a column name which
063   * will be added to WAL.
064   */
065  public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, byte[] chf,
066    byte[] chq, byte[] addf, byte[] addq) {
067    this.row = row;
068    this.tableName = tableName;
069    this.ignoredFamily = igf;
070    this.ignoredQualifier = igq;
071    this.addedFamily = addf;
072    this.addedQualifier = addq;
073    this.changedFamily = chf;
074    this.changedQualifier = chq;
075    preWALWriteCalled = false;
076    postWALWriteCalled = false;
077    preWALRestoreCalled = false;
078    postWALRestoreCalled = false;
079    preWALRollCalled = false;
080    postWALRollCalled = false;
081  }
082
083  @Override
084  public Optional<WALObserver> getWALObserver() {
085    return Optional.of(this);
086  }
087
088  @Override
089  public Optional<RegionObserver> getRegionObserver() {
090    return Optional.of(this);
091  }
092
093  @Override
094  public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
095    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
096    postWALWriteCalled = true;
097  }
098
099  @Override
100  public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, RegionInfo info,
101    WALKey logKey, WALEdit logEdit) throws IOException {
102    // check table name matches or not.
103    if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
104      return;
105    }
106    preWALWriteCalled = true;
107    // here we're going to remove one keyvalue from the WALEdit, and add
108    // another one to it.
109    List<Cell> cells = logEdit.getCells();
110    Cell deletedCell = null;
111    for (Cell cell : cells) {
112      // assume only one kv from the WALEdit matches.
113      byte[] family = CellUtil.cloneFamily(cell);
114      byte[] qulifier = CellUtil.cloneQualifier(cell);
115
116      if (Arrays.equals(family, ignoredFamily) && Arrays.equals(qulifier, ignoredQualifier)) {
117        LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
118        deletedCell = cell;
119      }
120      if (Arrays.equals(family, changedFamily) && Arrays.equals(qulifier, changedQualifier)) {
121        LOG.debug("Found the KeyValue from WALEdit which should be changed.");
122        cell.getValueArray()[cell.getValueOffset()] =
123          (byte) (cell.getValueArray()[cell.getValueOffset()] + 1);
124      }
125    }
126    if (null != row) {
127      cells.add(new KeyValue(row, addedFamily, addedQualifier));
128    }
129    if (deletedCell != null) {
130      LOG.debug("About to delete a KeyValue from WALEdit.");
131      cells.remove(deletedCell);
132    }
133  }
134
135  /**
136   * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
137   */
138  @Override
139  public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
140    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
141    preWALRestoreCalled = true;
142  }
143
144  @Override
145  public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
146    Path newPath) throws IOException {
147    preWALRollCalled = true;
148  }
149
150  @Override
151  public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
152    Path newPath) throws IOException {
153    postWALRollCalled = true;
154  }
155
156  /**
157   * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
158   */
159  @Override
160  public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
161    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
162    postWALRestoreCalled = true;
163  }
164
165  public boolean isPreWALWriteCalled() {
166    return preWALWriteCalled;
167  }
168
169  public boolean isPostWALWriteCalled() {
170    return postWALWriteCalled;
171  }
172
173  public boolean isPreWALRestoreCalled() {
174    LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called.");
175    return preWALRestoreCalled;
176  }
177
178  public boolean isPostWALRestoreCalled() {
179    LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called.");
180    return postWALRestoreCalled;
181  }
182
183  public boolean isPreWALRollCalled() {
184    return preWALRollCalled;
185  }
186
187  public boolean isPostWALRollCalled() {
188    return postWALRollCalled;
189  }
190}