/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.cancelling;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.cancelling.CancellingTestBase;
import eu.stratosphere.test.recordJobs.util.DiscardingOutputFormat;
import eu.stratosphere.test.recordJobs.util.InfiniteIntegerInputFormat;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;

public class MapCancelingITCase
extends CancellingTestBase {
    public void testMapCancelling() throws Exception {
        GenericDataSource source = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source");
        MapOperator mapper = MapOperator.builder(IdentityMapper.class).input(new Operator[]{source}).name("Identity Mapper").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)mapper, "Sink");
        Plan p = new Plan(sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 5000, 10000);
    }

    public void testSlowMapCancelling() throws Exception {
        GenericDataSource source = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source");
        MapOperator mapper = MapOperator.builder(DelayingIdentityMapper.class).input(new Operator[]{source}).name("Delay Mapper").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)mapper, "Sink");
        Plan p = new Plan(sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 5000, 10000);
    }

    public void testMapWithLongCancellingResponse() throws Exception {
        GenericDataSource source = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source");
        MapOperator mapper = MapOperator.builder(LongCancelTimeIdentityMapper.class).input(new Operator[]{source}).name("Long Cancelling Time Mapper").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)mapper, "Sink");
        Plan p = new Plan(sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 10000, 10000);
    }

    public void testMapPriorToFirstRecordReading() throws Exception {
        GenericDataSource source = new GenericDataSource((InputFormat)new InfiniteIntegerInputFormat(), "Source");
        MapOperator mapper = MapOperator.builder(StuckInOpenIdentityMapper.class).input(new Operator[]{source}).name("Stuck-In-Open Mapper").build();
        GenericDataSink sink = new GenericDataSink((OutputFormat)new DiscardingOutputFormat(), (Operator)mapper, "Sink");
        Plan p = new Plan(sink);
        p.setDefaultParallelism(4);
        this.runAndCancelJob(p, 10000, 10000);
    }

    public static final class StuckInOpenIdentityMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            StuckInOpenIdentityMapper stuckInOpenIdentityMapper = this;
            synchronized (stuckInOpenIdentityMapper) {
                ((Object)((Object)this)).wait();
            }
        }

        public void map(Record record, Collector<Record> out) throws Exception {
            out.collect((Object)record);
        }
    }

    public static final class LongCancelTimeIdentityMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        public void map(Record record, Collector<Record> out) throws Exception {
            long start = System.currentTimeMillis();
            long remaining = 5000L;
            do {
                try {
                    Thread.sleep(remaining);
                }
                catch (InterruptedException iex) {
                    // empty catch block
                }
            } while ((remaining = 5000L - System.currentTimeMillis() + start) > 0L);
            out.collect((Object)record);
        }
    }

    public static final class DelayingIdentityMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        public void map(Record record, Collector<Record> out) throws Exception {
            Thread.sleep(10000L);
            out.collect((Object)record);
        }
    }

    public static final class IdentityMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> out) throws Exception {
            out.collect((Object)record);
        }
    }
}

