001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.math.RoundingMode; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.Optional; 027import java.util.TreeMap; 028import java.util.stream.IntStream; 029import org.apache.commons.lang3.mutable.MutableLong; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilderFactory; 032import org.apache.hadoop.hbase.CellBuilderType; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Increment; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 046import org.apache.hadoop.hbase.regionserver.InternalScanner; 047import org.apache.hadoop.hbase.regionserver.RegionScanner; 048import org.apache.hadoop.hbase.regionserver.ScanOptions; 049import org.apache.hadoop.hbase.regionserver.ScanType; 050import org.apache.hadoop.hbase.regionserver.ScannerContext; 051import org.apache.hadoop.hbase.regionserver.Store; 052import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 053import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.yetus.audience.InterfaceAudience; 057 058import org.apache.hbase.thirdparty.com.google.common.math.IntMath; 059 060/** 061 * An example for implementing a counter that reads is much less than writes, i.e, write heavy. 062 * <p> 063 * We will convert increment to put, and do aggregating when get. And of course the return value of 064 * increment is useless then. 065 * <p> 066 * Notice that this is only an example so we do not handle most corner cases, for example, you must 067 * provide a qualifier when doing a get. 068 */ 069@InterfaceAudience.Private 070public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver { 071 072 @Override 073 public Optional<RegionObserver> getRegionObserver() { 074 return Optional.of(this); 075 } 076 077 @Override 078 public void preFlushScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, 079 Store store, ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 080 options.readAllVersions(); 081 } 082 083 private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) { 084 return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 085 .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier).setTimestamp(ts) 086 .setValue(Bytes.toBytes(value)).build(); 087 } 088 089 private InternalScanner wrap(byte[] family, InternalScanner scanner) { 090 return new InternalScanner() { 091 092 private List<Cell> srcResult = new ArrayList<>(); 093 094 private byte[] row; 095 096 private byte[] qualifier; 097 098 private long timestamp; 099 100 private long sum; 101 102 @Override 103 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 104 throws IOException { 105 boolean moreRows = scanner.next(srcResult, scannerContext); 106 if (srcResult.isEmpty()) { 107 if (!moreRows && row != null) { 108 // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell, 109 // just do not use its methods since it may change without any deprecation cycle 110 result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum)); 111 } 112 return moreRows; 113 } 114 Cell firstCell = srcResult.get(0); 115 // Check if there is a row change first. All the cells will come from the same row so just 116 // check the first one once is enough. 117 if (row == null) { 118 row = CellUtil.cloneRow(firstCell); 119 qualifier = CellUtil.cloneQualifier(firstCell); 120 } else if (!CellUtil.matchingRows(firstCell, row)) { 121 // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell, 122 // just do not use its methods since it may change without any deprecation cycle 123 result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum)); 124 row = CellUtil.cloneRow(firstCell); 125 qualifier = CellUtil.cloneQualifier(firstCell); 126 sum = 0; 127 } 128 srcResult.forEach(c -> { 129 if (CellUtil.matchingQualifier(c, qualifier)) { 130 sum += Bytes.toLong(c.getValueArray(), c.getValueOffset()); 131 } else { 132 // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell, 133 // just do not use its methods since it may change without any deprecation cycle 134 result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum)); 135 qualifier = CellUtil.cloneQualifier(c); 136 sum = Bytes.toLong(c.getValueArray(), c.getValueOffset()); 137 } 138 timestamp = c.getTimestamp(); 139 }); 140 if (!moreRows) { 141 // all cells in HBase are ExtendedCells, so you are fine to cast it to ExtendedCell, 142 // just do not use its methods since it may change without any deprecation cycle 143 result.add((ExtendedCell) createCell(row, family, qualifier, timestamp, sum)); 144 } 145 srcResult.clear(); 146 return moreRows; 147 } 148 149 @Override 150 public void close() throws IOException { 151 scanner.close(); 152 } 153 }; 154 } 155 156 @Override 157 public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 158 Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 159 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 160 } 161 162 @Override 163 public void preCompactScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, 164 Store store, ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 165 CompactionRequest request) throws IOException { 166 options.readAllVersions(); 167 } 168 169 @Override 170 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 171 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 172 CompactionRequest request) throws IOException { 173 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 174 } 175 176 @Override 177 public void preMemStoreCompactionCompactScannerOpen( 178 ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, ScanOptions options) 179 throws IOException { 180 options.readAllVersions(); 181 } 182 183 @Override 184 public InternalScanner preMemStoreCompactionCompact( 185 ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) 186 throws IOException { 187 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 188 } 189 190 @Override 191 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 192 List<Cell> result) throws IOException { 193 Scan scan = 194 new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions(); 195 NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums = 196 new TreeMap<>(Bytes.BYTES_COMPARATOR); 197 get.getFamilyMap().forEach((cf, cqs) -> { 198 NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR); 199 sums.put(cf, ss); 200 cqs.forEach(cq -> { 201 ss.put(cq, new MutableLong(0)); 202 scan.addColumn(cf, cq); 203 }); 204 }); 205 List<Cell> cells = new ArrayList<>(); 206 try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { 207 boolean moreRows; 208 do { 209 moreRows = scanner.next(cells); 210 for (Cell cell : cells) { 211 byte[] family = CellUtil.cloneFamily(cell); 212 byte[] qualifier = CellUtil.cloneQualifier(cell); 213 long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset()); 214 sums.get(family).get(qualifier).add(value); 215 } 216 cells.clear(); 217 } while (moreRows); 218 } 219 sums.forEach((cf, m) -> m.forEach((cq, s) -> result 220 .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue())))); 221 c.bypass(); 222 } 223 224 private final int mask; 225 private final MutableLong[] lastTimestamps; 226 { 227 int stripes = 228 1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING); 229 lastTimestamps = 230 IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new); 231 mask = stripes - 1; 232 } 233 234 // We need make sure the different put uses different timestamp otherwise we may lost some 235 // increments. This is a known issue for HBase. 236 private long getUniqueTimestamp(byte[] row) { 237 int slot = Bytes.hashCode(row) & mask; 238 MutableLong lastTimestamp = lastTimestamps[slot]; 239 long now = EnvironmentEdgeManager.currentTime(); 240 synchronized (lastTimestamp) { 241 long pt = lastTimestamp.longValue() >> 10; 242 if (now > pt) { 243 lastTimestamp.setValue(now << 10); 244 } else { 245 lastTimestamp.increment(); 246 } 247 return lastTimestamp.longValue(); 248 } 249 } 250 251 @Override 252 public Result preIncrement(ObserverContext<? extends RegionCoprocessorEnvironment> c, 253 Increment increment) throws IOException { 254 byte[] row = increment.getRow(); 255 Put put = new Put(row); 256 long ts = getUniqueTimestamp(row); 257 for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { 258 for (Cell cell : entry.getValue()) { 259 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 260 .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) 261 .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), 262 cell.getQualifierLength()) 263 .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) 264 .setType(Cell.Type.Put).setTimestamp(ts).build()); 265 } 266 } 267 c.getEnvironment().getRegion().put(put); 268 c.bypass(); 269 return Result.EMPTY_RESULT; 270 } 271 272 @Override 273 public void preStoreScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 274 Store store, ScanOptions options) throws IOException { 275 options.readAllVersions(); 276 } 277}