package com.nazdaq.workflow.engine.core.manager;

import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import com.nazdaq.core.helpers.AppConfig;
import com.nazdaq.workflow.engine.core.events.StopEvent;
import com.nazdaq.workflow.engine.core.exceptions.LicenseExpiredException;
import com.nazdaq.workflow.engine.core.models.node.AbstractNodeConfigurationData;
import com.nazdaq.workflow.engine.core.models.node.NodeConfiguration;
import com.nazdaq.workflow.engine.core.plugins.models.nodes.RegisteredNodeType;
import com.nazdaq.workflow.engine.core.processor.ProcessorFactory;
import com.nazdaq.workflow.engine.core.processor.interfaces.NodeProcessorTrigger;
import com.nazdaq.workflow.engine.core.storage.models.messages.NodeProcessorMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import models.users.User;
import models.workflow.executions.triggers.TriggerExecutionState;
import models.workflow.executions.triggers.TriggerStatus;
import models.workflow.executions.triggers.WorkFlowExecutionTrigger;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/TriggersThreadPool.class */
public final class TriggersThreadPool {
    private static final Logger log = LoggerFactory.getLogger(TriggersThreadPool.class);
    private final ConcurrentHashMap<String, NodeProcessorTrigger<?, ?>> activeTriggers = new ConcurrentHashMap<>();
    private final Set<String> stopping = Collections.newSetFromMap(new ConcurrentHashMap());
    private final WorkFlowExecutionManager manager;
    private Cancellable intervalExecutorInitializer;

    public TriggersThreadPool(@NotNull WorkFlowExecutionManager workFlowExecutionManager) {
        this.manager = workFlowExecutionManager;
    }

    private void initIntervalExecutor() {
        if (this.intervalExecutorInitializer == null) {
            Duration duration = this.manager.getWorkFlowConfigs().getTriggersSchedulerInterval().get();
            logger().info("Starting interval executor with period: {} ...", duration);
            ActorSystem actorSystem = this.manager.getActorSystem();
            this.intervalExecutorInitializer = actorSystem.scheduler().scheduleWithFixedDelay(scala.concurrent.duration.Duration.create(5L, TimeUnit.SECONDS), scala.concurrent.duration.Duration.create(duration.toMillis(), TimeUnit.MILLISECONDS), this::schedulerProcess, actorSystem.dispatchers().lookup("workflow.triggers"));
        }
    }

    public Set<String> getActiveTriggers() {
        return new HashSet(this.activeTriggers.keySet());
    }

    private void schedulerProcess() {
        try {
            if (!AppConfig.shuttingDown && !this.manager.isStopping() && !this.manager.isStopped() && !this.activeTriggers.isEmpty()) {
                IterationsThreadPool iterationsPool = this.manager.getIterationsPool();
                long queueSize = iterationsPool.getQueueSize();
                long activeSize = iterationsPool.getActiveSize();
                if (queueSize >= this.manager.getWorkFlowConfigs().getBackPressureBuffer().get().intValue()) {
                    logger().warn("The Execution buffer is full (Active {}, Queued {}) skipping this scheduler scan, until freeing up.", Long.valueOf(activeSize), Long.valueOf(queueSize));
                    return;
                }
                for (NodeProcessorTrigger<?, ?> nodeProcessorTrigger : this.activeTriggers.values()) {
                    if (nodeProcessorTrigger.isRunnable()) {
                        nodeProcessorTrigger.scheduledProcess();
                        logger().trace("Running process on processId: {}", nodeProcessorTrigger.getId());
                    }
                }
            }
        } catch (Throwable th) {
            logger().error("Failure occurred while running scheduling check of triggers processors", th);
        }
    }

    private WorkFlowExecutionTrigger createTriggerExecution(String str) {
        return this.manager.getExecution().createOrGetTriggerExecution(logger(), str);
    }

