// Collects all of the replies from the grpc stream.
export async function collectReplies(stream) {
  const replies = [];
  await streamProcessor(stream, reply => replies.push(reply));
  return replies;
}

// Applies the processor function to the grpc stream in sequence.
//
// Example:
//
// const sites = [];
// await streamProcessor(
//   GRPCWebInternalCallbackClient.querySites(new QuerySitesRequest()),
//   (response) => sites.push(...response.getSitesList())
// );
export async function streamProcessor(stream, processor) {
  for await (const reply of streamGenerator(stream)) {
    processor(reply);
  }
}

// Applies the reducer function to the replies of the grpc stream in sequence,
// passing in the previous value each time (starting with the initial value).
//
// The reducer should be of the form func(reply, value).
//
// Example:
//
// const sites = await streamReducer(
//   GRPCWebInternalCallbackClient.querySites(new QuerySitesRequest()),
//   (response, value) => value.concat(response.getSitesList()),
//   []
// );
export async function streamReducer(stream, reducer, initialValue) {
  let value = initialValue;
  await streamProcessor(stream, reply => (value = reducer(reply, value)));
  return value;
}

// Returns a generator that yields individual responses from the grpc stream.
//
export async function* streamGenerator(stream) {
  // Adapted from https://stackoverflow.com/a/59347615/13241549.
  //
  // This basically works by using promises as a synchronization primitive.
  // We set up a promise, wait for the promise to be resolved, send any
  // collected replies, then wait on the next promise.
  let done = false;
  let responses = [];
  let promise; // the defer object
  let resolve;
  let reject;

  const resetPromise = () => {
    promise = new Promise((res, rej) => {
      resolve = res;
      reject = rej;
    });
  };

  resetPromise(); // initialization

  stream.on("data", response => {
    responses.push(response);
    resolve();
    resetPromise();
  });

  stream.on("error", err => {
    // Don't set done = true or we may miss the rejection if we are currently
    // blocked in a yield.
    reject(err);
  });

  stream.on("end", () => {
    done = true;
    resolve();
  });

  while (!done) {
    // Wait for the promise to resolve.
    await promise;

    // Return all of the responses we have collected since the last promise.
    // We capture that here, since it's unclear what happens if we push on
    // to the array while we are yielding values from it.
    const newResponses = responses;
    responses = [];
    yield* newResponses;
  }

  // Return any remaining responses.
  yield* responses;
}
