001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.PriorityQueue;
022import java.util.Queue;
023import java.util.Set;
024import java.util.concurrent.ArrayBlockingQueue;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/** Creates multiple threads that write key/values into the */
039public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
040  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
041
042  /**
043   * A temporary place to keep track of inserted/updated keys. This is written to by all writers and
044   * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the
045   * contiguous range of keys being inserted/updated. This queue is supposed to stay small.
046   */
047  protected BlockingQueue<Long> wroteKeys;
048
049  /**
050   * This is the current key to be inserted/updated by any thread. Each thread does an atomic get
051   * and increment operation and inserts the current value.
052   */
053  protected AtomicLong nextKeyToWrite = new AtomicLong();
054
055  /**
056   * The highest key in the contiguous range of keys .
057   */
058  protected AtomicLong wroteUpToKey = new AtomicLong();
059
060  /** The sorted set of keys NOT inserted/updated by the writers */
061  protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
062
063  /**
064   * The total size of the temporary inserted/updated key set that have not yet lined up in a our
065   * contiguous sequence starting from startKey. Supposed to stay small.
066   */
067  protected AtomicLong wroteKeyQueueSize = new AtomicLong();
068
069  /** Enable this if used in conjunction with a concurrent reader. */
070  protected boolean trackWroteKeys;
071
072  public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
073    TableName tableName, String actionLetter) throws IOException {
074    super(dataGen, conf, tableName, actionLetter);
075    this.wroteKeys = createWriteKeysQueue(conf);
076  }
077
078  protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
079    return new ArrayBlockingQueue<>(10000);
080  }
081
082  @Override
083  public void start(long startKey, long endKey, int numThreads) throws IOException {
084    super.start(startKey, endKey, numThreads);
085    nextKeyToWrite.set(startKey);
086    wroteUpToKey.set(startKey - 1);
087
088    if (trackWroteKeys) {
089      new Thread(new WroteKeysTracker(),
090        "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start();
091      numThreadsWorking.incrementAndGet();
092    }
093  }
094
095  protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
096    HRegionLocation cached = null, real = null;
097    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
098      cached = locator.getRegionLocation(rowKey, false);
099      real = locator.getRegionLocation(rowKey, true);
100    } catch (Throwable t) {
101      // Cannot obtain region information for another catch block - too bad!
102    }
103    String result = "no information can be obtained";
104    if (cached != null) {
105      result = "cached: " + cached.toString();
106    }
107    if (real != null && real.getServerName() != null) {
108      if (cached != null && cached.getServerName() != null && real.equals(cached)) {
109        result += "; cache is up to date";
110      } else {
111        result = (cached != null) ? (result + "; ") : "";
112        result += "real: " + real.toString();
113      }
114    }
115    return result;
116  }
117
118  /**
119   * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys.
120   */
121  private class WroteKeysTracker implements Runnable {
122
123    @Override
124    public void run() {
125      Thread.currentThread().setName(getClass().getSimpleName());
126      try {
127        long expectedKey = startKey;
128        Queue<Long> sortedKeys = new PriorityQueue<>();
129        while (expectedKey < endKey) {
130          // Block until a new element is available.
131          Long k;
132          try {
133            k = wroteKeys.poll(1, TimeUnit.SECONDS);
134          } catch (InterruptedException e) {
135            LOG.info("Inserted key tracker thread interrupted", e);
136            break;
137          }
138          if (k == null) {
139            continue;
140          }
141          if (k == expectedKey) {
142            // Skip the "sorted key" queue and consume this key.
143            wroteUpToKey.set(k);
144            ++expectedKey;
145          } else {
146            sortedKeys.add(k);
147          }
148
149          // See if we have a sequence of contiguous keys lined up.
150          while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) {
151            sortedKeys.poll();
152            wroteUpToKey.set(k);
153            ++expectedKey;
154          }
155
156          wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
157        }
158      } catch (Exception ex) {
159        LOG.error("Error in inserted/updaed key tracker", ex);
160      } finally {
161        numThreadsWorking.decrementAndGet();
162      }
163    }
164  }
165
166  public int getNumWriteFailures() {
167    return failedKeySet.size();
168  }
169
170  /**
171   * The max key until which all keys have been inserted/updated (successfully or not).
172   * @return the last key that we have inserted/updated all keys up to (inclusive)
173   */
174  public long wroteUpToKey() {
175    return wroteUpToKey.get();
176  }
177
178  public boolean failedToWriteKey(long k) {
179    return failedKeySet.contains(k);
180  }
181
182  @Override
183  protected String progressInfo() {
184    StringBuilder sb = new StringBuilder();
185    appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
186    appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
187    return sb.toString();
188  }
189
190  /**
191   * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which
192   * requires a blocking queue and a consumer thread.
193   * @param enable whether to enable tracking the last inserted/updated key
194   */
195  public void setTrackWroteKeys(boolean enable) {
196    trackWroteKeys = enable;
197  }
198}