package com.nazdaq.workflow.engine.core.processor;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.UnmodifiableIterator;
import com.nazdaq.core.helpers.TextHelper;
import com.nazdaq.noms.app.modules.SequenceGeneratorLong;
import com.nazdaq.workflow.engine.core.events.StopEvent;
import com.nazdaq.workflow.engine.core.exceptions.LicenseExpiredException;
import com.nazdaq.workflow.engine.core.exceptions.NodeAbortException;
import com.nazdaq.workflow.engine.core.exceptions.WorkFlowDataInvalidException;
import com.nazdaq.workflow.engine.core.manager.WorkFlowExecutionManager;
import com.nazdaq.workflow.engine.core.manager.WorkFlowIterationThread;
import com.nazdaq.workflow.engine.core.models.connections.AbstractNodeConnectionData;
import com.nazdaq.workflow.engine.core.models.node.AbstractNodeConfigurationData;
import com.nazdaq.workflow.engine.core.models.node.NodeConfiguration;
import com.nazdaq.workflow.engine.core.plugins.PluginsSystemService;
import com.nazdaq.workflow.engine.core.plugins.models.nodes.RegisteredNodeType;
import com.nazdaq.workflow.engine.core.processor.annotations.NodeProcessor;
import com.nazdaq.workflow.engine.core.processor.annotations.NodeProcessorProperty;
import com.nazdaq.workflow.engine.core.storage.ExecutionStorage;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataDirection;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataKey;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataWrap;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeInputs;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeOutputs;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodePortType;
import com.nazdaq.workflow.engine.core.storage.models.inout.datatypes.AbstractData;
import com.nazdaq.workflow.engine.core.storage.models.messages.NodeProcessorMessage;
import com.nazdaq.workflow.engine.core.storage.models.properties.NodePropertyDefinition;
import com.nazdaq.workflow.engine.core.storage.models.properties.NodePropertyValue;
import com.nazdaq.workflow.engine.dag.task.ExecutionResult;
import com.nazdaq.workflow.engine.dag.task.ExecutionResults;
import com.nazdaq.workflow.engine.dag.task.Task;
import com.nazdaq.workflow.engine.helpers.NodeGraphData;
import com.nazdaq.workflow.graphql.models.execution.ExecutionStartType;
import com.nazdaq.workflow.graphql.models.execution.iteration.IterationStartInput;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import models.workflow.executions.iterations.nodes.ExecutionNodeStatus;
import models.workflow.executions.iterations.nodes.WorkFlowExecutionNode;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/nazdaq/workflow/engine/core/processor/AbstractNodeProcessor.class */
public abstract class AbstractNodeProcessor<IN extends AbstractData, OUT extends AbstractData, CF extends AbstractNodeConfigurationData, CN extends AbstractNodeConnectionData> extends Task<String, NodeResults> {
    private static final Logger log = LoggerFactory.getLogger(AbstractNodeProcessor.class);
    public static final String NODE = "node";
    private final NodeProcessor annotation;
    private final NodePortType inputPortType;

    @JsonIgnore
    protected final ProcessorContext context;
    private final Class<IN> inputDataType;
    private final Class<OUT> outputDataType;
    private final Class<? extends AbstractNodeConfigurationData> configDataType;
    private int totalToProcess;

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public AbstractNodeProcessor(@NotNull ProcessorContext processorContext, NodePortType nodePortType) {
        this.totalToProcess = 1;
        this.annotation = (NodeProcessor) getClass().getAnnotation(NodeProcessor.class);
        this.inputPortType = nodePortType;
        setId(processorContext.getProcessorId());
        this.context = processorContext;
        RegisteredNodeType nodeType = getNodeType();
        this.inputDataType = (Class<IN>) nodeType.getInputDataType();
        this.outputDataType = (Class<OUT>) nodeType.getOutputDataType();
        this.configDataType = nodeType.getConfigClassType();
        if (iterationThread() != null && !processorContext.getNode().isTrigger() && (!getContext().getInput().hasIterationId() || !storage().getInputOutputStorage().hasInputOutput((String) getId(), getIterationIndex()))) {
            processorContext.getIterationTransaction().initProcessorInputOutput((String) getId(), this.inputDataType, this.inputPortType, this.outputDataType);
        }
        if (nodeType.trigger()) {
            return;
        }
        init(getConfigs().getData());
    }

