001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Optional;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.wal.WALEdit;
031import org.apache.hadoop.hbase.wal.WALKey;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Class for testing WALObserver coprocessor. It will monitor WAL writing and restoring, and modify
037 * passed-in WALEdit, i.e, ignore specified columns when writing, or add a KeyValue. On the other
038 * side, it checks whether the ignored column is still in WAL when Restoreed at region reconstruct.
039 */
040public class SampleRegionWALCoprocessor
041  implements WALCoprocessor, RegionCoprocessor, WALObserver, RegionObserver {
042
043  private static final Logger LOG = LoggerFactory.getLogger(SampleRegionWALCoprocessor.class);
044
045  private byte[] tableName;
046  private byte[] row;
047  private byte[] ignoredFamily;
048  private byte[] ignoredQualifier;
049  private byte[] addedFamily;
050  private byte[] addedQualifier;
051  private byte[] changedFamily;
052  private byte[] changedQualifier;
053
054  private boolean preWALWriteCalled = false;
055  private boolean postWALWriteCalled = false;
056  private boolean preWALRestoreCalled = false;
057  private boolean postWALRestoreCalled = false;
058  private boolean preWALRollCalled = false;
059  private boolean postWALRollCalled = false;
060  private boolean preReplayWALsCalled = false;
061  private boolean postReplayWALsCalled = false;
062
063  /**
064   * Set values: with a table name, a column name which will be ignored, and a column name which
065   * will be added to WAL.
066   */
067  public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, byte[] chf,
068    byte[] chq, byte[] addf, byte[] addq) {
069    this.row = row;
070    this.tableName = tableName;
071    this.ignoredFamily = igf;
072    this.ignoredQualifier = igq;
073    this.addedFamily = addf;
074    this.addedQualifier = addq;
075    this.changedFamily = chf;
076    this.changedQualifier = chq;
077    preWALWriteCalled = false;
078    postWALWriteCalled = false;
079    preWALRestoreCalled = false;
080    postWALRestoreCalled = false;
081    preWALRollCalled = false;
082    postWALRollCalled = false;
083  }
084
085  @Override
086  public Optional<WALObserver> getWALObserver() {
087    return Optional.of(this);
088  }
089
090  @Override
091  public Optional<RegionObserver> getRegionObserver() {
092    return Optional.of(this);
093  }
094
095  @Override
096  public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
097    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
098    postWALWriteCalled = true;
099  }
100
101  @Override
102  public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, RegionInfo info,
103    WALKey logKey, WALEdit logEdit) throws IOException {
104    // check table name matches or not.
105    if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
106      return;
107    }
108    preWALWriteCalled = true;
109    // here we're going to remove one keyvalue from the WALEdit, and add
110    // another one to it.
111    List<Cell> cells = logEdit.getCells();
112    Cell deletedCell = null;
113    for (Cell cell : cells) {
114      // assume only one kv from the WALEdit matches.
115      byte[] family = CellUtil.cloneFamily(cell);
116      byte[] qulifier = CellUtil.cloneQualifier(cell);
117
118      if (Arrays.equals(family, ignoredFamily) && Arrays.equals(qulifier, ignoredQualifier)) {
119        LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
120        deletedCell = cell;
121      }
122      if (Arrays.equals(family, changedFamily) && Arrays.equals(qulifier, changedQualifier)) {
123        LOG.debug("Found the KeyValue from WALEdit which should be changed.");
124        cell.getValueArray()[cell.getValueOffset()] =
125          (byte) (cell.getValueArray()[cell.getValueOffset()] + 1);
126      }
127    }
128    if (null != row) {
129      cells.add(new KeyValue(row, addedFamily, addedQualifier));
130    }
131    if (deletedCell != null) {
132      LOG.debug("About to delete a KeyValue from WALEdit.");
133      cells.remove(deletedCell);
134    }
135  }
136
137  /**
138   * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
139   */
140  @Override
141  public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
142    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
143    preWALRestoreCalled = true;
144  }
145
146  @Override
147  public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
148    Path newPath) throws IOException {
149    preWALRollCalled = true;
150  }
151
152  @Override
153  public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
154    Path newPath) throws IOException {
155    postWALRollCalled = true;
156  }
157
158  /**
159   * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
160   */
161  @Override
162  public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
163    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
164    postWALRestoreCalled = true;
165  }
166
167  @Override
168  public void preReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
169    RegionInfo info, Path edits) throws IOException {
170    preReplayWALsCalled = true;
171  }
172
173  @Override
174  public void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
175    RegionInfo info, Path edits) throws IOException {
176    postReplayWALsCalled = true;
177  }
178
179  public boolean isPreWALWriteCalled() {
180    return preWALWriteCalled;
181  }
182
183  public boolean isPostWALWriteCalled() {
184    return postWALWriteCalled;
185  }
186
187  public boolean isPreWALRestoreCalled() {
188    LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called.");
189    return preWALRestoreCalled;
190  }
191
192  public boolean isPostWALRestoreCalled() {
193    LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called.");
194    return postWALRestoreCalled;
195  }
196
197  public boolean isPreWALRollCalled() {
198    return preWALRollCalled;
199  }
200
201  public boolean isPostWALRollCalled() {
202    return postWALRollCalled;
203  }
204
205  public boolean isPreReplayWALsCalled() {
206    return preReplayWALsCalled;
207  }
208
209  public boolean isPostReplayWALsCalled() {
210    return postReplayWALsCalled;
211  }
212}