import { Observable } from 'rxjs';
import { z } from 'zod';

export interface WebsocketResponse<R, T> {
  request: T;
  response: R;
}

export function listenForWebsocketMessages<Schema extends z.ZodTypeAny>(
  ws: WebSocket,
  schema: Schema
): Observable<z.infer<Schema>> {
  return new Observable<z.infer<Schema>>(observer => {
    const messageHandler = (event: MessageEvent) => {
      const responseWrapper = schema.safeParse(JSON.parse(event.data));
      if (!responseWrapper.success) {
        // ignore messages that could not be parsed
        return;
      }

      observer.next(responseWrapper.data);
    };
    ws.addEventListener('message', messageHandler);

    const closeHandler = () => {
      observer.complete();
      ws.removeEventListener('message', messageHandler);
      ws.removeEventListener('close', closeHandler);
    };

    ws.addEventListener('close', closeHandler);
  });
}

export function sendWebsocketMessageAndAwaitResponse<
  Schema extends z.ZodTypeAny,
  T extends { messageId: string }
>(
  ws: WebSocket,
  request: T,
  schema: Schema
): Observable<WebsocketResponse<z.infer<Schema>, T>> {
  const sender = () => ws.send(JSON.stringify(request));
  if (ws.readyState === ws.CONNECTING) {
    ws.addEventListener('open', () => {
      sender();
      ws.removeEventListener('open', sender);
    });
  } else {
    sender();
  }

  return new Observable<WebsocketResponse<z.infer<Schema>, T>>(observer => {
    const handler = (event: MessageEvent) => {
      const responseWrapper = schema.safeParse(JSON.parse(event.data));
      if (!responseWrapper.success) {
        // ignore messages that could not be parsed
        return;
      }
      const response = responseWrapper.data;

      const { messageId } = response;
      // skip message that we have not sent
      if (!messageId || messageId !== request.messageId) {
        return;
      }

      observer.next({
        response: response,
        request,
      });
      observer.unsubscribe();
      ws.removeEventListener('message', handler);
    };
    ws.addEventListener('message', handler);
  });
}