    private AbstractNodeProcessor() {
        this.totalToProcess = 1;
        this.annotation = null;
        this.context = null;
        this.inputPortType = null;
        this.inputDataType = null;
        this.outputDataType = null;
        this.configDataType = null;
    }

    public Optional<NodeProcessorProperty> findProperty(String str) {
        return Arrays.stream(this.annotation.properties()).filter(nodeProcessorProperty -> {
            return nodeProcessorProperty.id().equals(str);
        }).findFirst();
    }

    public RegisteredNodeType getNodeType() {
        return PluginsSystemService.getNodeTypeById(getAnnotation().id());
    }

    public String transactionId() {
        if (getExecutionNode() != null) {
            return getExecutionNode().getState().getTransactionId();
        }
        return null;
    }

    public void setThreadName() {
        try {
            WorkFlowExecutionNode executionNode = getExecutionNode();
            if (executionNode == null) {
                setTriggerThreadName(this.context.getProcessorId());
            } else {
                setNodeThreadName(executionNode, "");
            }
        } catch (Exception e) {
            logger().warn("Failure while setting thread name", e);
        }
    }

    public void setNodeThreadName(WorkFlowExecutionNode workFlowExecutionNode, String str) {
        try {
            MDC.put("node", workFlowExecutionNode.getProcessId());
            MDC.put(WorkFlowIterationThread.LOG_MDC_TRIGGER_TRANSACTION, str);
            MDC.put(WorkFlowIterationThread.LOG_MDC_ITERATION, String.valueOf(getIterationIndex()));
        } catch (Exception e) {
            logger().warn("Failure while setting thread name", e);
        }
    }

    private void setTriggerThreadName(String str) {
        try {
            MDC.put("node", str);
            MDC.put(WorkFlowIterationThread.LOG_MDC_TRIGGER_TRANSACTION, SequenceGeneratorLong.nextIdString());
        } catch (Exception e) {
            logger().warn("Failure while setting thread name", e);
        }
    }

    public void unSetThreadName() {
        MDC.remove("node");
        MDC.remove(WorkFlowIterationThread.LOG_MDC_TRIGGER_TRANSACTION);
    }

    public Logger logger() {
        try {
            return getContext().logger();
        } catch (Exception e) {
            return log;
        }
    }

    public final WorkFlowExecutionManager manager() {
        return this.context.getManager();
    }

    private WorkFlowIterationThread iterationThread() {
        return this.context.getIterationThread();
    }

    public final ExecutionStorage storage() {
        return this.context.storage();
    }

    public WorkFlowExecutionNode getExecutionNode() {
        return this.context.getNode();
    }

    public final NodeConfiguration<CF> getConfigs() {
        return this.context.getConfigs();
    }

    public long getIterationIndex() {
        if (this.context.getIterationThread() != null) {
            return this.context.getIterationThread().getIterationIndex();
        }
        return 0L;
    }

    public boolean isForceStopping() {
        return this.context.isStopping() || manager().isForceStopping();
    }

    public void updateState() {
        try {
            if (getExecutionNode() != null) {
                getExecutionNode().update();
            }
        } catch (Exception e) {
            logger().error("Failed while updating state {}", getId(), e);
        }
    }

    @Override // com.nazdaq.workflow.engine.dag.task.Task
    public Duration getTimeout() {
        if (getConfigs().getExecution().isTimeOutCheck()) {
            return getConfigs().getExecution().getExecutionTimeout();
        }
        return null;
    }

    @Override // com.nazdaq.workflow.engine.dag.task.Task
    public String toString() {
        return getId();
    }

    private void hasDeletedInputs(@NotNull NodeInputs<IN> nodeInputs, @NotNull ExecutionResults<String, NodeResults> executionResults, boolean z) {
        if (nodeInputs.hasInputs()) {
            HashSet<String> deletedSources = getDeletedSources(nodeInputs, executionResults);
            if (deletedSources.isEmpty()) {
                return;
            }
            Iterator<String> it = deletedSources.iterator();
            while (it.hasNext()) {
                String next = it.next();
                nodeInputs.deleteInputFrom(this.context, next);
                logger().warn("hasDeletedInputs {} delete input from source {}", getId(), next);
            }
            storage().getInputOutputStorage().setInputs(getId(), getIterationIndex(), nodeInputs);
            if (z) {
                NodeOutputs outputs = storage().getInputOutputStorage().getOutputs(getId(), getIterationIndex());
                outputs.getOutputs().clear();
                storage().getInputOutputStorage().setOutputs(getId(), getIterationIndex(), outputs);
                logger().info("hasDeletedInputs {} deleted all outputs.", getId());
            }
        }
    }

