001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.math.RoundingMode; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.Optional; 027import java.util.TreeMap; 028import java.util.stream.IntStream; 029import org.apache.commons.lang3.mutable.MutableLong; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilderFactory; 032import org.apache.hadoop.hbase.CellBuilderType; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Increment; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.regionserver.RegionScanner; 047import org.apache.hadoop.hbase.regionserver.ScanOptions; 048import org.apache.hadoop.hbase.regionserver.ScanType; 049import org.apache.hadoop.hbase.regionserver.ScannerContext; 050import org.apache.hadoop.hbase.regionserver.Store; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.yetus.audience.InterfaceAudience; 055 056import org.apache.hbase.thirdparty.com.google.common.math.IntMath; 057 058/** 059 * An example for implementing a counter that reads is much less than writes, i.e, write heavy. 060 * <p> 061 * We will convert increment to put, and do aggregating when get. And of course the return value of 062 * increment is useless then. 063 * <p> 064 * Notice that this is only an example so we do not handle most corner cases, for example, you must 065 * provide a qualifier when doing a get. 066 */ 067@InterfaceAudience.Private 068public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver { 069 070 @Override 071 public Optional<RegionObserver> getRegionObserver() { 072 return Optional.of(this); 073 } 074 075 @Override 076 public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 077 ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 078 options.readAllVersions(); 079 } 080 081 private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) { 082 return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 083 .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier) 084 .setTimestamp(ts).setValue(Bytes.toBytes(value)).build(); 085 } 086 087 private InternalScanner wrap(byte[] family, InternalScanner scanner) { 088 return new InternalScanner() { 089 090 private List<Cell> srcResult = new ArrayList<>(); 091 092 private byte[] row; 093 094 private byte[] qualifier; 095 096 private long timestamp; 097 098 private long sum; 099 100 @Override 101 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 102 boolean moreRows = scanner.next(srcResult, scannerContext); 103 if (srcResult.isEmpty()) { 104 if (!moreRows && row != null) { 105 result.add(createCell(row, family, qualifier, timestamp, sum)); 106 } 107 return moreRows; 108 } 109 Cell firstCell = srcResult.get(0); 110 // Check if there is a row change first. All the cells will come from the same row so just 111 // check the first one once is enough. 112 if (row == null) { 113 row = CellUtil.cloneRow(firstCell); 114 qualifier = CellUtil.cloneQualifier(firstCell); 115 } else if (!CellUtil.matchingRows(firstCell, row)) { 116 result.add(createCell(row, family, qualifier, timestamp, sum)); 117 row = CellUtil.cloneRow(firstCell); 118 qualifier = CellUtil.cloneQualifier(firstCell); 119 sum = 0; 120 } 121 srcResult.forEach(c -> { 122 if (CellUtil.matchingQualifier(c, qualifier)) { 123 sum += Bytes.toLong(c.getValueArray(), c.getValueOffset()); 124 } else { 125 result.add(createCell(row, family, qualifier, timestamp, sum)); 126 qualifier = CellUtil.cloneQualifier(c); 127 sum = Bytes.toLong(c.getValueArray(), c.getValueOffset()); 128 } 129 timestamp = c.getTimestamp(); 130 }); 131 if (!moreRows) { 132 result.add(createCell(row, family, qualifier, timestamp, sum)); 133 } 134 srcResult.clear(); 135 return moreRows; 136 } 137 138 @Override 139 public void close() throws IOException { 140 scanner.close(); 141 } 142 }; 143 } 144 145 @Override 146 public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 147 InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 148 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 149 } 150 151 @Override 152 public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 153 ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 154 CompactionRequest request) throws IOException { 155 options.readAllVersions(); 156 } 157 158 @Override 159 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 160 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 161 CompactionRequest request) throws IOException { 162 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 163 } 164 165 @Override 166 public void preMemStoreCompactionCompactScannerOpen( 167 ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options) 168 throws IOException { 169 options.readAllVersions(); 170 } 171 172 @Override 173 public InternalScanner preMemStoreCompactionCompact( 174 ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) 175 throws IOException { 176 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 177 } 178 179 @Override 180 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) 181 throws IOException { 182 Scan scan = 183 new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions(); 184 NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums = 185 new TreeMap<>(Bytes.BYTES_COMPARATOR); 186 get.getFamilyMap().forEach((cf, cqs) -> { 187 NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR); 188 sums.put(cf, ss); 189 cqs.forEach(cq -> { 190 ss.put(cq, new MutableLong(0)); 191 scan.addColumn(cf, cq); 192 }); 193 }); 194 List<Cell> cells = new ArrayList<>(); 195 try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { 196 boolean moreRows; 197 do { 198 moreRows = scanner.next(cells); 199 for (Cell cell : cells) { 200 byte[] family = CellUtil.cloneFamily(cell); 201 byte[] qualifier = CellUtil.cloneQualifier(cell); 202 long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset()); 203 sums.get(family).get(qualifier).add(value); 204 } 205 cells.clear(); 206 } while (moreRows); 207 } 208 sums.forEach((cf, m) -> m.forEach((cq, s) -> result 209 .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue())))); 210 c.bypass(); 211 } 212 213 private final int mask; 214 private final MutableLong[] lastTimestamps; 215 { 216 int stripes = 217 1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING); 218 lastTimestamps = 219 IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new); 220 mask = stripes - 1; 221 } 222 223 // We need make sure the different put uses different timestamp otherwise we may lost some 224 // increments. This is a known issue for HBase. 225 private long getUniqueTimestamp(byte[] row) { 226 int slot = Bytes.hashCode(row) & mask; 227 MutableLong lastTimestamp = lastTimestamps[slot]; 228 long now = System.currentTimeMillis(); 229 synchronized (lastTimestamp) { 230 long pt = lastTimestamp.longValue() >> 10; 231 if (now > pt) { 232 lastTimestamp.setValue(now << 10); 233 } else { 234 lastTimestamp.increment(); 235 } 236 return lastTimestamp.longValue(); 237 } 238 } 239 240 @Override 241 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 242 throws IOException { 243 byte[] row = increment.getRow(); 244 Put put = new Put(row); 245 long ts = getUniqueTimestamp(row); 246 for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { 247 for (Cell cell : entry.getValue()) { 248 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 249 .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) 250 .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), 251 cell.getQualifierLength()) 252 .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) 253 .setType(Cell.Type.Put).setTimestamp(ts).build()); 254 } 255 } 256 c.getEnvironment().getRegion().put(put); 257 c.bypass(); 258 return Result.EMPTY_RESULT; 259 } 260 261 @Override 262 public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store, 263 ScanOptions options) throws IOException { 264 options.readAllVersions(); 265 } 266}