package com.nazdaq.workflow.engine.core.storage.utils.dataframe;

import com.nazdaq.workflow.engine.core.processor.ProcessorContext;
import com.nazdaq.workflow.engine.core.storage.models.inout.datatypes.DataFrame;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.tablesaw.api.Table;

/* loaded from: input_file:com/nazdaq/workflow/engine/core/storage/utils/dataframe/DataFrameTableWriter.class */
public class DataFrameTableWriter implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(DataFrameTableWriter.class);
    private final int parallelCount;
    private final ProcessorContext context;
    private final DataFrame dataFrame;
    private final ExecutorService executor;
    private final AtomicInteger isWriting = new AtomicInteger(0);
    private final Duration timeout;

    public DataFrameTableWriter(@NotNull ProcessorContext processorContext, @NotNull DataFrame dataFrame) {
        this.parallelCount = processorContext.getConfigs().getExecution().getParallelSafe(processorContext.getManager().getWorkFlowConfigs());
        this.context = processorContext;
        this.dataFrame = dataFrame;
        this.executor = Executors.newFixedThreadPool(this.parallelCount);
        this.timeout = processorContext.getConfigs().getExecution().getExecutionTimeout();
        processorContext.logger().debug("Started new writer for table: {}", dataFrame.getId());
    }

    public void writeTable(@NotNull Table table, int i) throws InterruptedException {
        if (this.isWriting.get() > this.parallelCount) {
            this.context.logger().debug("Writing queue is full we wait for slots to free to write: {}, Part: {} ...", this.dataFrame.getId(), Integer.valueOf(i));
            Instant now = Instant.now();
            while (this.isWriting.get() > this.parallelCount) {
                synchronized (this.isWriting) {
                    this.isWriting.wait(1000L);
                }
                if (this.context.isStopping()) {
                    throw new RuntimeException("Aborting");
                }
                if (now.plus((TemporalAmount) this.timeout).isBefore(Instant.now())) {
                    throw new RuntimeException("Reached timeout while waiting for free slot for writing table");
                }
            }
        }
        this.isWriting.incrementAndGet();
        this.executor.submit(() -> {
            try {
                try {
                    if (this.context.isStopping()) {
                        throw new RuntimeException("Aborting");
                    }
                    this.dataFrame.updateTablePart(this.context, table, i);
                    this.isWriting.decrementAndGet();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                this.isWriting.decrementAndGet();
                throw th;
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.executor.shutdown();
        this.context.logger().debug("Closed writer for table: {}, Awaiting {}", this.dataFrame.getId(), Boolean.valueOf(this.executor.awaitTermination(this.context.getConfigs().getExecution().getExecutionTimeout().getSeconds(), TimeUnit.SECONDS)));
    }
}
