001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Arrays;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Append;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Increment;
041import org.apache.hadoop.hbase.client.Mutation;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
047import org.apache.hadoop.util.StringUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
055
056/** Creates multiple threads that write key/values into the */
057@InterfaceAudience.Private
058public class MultiThreadedUpdater extends MultiThreadedWriterBase {
059  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class);
060
061  protected Set<HBaseUpdaterThread> updaters = new HashSet<>();
062
063  private MultiThreadedWriterBase writer = null;
064  private boolean isBatchUpdate = false;
065  private boolean ignoreNonceConflicts = false;
066  private final double updatePercent;
067
068  public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
069    TableName tableName, double updatePercent) throws IOException {
070    super(dataGen, conf, tableName, "U");
071    this.updatePercent = updatePercent;
072  }
073
074  /** Use batch vs. separate updates for every column in a row */
075  public void setBatchUpdate(boolean isBatchUpdate) {
076    this.isBatchUpdate = isBatchUpdate;
077  }
078
079  public synchronized void linkToWriter(MultiThreadedWriterBase writer) {
080    this.writer = writer;
081    writer.setTrackWroteKeys(true);
082  }
083
084  @Override
085  public void start(long startKey, long endKey, int numThreads) throws IOException {
086    super.start(startKey, endKey, numThreads);
087
088    if (verbose) {
089      LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
090    }
091
092    addUpdaterThreads(numThreads);
093
094    startThreads(updaters);
095  }
096
097  protected void addUpdaterThreads(int numThreads) throws IOException {
098    for (int i = 0; i < numThreads; ++i) {
099      HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
100      updaters.add(updater);
101    }
102  }
103
104  private long getNextKeyToUpdate() {
105    if (writer == null) {
106      return nextKeyToWrite.getAndIncrement();
107    }
108    synchronized (this) {
109      if (nextKeyToWrite.get() >= endKey) {
110        // Finished the whole key range
111        return endKey;
112      }
113      while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
114        Threads.sleepWithoutInterrupt(100);
115      }
116      long k = nextKeyToWrite.getAndIncrement();
117      if (writer.failedToWriteKey(k)) {
118        failedKeySet.add(k);
119        return getNextKeyToUpdate();
120      }
121      return k;
122    }
123  }
124
125  protected class HBaseUpdaterThread extends Thread {
126    protected final Table table;
127
128    public HBaseUpdaterThread(int updaterId) throws IOException {
129      setName(getClass().getSimpleName() + "_" + updaterId);
130      table = createTable();
131    }
132
133    protected Table createTable() throws IOException {
134      return connection.getTable(tableName);
135    }
136
137    @Override
138    public void run() {
139      try {
140        Random rand = ThreadLocalRandom.current();
141        long rowKeyBase;
142        StringBuilder buf = new StringBuilder();
143        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
144        while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
145          if (rand.nextInt(100) < updatePercent) {
146            byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
147            Increment inc = new Increment(rowKey);
148            Append app = new Append(rowKey);
149            numKeys.addAndGet(1);
150            int columnCount = 0;
151            for (byte[] cf : columnFamilies) {
152              long cfHash = Arrays.hashCode(cf);
153              inc.addColumn(cf, INCREMENT, cfHash);
154              buf.setLength(0); // Clear the buffer
155              buf.append("#").append(Bytes.toString(INCREMENT));
156              buf.append(":").append(MutationType.INCREMENT.getNumber());
157              app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
158              ++columnCount;
159              if (!isBatchUpdate) {
160                mutate(table, inc, rowKeyBase);
161                numCols.addAndGet(1);
162                inc = new Increment(rowKey);
163                mutate(table, app, rowKeyBase);
164                numCols.addAndGet(1);
165                app = new Append(rowKey);
166              }
167              Get get = new Get(rowKey);
168              get.addFamily(cf);
169              try {
170                get = dataGenerator.beforeGet(rowKeyBase, get);
171              } catch (Exception e) {
172                // Ideally wont happen
173                LOG.warn("Failed to modify the get from the load generator  = ["
174                  + Bytes.toString(get.getRow()) + "], column family = [" + Bytes.toString(cf)
175                  + "]", e);
176              }
177              Result result = getRow(get, rowKeyBase, cf);
178              Map<byte[], byte[]> columnValues = result != null ? result.getFamilyMap(cf) : null;
179              if (columnValues == null) {
180                int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
181                if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
182                  LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
183                } else {
184                  failedKeySet.add(rowKeyBase);
185                  LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey)
186                    + "], since we could not get the original row");
187                }
188              }
189              if (columnValues != null) {
190                for (byte[] column : columnValues.keySet()) {
191                  if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
192                    continue;
193                  }
194                  MutationType mt =
195                    MutationType.values()[rand.nextInt(MutationType.values().length)];
196                  long columnHash = Arrays.hashCode(column);
197                  long hashCode = cfHash + columnHash;
198                  byte[] hashCodeBytes = Bytes.toBytes(hashCode);
199                  byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
200                  if (hashCode % 2 == 0) {
201                    Cell kv = result.getColumnLatestCell(cf, column);
202                    checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
203                    Preconditions.checkNotNull(checkedValue,
204                      "Column value to be checked should not be null");
205                  }
206                  buf.setLength(0); // Clear the buffer
207                  buf.append("#").append(Bytes.toString(column)).append(":");
208                  ++columnCount;
209                  switch (mt) {
210                    case PUT:
211                      Put put = new Put(rowKey);
212                      put.addColumn(cf, column, hashCodeBytes);
213                      mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
214                      buf.append(MutationType.PUT.getNumber());
215                      break;
216                    case DELETE:
217                      Delete delete = new Delete(rowKey);
218                      // Delete all versions since a put
219                      // could be called multiple times if CM is used
220                      delete.addColumns(cf, column);
221                      mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
222                      buf.append(MutationType.DELETE.getNumber());
223                      break;
224                    default:
225                      buf.append(MutationType.APPEND.getNumber());
226                      app.addColumn(cf, column, hashCodeBytes);
227                  }
228                  app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
229                  if (!isBatchUpdate) {
230                    mutate(table, app, rowKeyBase);
231                    numCols.addAndGet(1);
232                    app = new Append(rowKey);
233                  }
234                }
235              }
236            }
237            if (isBatchUpdate) {
238              if (verbose) {
239                LOG.debug("Preparing increment and append for key = [" + Bytes.toString(rowKey)
240                  + "], " + columnCount + " columns");
241              }
242              mutate(table, inc, rowKeyBase);
243              mutate(table, app, rowKeyBase);
244              numCols.addAndGet(columnCount);
245            }
246          }
247          if (trackWroteKeys) {
248            wroteKeys.add(rowKeyBase);
249          }
250        }
251      } finally {
252        closeHTable();
253        numThreadsWorking.decrementAndGet();
254      }
255    }
256
257    protected void closeHTable() {
258      try {
259        if (table != null) {
260          table.close();
261        }
262      } catch (IOException e) {
263        LOG.error("Error closing table", e);
264      }
265    }
266
267    protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
268      Result result = null;
269      try {
270        result = table.get(get);
271      } catch (IOException ie) {
272        LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
273          + "], column family = [" + Bytes.toString(cf) + "]", ie);
274      }
275      return result;
276    }
277
278    public void mutate(Table table, Mutation m, long keyBase) {
279      mutate(table, m, keyBase, null, null, null, null);
280    }
281
282    public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q,
283      byte[] v) {
284      long start = EnvironmentEdgeManager.currentTime();
285      try {
286        m = dataGenerator.beforeMutate(keyBase, m);
287        if (m instanceof Increment) {
288          table.increment((Increment) m);
289        } else if (m instanceof Append) {
290          table.append((Append) m);
291        } else if (m instanceof Put) {
292          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
293        } else if (m instanceof Delete) {
294          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
295        } else {
296          throw new IllegalArgumentException(
297            "unsupported mutation " + m.getClass().getSimpleName());
298        }
299        totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
300      } catch (IOException e) {
301        if (ignoreNonceConflicts) {
302          LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
303          totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
304          return;
305        }
306        failedKeySet.add(keyBase);
307        String exceptionInfo;
308        if (e instanceof RetriesExhaustedWithDetailsException) {
309          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
310          exceptionInfo = aggEx.getExhaustiveDescription();
311        } else {
312          exceptionInfo = StringUtils.stringifyException(e);
313        }
314        LOG.error("Failed to mutate: " + keyBase + " after "
315          + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
316          + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
317      }
318    }
319  }
320
321  @Override
322  public void waitForFinish() {
323    super.waitForFinish();
324    System.out.println("Failed to update keys: " + failedKeySet.size());
325    for (Long key : failedKeySet) {
326      System.out.println("Failed to update key: " + key);
327    }
328  }
329
330  public void mutate(Table table, Mutation m, long keyBase) {
331    mutate(table, m, keyBase, null, null, null, null);
332  }
333
334  public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q,
335    byte[] v) {
336    long start = EnvironmentEdgeManager.currentTime();
337    try {
338      m = dataGenerator.beforeMutate(keyBase, m);
339      if (m instanceof Increment) {
340        table.increment((Increment) m);
341      } else if (m instanceof Append) {
342        table.append((Append) m);
343      } else if (m instanceof Put) {
344        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
345      } else if (m instanceof Delete) {
346        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
347      } else {
348        throw new IllegalArgumentException("unsupported mutation " + m.getClass().getSimpleName());
349      }
350      totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
351    } catch (IOException e) {
352      failedKeySet.add(keyBase);
353      String exceptionInfo;
354      if (e instanceof RetriesExhaustedWithDetailsException) {
355        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
356        exceptionInfo = aggEx.getExhaustiveDescription();
357      } else {
358        StringWriter stackWriter = new StringWriter();
359        PrintWriter pw = new PrintWriter(stackWriter);
360        e.printStackTrace(pw);
361        pw.flush();
362        exceptionInfo = StringUtils.stringifyException(e);
363      }
364      LOG.error("Failed to mutate: " + keyBase + " after "
365        + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
366        + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
367    }
368  }
369
370  public void setIgnoreNonceConflicts(boolean value) {
371    this.ignoreNonceConflicts = value;
372  }
373}