001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.Modifier;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.util.NonceKey;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
034import org.apache.hbase.thirdparty.com.google.protobuf.Any;
035import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
037import org.apache.hbase.thirdparty.com.google.protobuf.Message;
038import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
043
044/**
045 * Helper to convert to/from ProcedureProtos
046 */
047@InterfaceAudience.Private
048public final class ProcedureUtil {
049
050  private ProcedureUtil() {
051  }
052
053  // ==========================================================================
054  // Reflection helpers to create/validate a Procedure object
055  // ==========================================================================
056  private static Procedure<?> newProcedure(String className) throws BadProcedureException {
057    try {
058      Class<?> clazz = Class.forName(className);
059      if (!Modifier.isPublic(clazz.getModifiers())) {
060        throw new Exception("the " + clazz + " class is not public");
061      }
062
063      @SuppressWarnings("rawtypes")
064      Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
065      assert ctor != null : "no constructor found";
066      if (!Modifier.isPublic(ctor.getModifiers())) {
067        throw new Exception("the " + clazz + " constructor is not public");
068      }
069      return ctor.newInstance();
070    } catch (Exception e) {
071      throw new BadProcedureException(
072        "The procedure class " + className + " must be accessible and have an empty constructor",
073        e);
074    }
075  }
076
077  static void validateClass(Procedure<?> proc) throws BadProcedureException {
078    try {
079      Class<?> clazz = proc.getClass();
080      if (!Modifier.isPublic(clazz.getModifiers())) {
081        throw new Exception("the " + clazz + " class is not public");
082      }
083
084      Constructor<?> ctor = clazz.getConstructor();
085      assert ctor != null;
086      if (!Modifier.isPublic(ctor.getModifiers())) {
087        throw new Exception("the " + clazz + " constructor is not public");
088      }
089    } catch (Exception e) {
090      throw new BadProcedureException("The procedure class " + proc.getClass().getName()
091        + " must be accessible and have an empty constructor", e);
092    }
093  }
094
095  // ==========================================================================
096  // convert to and from Procedure object
097  // ==========================================================================
098
099  /**
100   * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage
101   * list to store the internal state of the Procedures.
102   */
103  private static class StateSerializer implements ProcedureStateSerializer {
104    private final ProcedureProtos.Procedure.Builder builder;
105    private int deserializeIndex;
106
107    public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
108      this.builder = builder;
109    }
110
111    @Override
112    public void serialize(Message message) throws IOException {
113      Any packedMessage = Any.pack(message);
114      builder.addStateMessage(packedMessage);
115    }
116
117    @Override
118    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
119      if (deserializeIndex >= builder.getStateMessageCount()) {
120        throw new IOException("Invalid state message index: " + deserializeIndex);
121      }
122
123      try {
124        Any packedMessage = builder.getStateMessage(deserializeIndex++);
125        return packedMessage.unpack(clazz);
126      } catch (InvalidProtocolBufferException e) {
127        throw e.unwrapIOException();
128      }
129    }
130  }
131
132  /**
133   * A serializer (deserializer) for those Procedures which were serialized before this patch. It
134   * deserializes the old, binary stateData field.
135   */
136  private static class CompatStateSerializer implements ProcedureStateSerializer {
137    private InputStream inputStream;
138
139    public CompatStateSerializer(InputStream inputStream) {
140      this.inputStream = inputStream;
141    }
142
143    @Override
144    public void serialize(Message message) throws IOException {
145      throw new UnsupportedOperationException();
146    }
147
148    @SuppressWarnings("unchecked")
149    @Override
150    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
151      Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
152      try {
153        return parser.parseDelimitedFrom(inputStream);
154      } catch (InvalidProtocolBufferException e) {
155        throw e.unwrapIOException();
156      }
157    }
158  }
159
160  /**
161   * Helper to convert the procedure to protobuf.
162   * <p/>
163   * Used by ProcedureStore implementations.
164   */
165  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
166    throws IOException {
167    Preconditions.checkArgument(proc != null);
168    validateClass(proc);
169
170    final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
171      .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState())
172      .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate());
173
174    if (proc.hasParent()) {
175      builder.setParentId(proc.getParentProcId());
176    }
177
178    if (proc.isCriticalSystemTable()) {
179      builder.setIsCryticalSystemTable(true);
180    }
181
182    if (proc.hasTimeout()) {
183      builder.setTimeout(proc.getTimeout());
184    }
185
186    if (proc.hasOwner()) {
187      builder.setOwner(proc.getOwner());
188    }
189
190    final int[] stackIds = proc.getStackIndexes();
191    if (stackIds != null) {
192      for (int i = 0; i < stackIds.length; ++i) {
193        builder.addStackId(stackIds[i]);
194      }
195    }
196    builder.setExecuted(proc.wasExecuted());
197
198    if (proc.hasException()) {
199      RemoteProcedureException exception = proc.getException();
200      builder.setException(
201        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
202    }
203
204    final byte[] result = proc.getResult();
205    if (result != null) {
206      builder.setResult(UnsafeByteOperations.unsafeWrap(result));
207    }
208
209    ProcedureStateSerializer serializer = new StateSerializer(builder);
210    proc.serializeStateData(serializer);
211
212    if (proc.getNonceKey() != null) {
213      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
214      builder.setNonce(proc.getNonceKey().getNonce());
215    }
216
217    if (proc.hasLock()) {
218      builder.setLocked(true);
219    }
220
221    if (proc.isBypass()) {
222      builder.setBypass(true);
223    }
224    return builder.build();
225  }
226
227  /**
228   * Helper to convert the protobuf procedure.
229   * <p/>
230   * Used by ProcedureStore implementations.
231   * <p/>
232   * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
233   * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
234   * it by storing the data only on insert().
235   */
236  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
237    throws IOException {
238    // Procedure from class name
239    Procedure<?> proc = newProcedure(proto.getClassName());
240
241    // set fields
242    proc.setProcId(proto.getProcId());
243    proc.setState(proto.getState());
244    proc.setSubmittedTime(proto.getSubmittedTime());
245    proc.setLastUpdate(proto.getLastUpdate());
246
247    if (proto.hasParentId()) {
248      proc.setParentProcId(proto.getParentId());
249    }
250
251    if (proto.hasIsCryticalSystemTable()) {
252      proc.setCriticalSystemTable(proto.getIsCryticalSystemTable());
253    }
254
255    if (proto.hasOwner()) {
256      proc.setOwner(proto.getOwner());
257    }
258
259    if (proto.hasTimeout()) {
260      proc.setTimeout(proto.getTimeout());
261    }
262
263    if (proto.getStackIdCount() > 0) {
264      proc.setStackIndexes(proto.getStackIdList());
265    }
266    if (proto.getExecuted()) {
267      proc.setExecuted();
268    }
269
270    if (proto.hasException()) {
271      assert proc.getState() == ProcedureProtos.ProcedureState.FAILED
272        || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK
273        : "The procedure must be failed (waiting to rollback) or rolledback";
274      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
275    }
276
277    if (proto.hasResult()) {
278      proc.setResult(proto.getResult().toByteArray());
279    }
280
281    if (proto.getNonce() != HConstants.NO_NONCE) {
282      proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
283    }
284
285    if (proto.getLocked()) {
286      proc.lockedWhenLoading();
287    }
288
289    if (proto.getBypass()) {
290      proc.bypass(null);
291    }
292
293    ProcedureStateSerializer serializer = null;
294
295    if (proto.getStateMessageCount() > 0) {
296      serializer = new StateSerializer(proto.toBuilder());
297    } else if (proto.hasStateData()) {
298      InputStream inputStream = proto.getStateData().newInput();
299      serializer = new CompatStateSerializer(inputStream);
300    }
301
302    if (serializer != null) {
303      proc.deserializeStateData(serializer);
304    }
305
306    return proc;
307  }
308
309  // ==========================================================================
310  // convert from LockedResource object
311  // ==========================================================================
312
313  public static LockServiceProtos.LockedResourceType
314    convertToProtoResourceType(LockedResourceType resourceType) {
315    return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
316  }
317
318  public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
319    return LockServiceProtos.LockType.valueOf(lockType.name());
320  }
321
322  public static LockServiceProtos.LockedResource
323    convertToProtoLockedResource(LockedResource lockedResource) throws IOException {
324    LockServiceProtos.LockedResource.Builder builder =
325      LockServiceProtos.LockedResource.newBuilder();
326
327    builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
328      .setResourceName(lockedResource.getResourceName())
329      .setLockType(convertToProtoLockType(lockedResource.getLockType()));
330
331    Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
332
333    if (exclusiveLockOwnerProcedure != null) {
334      ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
335        convertToProtoProcedure(exclusiveLockOwnerProcedure);
336      builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
337    }
338
339    builder.setSharedLockCount(lockedResource.getSharedLockCount());
340
341    for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
342      ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure);
343      builder.addWaitingProcedures(waitingProcedureProto);
344    }
345
346    return builder.build();
347  }
348
349  public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
350    "hbase.procedure.retry.sleep.interval.ms";
351
352  // default to 1 second
353  public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
354
355  public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
356    "hbase.procedure.retry.max.sleep.time.ms";
357
358  // default to 10 minutes
359  public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
360    TimeUnit.MINUTES.toMillis(10);
361
362  /**
363   * Get a retry counter for getting the backoff time. We will use the
364   * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
365   * is 10 minutes by default.
366   * <p/>
367   * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
368   * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
369   * timeout.
370   */
371  public static RetryCounter createRetryCounter(Configuration conf) {
372    long sleepIntervalMs =
373      conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
374    long maxSleepTimeMs =
375      conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
376    RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
377      .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
378    return new RetryCounter(retryConfig);
379  }
380
381  public static boolean isFinished(ProcedureProtos.Procedure proc) {
382    if (!proc.hasParentId()) {
383      switch (proc.getState()) {
384        case ROLLEDBACK:
385        case SUCCESS:
386          return true;
387        default:
388          break;
389      }
390    }
391    return false;
392  }
393}