    public void initTriggers(@NotNull User user, boolean z, boolean z2, Set<String> set) throws ExecutionException {
        ConcurrentHashMap<String, NodeConfiguration<? extends AbstractNodeConfigurationData>> nodes = this.manager.getNodes();
        for (Map.Entry<String, NodeConfiguration<? extends AbstractNodeConfigurationData>> entry : nodes.entrySet()) {
            String key = entry.getKey();
            NodeConfiguration<? extends AbstractNodeConfigurationData> value = entry.getValue();
            try {
                if (value.hasStartStop()) {
                    if (value.getUi().isValid()) {
                        WorkFlowExecutionTrigger createTriggerExecution = createTriggerExecution(value.getInstanceId());
                        TriggerExecutionState.TriggerAction lastAction = createTriggerExecution.getState().getLastAction();
                        boolean z3 = (set != null && set.contains(key)) || z2;
                        boolean z4 = lastAction != null && lastAction.equals(TriggerExecutionState.TriggerAction.INIT);
                        boolean z5 = lastAction != null && lastAction.equals(TriggerExecutionState.TriggerAction.START);
                        if ((z3 && z4) || z5) {
                            if (z) {
                                createTriggerExecution.getState().setLastAction(TriggerExecutionState.TriggerAction.STOP);
                                createTriggerExecution.getState().setAutoDisabled(true);
                                createTriggerExecution.updateStopped();
                                logger().info("Auto disabled trigger {} from init allowAutoStart = false", key);
                            } else {
                                startOrRestartIfChanged(value, createTriggerExecution, user);
                            }
                        }
                    } else if (isRunning(key)) {
                        logger().info("Stopping trigger {} because it is not valid, and found running.", key);
                        stopTriggerById(key, user);
                    }
                }
            } catch (Exception e) {
                logger().error("Failed while running trigger {}, {}", new Object[]{key, value.getNodeTypeId(), e});
                this.manager.messagesStore().onNext(NodeProcessorMessage.buildExecutionError("Start Trigger Failed", String.format("Failed while running trigger %s, %s, Error: %s", key, value.getNodeTypeId(), e.getMessage()), e));
            }
        }
        ArrayList arrayList = new ArrayList(this.manager.getExecution().triggers());
        for (int i = 0; i < arrayList.size(); i++) {
            WorkFlowExecutionTrigger workFlowExecutionTrigger = (WorkFlowExecutionTrigger) arrayList.get(i);
            if (workFlowExecutionTrigger != null) {
                try {
                    if (!nodes.containsKey(workFlowExecutionTrigger.getProcessId())) {
                        if (isRunning(workFlowExecutionTrigger.getProcessId())) {
                            stopTriggerById(workFlowExecutionTrigger.getProcessId(), user);
                        }
                        this.manager.getExecution().deleteTriggerById(logger(), workFlowExecutionTrigger.getProcessId());
                    }
                } catch (Exception e2) {
                    logger().error("Failed while deleting and stopping trigger {}", workFlowExecutionTrigger.getProcessId(), e2);
                }
            }
        }
    }

    private void startOrRestartIfChanged(@NotNull NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration, WorkFlowExecutionTrigger workFlowExecutionTrigger, User user) throws Exception {
        NodeProcessorTrigger<?, ?> activeProcessor = getActiveProcessor(nodeConfiguration.getInstanceId());
        if (activeProcessor == null) {
            logger().info("Starting trigger {} from initTriggers ...", nodeConfiguration.getInstanceId());
            startTrigger(workFlowExecutionTrigger, user);
        } else if (activeProcessor.getRunningRevision() < nodeConfiguration.getRevision()) {
            this.manager.messagesStore().onNext(NodeProcessorMessage.buildTriggerInfo(workFlowExecutionTrigger.getProcessId(), "Config Changed", "Restarting trigger since config has changed from revision " + activeProcessor.getRunningRevision() + " to " + nodeConfiguration.getRevision() + "."));
            stopTrigger(workFlowExecutionTrigger, user, false, false, true);
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
            startTrigger(workFlowExecutionTrigger, user);
        }
    }

