RxJS operator waitUntil












1















a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6


Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?



I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.










share|improve this question























  • You can use delay operator for that

    – Oles Savluk
    Jan 19 at 9:10
















1















a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6


Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?



I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.










share|improve this question























  • You can use delay operator for that

    – Oles Savluk
    Jan 19 at 9:10














1












1








1








a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6


Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?



I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.










share|improve this question














a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6


Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?



I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.







rxjs






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Jan 18 at 18:52









delashumdelashum

706




706













  • You can use delay operator for that

    – Oles Savluk
    Jan 19 at 9:10



















  • You can use delay operator for that

    – Oles Savluk
    Jan 19 at 9:10

















You can use delay operator for that

– Oles Savluk
Jan 19 at 9:10





You can use delay operator for that

– Oles Savluk
Jan 19 at 9:10












2 Answers
2






active

oldest

votes


















1














I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.



a$.pipe(
multicast(new Subject(), s => concat(
s.pipe(
buffer(b$),
take(1),
),
s
)),
)


multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.






share|improve this answer































    1

















    const { concat, interval, of, from } = rxjs;
    const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

    const waitUntil = signal$ => source$ => {
    const sharedSource$ = source$.pipe(share());
    return concat(
    sharedSource$.pipe(
    takeUntil(signal$),
    toArray(),
    mergeMap(from)
    ),
    sharedSource$
    );
    }

    const stopWaiting$ = of('signal').pipe(delay(2000));

    const source$ = interval(500).pipe(
    waitUntil(stopWaiting$)
    ).subscribe(console.log);

    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>








    share|improve this answer

























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259886%2frxjs-operator-waituntil%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      1














      I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.



      a$.pipe(
      multicast(new Subject(), s => concat(
      s.pipe(
      buffer(b$),
      take(1),
      ),
      s
      )),
      )


      multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.






      share|improve this answer




























        1














        I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.



        a$.pipe(
        multicast(new Subject(), s => concat(
        s.pipe(
        buffer(b$),
        take(1),
        ),
        s
        )),
        )


        multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.






        share|improve this answer


























          1












          1








          1







          I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.



          a$.pipe(
          multicast(new Subject(), s => concat(
          s.pipe(
          buffer(b$),
          take(1),
          ),
          s
          )),
          )


          multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.






          share|improve this answer













          I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.



          a$.pipe(
          multicast(new Subject(), s => concat(
          s.pipe(
          buffer(b$),
          take(1),
          ),
          s
          )),
          )


          multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Jan 20 at 12:14









          martinmartin

          43k1187130




          43k1187130

























              1

















              const { concat, interval, of, from } = rxjs;
              const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

              const waitUntil = signal$ => source$ => {
              const sharedSource$ = source$.pipe(share());
              return concat(
              sharedSource$.pipe(
              takeUntil(signal$),
              toArray(),
              mergeMap(from)
              ),
              sharedSource$
              );
              }

              const stopWaiting$ = of('signal').pipe(delay(2000));

              const source$ = interval(500).pipe(
              waitUntil(stopWaiting$)
              ).subscribe(console.log);

              <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>








              share|improve this answer






























                1

















                const { concat, interval, of, from } = rxjs;
                const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

                const waitUntil = signal$ => source$ => {
                const sharedSource$ = source$.pipe(share());
                return concat(
                sharedSource$.pipe(
                takeUntil(signal$),
                toArray(),
                mergeMap(from)
                ),
                sharedSource$
                );
                }

                const stopWaiting$ = of('signal').pipe(delay(2000));

                const source$ = interval(500).pipe(
                waitUntil(stopWaiting$)
                ).subscribe(console.log);

                <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>








                share|improve this answer




























                  1












                  1








                  1










                  const { concat, interval, of, from } = rxjs;
                  const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

                  const waitUntil = signal$ => source$ => {
                  const sharedSource$ = source$.pipe(share());
                  return concat(
                  sharedSource$.pipe(
                  takeUntil(signal$),
                  toArray(),
                  mergeMap(from)
                  ),
                  sharedSource$
                  );
                  }

                  const stopWaiting$ = of('signal').pipe(delay(2000));

                  const source$ = interval(500).pipe(
                  waitUntil(stopWaiting$)
                  ).subscribe(console.log);

                  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>








                  share|improve this answer


















                  const { concat, interval, of, from } = rxjs;
                  const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

                  const waitUntil = signal$ => source$ => {
                  const sharedSource$ = source$.pipe(share());
                  return concat(
                  sharedSource$.pipe(
                  takeUntil(signal$),
                  toArray(),
                  mergeMap(from)
                  ),
                  sharedSource$
                  );
                  }

                  const stopWaiting$ = of('signal').pipe(delay(2000));

                  const source$ = interval(500).pipe(
                  waitUntil(stopWaiting$)
                  ).subscribe(console.log);

                  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>








                  const { concat, interval, of, from } = rxjs;
                  const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

                  const waitUntil = signal$ => source$ => {
                  const sharedSource$ = source$.pipe(share());
                  return concat(
                  sharedSource$.pipe(
                  takeUntil(signal$),
                  toArray(),
                  mergeMap(from)
                  ),
                  sharedSource$
                  );
                  }

                  const stopWaiting$ = of('signal').pipe(delay(2000));

                  const source$ = interval(500).pipe(
                  waitUntil(stopWaiting$)
                  ).subscribe(console.log);

                  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>





                  const { concat, interval, of, from } = rxjs;
                  const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

                  const waitUntil = signal$ => source$ => {
                  const sharedSource$ = source$.pipe(share());
                  return concat(
                  sharedSource$.pipe(
                  takeUntil(signal$),
                  toArray(),
                  mergeMap(from)
                  ),
                  sharedSource$
                  );
                  }

                  const stopWaiting$ = of('signal').pipe(delay(2000));

                  const source$ = interval(500).pipe(
                  waitUntil(stopWaiting$)
                  ).subscribe(console.log);

                  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>






                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Jan 19 at 12:33

























                  answered Jan 19 at 10:44









                  ZahiCZahiC

                  3,92611221




                  3,92611221






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54259886%2frxjs-operator-waituntil%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      Callistus III

                      Plistias Cous

                      Index Sanctorum