import * as M from '@most/core';
import { currentTime, delay } from '@most/scheduler';
import { $, def } from 'utils/S';
import { tryEvent, tryEnd } from './tryEvent';

export default function getStreamSource(initialValue) {
  const initialEvents = [];
  let source = e => initialEvents.push(e);
  let error = () => {};
  let dispose = () => {};

  const sourceStream = {
    run: (sink, scheduler) => {
      initialEvents.forEach((e, i) => delay(i, M.propagateEventTask(e, sink), scheduler));
      source = def("source")({})([$.Any, $.Undefined])(e => tryEvent(currentTime(scheduler), e, sink));
      error = def("error")({})([$.Any, $.Undefined])(e => sink.error(currentTime(scheduler), e));
      dispose = def("dispose")({})([$.Undefined])(() => tryEnd(currentTime(scheduler), sink));
      return {
        dispose: () => {
          source = () => {};
          dispose = () => {};
        }
      };
    }
  };

  return {
    stream: typeof initialValue !== "undefined" ? M.startWith(initialValue, sourceStream) : sourceStream,
    push: e => source(e),
    error: e => error(e),
    end: () => dispose(),
  };
}

export const fromObservable = (observable, log = false) => {
  const { stream, push, error, end } = getStreamSource();
  observable.subscribe({
    next: (...x) => (log && console.log(...x)) || push(...x),
    error: e => console.error('[getStreamSource.fromObservable]', e) || error(e),
    complete: end,
  });
  return stream;
};

export function toStream(listen, initialValue) {
  const sourceStream = {
    run: (sink, scheduler) => {
      const dispose = listen(e => tryEvent(currentTime(scheduler), e, sink));
      return { dispose };
    }
  };
  return typeof initialValue !== 'undefined'
    ? M.startWith(initialValue, sourceStream)
    : sourceStream;
}
