/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.env;

import java.io.File;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.agents.api.Agent;
import org.apache.flink.agents.api.AgentBuilder;
import org.apache.flink.agents.api.AgentsExecutionEnvironment;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.runtime.CompileUtils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.YamlParserUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class RemoteExecutionEnvironment
extends AgentsExecutionEnvironment {
    private final StreamExecutionEnvironment env;
    @Nullable
    private StreamTableEnvironment tEnv;
    private final AgentConfiguration config;
    public static final String FLINK_CONF_FILENAME = "config.yaml";

    public RemoteExecutionEnvironment(StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) {
        this.env = env;
        this.tEnv = tEnv;
        String configDir = System.getenv("FLINK_CONF_DIR");
        this.config = RemoteExecutionEnvironment.loadAgentConfiguration(configDir);
    }

    private StreamTableEnvironment getTableEnvironment() {
        if (this.tEnv == null) {
            this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        }
        return this.tEnv;
    }

    @Override
    public AgentConfiguration getConfig() {
        return this.config;
    }

    @Override
    public AgentBuilder fromList(List<Object> input) {
        throw new UnsupportedOperationException("RemoteExecutionEnvironment does not support fromList. Use fromDataStream or fromTable instead.");
    }

    @Override
    public <T, K> AgentBuilder fromDataStream(DataStream<T> input, KeySelector<T, K> keySelector) {
        return new RemoteAgentBuilder<T, K>(input, this.tEnv, keySelector, this.env, this.config, (Map<ResourceType, Map<String, Object>>)this.resources);
    }

    @Override
    public <K> AgentBuilder fromTable(Table input, KeySelector<Object, K> keySelector) {
        return new RemoteAgentBuilder(input, this.getTableEnvironment(), keySelector, this.env, this.config, (Map<ResourceType, Map<String, Object>>)this.resources);
    }

    @Override
    public void execute() throws Exception {
        this.env.execute();
    }

    public static AgentConfiguration loadAgentConfiguration(String configDir) {
        try {
            if (configDir == null) {
                return new AgentConfiguration();
            }
            Map configData = YamlParserUtils.loadYamlFile((File)new File(configDir, FLINK_CONF_FILENAME)).getOrDefault("agent", new HashMap());
            return new AgentConfiguration(configData);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to load Flink Agents configuration from " + configDir, e);
        }
    }

    private static class RemoteAgentBuilder<T, K>
    implements AgentBuilder {
        private final DataStream<T> inputDataStream;
        private final KeySelector<T, K> keySelector;
        private final StreamExecutionEnvironment env;
        @Nullable
        private StreamTableEnvironment tableEnv;
        private final AgentConfiguration config;
        private final Map<ResourceType, Map<String, Object>> resources;
        private AgentPlan agentPlan;
        private DataStream<Object> outputDataStream;

        public RemoteAgentBuilder(DataStream<T> inputDataStream, @Nullable StreamTableEnvironment tableEnv, KeySelector<T, K> keySelector, StreamExecutionEnvironment env, AgentConfiguration config, Map<ResourceType, Map<String, Object>> resources) {
            this.inputDataStream = inputDataStream;
            this.keySelector = keySelector;
            this.env = env;
            this.tableEnv = tableEnv;
            this.config = config;
            this.resources = resources;
        }

        public RemoteAgentBuilder(Table inputTable, StreamTableEnvironment tableEnv, KeySelector<Object, K> keySelector, StreamExecutionEnvironment env, AgentConfiguration config, Map<ResourceType, Map<String, Object>> resources) {
            this.inputDataStream = tableEnv.toDataStream(inputTable);
            this.keySelector = keySelector;
            this.env = env;
            this.tableEnv = tableEnv;
            this.config = config;
            this.resources = resources;
        }

        private StreamTableEnvironment getTableEnvironment() {
            if (this.tableEnv == null) {
                this.tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
            }
            return this.tableEnv;
        }

        @Override
        public AgentBuilder apply(Agent agent) {
            try {
                agent.addResourcesIfAbsent(this.resources);
                this.agentPlan = new AgentPlan(agent, this.config);
                return this;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create agent plan from agent", e);
            }
        }

        @Override
        public List<Map<String, Object>> toList() {
            throw new UnsupportedOperationException("RemoteAgentBuilder does not support toList. Use toDataStream or toTable instead.");
        }

        @Override
        public DataStream<Object> toDataStream() {
            if (this.agentPlan == null) {
                throw new IllegalStateException("Must apply agent before calling toDataStream");
            }
            if (this.outputDataStream == null) {
                this.outputDataStream = this.keySelector != null ? CompileUtils.connectToAgent(this.inputDataStream, this.keySelector, this.agentPlan) : CompileUtils.connectToAgent(this.inputDataStream, (KeySelector & Serializable)x -> x, this.agentPlan);
            }
            return this.outputDataStream;
        }

        @Override
        public Table toTable(Schema schema) {
            DataStream<Object> dataStream = this.toDataStream();
            return this.getTableEnvironment().fromDataStream(dataStream, schema);
        }
    }
}