    private void startTrigger(@NotNull WorkFlowExecutionTrigger workFlowExecutionTrigger, @NotNull User user) throws Exception {
        try {
            if (isRunning(workFlowExecutionTrigger.getProcessId())) {
                logger().warn("Trigger " + workFlowExecutionTrigger.getProcessId() + " already started.");
                return;
            }
            removeStopping(workFlowExecutionTrigger.getProcessId());
            NodeConfiguration nodeConfig = this.manager.getNodeConfig(workFlowExecutionTrigger.getProcessId());
            if (!nodeConfig.getUi().isValid()) {
                throw new Exception("Trigger is " + nodeConfig.getUi().getValidationCause());
            }
            if (!nodeConfig.getNodeType().licensed()) {
                throw new LicenseExpiredException("You don't have a license to use the trigger " + nodeConfig.getNodeType().getId());
            }
            NodeProcessorTrigger<?, ?> createTriggerInstance = ProcessorFactory.createTriggerInstance(this.manager, nodeConfig, workFlowExecutionTrigger);
            addActive(createTriggerInstance);
            createTriggerInstance.setAutoDisabled(false);
            try {
                createTriggerInstance.setThreadName();
                createTriggerInstance.setLastAction(TriggerExecutionState.TriggerAction.START);
                if (createTriggerInstance.isRunnable()) {
                    initIntervalExecutor();
                }
                createTriggerInstance.updateStartedBy(user);
                createTriggerInstance.start();
                logger().info("Allocating processId: {} (Runnable: {})", workFlowExecutionTrigger.getProcessId(), Boolean.valueOf(createTriggerInstance.isRunnable()));
                this.manager.messagesStore().onNext(NodeProcessorMessage.buildTriggerInfo(workFlowExecutionTrigger.getProcessId(), "Trigger Started", "Trigger " + workFlowExecutionTrigger.getProcessId() + " Starting by " + user.getUsername() + " (Runnable: " + createTriggerInstance.isRunnable() + ")"));
                createTriggerInstance.unSetThreadName();
            } catch (Throwable th) {
                createTriggerInstance.unSetThreadName();
                throw th;
            }
        } catch (Exception e) {
            logger().error("Failed while starting trigger {}", workFlowExecutionTrigger.getProcessId(), e);
            publishTriggerStatus(workFlowExecutionTrigger, TriggerStatus.FAILED, e.getMessage());
            stopTrigger(workFlowExecutionTrigger, user, false, false, true);
            throw e;
        }
    }

    private void publishTriggerStatus(@NotNull WorkFlowExecutionTrigger workFlowExecutionTrigger, TriggerStatus triggerStatus, String str) {
        workFlowExecutionTrigger.setStatus(triggerStatus);
        workFlowExecutionTrigger.getState().setLastMessage(str);
        workFlowExecutionTrigger.updateNow();
    }

    private void stopTrigger(@NotNull WorkFlowExecutionTrigger workFlowExecutionTrigger, User user, boolean z, boolean z2, boolean z3) throws Exception {
        String processId = workFlowExecutionTrigger.getProcessId();
        if (!isRunning(processId)) {
            if (workFlowExecutionTrigger.getStatus().equals(TriggerStatus.RUNNING)) {
                workFlowExecutionTrigger.updateStopped();
                logger().warn("Found in db stuck as running, setting it to stopped.");
            }
            throw new Exception("Trigger " + processId + " is not running!");
        }
        NodeProcessorTrigger<?, ?> activeProcessor = getActiveProcessor(processId);
        try {
            try {
                setStopping(processId);
                String str = user != null ? "Stopped by " + user.getUsername() : "";
                logger().debug("Stopping trigger processId: {}", processId);
                if (activeProcessor != null) {
                    activeProcessor.setThreadName();
                    activeProcessor.abort(StopEvent.builder().force(z3).build());
                    if (!z && activeProcessor.executionTrigger().getState().isAutoDisabled()) {
                        str = "Auto Disabled";
                    }
                    if (z2) {
                        activeProcessor.setAutoDisabled(false);
                    }
                    activeProcessor.updateTriggerStatus(TriggerStatus.STOPPED, str);
                    if (z) {
                        activeProcessor.setLastAction(TriggerExecutionState.TriggerAction.STOP);
                    }
                    activeProcessor.updateState();
                } else {
                    publishTriggerStatus(workFlowExecutionTrigger, TriggerStatus.STOPPED, str);
                }
                logger().info("Trigger processId: {} stopped successfully", processId);
                this.manager.messagesStore().onNext(NodeProcessorMessage.buildTriggerInfo(workFlowExecutionTrigger.getProcessId(), "Trigger Stopped", "Trigger " + workFlowExecutionTrigger.getProcessId() + " " + (user != null ? "Stopped by " + user.getUsername() : "System") + " (Manually: " + z + ")"));
                if (activeProcessor != null) {
                    removeActive(activeProcessor);
                    activeProcessor.unSetThreadName();
                }
            } catch (Exception e) {
                Thread.sleep(100L);
                logger().error("Failed while stopping trigger {}", processId, e);
                this.manager.messagesStore().onNext(NodeProcessorMessage.buildTriggerInfo(workFlowExecutionTrigger.getProcessId(), "Trigger Stopped", "Trigger " + workFlowExecutionTrigger.getProcessId() + " Failed to stop: " + e.getMessage()));
                if (activeProcessor != null) {
                    activeProcessor.updateTriggerStatus(TriggerStatus.FAILED, e.getMessage());
                    activeProcessor.updateState();
                } else {
                    publishTriggerStatus(workFlowExecutionTrigger, TriggerStatus.FAILED, e.getMessage());
                }
                throw e;
            }
        } catch (Throwable th) {
            if (activeProcessor != null) {
                removeActive(activeProcessor);
                activeProcessor.unSetThreadName();
            }
            throw th;
        }
    }