    @NotNull
    private HashSet<String> getDeletedSources(@NotNull NodeInputs<IN> nodeInputs, @NotNull ExecutionResults<String, NodeResults> executionResults) {
        HashSet<String> hashSet = new HashSet<>();
        Iterator<NodeDataWrap<IN>> it = nodeInputs.getInputs().values().iterator();
        while (it.hasNext()) {
            String sourceId = it.next().getSourceId();
            boolean z = false;
            Iterator<ExecutionResult<String, NodeResults>> it2 = executionResults.getAll().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (it2.next().getId().equals(sourceId)) {
                    z = true;
                    break;
                }
            }
            if (!z && !sourceId.equals("init")) {
                hashSet.add(sourceId);
            }
        }
        return hashSet;
    }

    private NodeInputs<IN> initInputsFill() {
        NodeInputs<IN> inputs = storage().getInputOutputStorage().getInputs(getId(), getIterationIndex());
        if (manager().getNodeGraphData(getId()).getDependencies().stream().findFirst().isEmpty()) {
            NodeDataKey nodeDataKey = new NodeDataKey(this.context, NodeDataDirection.INPUT);
            if (!inputs.hasInputs()) {
                inputs.updateInitValue(getContext(), NodeDataWrap.createInstance(this.context, nodeDataKey, null, null, inputs.getClassType(), null, false));
                storage().getInputOutputStorage().setInputs(getId(), getIterationIndex(), inputs);
            }
            logger().debug("Updating first value with the init input");
        }
        return inputs;
    }

    private boolean isDestinationAlreadyFilledByAnotherNode() {
        NodeGraphData nodeGraphData = manager().getNodeGraphData(getId());
        String triggerProcessId = this.context.triggerProcessId();
        if (!nodeGraphData.getDependencies().isEmpty() || triggerProcessId == null) {
            return false;
        }
        UnmodifiableIterator it = manager().getNodeGraphData(triggerProcessId).getSuccessors().iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (nodeGraphData.getSuccessors().contains(str) && storage().getInputOutputStorage().getInputs(str, getIterationIndex()).getInputPortType().equals(NodePortType.SINGLE)) {
                logger().debug("Both trigger {} and {} output to the same nodeId {} which is not a Multiple ", new Object[]{triggerProcessId, getId(), str});
                return true;
            }
        }
        return false;
    }

    private ShouldExecuteResult reSendOutput() {
        UnmodifiableIterator it = manager().getNodeGraphData(getId()).getSuccessors().iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            NodeInputs inputs = storage().getInputOutputStorage().getInputs(str, getIterationIndex());
            if (inputs == null) {
                return ShouldExecuteResult.Resend();
            }
            if (inputs.hasInputFrom(getId()) == null) {
                logger().warn("Input for {} from {} doesn't exists, will re-execute to send the data to it.", str, getId());
                return ShouldExecuteResult.Resend();
            }
        }
        return ShouldExecuteResult.Skip();
    }

    public ShouldExecuteResult shouldExecuteWrapper(ExecutionResults<String, NodeResults> executionResults) throws Exception {
        IterationStartInput input = getContext().getInput();
        if (getExecutionNode().isTrigger()) {
            logger().debug("Node {} is a trigger already executed, proceeding out to mark it completed.", getId());
            return ShouldExecuteResult.Resend();
        }
        if (!getConfigs().getUi().isValid() && !input.isForPreview()) {
            if (input.getStartType().equals(ExecutionStartType.UNTIL_NODE)) {
                clearNodeData();
                return ShouldExecuteResult.Skip();
            }
            logger().debug("Node {} is not valid for execution. (Cause: {})", getId(), getConfigs().getUi().getValidationCause() != null ? getConfigs().getUi().getValidationCause().getType().toString() : "Unknown");
            addError("Configuration Error", "Node is not configured", null);
            return ShouldExecuteResult.Skip();
        }
        if (getNodeType().isTriggerOnly()) {
            logger().debug("Node {} skipped, Its a trigger only node can't run manually", getId());
            updateProgress(1);
            getExecutionNode().setStatus(ExecutionNodeStatus.SKIPPED);
            return ShouldExecuteResult.Skip();
        }
        if (isDestinationAlreadyFilledByAnotherNode()) {
            updateProgress(1);
            getExecutionNode().setStatus(ExecutionNodeStatus.SKIPPED);
            return ShouldExecuteResult.Skip();
        }
        NodeInputs<IN> initInputsFill = initInputsFill();
        ExecutionStartType startType = input.getStartType();
        if (input.isRunAll()) {
            if (!executionResults.anySkipped()) {
                return ShouldExecuteResult.Full();
            }
            if (logger().isDebugEnabled()) {
                logger().debug("Node {} skipped, one of the previous has been skipped {}", getId(), executionResults.getSkipped());
            }
            updateProgress(1);
            getExecutionNode().setStatus(ExecutionNodeStatus.SKIPPED);
            return ShouldExecuteResult.Skip();
        }
        String processId = input.getProcessId();
        boolean equals = getId().equals(processId);
        if (startType.equals(ExecutionStartType.START_FROM)) {
            if (equals) {
                return ShouldExecuteResult.Full();
            }
            if (!(!executionResults.getSuccess().isEmpty())) {
                return ShouldExecuteResult.Skip();
            }
            logger().info("Found success previous nodes then we proceed to run this {}", getId());
            return ShouldExecuteResult.Full();
        }
        if (equals) {
            boolean equals2 = startType.equals(ExecutionStartType.UNTIL_BEFORE_NODE);
            hasDeletedInputs(initInputsFill, executionResults, equals2);
            return ShouldExecuteResult.builder().execute(!equals2).build();
        }
        if (!manager().isConnectedNodes(getId(), processId)) {
            if (manager().isConnectedNodes(processId, getId())) {
                clearNodeData();
            }
            return ShouldExecuteResult.Skip();
        }
        if (hasParentFailed(executionResults)) {
            clearNodeData();
            return ShouldExecuteResult.Skip();
        }
        if (hasUpdatedConfigData() && !hasParentBeenExecuted(executionResults) && getExecutionNode().getStatus().equals(ExecutionNodeStatus.COMPLETED)) {
            return reSendOutput();
        }
        return ShouldExecuteResult.Full();
    }

    private void clearNodeData() {
        WorkFlowExecutionNode executionNode = getExecutionNode();
        if (executionNode.getStatus().equals(ExecutionNodeStatus.NEW)) {
            return;
        }
        logger().debug("Clearing node {} data ...", getId());
        manager().messagesStore().clearNodeMessages(this.context.iterationId(), getId());
        executionNode.setStartedAt(null);
        executionNode.setEndAt(null);
        executionNode.setStatus(ExecutionNodeStatus.NEW);
        updateState();
        storage().getInputOutputStorage().clearNodeData(getId(), getIterationIndex(), this.inputDataType, this.inputPortType, this.outputDataType);
        logger().info("Clearing node {} data finished.", getId());
    }

    @Override // com.nazdaq.workflow.engine.dag.task.Task
    public ShouldExecuteResult shouldExecute(ExecutionResults<String, NodeResults> executionResults) {
        setThreadName();
        if (isForceStopping()) {
            logger().warn("Iteration or node was aborted, skipping {} execution.", getId());
            return ShouldExecuteResult.Skip();
        }
        long startTime = TextHelper.startTime();
        WorkFlowExecutionNode executionNode = getExecutionNode();
        try {
            try {
                IterationStartInput input = getContext().getInput();
                if (input.isFillProperties()) {
                    ShouldExecuteResult FillProperties = ShouldExecuteResult.FillProperties();
                    updateState();
                    if (logger().isDebugEnabled()) {
                        logger().debug("Finished shouldExecute {} took {}", getId(), TextHelper.endTime(startTime));
                    }
                    return FillProperties;
                }
                ShouldExecuteResult shouldExecuteWrapper = shouldExecuteWrapper(executionResults);
                if (shouldExecuteWrapper.isExecute() && input.isRunAll() && executionNode.isTrigger()) {
                    updateState();
                    if (logger().isDebugEnabled()) {
                        logger().debug("Finished shouldExecute {} took {}", getId(), TextHelper.endTime(startTime));
                    }
                    return shouldExecuteWrapper;
                }
                updateState();
                if (logger().isDebugEnabled()) {
                    logger().debug("Finished shouldExecute {} took {}", getId(), TextHelper.endTime(startTime));
                }
                return shouldExecuteWrapper;
            } catch (Throwable th) {
                executionNode.setStatus(ExecutionNodeStatus.FAILED);
                addError("Pre-Node Check", th.getMessage(), th);
                updateProgressMessage("Node is not valid for execution: " + th.getMessage());
                logger().error("Process check execute {} Failed", getId(), th);
                ShouldExecuteResult Skip = ShouldExecuteResult.Skip();
                updateState();
                if (logger().isDebugEnabled()) {
                    logger().debug("Finished shouldExecute {} took {}", getId(), TextHelper.endTime(startTime));
                }
                return Skip;
            }
        } catch (Throwable th2) {
            updateState();
            if (logger().isDebugEnabled()) {
                logger().debug("Finished shouldExecute {} took {}", getId(), TextHelper.endTime(startTime));
            }
            throw th2;
        }
    }

    private boolean hasUpdatedConfigData() {
        int revision = getExecutionNode().getRevision();
        return revision > 0 && revision == getConfigs().getRevision();
    }

    private boolean hasParentFailed(@NotNull ExecutionResults<String, NodeResults> executionResults) {
        return !executionResults.getErrored().isEmpty();
    }

    private boolean hasParentBeenExecuted(@NotNull ExecutionResults<String, NodeResults> executionResults) {
        Iterator<ExecutionResult<String, NodeResults>> it = executionResults.getSuccess().iterator();
        while (it.hasNext()) {
            if (it.next().getResult().isExecuted()) {
                return true;
            }
        }
        return false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.nazdaq.workflow.engine.dag.task.Task
    public NodeResults execute(ShouldExecuteResult shouldExecuteResult) {
        try {
            setThreadName();
            try {
                boolean z = false;
                if (isForceStopping()) {
                    throw new NodeAbortException();
                }
                if (!getNodeType().licensed()) {
                    throw new LicenseExpiredException("You don't have a license to use " + this.annotation.id());
                }
                if (logger().isTraceEnabled()) {
                    logger().trace("Processor {} got execution index #{}", getId(), Integer.valueOf(getExecutionNode().getExecutionIndex()));
                }
                if (shouldExecuteResult.isFillProps()) {
                    List<NodePropertyDefinition> definedProperties = definedProperties(getConfigs());
                    if (!definedProperties.isEmpty()) {
                        iterationThread().appendToDefinedProperties(definedProperties);
                        logger().debug("Filling defined properties for node {}, Total: {}", getId(), Integer.valueOf(definedProperties.size()));
                    }
                    if (!getConfigs().getProperties().isEmpty()) {
                        List<NodePropertyDefinition> definedProperties2 = NodePropertyDefinition.definedProperties(getId(), getConfigs().getProperties());
                        iterationThread().appendToDefinedProperties(definedProperties2);
                        logger().debug("Filling config properties for node {}, Total: {}", getId(), Integer.valueOf(definedProperties2.size()));
                    }
                } else if (shouldExecuteResult.isExecute()) {
                    OutputDispatcher<OUT> outputDispatcher = new OutputDispatcher<>(getContext(), this.inputDataType, this.outputDataType, getNodeType().firstOutputPortName());
                    if (shouldExecuteResult.isReSend()) {
                        outputDispatcher.reSend();
                        logger().info("Resend outputs to destinations only.");
                    } else {
                        NodeInputs<IN> inputs = storage().getInputOutputStorage().getInputs(getId(), getIterationIndex());
                        if (inputs.hasInputs()) {
                            executeAndSend(inputs, outputDispatcher);
                            z = true;
                        } else {
                            getExecutionNode().setStatus(ExecutionNodeStatus.SKIPPED);
                            addInformation("Node Inputs", "No inputs arrived to this node, skipping execution.");
                            updateProgressMessage("No inputs arrived to this node, skipping execution.");
                        }
                    }
                }
                if (isForceStopping()) {
                    throw new NodeAbortException();
                }
                NodeResults nodeResults = new NodeResults(shouldExecuteResult, true, z);
                updateState();
                this.context.free();
                unSetThreadName();
                logger().debug("Process {} Completed, Should Check: {}", getId(), shouldExecuteResult);
                return nodeResults;
            } catch (Throwable th) {
                onExecutionFailed(th);
                throw new ExecutionException(th);
            }
        } catch (Throwable th2) {
            updateState();
            this.context.free();
            unSetThreadName();
            logger().debug("Process {} Completed, Should Check: {}", getId(), shouldExecuteResult);
            throw th2;
        }
    }

    private void executeAndSend(NodeInputs<IN> nodeInputs, OutputDispatcher<OUT> outputDispatcher) throws Exception {
        long startTime = TextHelper.startTime();
        validate(nodeInputs);
        logger().debug("Finished Validating the input data.");
        this.context.getManager().getPropertiesCompiler().onNodeStarts(this.context, getId(), getConfigs());
        if (!getContext().getInput().isRunAll()) {
            manager().messagesStore().clearNodeMessages(this.context.iterationId(), getId());
        }
        getExecutionNode().setStatus(ExecutionNodeStatus.RUNNING);
        getExecutionNode().setStartedAt(Instant.now());
        if (getContext().getInput().isRunAll()) {
            getExecutionNode().setExecutionIndex(iterationThread().generateExecutionIndex());
        }
        updateProgressMessage("Preparing to Execute ...");
        updateState();
        logger().debug("Executing {} started ...", getId());
        updateProgressMessage("Execution started ...");
        List<NodeDataWrap<IN>> genericList = nodeInputs.getGenericList();
        if (getInputPortType().equals(NodePortType.SINGLE)) {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            this.totalToProcess = genericList.size();
            for (NodeDataWrap<IN> nodeDataWrap : genericList) {
                try {
                } catch (Exception e) {
                    if (genericList.size() == 1) {
                        throw e;
                    }
                    logger().error("Failed to process input {}", nodeDataWrap, e);
                    addError("Input process failed", "Failed to process input " + nodeDataWrap + ", Error: " + e.getMessage(), e);
                    atomicInteger.getAndIncrement();
                }
                if (isForceStopping()) {
                    throw new NodeAbortException();
                }
                logger().debug("Starting {} on the input: {} ...", getId(), nodeDataWrap);
                if (!nodeDataWrap.isEmpty() && nodeDataWrap.getData() == null) {
                    throw new IOException("The input has an empty data!");
                }
                execute(nodeDataWrap, outputDispatcher);
                getExecutionNode().getState().incProcessedInputs();
                updateProgress(atomicInteger2.incrementAndGet());
            }
            this.context.getManager().getPropertiesCompiler().onNodeFinish(this.context.getIterationReadOnly(), getId(), List.of(), this.context.getNodeStates());
            if (atomicInteger2.get() == 0) {
                throw new Exception("Nothing has been executed, check if you have any input data available on init.");
            }
            if (atomicInteger.get() > 1) {
                throw new Exception("You have " + atomicInteger.get() + " of failed inputs!");
            }
        } else {
            this.totalToProcess = genericList.size();
            execute(genericList, outputDispatcher);
            getExecutionNode().getState().incProcessedInputs(this.totalToProcess);
        }
        updateProgress(this.totalToProcess);
        updateProgressMessage("Finished running.");
        getExecutionNode().setEndAt(Instant.now());
        getExecutionNode().setStatus(ExecutionNodeStatus.COMPLETED);
        getExecutionNode().setRevision(getConfigs().getRevision());
        if (logger().isDebugEnabled()) {
            logger().debug("Executing {} #{}, finished in ({})", new Object[]{getId(), Integer.valueOf(getExecutionNode().getExecutionIndex()), TextHelper.endTime(startTime)});
        }
    }

    private void onExecutionFailed(Throwable th) {
        getExecutionNode().setEndAt(Instant.now());
        if (isForceStopping()) {
            getExecutionNode().setStatus(ExecutionNodeStatus.ABORTED);
        } else {
            getExecutionNode().setStatus(ExecutionNodeStatus.FAILED);
        }
        String message = th.getMessage();
        if (message == null) {
            message = "Unknown error occurred with type " + th.getClass().getSimpleName() + ", check the logs for more details";
        }
        getExecutionNode().setRevision(0);
        updateProgressMessage("Runtime Error " + message);
        addError("Runtime Error", message, th);
        logger().error("Process {} Failed", getId(), th);
        setConsiderExecutionError(true);
    }

    public abstract void init(CF cf);

    public abstract void validate(NodeInputs<IN> nodeInputs) throws WorkFlowDataInvalidException;

    public abstract void execute(NodeDataWrap<IN> nodeDataWrap, OutputDispatcher<OUT> outputDispatcher) throws Exception;

    public abstract void execute(List<NodeDataWrap<IN>> list, OutputDispatcher<OUT> outputDispatcher) throws Exception;

    public List<NodePropertyDefinition> definedProperties(@NotNull NodeConfiguration<CF> nodeConfiguration) {
        return new ArrayList();
    }

    public String readConfigValue(String str, String str2) throws ExecutionException {
        String str3 = (str == null || str.isEmpty()) ? "" : str;
        NodePropertyValue propertyById = this.context.getPropertyById(str2);
        return propertyById != null ? propertyById.asText() : this.context.evalExpression(str3);
    }

    public void abort(StopEvent stopEvent) {
        try {
            if (getExecutionNode() != null) {
                setThreadName();
                logger().info("{} Received stop event {}", getId(), stopEvent);
                if (getExecutionNode().getStatus().equals(ExecutionNodeStatus.RUNNING)) {
                    getExecutionNode().setStatus(ExecutionNodeStatus.STOPPING);
                } else if (getExecutionNode().getStatus().equals(ExecutionNodeStatus.QUEUED)) {
                    getExecutionNode().setStatus(ExecutionNodeStatus.ABORTED);
                }
            }
            this.context.setStopping(true);
            try {
                onAbort(stopEvent);
            } catch (Throwable th) {
                logger().error("Failed while aborting using the processor abort() function", th);
            }
        } finally {
            updateState();
        }
    }

    public abstract void onAbort(StopEvent stopEvent);

    public void addInformation(String str, String str2) {
        NodeProcessorMessage buildNodeInfo = NodeProcessorMessage.buildNodeInfo(this.context.iterationId(), getId(), str, str2);
        manager().messagesStore().onNext(buildNodeInfo);
        if (logger().isDebugEnabled()) {
            logger().debug("Added info {}", buildNodeInfo);
        }
    }

    public void addError(String str, String str2, Throwable th) {
        NodeProcessorMessage buildNodeError = NodeProcessorMessage.buildNodeError(this.context.iterationId(), getId(), str, str2, th);
        manager().messagesStore().onNext(buildNodeError);
        logger().error("Added error {}", buildNodeError);
    }

    public void addWarning(String str, String str2) {
        NodeProcessorMessage buildNodeWarning = NodeProcessorMessage.buildNodeWarning(this.context.iterationId(), getId(), str, str2);
        manager().messagesStore().onNext(buildNodeWarning);
        logger().warn("Added warning {}", buildNodeWarning);
    }

    public void addWarningWithData(String str, String str2, JsonNode jsonNode, Collection<String> collection) {
        manager().messagesStore().onNext(NodeProcessorMessage.buildNodeWarningWithData(this.context.iterationId(), getId(), str, str2, jsonNode, collection));
    }

    public void addErrorWithData(String str, String str2, JsonNode jsonNode, Collection<String> collection) {
        manager().messagesStore().onNext(NodeProcessorMessage.buildNodeErrorWithData(this.context.iterationId(), getId(), str, str2, jsonNode, collection));
    }

    public void updateProgressMessage(String str) {
        if (getExecutionNode() != null) {
            getExecutionNode().getState().setProgressMessage(str);
        }
    }

    public void updateProgress(int i) {
        IterationStartInput input = getContext().getInput();
        if (input.isForPreview() && input.getProcessId().equals(getId())) {
            return;
        }
        getExecutionNode().getState().setProgress(Double.valueOf(Math.round(((i * 100.0d) / this.totalToProcess) * 100.0d) / 100.0d));
    }

    public void setTotalToProcess(int i) {
        this.totalToProcess = i;
    }

    public NodeProcessor getAnnotation() {
        return this.annotation;
    }

    public NodePortType getInputPortType() {
        return this.inputPortType;
    }

    public ProcessorContext getContext() {
        return this.context;
    }

    public Class<IN> getInputDataType() {
        return this.inputDataType;
    }

    public Class<OUT> getOutputDataType() {
        return this.outputDataType;
    }

    public Class<? extends AbstractNodeConfigurationData> getConfigDataType() {
        return this.configDataType;
    }

    public int getTotalToProcess() {
        return this.totalToProcess;
    }
}
