001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Optional; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.wal.WALEdit; 031import org.apache.hadoop.hbase.wal.WALKey; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Class for testing WALObserver coprocessor. It will monitor WAL writing and restoring, and modify 037 * passed-in WALEdit, i.e, ignore specified columns when writing, or add a KeyValue. On the other 038 * side, it checks whether the ignored column is still in WAL when Restoreed at region reconstruct. 039 */ 040public class SampleRegionWALCoprocessor 041 implements WALCoprocessor, RegionCoprocessor, WALObserver, RegionObserver { 042 043 private static final Logger LOG = LoggerFactory.getLogger(SampleRegionWALCoprocessor.class); 044 045 private byte[] tableName; 046 private byte[] row; 047 private byte[] ignoredFamily; 048 private byte[] ignoredQualifier; 049 private byte[] addedFamily; 050 private byte[] addedQualifier; 051 private byte[] changedFamily; 052 private byte[] changedQualifier; 053 054 private boolean preWALWriteCalled = false; 055 private boolean postWALWriteCalled = false; 056 private boolean preWALRestoreCalled = false; 057 private boolean postWALRestoreCalled = false; 058 private boolean preWALRollCalled = false; 059 private boolean postWALRollCalled = false; 060 private boolean preReplayWALsCalled = false; 061 private boolean postReplayWALsCalled = false; 062 063 /** 064 * Set values: with a table name, a column name which will be ignored, and a column name which 065 * will be added to WAL. 066 */ 067 public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, byte[] chf, 068 byte[] chq, byte[] addf, byte[] addq) { 069 this.row = row; 070 this.tableName = tableName; 071 this.ignoredFamily = igf; 072 this.ignoredQualifier = igq; 073 this.addedFamily = addf; 074 this.addedQualifier = addq; 075 this.changedFamily = chf; 076 this.changedQualifier = chq; 077 preWALWriteCalled = false; 078 postWALWriteCalled = false; 079 preWALRestoreCalled = false; 080 postWALRestoreCalled = false; 081 preWALRollCalled = false; 082 postWALRollCalled = false; 083 } 084 085 @Override 086 public Optional<WALObserver> getWALObserver() { 087 return Optional.of(this); 088 } 089 090 @Override 091 public Optional<RegionObserver> getRegionObserver() { 092 return Optional.of(this); 093 } 094 095 @Override 096 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, 097 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 098 postWALWriteCalled = true; 099 } 100 101 @Override 102 public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, RegionInfo info, 103 WALKey logKey, WALEdit logEdit) throws IOException { 104 // check table name matches or not. 105 if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) { 106 return; 107 } 108 preWALWriteCalled = true; 109 // here we're going to remove one keyvalue from the WALEdit, and add 110 // another one to it. 111 List<Cell> cells = logEdit.getCells(); 112 Cell deletedCell = null; 113 for (Cell cell : cells) { 114 // assume only one kv from the WALEdit matches. 115 byte[] family = CellUtil.cloneFamily(cell); 116 byte[] qulifier = CellUtil.cloneQualifier(cell); 117 118 if (Arrays.equals(family, ignoredFamily) && Arrays.equals(qulifier, ignoredQualifier)) { 119 LOG.debug("Found the KeyValue from WALEdit which should be ignored."); 120 deletedCell = cell; 121 } 122 if (Arrays.equals(family, changedFamily) && Arrays.equals(qulifier, changedQualifier)) { 123 LOG.debug("Found the KeyValue from WALEdit which should be changed."); 124 cell.getValueArray()[cell.getValueOffset()] = 125 (byte) (cell.getValueArray()[cell.getValueOffset()] + 1); 126 } 127 } 128 if (null != row) { 129 cells.add(new KeyValue(row, addedFamily, addedQualifier)); 130 } 131 if (deletedCell != null) { 132 LOG.debug("About to delete a KeyValue from WALEdit."); 133 cells.remove(deletedCell); 134 } 135 } 136 137 /** 138 * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed. 139 */ 140 @Override 141 public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, 142 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 143 preWALRestoreCalled = true; 144 } 145 146 @Override 147 public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath, 148 Path newPath) throws IOException { 149 preWALRollCalled = true; 150 } 151 152 @Override 153 public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath, 154 Path newPath) throws IOException { 155 postWALRollCalled = true; 156 } 157 158 /** 159 * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed. 160 */ 161 @Override 162 public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, 163 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 164 postWALRestoreCalled = true; 165 } 166 167 @Override 168 public void preReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 169 RegionInfo info, Path edits) throws IOException { 170 preReplayWALsCalled = true; 171 } 172 173 @Override 174 public void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 175 RegionInfo info, Path edits) throws IOException { 176 postReplayWALsCalled = true; 177 } 178 179 public boolean isPreWALWriteCalled() { 180 return preWALWriteCalled; 181 } 182 183 public boolean isPostWALWriteCalled() { 184 return postWALWriteCalled; 185 } 186 187 public boolean isPreWALRestoreCalled() { 188 LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called."); 189 return preWALRestoreCalled; 190 } 191 192 public boolean isPostWALRestoreCalled() { 193 LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called."); 194 return postWALRestoreCalled; 195 } 196 197 public boolean isPreWALRollCalled() { 198 return preWALRollCalled; 199 } 200 201 public boolean isPostWALRollCalled() { 202 return postWALRollCalled; 203 } 204 205 public boolean isPreReplayWALsCalled() { 206 return preReplayWALsCalled; 207 } 208 209 public boolean isPostReplayWALsCalled() { 210 return postReplayWALsCalled; 211 } 212}