import type { GraphRun } from '@respell/database';
import { PostHogEvents } from '@respell/utils/tracking/types';
import { useEventSource } from '@vueuse/core';

export function useStreaming() {
  const spellStore = useSpellsStore();
  const { run, isRunning, groupRuns } = storeToRefs(spellStore);

  const agentStore = useAgentsStore();
  const { campaigns } = storeToRefs(agentStore);

  const { $clientPosthog } = useNuxtApp();

  let closeStream: (() => void) | null = null;

  const streamSpell = async ({
    spellId,
    runId,
    groupId,
    type,
  }: {
    spellId: string;
    runId?: string;
    groupId?: string;
    type?: 'test' | 'agent' | 'live';
  }) => {
    isRunning.value = true;

    if (!runId && !groupId) {
      throw new Error('Missing runId or groupId parameter');
    }

    const { data, error, close } = useEventSource(
      `/api/spells/${spellId}/stream?${runId ? `runId=${runId}` : `groupId=${groupId}`}`,
    );

    closeStream = close;

    const unwatch = watch(data, (newData) => {
      if (newData === 'end') {
        $clientPosthog?.capture(PostHogEvents.SpellRunSucceeded, {
          spellId,
          runId,
          groupId,
        });
        stopStreaming();
      } else if (newData) {
        try {
          const parsedData = JSON.parse(newData);
          if (parsedData && parsedData.graph && parsedData.steps) {
            if (groupId) {
              groupRuns.value = groupRuns.value?.map((run: GraphRun) =>
                run.id === parsedData.graph.id
                  ? { ...parsedData.graph, steps: parsedData.steps }
                  : run,
              );
            } else {
              run.value = {
                ...parsedData.graph,
                steps: Object.values(parsedData.steps),
              };
              if (type === 'agent') {
                campaigns.value = campaigns.value?.map((campaign: GraphRun) =>
                  campaign.id === run.value?.id ? run.value : campaign,
                );
              }
              if (
                run.value?.state === 'canceled' ||
                run.value?.state === 'paused'
              ) {
                stopStreaming();
              }
            }
          }
        } catch (parseError) {
          console.warn('Received incomplete or invalid JSON data:', parseError);
        }
      }
    });

    watch(error, (newError) => {
      if (newError) {
        console.error('EventSource error:', newError);
        stopStreaming();
      }
    });

    await until(isRunning).toBe(false);
    unwatch();
  };

  const stopStreaming = () => {
    isRunning.value = false;
    if (closeStream) {
      closeStream();
      closeStream = null;
    }
  };

  return {
    streamSpell,
    stopStreaming,
  };
}
