/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.dataobjects;

import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.api.InputPartitioner;
import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner;
import edu.iu.dsc.tws.data.fs.io.InputSplit;
import edu.iu.dsc.tws.dataset.DataSource;
import edu.iu.dsc.tws.executor.core.ExecutionRuntime;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DataFileSplittedReadSource<T>
extends BaseSource {
    private static final Logger LOG = Logger.getLogger(DataFileSplittedReadSource.class.getName());
    private static final long serialVersionUID = -1L;
    private DataSource<?, ?> source;
    private String edgeName;
    private String dataDirectory;

    public String getDataDirectory() {
        return this.dataDirectory;
    }

    public void setDataDirectory(String dataDirectory) {
        this.dataDirectory = dataDirectory;
    }

    public DataFileSplittedReadSource(String edgename) {
        this.edgeName = edgename;
    }

    public String getEdgeName() {
        return this.edgeName;
    }

    public void setEdgeName(String edgeName) {
        this.edgeName = edgeName;
    }

    public void execute() {
        LOG.fine("Context Task Index:" + this.context.taskIndex() + "\t" + this.getEdgeName());
        InputSplit inputSplit = this.source.getNextSplit(this.context.taskIndex());
        int totalCount = 0;
        while (inputSplit != null) {
            try {
                int count = 0;
                while (!inputSplit.reachedEnd()) {
                    Object value = inputSplit.nextRecord(null);
                    if (value == null) continue;
                    this.context.write(this.getEdgeName(), value);
                    ++count;
                    ++totalCount;
                }
                inputSplit = this.source.getNextSplit(this.context.taskIndex());
            }
            catch (IOException e) {
                LOG.log(Level.SEVERE, "Failed to read the input", e);
            }
        }
        this.context.end(this.getEdgeName());
    }

    public void prepare(Config cfg, TaskContext context) {
        super.prepare(cfg, context);
        ExecutionRuntime runtime = (ExecutionRuntime)cfg.get("_twister2.runtime_");
        this.source = runtime.createInput(cfg, context, (InputPartitioner)new LocalTextInputPartitioner(new Path(this.getDataDirectory()), context.getParallelism(), this.config));
    }
}

