package com.nazdaq.workflow.engine.core.processor;

import com.nazdaq.workflow.engine.core.exceptions.InputDataInstanceException;
import com.nazdaq.workflow.engine.core.manager.WorkFlowFactory;
import com.nazdaq.workflow.engine.core.models.connections.AbstractNodeConnectionData;
import com.nazdaq.workflow.engine.core.models.connections.NodesConnection;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataDirection;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataKey;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeDataWrap;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeInputs;
import com.nazdaq.workflow.engine.core.storage.models.inout.NodeOutputs;
import com.nazdaq.workflow.engine.core.storage.models.inout.datatypes.AbstractData;
import com.nazdaq.workflow.graphql.models.execution.iteration.IterationStartInput;
import de.jkeylockmanager.manager.LockCallback;
import de.jkeylockmanager.manager.ReturnValueLockCallback;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nazdaq/workflow/engine/core/processor/OutputDispatcher.class */
public class OutputDispatcher<OUT extends AbstractData> {
    private static final Logger log = LoggerFactory.getLogger(OutputDispatcher.class);
    private final Logger logger;
    private final ProcessorContext context;
    private final Class<? extends AbstractData> inputDataType;
    private final Class<? extends AbstractData> outputDataType;
    private final String firstOutputPortId;

    public OutputDispatcher(@NotNull ProcessorContext processorContext, Class<? extends AbstractData> cls, Class<? extends AbstractData> cls2, String str) {
        this.logger = processorContext.logger();
        this.context = processorContext;
        this.inputDataType = cls;
        this.outputDataType = cls2;
        this.firstOutputPortId = str;
    }

    private NodeOutputs<OUT> getCurrentOutputs() {
        NodeOutputs<OUT> outputs = this.context.storage().getInputOutputStorage().getOutputs(this.context.getProcessorId(), this.context.iteration());
        long iteration = this.context.iteration();
        this.context.getProcessorId();
        Objects.requireNonNull(outputs, "No outputs for node " + iteration + "//" + outputs + " found!");
        return outputs;
    }

    public synchronized void send(NodeDataWrap<OUT> nodeDataWrap) {
        sendThrough(this.firstOutputPortId, nodeDataWrap);
    }

    public synchronized void sendThrough(String str, NodeDataWrap<OUT> nodeDataWrap) {
        outputToDestinations(str, getCurrentOutputs(), nodeDataWrap, false);
    }

    public void reSend() {
        if (outputToDestination()) {
            NodeOutputs<OUT> currentOutputs = getCurrentOutputs();
            currentOutputs.getOutputs().forEach((str, nodeDataWrap) -> {
                outputToDestinations(str, currentOutputs, nodeDataWrap, true);
            });
        }
    }

    public <R extends AbstractData> NodeDataWrap<R> newDataInstance() throws InputDataInstanceException {
        return newDataInstance(this.outputDataType);
    }

    public <R extends AbstractData> NodeDataWrap<R> newDataInstance(Class<? extends AbstractData> cls) throws InputDataInstanceException {
        try {
            return NodeDataWrap.createInstance(this.context, new NodeDataKey(this.context, NodeDataDirection.OUTPUT), getContext().getProcessorId(), null, cls, null, true);
        } catch (Exception e) {
            throw new InputDataInstanceException(e);
        }
    }

    private boolean outputToDestination() {
        IterationStartInput input = this.context.getInput();
        return (!input.isForPreview() || input.isRunAll() || (input.getProcessId() != null && !input.getProcessId().equals(this.context.getProcessorId()))) && !this.context.getManager().getNodeDestinations(this.context.getProcessorId()).isEmpty();
    }

    @Contract(pure = true)
    @NotNull
    public static String lockUniqueId(String str, long j, String str2) {
        return str + j + str;
    }

    public static <R> R executeLocked(String str, long j, String str2, ReturnValueLockCallback<R> returnValueLockCallback) {
        return (R) WorkFlowFactory.lockManagerLong.executeLocked(lockUniqueId(str, j, str2), returnValueLockCallback);
    }

    private void executeLocked(String str, long j, String str2, LockCallback lockCallback) {
        WorkFlowFactory.lockManagerLong.executeLocked(lockUniqueId(str, j, str2), lockCallback);
    }

    private void outputToDestinations(String str, NodeOutputs<OUT> nodeOutputs, NodeDataWrap<OUT> nodeDataWrap, boolean z) {
        AtomicInteger atomicInteger = new AtomicInteger();
        long iteration = getContext().iteration();
        if (outputToDestination()) {
            String nodeId = nodeOutputs.getNodeId();
            Collection<NodesConnection<? extends AbstractNodeConnectionData>> nodeDestinations = this.context.getManager().getNodeDestinations(nodeId);
            if (!nodeDestinations.isEmpty()) {
                String id = getContext().getManager().getExecution().getId();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Passing output of {} to {} destination input ...", nodeId, Integer.valueOf(nodeDestinations.size()));
                }
                nodeDestinations.forEach(nodesConnection -> {
                    String destinationId = nodesConnection.getDestinationId();
                    if (str == null || str.equals(nodesConnection.getSourcePort())) {
                        executeLocked(id, iteration, destinationId, () -> {
                            NodeInputs inputs = getContext().storage().getInputOutputStorage().getInputs(destinationId, iteration);
                            if (inputs == null) {
                                this.logger.warn("- Skipped output of {} to #{}, {} - Node is missing!", new Object[]{nodeId, Integer.valueOf(atomicInteger.getAndIncrement()), destinationId});
                                return;
                            }
                            inputs.updateValueFrom(getContext(), nodeId, str, nodesConnection.getDestinationPort(), nodeDataWrap, z);
                            if (this.logger.isTraceEnabled()) {
                                log.trace("Setting node input {}:{} ...", Long.valueOf(iteration), destinationId);
                            }
                            getContext().storage().getInputOutputStorage().setInputs(destinationId, iteration, inputs);
                            if (this.logger.isTraceEnabled()) {
                                log.trace("Setting node input {}:{} Finished total: {}", new Object[]{Long.valueOf(iteration), destinationId, Integer.valueOf(inputs.getInputs().size())});
                            }
                            getContext().getNode().getState().incProcessedOutputs();
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("- Passing output of {} to #{}, {}", new Object[]{nodeId, Integer.valueOf(atomicInteger.getAndIncrement()), destinationId});
                            }
                        });
                    } else {
                        this.logger.warn("Can't dispatch value from Node {}, Port: {} -> To {}, Error in connection: {}", new Object[]{nodeId, str, destinationId, nodesConnection});
                    }
                });
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Finished Passing output of {}, Port: {} to {} outputs.", new Object[]{nodeId, str, atomicInteger});
                }
            }
        } else {
            this.logger.debug("Skipped output to destinations.");
        }
        nodeOutputs.updateValue(getContext(), str, nodeDataWrap);
        getContext().storage().getInputOutputStorage().setOutputs(nodeOutputs.getNodeId(), iteration, nodeOutputs);
    }

    public Logger getLogger() {
        return this.logger;
    }

    public ProcessorContext getContext() {
        return this.context;
    }

    public Class<? extends AbstractData> getInputDataType() {
        return this.inputDataType;
    }

    public Class<? extends AbstractData> getOutputDataType() {
        return this.outputDataType;
    }

    public String getFirstOutputPortId() {
        return this.firstOutputPortId;
    }
}