    public void startTriggerById(String str, User user) throws Exception {
        startTrigger(createTriggerExecution(str), user);
    }

    public void stopTriggerById(String str, User user) throws Exception {
        stopTrigger(createTriggerExecution(str), user, true, false, true);
    }

    public synchronized NodeProcessorTrigger<?, ?> getActiveProcessor(String str) {
        return this.activeTriggers.get(str);
    }

    public synchronized NodeProcessorTrigger<?, ?> getActiveProcessorByType(RegisteredNodeType registeredNodeType) {
        return (NodeProcessorTrigger) this.activeTriggers.searchValues(1L, nodeProcessorTrigger -> {
            if (nodeProcessorTrigger.getNodeType().equals(registeredNodeType)) {
                return nodeProcessorTrigger;
            }
            return null;
        });
    }

    private synchronized boolean isRunning(String str) {
        logger().trace("Checking isRunning: {}", str);
        return getActiveProcessor(str) != null;
    }

    private synchronized void addActive(@NotNull NodeProcessorTrigger<?, ?> nodeProcessorTrigger) {
        removeStopping(nodeProcessorTrigger.getId());
        this.activeTriggers.put(nodeProcessorTrigger.getId(), nodeProcessorTrigger);
        logger().debug("Adding active: {}", nodeProcessorTrigger.getId());
    }

    public synchronized void removeActive(@NotNull NodeProcessorTrigger<?, ?> nodeProcessorTrigger) {
        removeStopping(nodeProcessorTrigger.getId());
        nodeProcessorTrigger.onDestroy();
        this.activeTriggers.remove(nodeProcessorTrigger.getId());
        logger().debug("Removing active: {}", nodeProcessorTrigger.getId());
    }

    private synchronized void setStopping(String str) {
        this.stopping.add(str);
        logger().trace("Set Stopping: {}", str);
    }

    private synchronized void removeStopping(String str) {
        this.stopping.remove(str);
        logger().trace("Remove Stopping: {}", str);
    }

    private synchronized boolean isStopping(String str) {
        return this.stopping.contains(str);
    }

    public synchronized boolean isTriggerStopped(String str) {
        return isStopping(str) || this.manager.isStopped() || this.manager.isStopping();
    }

    public void shutdownTriggers(boolean z) {
        try {
            try {
                if (this.manager.isDev()) {
                    autoDisableTriggers();
                }
                if (this.intervalExecutorInitializer != null) {
                    logger().info("Cancelling Interval executor ...");
                    this.intervalExecutorInitializer.cancel();
                }
                for (NodeProcessorTrigger<?, ?> nodeProcessorTrigger : this.activeTriggers.values()) {
                    try {
                        stopTrigger(nodeProcessorTrigger.executionTrigger(), null, false, true, z);
                    } catch (Exception e) {
                        logger().error("Failed while stopping trigger {}", nodeProcessorTrigger.getId(), e);
                        this.manager.messagesStore().onNext(NodeProcessorMessage.buildTriggerError(nodeProcessorTrigger.getId(), "Trigger Stop", e.getMessage(), e));
                    }
                }
                this.activeTriggers.clear();
                this.activeTriggers.clear();
                this.stopping.clear();
            } catch (Throwable th) {
                this.activeTriggers.clear();
                this.stopping.clear();
                throw th;
            }
        } catch (Exception e2) {
            logger().error("Error while shutting down triggers", e2);
            this.activeTriggers.clear();
            this.stopping.clear();
        }
    }

    private void autoDisableTriggers() {
        NodeProcessorTrigger<?, ?> activeProcessor;
        for (NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration : this.manager.getWorkFlow().getNodes()) {
            if (nodeConfiguration != null) {
                try {
                    if (nodeConfiguration.hasStartStop() && (activeProcessor = getActiveProcessor(nodeConfiguration.getInstanceId())) != null && activeProcessor.isRunnable()) {
                        activeProcessor.setAutoDisabled(true);
                        logger().info("Auto disabling trigger {}", nodeConfiguration.getInstanceId());
                    }
                } catch (Exception e) {
                    logger().error("Failed while auto disabling trigger {}", nodeConfiguration.getInstanceId(), e);
                }
            }
        }
    }

    public Logger logger() {
        try {
            return this.manager.logger();
        } catch (Exception e) {
            return log;
        }
    }
}
