import { Observable, RetryConfig, finalize, mergeMap, retry, tap } from 'rxjs';

export interface EventSourceConfig {
  params?: Record<string, string>;
  retries?: RetryConfig;
  events?: string[];
}

export function createEventSource(url: string) {
  const source = new EventSource(url);

  return new Observable<EventSource>((observer) => {
    source.addEventListener('error', () => {
      source.close();
      observer.error(source);
    });
    source.addEventListener('open', () => {
      observer.next(source);
      observer.complete();
    });
  });
}

export function listenEvents(source: EventSource, events: string[]) {
  return new Observable((observer) => {
    const nextFn = (e: any) => {
      observer.next({
        type: e.type,
        data: JSON.parse(e.data)
      });
    };
    events.forEach((event) => source.addEventListener(event, nextFn));
  }).pipe(finalize(() => source.close()));
}

export function eventSource(
  url: string,
  config?: EventSourceConfig
): Observable<any> {
  const paremeters = new URLSearchParams(config.params ?? {}).toString();
  const sourceUrl = paremeters ? `${url}?${paremeters}` : url;
  const retries = config?.retries ?? { count: 3, delay: 1000 };
  const events = config?.events ?? ['event'];

  return createEventSource(sourceUrl).pipe(
    retry(retries),
    mergeMap((s) => listenEvents(s, events))
  );
}
