001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.math.RoundingMode;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.Optional;
027import java.util.TreeMap;
028import java.util.stream.IntStream;
029import org.apache.commons.lang3.mutable.MutableLong;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Increment;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.InternalScanner;
046import org.apache.hadoop.hbase.regionserver.RegionScanner;
047import org.apache.hadoop.hbase.regionserver.ScanOptions;
048import org.apache.hadoop.hbase.regionserver.ScanType;
049import org.apache.hadoop.hbase.regionserver.ScannerContext;
050import org.apache.hadoop.hbase.regionserver.Store;
051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.yetus.audience.InterfaceAudience;
056
057import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
058
059/**
060 * An example for implementing a counter that reads is much less than writes, i.e, write heavy.
061 * <p>
062 * We will convert increment to put, and do aggregating when get. And of course the return value of
063 * increment is useless then.
064 * <p>
065 * Notice that this is only an example so we do not handle most corner cases, for example, you must
066 * provide a qualifier when doing a get.
067 */
068@InterfaceAudience.Private
069public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
070
071  @Override
072  public Optional<RegionObserver> getRegionObserver() {
073    return Optional.of(this);
074  }
075
076  @Override
077  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
078    ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
079    options.readAllVersions();
080  }
081
082  private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
083    return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
084      .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier).setTimestamp(ts)
085      .setValue(Bytes.toBytes(value)).build();
086  }
087
088  private InternalScanner wrap(byte[] family, InternalScanner scanner) {
089    return new InternalScanner() {
090
091      private List<Cell> srcResult = new ArrayList<>();
092
093      private byte[] row;
094
095      private byte[] qualifier;
096
097      private long timestamp;
098
099      private long sum;
100
101      @Override
102      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
103        boolean moreRows = scanner.next(srcResult, scannerContext);
104        if (srcResult.isEmpty()) {
105          if (!moreRows && row != null) {
106            result.add(createCell(row, family, qualifier, timestamp, sum));
107          }
108          return moreRows;
109        }
110        Cell firstCell = srcResult.get(0);
111        // Check if there is a row change first. All the cells will come from the same row so just
112        // check the first one once is enough.
113        if (row == null) {
114          row = CellUtil.cloneRow(firstCell);
115          qualifier = CellUtil.cloneQualifier(firstCell);
116        } else if (!CellUtil.matchingRows(firstCell, row)) {
117          result.add(createCell(row, family, qualifier, timestamp, sum));
118          row = CellUtil.cloneRow(firstCell);
119          qualifier = CellUtil.cloneQualifier(firstCell);
120          sum = 0;
121        }
122        srcResult.forEach(c -> {
123          if (CellUtil.matchingQualifier(c, qualifier)) {
124            sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
125          } else {
126            result.add(createCell(row, family, qualifier, timestamp, sum));
127            qualifier = CellUtil.cloneQualifier(c);
128            sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
129          }
130          timestamp = c.getTimestamp();
131        });
132        if (!moreRows) {
133          result.add(createCell(row, family, qualifier, timestamp, sum));
134        }
135        srcResult.clear();
136        return moreRows;
137      }
138
139      @Override
140      public void close() throws IOException {
141        scanner.close();
142      }
143    };
144  }
145
146  @Override
147  public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
148    InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
149    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
150  }
151
152  @Override
153  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
154    ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
155    CompactionRequest request) throws IOException {
156    options.readAllVersions();
157  }
158
159  @Override
160  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
161    InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
162    CompactionRequest request) throws IOException {
163    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
164  }
165
166  @Override
167  public void preMemStoreCompactionCompactScannerOpen(
168    ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
169    throws IOException {
170    options.readAllVersions();
171  }
172
173  @Override
174  public InternalScanner preMemStoreCompactionCompact(
175    ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
176    throws IOException {
177    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
178  }
179
180  @Override
181  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
182    throws IOException {
183    Scan scan =
184      new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
185    NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
186      new TreeMap<>(Bytes.BYTES_COMPARATOR);
187    get.getFamilyMap().forEach((cf, cqs) -> {
188      NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
189      sums.put(cf, ss);
190      cqs.forEach(cq -> {
191        ss.put(cq, new MutableLong(0));
192        scan.addColumn(cf, cq);
193      });
194    });
195    List<Cell> cells = new ArrayList<>();
196    try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
197      boolean moreRows;
198      do {
199        moreRows = scanner.next(cells);
200        for (Cell cell : cells) {
201          byte[] family = CellUtil.cloneFamily(cell);
202          byte[] qualifier = CellUtil.cloneQualifier(cell);
203          long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
204          sums.get(family).get(qualifier).add(value);
205        }
206        cells.clear();
207      } while (moreRows);
208    }
209    sums.forEach((cf, m) -> m.forEach((cq, s) -> result
210      .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
211    c.bypass();
212  }
213
214  private final int mask;
215  private final MutableLong[] lastTimestamps;
216  {
217    int stripes =
218      1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
219    lastTimestamps =
220      IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
221    mask = stripes - 1;
222  }
223
224  // We need make sure the different put uses different timestamp otherwise we may lost some
225  // increments. This is a known issue for HBase.
226  private long getUniqueTimestamp(byte[] row) {
227    int slot = Bytes.hashCode(row) & mask;
228    MutableLong lastTimestamp = lastTimestamps[slot];
229    long now = EnvironmentEdgeManager.currentTime();
230    synchronized (lastTimestamp) {
231      long pt = lastTimestamp.longValue() >> 10;
232      if (now > pt) {
233        lastTimestamp.setValue(now << 10);
234      } else {
235        lastTimestamp.increment();
236      }
237      return lastTimestamp.longValue();
238    }
239  }
240
241  @Override
242  public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
243    throws IOException {
244    byte[] row = increment.getRow();
245    Put put = new Put(row);
246    long ts = getUniqueTimestamp(row);
247    for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
248      for (Cell cell : entry.getValue()) {
249        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
250          .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
251          .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
252            cell.getQualifierLength())
253          .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
254          .setType(Cell.Type.Put).setTimestamp(ts).build());
255      }
256    }
257    c.getEnvironment().getRegion().put(put);
258    c.bypass();
259    return Result.EMPTY_RESULT;
260  }
261
262  @Override
263  public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
264    ScanOptions options) throws IOException {
265    options.readAllVersions();
266  }
267}